diff options
18 files changed, 666 insertions, 23 deletions
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java index 4b24c6e0a8..1eede47451 100644 --- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java +++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java @@ -77,6 +77,12 @@ public interface CDOProtocolConstants public static final short SIGNAL_OBJECT_LOCKED = 27; + public static final short SIGNAL_GET_REMOTE_SESSIONS = 28; + + public static final short SIGNAL_UNSUBSCRIBE_REMOTE_SESSIONS = 29; + + public static final short SIGNAL_REMOTE_SESSION_NOTIFICATION = 30; + // ////////////////////////////////////////////////////////////////////// // Session Management @@ -108,4 +114,17 @@ public interface CDOProtocolConstants // Locking Objects public static final int RELEASE_ALL_LOCKS = -1; + + // ////////////////////////////////////////////////////////////////////// + // Remote Sessions + + public static final int NO_MORE_REMOTE_SESSIONS = -1; + + public static final byte REMOTE_SESSION_OPENED = 1; + + public static final byte REMOTE_SESSION_CLOSED = 2; + + public static final byte REMOTE_SESSION_SUBSCRIBED = 3; + + public static final byte REMOTE_SESSION_UNSUBSCRIBED = 4; } diff --git a/plugins/org.eclipse.emf.cdo.server/.settings/.api_filters b/plugins/org.eclipse.emf.cdo.server/.settings/.api_filters index a492541c0d..02392d2c2a 100644 --- a/plugins/org.eclipse.emf.cdo.server/.settings/.api_filters +++ b/plugins/org.eclipse.emf.cdo.server/.settings/.api_filters @@ -101,13 +101,6 @@ <message_argument value="Session(SessionManager, CDOServerProtocol, int, boolean)"/> </message_arguments> </filter> - <filter id="574660632"> - <message_arguments> - <message_argument value="ISession"/> - <message_argument value="CDOProtocolSession"/> - <message_argument value="Session"/> - </message_arguments> - </filter> </resource> <resource path="src/org/eclipse/emf/cdo/internal/server/PackageManager.java" type="org.eclipse.emf.cdo.internal.server.PackageManager"> <filter id="576720909"> @@ -133,11 +126,5 @@ <message_argument value="ISession"/> </message_arguments> </filter> - <filter id="574619656"> - <message_arguments> - <message_argument value="CDOProtocolSession"/> - <message_argument value="ISession"/> - </message_arguments> - </filter> </resource> </component> diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Session.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Session.java index dd2d0dceda..115055cb5f 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Session.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Session.java @@ -22,11 +22,13 @@ import org.eclipse.emf.cdo.common.id.CDOIDUtil; import org.eclipse.emf.cdo.common.model.CDOClass; import org.eclipse.emf.cdo.common.model.CDOFeature; import org.eclipse.emf.cdo.common.model.CDOPackageURICompressor; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; import org.eclipse.emf.cdo.common.revision.CDORevision; import org.eclipse.emf.cdo.common.revision.delta.CDORevisionDelta; import org.eclipse.emf.cdo.internal.server.bundle.OM; import org.eclipse.emf.cdo.internal.server.protocol.CDOServerProtocol; import org.eclipse.emf.cdo.internal.server.protocol.CommitNotificationRequest; +import org.eclipse.emf.cdo.internal.server.protocol.RemoteSessionNotificationRequest; import org.eclipse.emf.cdo.server.IAudit; import org.eclipse.emf.cdo.server.ISession; import org.eclipse.emf.cdo.server.ITransaction; @@ -84,6 +86,8 @@ public class Session extends Container<IView> implements ISession, CDOIDProvider } }; + private boolean subscribed; + /** * @since 2.0 */ @@ -142,6 +146,29 @@ public class Session extends Container<IView> implements ISession, CDOIDProvider /** * @since 2.0 */ + public boolean isSubscribed() + { + return subscribed; + } + + /** + * @since 2.0 + */ + public void setSubscribed(boolean subscribed) + { + checkActive(); + if (this.subscribed != subscribed) + { + this.subscribed = subscribed; + byte opcode = subscribed ? CDOProtocolConstants.REMOTE_SESSION_SUBSCRIBED + : CDOProtocolConstants.REMOTE_SESSION_UNSUBSCRIBED; + sessionManager.handleRemoteSessionNotification(opcode, this); + } + } + + /** + * @since 2.0 + */ public boolean isPassiveUpdateEnabled() { return passiveUpdateEnabled; @@ -303,6 +330,29 @@ public class Session extends Container<IView> implements ISession, CDOIDProvider } } + /** + * @since 2.0 + */ + public void handleRemoteSessionNotification(byte opcode, ISession session) + { + try + { + IChannel channel = protocol.getChannel(); + if (LifecycleUtil.isActive(channel)) + { + new RemoteSessionNotificationRequest(channel, opcode, session).sendAsync(); + } + else + { + OM.LOG.warn("Session channel is inactive: " + this); + } + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + } + public CDOID provideCDOID(Object idObject) { return (CDOID)idObject; diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java index ebfa19f9f3..751185ad99 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java @@ -14,6 +14,7 @@ package org.eclipse.emf.cdo.internal.server; import org.eclipse.emf.cdo.common.id.CDOID; import org.eclipse.emf.cdo.common.id.CDOIDAndVersion; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; import org.eclipse.emf.cdo.common.revision.delta.CDORevisionDelta; import org.eclipse.emf.cdo.internal.server.bundle.OM; import org.eclipse.emf.cdo.internal.server.protocol.CDOServerProtocol; @@ -118,6 +119,7 @@ public class SessionManager extends Container<ISession> implements ISessionManag } fireElementAddedEvent(session); + handleRemoteSessionNotification(CDOProtocolConstants.REMOTE_SESSION_OPENED, session); return session; } @@ -133,6 +135,7 @@ public class SessionManager extends Container<ISession> implements ISessionManag if (removeSession != null) { fireElementRemovedEvent(session); + handleRemoteSessionNotification(CDOProtocolConstants.REMOTE_SESSION_CLOSED, session); } } @@ -165,4 +168,18 @@ public class SessionManager extends Container<ISession> implements ISessionManag } } } + + /** + * @since 2.0 + */ + public void handleRemoteSessionNotification(byte opcode, Session excludedSession) + { + for (Session session : getSessions()) + { + if (session != excludedSession && session.isSubscribed()) + { + session.handleRemoteSessionNotification(opcode, excludedSession); + } + } + } } diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/CDOServerProtocol.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/CDOServerProtocol.java index ed6b7afe96..70db8e6c23 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/CDOServerProtocol.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/CDOServerProtocol.java @@ -124,6 +124,12 @@ public class CDOServerProtocol extends CDOProtocolImpl case CDOProtocolConstants.SIGNAL_OBJECT_LOCKED: return new ObjectLockedIndication(this); + case CDOProtocolConstants.SIGNAL_GET_REMOTE_SESSIONS: + return new GetRemoteSessionsIndication(this); + + case CDOProtocolConstants.SIGNAL_UNSUBSCRIBE_REMOTE_SESSIONS: + return new UnsubscribeRemoteSessionsIndication(this); + default: return super.createSignalReactor(signalID); } diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/GetRemoteSessionsIndication.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/GetRemoteSessionsIndication.java new file mode 100644 index 0000000000..865f77f565 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/GetRemoteSessionsIndication.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2004 - 2009 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.server.protocol; + +import org.eclipse.emf.cdo.common.io.CDODataInput; +import org.eclipse.emf.cdo.common.io.CDODataOutput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; +import org.eclipse.emf.cdo.internal.server.Session; +import org.eclipse.emf.cdo.internal.server.bundle.OM; + +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import java.io.IOException; + +/** + * @author Eike Stepper + */ +public class GetRemoteSessionsIndication extends CDOReadIndication +{ + private static final ContextTracer PROTOCOL_TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, + GetRemoteSessionsIndication.class); + + private boolean subscribe; + + public GetRemoteSessionsIndication(CDOServerProtocol protocol) + { + super(protocol, CDOProtocolConstants.SIGNAL_GET_REMOTE_SESSIONS); + } + + @Override + protected void indicating(CDODataInput in) throws IOException + { + subscribe = in.readBoolean(); + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Read subscribe: {0}", subscribe); + } + } + + @Override + protected void responding(CDODataOutput out) throws IOException + { + Session localSession = getSession(); + Session[] sessions = getSessionManager().getSessions(); + for (Session session : sessions) + { + if (session != localSession) + { + out.writeInt(session.getSessionID()); + out.writeString(session.getUserID()); + out.writeBoolean(session.isSubscribed()); + } + } + + out.writeInt(CDOProtocolConstants.NO_MORE_REMOTE_SESSIONS); + localSession.setSubscribed(subscribe); + } +} diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/RemoteSessionNotificationRequest.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/RemoteSessionNotificationRequest.java new file mode 100644 index 0000000000..8784503478 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/RemoteSessionNotificationRequest.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2004 - 2009 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Simon McDuff - http://bugs.eclipse.org/201266 + * Simon McDuff - http://bugs.eclipse.org/233490 + */ +package org.eclipse.emf.cdo.internal.server.protocol; + +import org.eclipse.emf.cdo.common.io.CDODataOutput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; +import org.eclipse.emf.cdo.server.ISession; + +import org.eclipse.net4j.channel.IChannel; + +import java.io.IOException; + +/** + * @author Eike Stepper + */ +public class RemoteSessionNotificationRequest extends CDOServerRequest +{ + private byte opcode; + + private ISession session; + + public RemoteSessionNotificationRequest(IChannel channel, byte opcode, ISession session) + { + super(channel, CDOProtocolConstants.SIGNAL_REMOTE_SESSION_NOTIFICATION); + this.opcode = opcode; + this.session = session; + } + + @Override + protected void requesting(CDODataOutput out) throws IOException + { + out.writeByte(opcode); + out.writeInt(session.getSessionID()); + if (opcode == CDOProtocolConstants.REMOTE_SESSION_OPENED) + { + out.writeString(session.getUserID()); + } + } +} diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/UnsubscribeRemoteSessionsIndication.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/UnsubscribeRemoteSessionsIndication.java new file mode 100644 index 0000000000..1c2b1cb7f8 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/UnsubscribeRemoteSessionsIndication.java @@ -0,0 +1,102 @@ +/** + * Copyright (c) 2004 - 2009 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.server.protocol; + +import org.eclipse.emf.cdo.common.id.CDOID; +import org.eclipse.emf.cdo.common.io.CDODataInput; +import org.eclipse.emf.cdo.common.io.CDODataOutput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; +import org.eclipse.emf.cdo.common.revision.CDORevision; +import org.eclipse.emf.cdo.internal.server.bundle.OM; +import org.eclipse.emf.cdo.server.IAudit; +import org.eclipse.emf.cdo.server.IView; + +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Eike Stepper + */ +public class UnsubscribeRemoteSessionsIndication extends CDOReadIndication +{ + private static final ContextTracer PROTOCOL_TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, UnsubscribeRemoteSessionsIndication.class); + + private List<CDORevision> revisions; + + public UnsubscribeRemoteSessionsIndication(CDOServerProtocol protocol) + { + super(protocol, CDOProtocolConstants.SIGNAL_SET_AUDIT); + } + + @Override + protected void indicating(CDODataInput in) throws IOException + { + int viewID = in.readInt(); + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Read viewID: {0}", viewID); + } + + long timeStamp = in.readLong(); + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Read timeStamp: {0,date} {0,time}", timeStamp); + } + + int size = in.readInt(); + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Reading {0} IDs", size); + } + + List<CDOID> invalidObjects = new ArrayList<CDOID>(size); + for (int i = 0; i < size; i++) + { + CDOID id = in.readCDOID(); + invalidObjects.add(id); + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Read ID: {0}", id); + } + } + + IView view = getSession().getView(viewID); + if (view instanceof IAudit) + { + IAudit audit = (IAudit)view; + revisions = audit.setTimeStamp(timeStamp, invalidObjects); + } + } + + @Override + protected void responding(CDODataOutput out) throws IOException + { + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Writing {0} existanceFlags", revisions.size()); + } + + out.writeInt(revisions.size()); + for (CDORevision revision : revisions) + { + boolean existanceFlag = revision != null; + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Writing existanceFlag: {0}", existanceFlag); + } + + out.writeBoolean(existanceFlag); + } + } +} diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/ISession.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/ISession.java index f7a1b22c01..cc6ba3300e 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/ISession.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/ISession.java @@ -26,6 +26,11 @@ public interface ISession extends CDOCommonSession, IContainer<IView> /** * @since 2.0 */ + public boolean isSubscribed(); + + /** + * @since 2.0 + */ public IView openView(int viewID); /** diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionEvent.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionEvent.java index e3ece652c0..e69f160bd4 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionEvent.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionEvent.java @@ -19,4 +19,20 @@ import org.eclipse.emf.cdo.util.CDOEvent; public interface CDORemoteSessionEvent extends CDOEvent { public CDORemoteSession getRemoteSession(); + + /** + * @author Eike Stepper + */ + public interface SubscriptionChanged extends CDORemoteSessionEvent + { + public boolean isSubscribed(); + } + + /** + * @author Eike Stepper + */ + public interface CustomData extends CDORemoteSessionEvent + { + public byte[] getData(); + } } diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionManager.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionManager.java index 19ff8f672f..8b6c57d4be 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionManager.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/session/remote/CDORemoteSessionManager.java @@ -22,8 +22,6 @@ public interface CDORemoteSessionManager extends IContainer<CDORemoteSession> { public CDOSession getLocalSession(); - public CDORemoteSession getRemoteSession(int sessionID); - public CDORemoteSession[] getRemoteSessions(); public boolean isSubscribed(); diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/CDOClientProtocol.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/CDOClientProtocol.java index 356cfa8bde..55271ca0dd 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/CDOClientProtocol.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/CDOClientProtocol.java @@ -19,6 +19,7 @@ import org.eclipse.emf.cdo.common.revision.CDORevision; import org.eclipse.emf.cdo.common.util.TransportException; import org.eclipse.emf.cdo.internal.common.protocol.CDOProtocolImpl; import org.eclipse.emf.cdo.session.CDOSession; +import org.eclipse.emf.cdo.session.remote.CDORemoteSession; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; import org.eclipse.emf.cdo.transaction.CDOTimeStampContext; import org.eclipse.emf.cdo.view.CDOView; @@ -37,6 +38,7 @@ import org.eclipse.net4j.util.om.trace.PerfTracer; import org.eclipse.emf.spi.cdo.AbstractQueryIterator; import org.eclipse.emf.spi.cdo.CDOSessionProtocol; import org.eclipse.emf.spi.cdo.InternalCDOObject; +import org.eclipse.emf.spi.cdo.InternalCDORemoteSessionManager; import org.eclipse.emf.spi.cdo.InternalCDOTransaction.InternalCDOCommitContext; import org.eclipse.emf.spi.cdo.InternalCDOXATransaction.InternalCDOXACommitContext; @@ -230,6 +232,16 @@ public class CDOClientProtocol extends CDOProtocolImpl implements CDOSessionProt return send(new CommitTransactionCancelRequest(this, xaContext), monitor); } + public List<CDORemoteSession> getRemoteSessions(InternalCDORemoteSessionManager manager, boolean subscribe) + { + return send(new GetRemoteSessionsRequest(this, manager, subscribe)); + } + + public void unsubscribeRemoteSessions() + { + send(new UnsubscribeRemoteSessionsRequest(this)); + } + @Override protected SignalReactor createSignalReactor(short signalID) { @@ -238,6 +250,9 @@ public class CDOClientProtocol extends CDOProtocolImpl implements CDOSessionProt case CDOProtocolConstants.SIGNAL_COMMIT_NOTIFICATION: return new CommitNotificationIndication(this); + case CDOProtocolConstants.SIGNAL_REMOTE_SESSION_NOTIFICATION: + return new RemoteSessionNotificationIndication(this); + default: return super.createSignalReactor(signalID); } diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/GetRemoteSessionsRequest.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/GetRemoteSessionsRequest.java new file mode 100644 index 0000000000..e6dae5fbee --- /dev/null +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/GetRemoteSessionsRequest.java @@ -0,0 +1,78 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.emf.internal.cdo.net4j.protocol; + +import org.eclipse.emf.cdo.common.io.CDODataInput; +import org.eclipse.emf.cdo.common.io.CDODataOutput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; +import org.eclipse.emf.cdo.session.remote.CDORemoteSession; + +import org.eclipse.emf.internal.cdo.bundle.OM; + +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.emf.spi.cdo.InternalCDORemoteSessionManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Eike Stepper + */ +public class GetRemoteSessionsRequest extends CDOClientRequest<List<CDORemoteSession>> +{ + private static final ContextTracer PROTOCOL_TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, + GetRemoteSessionsRequest.class); + + private InternalCDORemoteSessionManager manager; + + private boolean subscribe; + + public GetRemoteSessionsRequest(CDOClientProtocol protocol, InternalCDORemoteSessionManager manager, boolean subscribe) + { + super(protocol, CDOProtocolConstants.SIGNAL_GET_REMOTE_SESSIONS); + this.manager = manager; + this.subscribe = subscribe; + } + + @Override + protected void requesting(CDODataOutput out) throws IOException + { + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.format("Writing subscribe: {0}", subscribe); + } + + out.writeBoolean(subscribe); + } + + @Override + protected List<CDORemoteSession> confirming(CDODataInput in) throws IOException + { + List<CDORemoteSession> result = new ArrayList<CDORemoteSession>(); + for (;;) + { + int sessionID = in.readInt(); + if (sessionID == CDOProtocolConstants.NO_MORE_REMOTE_SESSIONS) + { + break; + } + + String userID = in.readString(); + boolean subscribed = in.readBoolean(); + CDORemoteSession remoteSession = manager.createRemoteSession(sessionID, userID, subscribed); + result.add(remoteSession); + } + + return result; + } +} diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/RemoteSessionNotificationIndication.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/RemoteSessionNotificationIndication.java new file mode 100644 index 0000000000..613b4b11fd --- /dev/null +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/RemoteSessionNotificationIndication.java @@ -0,0 +1,59 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Simon McDuff - http://bugs.eclipse.org/233490 + **************************************************************************/ +package org.eclipse.emf.internal.cdo.net4j.protocol; + +import org.eclipse.emf.cdo.common.io.CDODataInput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; + +import org.eclipse.emf.spi.cdo.InternalCDORemoteSessionManager; + +import java.io.IOException; + +/** + * @author Eike Stepper + */ +public class RemoteSessionNotificationIndication extends CDOClientIndication +{ + public RemoteSessionNotificationIndication(CDOClientProtocol protocol) + { + super(protocol, CDOProtocolConstants.SIGNAL_REMOTE_SESSION_NOTIFICATION); + } + + @Override + protected void indicating(CDODataInput in) throws IOException + { + byte opcode = in.readByte(); + int sessionID = in.readInt(); + InternalCDORemoteSessionManager manager = (InternalCDORemoteSessionManager)getSession().getRemoteSessionManager(); + + switch (opcode) + { + case CDOProtocolConstants.REMOTE_SESSION_OPENED: + String userID = in.readString(); + manager.handleRemoteSessionOpened(sessionID, userID); + break; + + case CDOProtocolConstants.REMOTE_SESSION_CLOSED: + manager.handleRemoteSessionClosed(sessionID); + break; + + case CDOProtocolConstants.REMOTE_SESSION_SUBSCRIBED: + manager.handleRemoteSessionSubscribed(sessionID, true); + break; + + case CDOProtocolConstants.REMOTE_SESSION_UNSUBSCRIBED: + manager.handleRemoteSessionSubscribed(sessionID, false); + break; + + } + } +} diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/UnsubscribeRemoteSessionsRequest.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/UnsubscribeRemoteSessionsRequest.java new file mode 100644 index 0000000000..73d35f1ba3 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/net4j/protocol/UnsubscribeRemoteSessionsRequest.java @@ -0,0 +1,50 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.emf.internal.cdo.net4j.protocol; + +import org.eclipse.emf.cdo.common.io.CDODataInput; +import org.eclipse.emf.cdo.common.io.CDODataOutput; +import org.eclipse.emf.cdo.common.protocol.CDOProtocolConstants; + +import org.eclipse.emf.internal.cdo.bundle.OM; + +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import java.io.IOException; + +/** + * @author Eike Stepper + */ +public class UnsubscribeRemoteSessionsRequest extends CDOClientRequest<Boolean> +{ + private static final ContextTracer PROTOCOL_TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, + UnsubscribeRemoteSessionsRequest.class); + + public UnsubscribeRemoteSessionsRequest(CDOClientProtocol protocol) + { + super(protocol, CDOProtocolConstants.SIGNAL_UNSUBSCRIBE_REMOTE_SESSIONS); + } + + @Override + protected void requesting(CDODataOutput out) throws IOException + { + if (PROTOCOL_TRACER.isEnabled()) + { + PROTOCOL_TRACER.trace("Unsubscribing"); + } + } + + @Override + protected Boolean confirming(CDODataInput in) throws IOException + { + return in.readBoolean(); + } +} diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/remote/CDORemoteSessionManagerImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/remote/CDORemoteSessionManagerImpl.java index bb6204ee18..ab1a010f5b 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/remote/CDORemoteSessionManagerImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/remote/CDORemoteSessionManagerImpl.java @@ -11,13 +11,20 @@ package org.eclipse.emf.internal.cdo.session.remote; import org.eclipse.emf.cdo.session.remote.CDORemoteSession; +import org.eclipse.emf.cdo.session.remote.CDORemoteSessionEvent; import org.eclipse.net4j.util.container.Container; +import org.eclipse.net4j.util.container.ContainerEvent; +import org.eclipse.net4j.util.container.IContainerDelta; +import org.eclipse.net4j.util.container.IContainerEvent; +import org.eclipse.net4j.util.event.INotifier; import org.eclipse.emf.spi.cdo.InternalCDORemoteSessionManager; import org.eclipse.emf.spi.cdo.InternalCDOSession; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -43,14 +50,19 @@ public class CDORemoteSessionManagerImpl extends Container<CDORemoteSession> imp return localSession; } - public synchronized CDORemoteSession getRemoteSession(int sessionId) - { - return remoteSessions.get(sessionId); - } - public synchronized CDORemoteSession[] getRemoteSessions() { - return remoteSessions.values().toArray(new CDORemoteSession[remoteSessions.size()]); + Collection<CDORemoteSession> remoteSessions; + if (subscribed) + { + remoteSessions = this.remoteSessions.values(); + } + else + { + remoteSessions = localSession.getSessionProtocol().getRemoteSessions(this, false); + } + + return remoteSessions.toArray(new CDORemoteSession[remoteSessions.size()]); } public CDORemoteSession[] getElements() @@ -73,11 +85,112 @@ public class CDORemoteSessionManagerImpl extends Container<CDORemoteSession> imp this.forceSubscription = forceSubscription; if (forceSubscription && !subscribed) { - subscribe(); + IContainerEvent<CDORemoteSession> event = subscribe(); + // TODO don't fire inside sync block! + fireEvent(event); + } + } + + public CDORemoteSession createRemoteSession(int sessionID, String userID, boolean subscribed) + { + CDORemoteSessionImpl remoteSession = new CDORemoteSessionImpl(this, sessionID, userID); + remoteSession.setSubscribed(subscribed); + return remoteSession; + } + + public synchronized void handleRemoteSessionOpened(int sessionID, String userID) + { + CDORemoteSession remoteSession = createRemoteSession(sessionID, userID, false); + remoteSessions.put(sessionID, remoteSession); + // TODO don't fire inside sync block! + fireElementAddedEvent(remoteSession); + } + + public synchronized void handleRemoteSessionClosed(int sessionID) + { + CDORemoteSession remoteSession = remoteSessions.remove(sessionID); + if (remoteSession != null) + { + // TODO don't fire inside sync block! + fireElementRemovedEvent(remoteSession); + } + } + + public synchronized void handleRemoteSessionSubscribed(int sessionID, final boolean subscribed) + { + final CDORemoteSessionImpl remoteSession = (CDORemoteSessionImpl)remoteSessions.get(sessionID); + if (remoteSession != null) + { + remoteSession.setSubscribed(subscribed); + // TODO don't fire inside sync block! + fireEvent(new CDORemoteSessionEvent.SubscriptionChanged() + { + public INotifier getSource() + { + return CDORemoteSessionManagerImpl.this; + } + + public CDORemoteSession getRemoteSession() + { + return remoteSession; + } + + public boolean isSubscribed() + { + return subscribed; + } + }); + } + } + + @Override + protected synchronized void listenersEmptyChanged(boolean empty) + { + if (empty) + { + if (!forceSubscription) + { + IContainerEvent<CDORemoteSession> event = unsubscribe(); + // TODO don't fire inside sync block! + fireEvent(event); + } + } + else + { + if (!subscribed) + { + IContainerEvent<CDORemoteSession> event = subscribe(); + // TODO don't fire inside sync block! + fireEvent(event); + } + } + } + + private IContainerEvent<CDORemoteSession> subscribe() + { + List<CDORemoteSession> result = localSession.getSessionProtocol().getRemoteSessions(this, true); + ContainerEvent<CDORemoteSession> event = new ContainerEvent<CDORemoteSession>(this); + for (CDORemoteSession remoteSession : result) + { + remoteSessions.put(remoteSession.getSessionID(), remoteSession); + event.addDelta(remoteSession, IContainerDelta.Kind.ADDED); } + + subscribed = true; + return event; } - private void subscribe() + private IContainerEvent<CDORemoteSession> unsubscribe() { + localSession.getSessionProtocol().unsubscribeRemoteSessions(); + ContainerEvent<CDORemoteSession> event = new ContainerEvent<CDORemoteSession>(this); + for (CDORemoteSession remoteSession : remoteSessions.values()) + { + event.addDelta(remoteSession, IContainerDelta.Kind.REMOVED); + } + + remoteSessions.clear(); + subscribed = false; + return event; } } diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/CDOSessionProtocol.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/CDOSessionProtocol.java index d6afebaeac..7df6860cf2 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/CDOSessionProtocol.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/CDOSessionProtocol.java @@ -22,6 +22,7 @@ import org.eclipse.emf.cdo.common.model.CDOPackageInfo; import org.eclipse.emf.cdo.common.model.CDOPackageURICompressor; import org.eclipse.emf.cdo.common.revision.CDOReferenceAdjuster; import org.eclipse.emf.cdo.common.revision.CDORevision; +import org.eclipse.emf.cdo.session.remote.CDORemoteSession; import org.eclipse.emf.cdo.spi.common.revision.CDOIDMapper; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; import org.eclipse.emf.cdo.transaction.CDOTimeStampContext; @@ -105,6 +106,10 @@ public interface CDOSessionProtocol public CommitTransactionResult commitTransactionCancel(InternalCDOXACommitContext xaContext, OMMonitor monitor); + public List<CDORemoteSession> getRemoteSessions(InternalCDORemoteSessionManager manager, boolean subscribe); + + public void unsubscribeRemoteSessions(); + /** * @author Eike Stepper */ diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/InternalCDORemoteSessionManager.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/InternalCDORemoteSessionManager.java index 3dee1e99e9..7437da3a6f 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/InternalCDORemoteSessionManager.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/spi/cdo/InternalCDORemoteSessionManager.java @@ -10,6 +10,7 @@ */ package org.eclipse.emf.spi.cdo; +import org.eclipse.emf.cdo.session.remote.CDORemoteSession; import org.eclipse.emf.cdo.session.remote.CDORemoteSessionManager; /** @@ -18,4 +19,11 @@ import org.eclipse.emf.cdo.session.remote.CDORemoteSessionManager; */ public interface InternalCDORemoteSessionManager extends CDORemoteSessionManager { + public CDORemoteSession createRemoteSession(int sessionID, String userID, boolean subscribed); + + public void handleRemoteSessionOpened(int sessionID, String userID); + + public void handleRemoteSessionClosed(int sessionID); + + public void handleRemoteSessionSubscribed(int sessionID, boolean subscribed); } |