Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWim Jongman2011-03-08 10:23:42 +0000
committerWim Jongman2011-03-08 10:23:42 +0000
commitbe8228a120c82ea7f7cab13e46a26ef529f8b36b (patch)
tree6febe8e2d1905a5928a80ff9fbb6161c3be53027 /providers/bundles/org.eclipse.ecf.provider.zookeeper
parent331aa3909bf8579b6ba5f8abc3fd4113fc18c4b5 (diff)
downloadorg.eclipse.ecf-be8228a120c82ea7f7cab13e46a26ef529f8b36b.tar.gz
org.eclipse.ecf-be8228a120c82ea7f7cab13e46a26ef529f8b36b.tar.xz
org.eclipse.ecf-be8228a120c82ea7f7cab13e46a26ef529f8b36b.zip
[zoodiscovery] Remote service undiscovered and not
rediscovered due to concurrency bug https://bugs.eclipse.org/bugs/show_bug.cgi?id=337973 Fixed ReadRoot to better support concurrency and added some documentation to IService which is not related to this bug. Signed-off-by: Wim Jongman <wim.jongman@remainsoftware.com>
Diffstat (limited to 'providers/bundles/org.eclipse.ecf.provider.zookeeper')
-rw-r--r--providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java38
-rw-r--r--providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java170
2 files changed, 122 insertions, 86 deletions
diff --git a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java
index 692ea14ed..0e892df1a 100644
--- a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java
+++ b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java
@@ -12,23 +12,45 @@
package org.eclipse.ecf.provider.zookeeper.core.internal;
import org.eclipse.ecf.discovery.IServiceInfo;
+import org.eclipse.ecf.discovery.identity.IServiceTypeID;
public interface IService extends IServiceInfo {
- String LOCATION = "discovery.service.location"; //$NON-NLS-1$
+ /**
+ * Holds the service location ( {@link IServiceInfo#getLocation()} ) in the
+ * zooKeeper node
+ **/
+ String LOCATION = "discovery.service.location"; //$NON-NLS-1$
+
+ /**
+ * Holds the service weight ( {@link IServiceInfo#getWeight()} ) in the
+ * zooKeeper node
+ **/
String WEIGHT = "discovery.service.weight"; //$NON-NLS-1$
+
+ /**
+ * Holds the service priority ({@link IServiceInfo#getPriority()()} ) in the
+ * zooKeeper node
+ **/
String PRIORITY = "discovery.service.priority"; //$NON-NLS-1$
+
+ /**
+ * Holds the service-type protocols ({@link IServiceTypeID#getProtocols()} )
+ * in the zooKeeper node
+ **/
String PROTOCOLS = "discovery.service.protocol"; //$NON-NLS-1$
- Object SERVICE_NAME = "discovery.service.name";;
- // Properties getProperties();
+ /**
+ * Holds the service name ({@link IServiceInfo#getServiceName()}) in the
+ * zooKeeper node
+ **/
+ String SERVICE_NAME = "discovery.service.name"; //$NON-NLS-1$
/**
- * Stored as value of a zookeeper node
+ * The byte representation of the service properties, appropriate to be
+ * stored in the zooKeeper node
*
- * @return byte value of string
+ * @return byte representation of the properties
*/
byte[] getPropertiesAsBytes();
-
- // String getPropertiesAsString();
-}
+} \ No newline at end of file
diff --git a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java
index 8e6449c77..5006b800c 100644
--- a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java
+++ b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
+ * Copyright (c)2011 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -11,15 +11,16 @@
*******************************************************************************/
package org.eclipse.ecf.provider.zookeeper.node.internal;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.provider.zookeeper.core.DiscoverdService;
import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer;
@@ -36,6 +37,7 @@ public class ReadRoot implements Watcher, ChildrenCallback {
.synchronizedMap(new HashMap<String, NodeReader>());
private Map<String, DiscoverdService> discoverdServices;
private Map<String, List<DiscoverdService>> perTypeDiscoverdServices;
+ private Object connectionLock = new Object();
ReadRoot(String ip, WatchManager watchManager) {
Assert.isNotNull(ip);
@@ -49,106 +51,119 @@ public class ReadRoot implements Watcher, ChildrenCallback {
connect();
}
- public synchronized void process(final WatchedEvent event) {
- if (watchManager.isDisposed())
- return;
+ public void process(final WatchedEvent event) {
ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
- switch (event.getState()) {
- case Disconnected:
- ReadRoot.this.isConnected = false;
- connect();
- break;
- case Expired:
- ReadRoot.this.isConnected = false;
- connect();
- break;
- case SyncConnected:
- if (!ReadRoot.this.isConnected) {
- ReadRoot.this.isConnected = true;
- ReadRoot.this.watchManager
- .addZooKeeper(ReadRoot.this.readKeeper);
- ReadRoot.this.readKeeper.exists(INode.ROOT,
- ReadRoot.this, null, null);
- ReadRoot.this.readKeeper.getChildren(INode.ROOT,
- ReadRoot.this, ReadRoot.this, null);
- }
- break;
+ synchronized (connectionLock) {
+ if (watchManager.isDisposed())
+ return;
+
+ switch (event.getState()) {
+ case Disconnected:
+ ReadRoot.this.isConnected = false;
+ connect();
+ break;
+ case Expired:
+ ReadRoot.this.isConnected = false;
+ connect();
+ break;
+ case SyncConnected:
+ if (!ReadRoot.this.isConnected) {
+ ReadRoot.this.isConnected = true;
+ ReadRoot.this.watchManager
+ .addZooKeeper(ReadRoot.this.readKeeper);
+ ReadRoot.this.readKeeper.exists(INode.ROOT,
+ ReadRoot.this, null, null);
+ ReadRoot.this.readKeeper.getChildren(INode.ROOT,
+ ReadRoot.this, ReadRoot.this, null);
+ }
+ break;
- // ignore @deprecated cases
- }
- switch (event.getType()) {
- case NodeDeleted:
- if (event.getPath() == null
- || event.getPath().equals(INode.ROOT))
+ // ignore @deprecated cases
+ }
+ switch (event.getType()) {
+ case NodeDeleted:
+ if (event.getPath() == null
+ || event.getPath().equals(INode.ROOT))
+ break;
+ ReadRoot.this.nodeReaders.remove(event.getPath());
+ break;
+ case NodeChildrenChanged:
+ if (ReadRoot.this.isConnected) {
+ ReadRoot.this.readKeeper.exists(INode.ROOT,
+ ReadRoot.this, null, null);
+ ReadRoot.this.readKeeper.getChildren(INode.ROOT,
+ ReadRoot.this, ReadRoot.this, null);
+ }
break;
- ReadRoot.this.nodeReaders.remove(event.getPath());
- break;
- case NodeChildrenChanged:
- if (ReadRoot.this.isConnected) {
- ReadRoot.this.readKeeper.exists(INode.ROOT,
- ReadRoot.this, null, null);
- ReadRoot.this.readKeeper.getChildren(INode.ROOT,
- ReadRoot.this, ReadRoot.this, null);
}
- break;
}
}
});
}
- private synchronized void connect() {
- if (this.isConnected || watchManager.isDisposed()) {
- return;
- }
- try {
+ private void connect() {
+ synchronized (connectionLock) {
+ if (this.isConnected || watchManager.isDisposed())
+ return;
+
this.nodeReaders.clear();
if (this.readKeeper != null) {
// discard the current stale reader
- this.readKeeper.close();
- this.readKeeper = null;
+ try {
+ this.readKeeper.close();
+ } catch (InterruptedException e) {
+ Logger.log(
+ LogService.LOG_ERROR,
+ "Error while closing the current ZooKeeper: "
+ + e.getMessage(), e);
+ }
this.watchManager.removeZooKeeper(this.readKeeper);
+ this.readKeeper = null;
+ }
+ try {
+ // try reconnecting
+ this.readKeeper = new ZooKeeper(this.ip, 3000, this);
+ } catch (IOException ioe) {
+ Logger.log(LogService.LOG_ERROR,
+ "Cannot initiate a new ZooKeeper: " + ioe.getMessage(),
+ ioe);
}
- // try reconnecting
- this.readKeeper = new ZooKeeper(this.ip, 3000, this);
-
- } catch (Exception e) {
- Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
}
}
- public synchronized void processResult(int rc, final String path,
- Object ctx, final List<String> children) {
-
- if (watchManager.isDisposed()) {
- return;
- }
-
+ public void processResult(int rc, final String path, Object ctx,
+ final List<String> children) {
ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
- if (path == null || children == null || children.size() == 0) {
- /* No children available yet, set a watch on it */
- ReadRoot.this.readKeeper.getChildren(INode.ROOT,
- ReadRoot.this, ReadRoot.this, null);
- return;
- }
- for (String p : children) {
- if (Geo.isOwnPublication(p)) {
- /* own publications need not to be discovered */
- continue;
+ synchronized (connectionLock) {
+ if (watchManager.isDisposed())
+ return;
+ if (path == null || children == null
+ || children.size() == 0) {
+ /* No children available yet, set a watch on it */
+ ReadRoot.this.readKeeper.getChildren(INode.ROOT,
+ ReadRoot.this, ReadRoot.this, null);
+ return;
}
- if (!ReadRoot.this.nodeReaders.containsKey(p)) {
- /* launch a new reader to handle this node's data */
- NodeReader nr = new NodeReader(p, ReadRoot.this);
- /* watch this very path for deletion */
- ReadRoot.this.readKeeper.exists(nr.getAbsolutePath(),
- ReadRoot.this, null, null);
- ReadRoot.this.nodeReaders.put(nr.getPath(), nr);
+ for (String p : children) {
+ if (Geo.isOwnPublication(p)) {
+ /* own publications need not to be discovered */
+ continue;
+ }
+ if (!ReadRoot.this.nodeReaders.containsKey(p)) {
+ /* launch a new reader to handle this node's data */
+ NodeReader nr = new NodeReader(p, ReadRoot.this);
+ /* watch this very path for deletion */
+ ReadRoot.this.readKeeper.exists(
+ nr.getAbsolutePath(), ReadRoot.this, null,
+ null);
+ ReadRoot.this.nodeReaders.put(nr.getPath(), nr);
+ }
}
}
}
});
-
}
public ZooKeeper getReadKeeper() {
@@ -166,5 +181,4 @@ public class ReadRoot implements Watcher, ChildrenCallback {
public Map<String, List<DiscoverdService>> getPerTypeDiscoverdServices() {
return perTypeDiscoverdServices;
}
-
} \ No newline at end of file

Back to the top