Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF2
-rw-r--r--examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java129
2 files changed, 93 insertions, 38 deletions
diff --git a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF
index 353764446..144cb42d3 100644
--- a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF
+++ b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF
@@ -8,8 +8,10 @@ Bundle-Vendor: %bundleProvider
Bundle-RequiredExecutionEnvironment: J2SE-1.5
Import-Package: org.eclipse.ecf.core,
org.eclipse.ecf.core.identity;version="3.0.0",
+ org.eclipse.ecf.core.util,
org.eclipse.ecf.examples.loadbalancing,
org.eclipse.ecf.remoteservice,
+ org.eclipse.ecf.remoteservice.util.tracker,
org.eclipse.equinox.app;version="1.0.0",
org.osgi.framework;version="1.3.0",
org.osgi.util.tracker
diff --git a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java
index 1789b2ccf..55aca0381 100644
--- a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java
+++ b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java
@@ -7,15 +7,15 @@
****************************************************************************/
package org.eclipse.ecf.internal.examples.loadbalancing.consumer;
-import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.core.IContainer;
import org.eclipse.ecf.core.IContainerManager;
-import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.identity.IDFactory;
import org.eclipse.ecf.examples.loadbalancing.IDataProcessor;
import org.eclipse.ecf.remoteservice.IRemoteService;
import org.eclipse.ecf.remoteservice.IRemoteServiceContainerAdapter;
import org.eclipse.ecf.remoteservice.IRemoteServiceReference;
+import org.eclipse.ecf.remoteservice.util.tracker.IRemoteServiceTrackerCustomizer;
+import org.eclipse.ecf.remoteservice.util.tracker.RemoteServiceTracker;
import org.eclipse.equinox.app.IApplication;
import org.eclipse.equinox.app.IApplicationContext;
import org.osgi.framework.BundleContext;
@@ -24,60 +24,93 @@ import org.osgi.util.tracker.ServiceTracker;
public class DataProcessorConsumerApplication implements IApplication {
private static final String LB_SVCCONSUMER_CONTAINER_TYPE = "ecf.jms.activemq.tcp.client";
- private static final String DEFAULT_TARGET_ID = "tcp://localhost:61616/exampleTopic";
+ private static final String DEFAULT_TOPIC_ID = "tcp://localhost:61616/exampleTopic";
private static final String DEFAULT_INPUT_DATA = "hello there";
-
+
private BundleContext bundleContext;
private ServiceTracker containerManagerServiceTracker;
-
+
// JMS topic URI that we will connect to in order to lookup/get/use the
- // data processor remote service. Note that this topicId can be
+ // data processor remote service. Note that this topicId can be
// changed by using the -topicId launch parameter...e.g.
// -topicId tcp://myjmdnsbrokerdnsname:61616/myTopicName
- private String targetId = DEFAULT_TARGET_ID;
-
+ private String topicId = DEFAULT_TOPIC_ID;
+ // Container type is the load balancing service consumer container type, which is normal client
+ private String containerType = LB_SVCCONSUMER_CONTAINER_TYPE;
+
// Container instance that connects us with the ActiveMQ queue as a message
// producer and publishes the service on the topicId
private IContainer container;
-
+ private IRemoteServiceContainerAdapter remoteServiceAdapter;
+
// Input data that is passed to the data processor
private String inputData = DEFAULT_INPUT_DATA;
+ // Lock and flag for synchronization
+ private final Object remoteServiceReceivedLock = new Object();
+ private boolean remoteServiceReceived = false;
+
+ // Remote service. The RemoteServiceTrackerCustomizer sets this
+ IRemoteService remoteService;
+
+ class RemoteServiceTrackerCustomizer implements
+ IRemoteServiceTrackerCustomizer {
+
+ public IRemoteService addingService(IRemoteServiceReference reference) {
+ remoteService = remoteServiceAdapter
+ .getRemoteService(reference);
+ synchronized (remoteServiceReceivedLock) {
+ remoteServiceReceived = true;
+ remoteServiceReceivedLock.notify();
+ }
+ return remoteService;
+ }
+
+ public void modifiedService(IRemoteServiceReference reference,
+ IRemoteService remoteService) {
+ }
+
+ public void removedService(IRemoteServiceReference reference,
+ IRemoteService remoteService) {
+ }
+
+ }
+
public Object start(IApplicationContext appContext) throws Exception {
bundleContext = Activator.getContext();
// Process Arguments...i.e. set queueId and topicId if specified
processArgs(appContext);
- // Create container
- container = getContainerManagerService().getContainerFactory().createContainer(LB_SVCCONSUMER_CONTAINER_TYPE);
- // Create targetID
- ID targetID = IDFactory.getDefault().createID(container.getConnectNamespace(),targetId);
-
- // Get remoteServiceContainerAdapter
- IRemoteServiceContainerAdapter remoteServiceAdapter = (IRemoteServiceContainerAdapter) container
- .getAdapter(IRemoteServiceContainerAdapter.class);
-
- // We'll get the remote service references directly with a blocking call to getRemoteServiceReferences
- // Note that we could also use non-blocking method: asyncGetRemoteServiceReferences OR we could us
- // the org.eclipse.ecf.remoteservice.util.tracker.RemoteServiceTracker. For this case we'll just invoke it
- // directly in caller thread.
- IRemoteServiceReference [] dataProcessorRefs = remoteServiceAdapter.getRemoteServiceReferences(targetID, IDataProcessor.class.getName(), null);
- // Get IRemoteService for first reference (must have at least one)
- Assert.isNotNull(dataProcessorRefs);
- Assert.isLegal(dataProcessorRefs.length > 0);
+ // Create container of appropriate type
+ container = getContainerManagerService().getContainerFactory()
+ .createContainer(containerType);
+ // Get appropriate adapter
+ remoteServiceAdapter = (IRemoteServiceContainerAdapter) container
+ .getAdapter(IRemoteServiceContainerAdapter.class);
+
+ // Create remote service tracker, and then open it
+ RemoteServiceTracker tracker = new RemoteServiceTracker(
+ remoteServiceAdapter, null, IDataProcessor.class.getName(),
+ new RemoteServiceTrackerCustomizer());
+ // Open it
+ tracker.open();
+
+ // Connect to topic
+ container.connect(
+ IDFactory.getDefault().createID(
+ container.getConnectNamespace(), topicId), null);
- // Get remote service associated with the first reference
- IRemoteService remoteService = remoteServiceAdapter.getRemoteService(dataProcessorRefs[0]);
- // At this point, the client can choose how to invoke the remote service...e.g. via IRemoteService.callAsync/1 or callSync/2
- // For this example consumer, we'll just get the proxy and invoke it in this thread
+ // Wait for remote service tracker to receive proxy
+ waitForRemoteService();
+
IDataProcessor dataProcessorProxy = (IDataProcessor) remoteService.getProxy();
- System.out.println("Calling remote service ref="+dataProcessorRefs[0]+"\n\tinput data="+inputData);
- // And then simply call it
+ System.out.println("Calling remote service with input data="
+ + inputData);
+ // And then call it
String result = dataProcessorProxy.processData(inputData);
// And print out results
- System.out.println("\tresult="+result);
- // And we're done
- stop();
+ System.out.println("\tremote service result=" + result);
+
return IApplication.EXIT_OK;
}
@@ -91,6 +124,10 @@ public class DataProcessorConsumerApplication implements IApplication {
containerManagerServiceTracker.close();
containerManagerServiceTracker = null;
}
+ synchronized (remoteServiceReceivedLock) {
+ remoteServiceReceived = true;
+ remoteServiceReceivedLock.notifyAll();
+ }
bundleContext = null;
}
@@ -100,16 +137,19 @@ public class DataProcessorConsumerApplication implements IApplication {
if (originalArgs == null)
return;
for (int i = 0; i < originalArgs.length; i++) {
- if (originalArgs[i].equals("-targetId")) {
- targetId = originalArgs[i + 1];
+ if (originalArgs[i].equals("-topicId")) {
+ topicId = originalArgs[i + 1];
i++;
} else if (originalArgs[i].equals("-inputData")) {
StringBuffer buf = new StringBuffer();
- for(int j=i+1; j < originalArgs.length; j++) {
+ for (int j = i + 1; j < originalArgs.length; j++) {
buf.append(originalArgs[j]).append(" ");
}
inputData = buf.toString();
return;
+ } else if (originalArgs[i].equals("-containerType")) {
+ containerType = originalArgs[i + 1];
+ i++;
}
}
}
@@ -123,4 +163,17 @@ public class DataProcessorConsumerApplication implements IApplication {
return (IContainerManager) containerManagerServiceTracker.getService();
}
+ private void waitForRemoteService() {
+ // then just wait here
+ synchronized (remoteServiceReceivedLock) {
+ while (!remoteServiceReceived) {
+ try {
+ remoteServiceReceivedLock.wait();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+ }
+ }
+
}

Back to the top