diff options
2 files changed, 93 insertions, 38 deletions
diff --git a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF index 353764446..144cb42d3 100644 --- a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF +++ b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/META-INF/MANIFEST.MF @@ -8,8 +8,10 @@ Bundle-Vendor: %bundleProvider Bundle-RequiredExecutionEnvironment: J2SE-1.5 Import-Package: org.eclipse.ecf.core, org.eclipse.ecf.core.identity;version="3.0.0", + org.eclipse.ecf.core.util, org.eclipse.ecf.examples.loadbalancing, org.eclipse.ecf.remoteservice, + org.eclipse.ecf.remoteservice.util.tracker, org.eclipse.equinox.app;version="1.0.0", org.osgi.framework;version="1.3.0", org.osgi.util.tracker diff --git a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java index 1789b2ccf..55aca0381 100644 --- a/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java +++ b/examples/bundles/org.eclipse.ecf.examples.loadbalancing.consumer/src/org/eclipse/ecf/internal/examples/loadbalancing/consumer/DataProcessorConsumerApplication.java @@ -7,15 +7,15 @@ ****************************************************************************/ package org.eclipse.ecf.internal.examples.loadbalancing.consumer; -import org.eclipse.core.runtime.Assert; import org.eclipse.ecf.core.IContainer; import org.eclipse.ecf.core.IContainerManager; -import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.identity.IDFactory; import org.eclipse.ecf.examples.loadbalancing.IDataProcessor; import org.eclipse.ecf.remoteservice.IRemoteService; import org.eclipse.ecf.remoteservice.IRemoteServiceContainerAdapter; import org.eclipse.ecf.remoteservice.IRemoteServiceReference; +import org.eclipse.ecf.remoteservice.util.tracker.IRemoteServiceTrackerCustomizer; +import org.eclipse.ecf.remoteservice.util.tracker.RemoteServiceTracker; import org.eclipse.equinox.app.IApplication; import org.eclipse.equinox.app.IApplicationContext; import org.osgi.framework.BundleContext; @@ -24,60 +24,93 @@ import org.osgi.util.tracker.ServiceTracker; public class DataProcessorConsumerApplication implements IApplication { private static final String LB_SVCCONSUMER_CONTAINER_TYPE = "ecf.jms.activemq.tcp.client"; - private static final String DEFAULT_TARGET_ID = "tcp://localhost:61616/exampleTopic"; + private static final String DEFAULT_TOPIC_ID = "tcp://localhost:61616/exampleTopic"; private static final String DEFAULT_INPUT_DATA = "hello there"; - + private BundleContext bundleContext; private ServiceTracker containerManagerServiceTracker; - + // JMS topic URI that we will connect to in order to lookup/get/use the - // data processor remote service. Note that this topicId can be + // data processor remote service. Note that this topicId can be // changed by using the -topicId launch parameter...e.g. // -topicId tcp://myjmdnsbrokerdnsname:61616/myTopicName - private String targetId = DEFAULT_TARGET_ID; - + private String topicId = DEFAULT_TOPIC_ID; + // Container type is the load balancing service consumer container type, which is normal client + private String containerType = LB_SVCCONSUMER_CONTAINER_TYPE; + // Container instance that connects us with the ActiveMQ queue as a message // producer and publishes the service on the topicId private IContainer container; - + private IRemoteServiceContainerAdapter remoteServiceAdapter; + // Input data that is passed to the data processor private String inputData = DEFAULT_INPUT_DATA; + // Lock and flag for synchronization + private final Object remoteServiceReceivedLock = new Object(); + private boolean remoteServiceReceived = false; + + // Remote service. The RemoteServiceTrackerCustomizer sets this + IRemoteService remoteService; + + class RemoteServiceTrackerCustomizer implements + IRemoteServiceTrackerCustomizer { + + public IRemoteService addingService(IRemoteServiceReference reference) { + remoteService = remoteServiceAdapter + .getRemoteService(reference); + synchronized (remoteServiceReceivedLock) { + remoteServiceReceived = true; + remoteServiceReceivedLock.notify(); + } + return remoteService; + } + + public void modifiedService(IRemoteServiceReference reference, + IRemoteService remoteService) { + } + + public void removedService(IRemoteServiceReference reference, + IRemoteService remoteService) { + } + + } + public Object start(IApplicationContext appContext) throws Exception { bundleContext = Activator.getContext(); // Process Arguments...i.e. set queueId and topicId if specified processArgs(appContext); - // Create container - container = getContainerManagerService().getContainerFactory().createContainer(LB_SVCCONSUMER_CONTAINER_TYPE); - // Create targetID - ID targetID = IDFactory.getDefault().createID(container.getConnectNamespace(),targetId); - - // Get remoteServiceContainerAdapter - IRemoteServiceContainerAdapter remoteServiceAdapter = (IRemoteServiceContainerAdapter) container - .getAdapter(IRemoteServiceContainerAdapter.class); - - // We'll get the remote service references directly with a blocking call to getRemoteServiceReferences - // Note that we could also use non-blocking method: asyncGetRemoteServiceReferences OR we could us - // the org.eclipse.ecf.remoteservice.util.tracker.RemoteServiceTracker. For this case we'll just invoke it - // directly in caller thread. - IRemoteServiceReference [] dataProcessorRefs = remoteServiceAdapter.getRemoteServiceReferences(targetID, IDataProcessor.class.getName(), null); - // Get IRemoteService for first reference (must have at least one) - Assert.isNotNull(dataProcessorRefs); - Assert.isLegal(dataProcessorRefs.length > 0); + // Create container of appropriate type + container = getContainerManagerService().getContainerFactory() + .createContainer(containerType); + // Get appropriate adapter + remoteServiceAdapter = (IRemoteServiceContainerAdapter) container + .getAdapter(IRemoteServiceContainerAdapter.class); + + // Create remote service tracker, and then open it + RemoteServiceTracker tracker = new RemoteServiceTracker( + remoteServiceAdapter, null, IDataProcessor.class.getName(), + new RemoteServiceTrackerCustomizer()); + // Open it + tracker.open(); + + // Connect to topic + container.connect( + IDFactory.getDefault().createID( + container.getConnectNamespace(), topicId), null); - // Get remote service associated with the first reference - IRemoteService remoteService = remoteServiceAdapter.getRemoteService(dataProcessorRefs[0]); - // At this point, the client can choose how to invoke the remote service...e.g. via IRemoteService.callAsync/1 or callSync/2 - // For this example consumer, we'll just get the proxy and invoke it in this thread + // Wait for remote service tracker to receive proxy + waitForRemoteService(); + IDataProcessor dataProcessorProxy = (IDataProcessor) remoteService.getProxy(); - System.out.println("Calling remote service ref="+dataProcessorRefs[0]+"\n\tinput data="+inputData); - // And then simply call it + System.out.println("Calling remote service with input data=" + + inputData); + // And then call it String result = dataProcessorProxy.processData(inputData); // And print out results - System.out.println("\tresult="+result); - // And we're done - stop(); + System.out.println("\tremote service result=" + result); + return IApplication.EXIT_OK; } @@ -91,6 +124,10 @@ public class DataProcessorConsumerApplication implements IApplication { containerManagerServiceTracker.close(); containerManagerServiceTracker = null; } + synchronized (remoteServiceReceivedLock) { + remoteServiceReceived = true; + remoteServiceReceivedLock.notifyAll(); + } bundleContext = null; } @@ -100,16 +137,19 @@ public class DataProcessorConsumerApplication implements IApplication { if (originalArgs == null) return; for (int i = 0; i < originalArgs.length; i++) { - if (originalArgs[i].equals("-targetId")) { - targetId = originalArgs[i + 1]; + if (originalArgs[i].equals("-topicId")) { + topicId = originalArgs[i + 1]; i++; } else if (originalArgs[i].equals("-inputData")) { StringBuffer buf = new StringBuffer(); - for(int j=i+1; j < originalArgs.length; j++) { + for (int j = i + 1; j < originalArgs.length; j++) { buf.append(originalArgs[j]).append(" "); } inputData = buf.toString(); return; + } else if (originalArgs[i].equals("-containerType")) { + containerType = originalArgs[i + 1]; + i++; } } } @@ -123,4 +163,17 @@ public class DataProcessorConsumerApplication implements IApplication { return (IContainerManager) containerManagerServiceTracker.getService(); } + private void waitForRemoteService() { + // then just wait here + synchronized (remoteServiceReceivedLock) { + while (!remoteServiceReceived) { + try { + remoteServiceReceivedLock.wait(); + } catch (InterruptedException e) { + // do nothing + } + } + } + } + } |