diff options
author | slewis | 2011-03-08 22:21:23 +0000 |
---|---|---|
committer | slewis | 2011-03-08 22:21:23 +0000 |
commit | f98066a5f7953c7d5593aff89b6e7f82f2ac2db8 (patch) | |
tree | b645bda0071ffcc31ed4a6ee28bd62b76374502d | |
parent | eff240cc1862e1b6f8de22d47edf7068ac8d3cba (diff) | |
parent | be8228a120c82ea7f7cab13e46a26ef529f8b36b (diff) | |
download | org.eclipse.ecf-R-Release_HEAD-sdk_feature-13_2011-03-09_23-23-52.tar.gz org.eclipse.ecf-R-Release_HEAD-sdk_feature-13_2011-03-09_23-23-52.tar.xz org.eclipse.ecf-R-Release_HEAD-sdk_feature-13_2011-03-09_23-23-52.zip |
Merge branch 'refs/heads/master' of ssh://git.eclipse.org/gitroot/ecf/org.eclipse.ecf.git into HEADR-Release_HEAD-sdk_feature-9_2011-03-09_18-00-33R-Release_HEAD-sdk_feature-8_2011-03-09_16-57-28R-Release_HEAD-sdk_feature-13_2011-03-09_23-23-52
2 files changed, 122 insertions, 86 deletions
diff --git a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java index 692ea14ed..0e892df1a 100644 --- a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java +++ b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/core/internal/IService.java @@ -12,23 +12,45 @@ package org.eclipse.ecf.provider.zookeeper.core.internal; import org.eclipse.ecf.discovery.IServiceInfo; +import org.eclipse.ecf.discovery.identity.IServiceTypeID; public interface IService extends IServiceInfo { - String LOCATION = "discovery.service.location"; //$NON-NLS-1$ + /** + * Holds the service location ( {@link IServiceInfo#getLocation()} ) in the + * zooKeeper node + **/ + String LOCATION = "discovery.service.location"; //$NON-NLS-1$ + + /** + * Holds the service weight ( {@link IServiceInfo#getWeight()} ) in the + * zooKeeper node + **/ String WEIGHT = "discovery.service.weight"; //$NON-NLS-1$ + + /** + * Holds the service priority ({@link IServiceInfo#getPriority()()} ) in the + * zooKeeper node + **/ String PRIORITY = "discovery.service.priority"; //$NON-NLS-1$ + + /** + * Holds the service-type protocols ({@link IServiceTypeID#getProtocols()} ) + * in the zooKeeper node + **/ String PROTOCOLS = "discovery.service.protocol"; //$NON-NLS-1$ - Object SERVICE_NAME = "discovery.service.name";; - // Properties getProperties(); + /** + * Holds the service name ({@link IServiceInfo#getServiceName()}) in the + * zooKeeper node + **/ + String SERVICE_NAME = "discovery.service.name"; //$NON-NLS-1$ /** - * Stored as value of a zookeeper node + * The byte representation of the service properties, appropriate to be + * stored in the zooKeeper node * - * @return byte value of string + * @return byte representation of the properties */ byte[] getPropertiesAsBytes(); - - // String getPropertiesAsString(); -} +}
\ No newline at end of file diff --git a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java index 8e6449c77..5006b800c 100644 --- a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java +++ b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/ReadRoot.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com). + * Copyright (c)2011 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com). * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -11,15 +11,16 @@ *******************************************************************************/ package org.eclipse.ecf.provider.zookeeper.node.internal; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.eclipse.core.runtime.Assert; import org.eclipse.ecf.provider.zookeeper.core.DiscoverdService; import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer; @@ -36,6 +37,7 @@ public class ReadRoot implements Watcher, ChildrenCallback { .synchronizedMap(new HashMap<String, NodeReader>()); private Map<String, DiscoverdService> discoverdServices; private Map<String, List<DiscoverdService>> perTypeDiscoverdServices; + private Object connectionLock = new Object(); ReadRoot(String ip, WatchManager watchManager) { Assert.isNotNull(ip); @@ -49,106 +51,119 @@ public class ReadRoot implements Watcher, ChildrenCallback { connect(); } - public synchronized void process(final WatchedEvent event) { - if (watchManager.isDisposed()) - return; + public void process(final WatchedEvent event) { ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() { public void run() { - switch (event.getState()) { - case Disconnected: - ReadRoot.this.isConnected = false; - connect(); - break; - case Expired: - ReadRoot.this.isConnected = false; - connect(); - break; - case SyncConnected: - if (!ReadRoot.this.isConnected) { - ReadRoot.this.isConnected = true; - ReadRoot.this.watchManager - .addZooKeeper(ReadRoot.this.readKeeper); - ReadRoot.this.readKeeper.exists(INode.ROOT, - ReadRoot.this, null, null); - ReadRoot.this.readKeeper.getChildren(INode.ROOT, - ReadRoot.this, ReadRoot.this, null); - } - break; + synchronized (connectionLock) { + if (watchManager.isDisposed()) + return; + + switch (event.getState()) { + case Disconnected: + ReadRoot.this.isConnected = false; + connect(); + break; + case Expired: + ReadRoot.this.isConnected = false; + connect(); + break; + case SyncConnected: + if (!ReadRoot.this.isConnected) { + ReadRoot.this.isConnected = true; + ReadRoot.this.watchManager + .addZooKeeper(ReadRoot.this.readKeeper); + ReadRoot.this.readKeeper.exists(INode.ROOT, + ReadRoot.this, null, null); + ReadRoot.this.readKeeper.getChildren(INode.ROOT, + ReadRoot.this, ReadRoot.this, null); + } + break; - // ignore @deprecated cases - } - switch (event.getType()) { - case NodeDeleted: - if (event.getPath() == null - || event.getPath().equals(INode.ROOT)) + // ignore @deprecated cases + } + switch (event.getType()) { + case NodeDeleted: + if (event.getPath() == null + || event.getPath().equals(INode.ROOT)) + break; + ReadRoot.this.nodeReaders.remove(event.getPath()); + break; + case NodeChildrenChanged: + if (ReadRoot.this.isConnected) { + ReadRoot.this.readKeeper.exists(INode.ROOT, + ReadRoot.this, null, null); + ReadRoot.this.readKeeper.getChildren(INode.ROOT, + ReadRoot.this, ReadRoot.this, null); + } break; - ReadRoot.this.nodeReaders.remove(event.getPath()); - break; - case NodeChildrenChanged: - if (ReadRoot.this.isConnected) { - ReadRoot.this.readKeeper.exists(INode.ROOT, - ReadRoot.this, null, null); - ReadRoot.this.readKeeper.getChildren(INode.ROOT, - ReadRoot.this, ReadRoot.this, null); } - break; } } }); } - private synchronized void connect() { - if (this.isConnected || watchManager.isDisposed()) { - return; - } - try { + private void connect() { + synchronized (connectionLock) { + if (this.isConnected || watchManager.isDisposed()) + return; + this.nodeReaders.clear(); if (this.readKeeper != null) { // discard the current stale reader - this.readKeeper.close(); - this.readKeeper = null; + try { + this.readKeeper.close(); + } catch (InterruptedException e) { + Logger.log( + LogService.LOG_ERROR, + "Error while closing the current ZooKeeper: " + + e.getMessage(), e); + } this.watchManager.removeZooKeeper(this.readKeeper); + this.readKeeper = null; + } + try { + // try reconnecting + this.readKeeper = new ZooKeeper(this.ip, 3000, this); + } catch (IOException ioe) { + Logger.log(LogService.LOG_ERROR, + "Cannot initiate a new ZooKeeper: " + ioe.getMessage(), + ioe); } - // try reconnecting - this.readKeeper = new ZooKeeper(this.ip, 3000, this); - - } catch (Exception e) { - Logger.log(LogService.LOG_DEBUG, e.getMessage(), e); } } - public synchronized void processResult(int rc, final String path, - Object ctx, final List<String> children) { - - if (watchManager.isDisposed()) { - return; - } - + public void processResult(int rc, final String path, Object ctx, + final List<String> children) { ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() { public void run() { - if (path == null || children == null || children.size() == 0) { - /* No children available yet, set a watch on it */ - ReadRoot.this.readKeeper.getChildren(INode.ROOT, - ReadRoot.this, ReadRoot.this, null); - return; - } - for (String p : children) { - if (Geo.isOwnPublication(p)) { - /* own publications need not to be discovered */ - continue; + synchronized (connectionLock) { + if (watchManager.isDisposed()) + return; + if (path == null || children == null + || children.size() == 0) { + /* No children available yet, set a watch on it */ + ReadRoot.this.readKeeper.getChildren(INode.ROOT, + ReadRoot.this, ReadRoot.this, null); + return; } - if (!ReadRoot.this.nodeReaders.containsKey(p)) { - /* launch a new reader to handle this node's data */ - NodeReader nr = new NodeReader(p, ReadRoot.this); - /* watch this very path for deletion */ - ReadRoot.this.readKeeper.exists(nr.getAbsolutePath(), - ReadRoot.this, null, null); - ReadRoot.this.nodeReaders.put(nr.getPath(), nr); + for (String p : children) { + if (Geo.isOwnPublication(p)) { + /* own publications need not to be discovered */ + continue; + } + if (!ReadRoot.this.nodeReaders.containsKey(p)) { + /* launch a new reader to handle this node's data */ + NodeReader nr = new NodeReader(p, ReadRoot.this); + /* watch this very path for deletion */ + ReadRoot.this.readKeeper.exists( + nr.getAbsolutePath(), ReadRoot.this, null, + null); + ReadRoot.this.nodeReaders.put(nr.getPath(), nr); + } } } } }); - } public ZooKeeper getReadKeeper() { @@ -166,5 +181,4 @@ public class ReadRoot implements Watcher, ChildrenCallback { public Map<String, List<DiscoverdService>> getPerTypeDiscoverdServices() { return perTypeDiscoverdServices; } - }
\ No newline at end of file |