Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkaguilar2011-11-17 21:22:18 +0000
committerRyan D. Brooks2011-11-17 21:22:18 +0000
commit9aea7fa7b170260da80a02df1b5b1c1f00416a1d (patch)
tree3f424bee35fb830d5537c3d0cb27e3f7cd85052a /plugins/org.eclipse.osee.ote.message
parentfd16031f3e0115c4322cb6cfcdef1f574a2fd2d6 (diff)
downloadorg.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.tar.gz
org.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.tar.xz
org.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.zip
bug: Fix handling of thread.interrupted in some code
Diffstat (limited to 'plugins/org.eclipse.osee.ote.message')
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java61
1 files changed, 45 insertions, 16 deletions
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
index d72a2247fb8..e0c6843a385 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
@@ -11,10 +11,13 @@
package org.eclipse.osee.ote.message.tool;
import java.io.IOException;
+import java.net.BindException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.rmi.RemoteException;
import java.util.ArrayList;
@@ -69,7 +72,7 @@ public class AbstractMessageToolService implements IRemoteMessageService {
private static final boolean debugEnabled = false;
private final HashMap<String, Throwable> cancelledSubscriptions = new HashMap<String, Throwable>(40);
- private final DatagramChannel channel;
+ private DatagramChannel channel;
private final HashMap<String, Map<DataType, EnumMap<MessageMode, SubscriptionRecord>>> messageMap =
new HashMap<String, Map<DataType, EnumMap<MessageMode, SubscriptionRecord>>>(100);
@@ -78,6 +81,8 @@ public class AbstractMessageToolService implements IRemoteMessageService {
private final DatagramChannel recorderOutputChannel;
private volatile boolean terminated = false;
private final AtomicInteger idCounter = new AtomicInteger(0x0DEF0000);
+
+ private InetSocketAddress xmitAddress;
private static final class ClientInfo {
private final IMsgToolServiceClient remoteReference;
@@ -134,7 +139,6 @@ public class AbstractMessageToolService implements IRemoteMessageService {
private ByteBuffer msgUpdatePart;
private long updateCount = 0;
private final SubscriptionKey key;
- private final StringBuilder strBuilder = new StringBuilder();
/**
* Creates a new listener. A listener is a one to one mapping of a message to a list of client addresses
@@ -329,20 +333,20 @@ public class AbstractMessageToolService implements IRemoteMessageService {
msg.getName() + " scheduling has changed to " + isScheduled);
}
- /**
- * @return Returns the msg.
- */
- public Message<?, ?, ?> getMsg() {
- return msg;
- }
-
} /* end of MsgListener */
/**
* Constructs a new message manager service
*/
public AbstractMessageToolService(IMessageManager messageManager) throws IOException {
+ openXmitChannel();
+ recorderOutputChannel = DatagramChannel.open();
+ recorderOutputChannel.socket().bind(
+ new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort()));
+ messageRequestor = messageManager.createMessageRequestor(getClass().getName());
+ }
+ private void openXmitChannel() throws IOException {
channel = DatagramChannel.open();
if (channel.socket().getSendBufferSize() < SEND_BUFFER_SIZE) {
channel.socket().setSendBufferSize(SEND_BUFFER_SIZE);
@@ -350,12 +354,26 @@ public class AbstractMessageToolService implements IRemoteMessageService {
"message tooling service send buffer size is now " + channel.socket().getSendBufferSize());
}
- channel.socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort()));
+ // we want to reuse any existing address if possible
+ if (xmitAddress == null) {
+ // no prior address was allocated so get one
+ xmitAddress = getXmitAddress();
+ }
+
+ try {
+ channel.socket().bind(xmitAddress);
+ } catch (BindException ex) {
+ // seems someone stole our address try to get a new one
+ xmitAddress = getXmitAddress();
+ // re-bind, if we fail then give up
+ channel.socket().bind(xmitAddress);
+ }
channel.configureBlocking(true);
- recorderOutputChannel = DatagramChannel.open();
- recorderOutputChannel.socket().bind(
- new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort()));
- messageRequestor = messageManager.createMessageRequestor(getClass().getName());
+ }
+
+ private InetSocketAddress getXmitAddress() throws UnknownHostException, IOException{
+ int xmitPort = PortUtil.getInstance().getValidPort();
+ return new InetSocketAddress(InetAddress.getLocalHost(), xmitPort);
}
@Override
@@ -632,9 +650,9 @@ public class AbstractMessageToolService implements IRemoteMessageService {
type.name()));
}
final SubscriptionRecord record = modeMap.get(cmd.getMode());
- ClientInfo client = record.findClient(cmd.getAddress());
if (record != null) {
+ ClientInfo client = record.findClient(cmd.getAddress());
/* remove the client address from the listener's client list */
record.removeClient(client);
@@ -837,7 +855,18 @@ public class AbstractMessageToolService implements IRemoteMessageService {
client.toString());
} else {
- channel.send(buffer, client.getIpAddress());
+ try {
+ channel.send(buffer, client.getIpAddress());
+ } catch (ClosedByInterruptException ex) {
+ if (!terminated) {
+ // we got interrupted indirectly we should reopen the channel. This can happen since multiple threads
+ // can pass through this method. We don't want to lose our socket just because a random thread with the
+ // interrupt flag set comes through here and hits the channel.send() method.
+ openXmitChannel();
+ }
+ // re-assert interrupt status
+ Thread.currentThread().interrupt();
+ }
}
/*
* rewind the buffer for next address since we are sending the same data to each client

Back to the top