diff options
author | kaguilar | 2011-11-17 21:22:18 +0000 |
---|---|---|
committer | Ryan D. Brooks | 2011-11-17 21:22:18 +0000 |
commit | 9aea7fa7b170260da80a02df1b5b1c1f00416a1d (patch) | |
tree | 3f424bee35fb830d5537c3d0cb27e3f7cd85052a /plugins/org.eclipse.osee.ote.message | |
parent | fd16031f3e0115c4322cb6cfcdef1f574a2fd2d6 (diff) | |
download | org.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.tar.gz org.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.tar.xz org.eclipse.osee-9aea7fa7b170260da80a02df1b5b1c1f00416a1d.zip |
bug: Fix handling of thread.interrupted in some code
Diffstat (limited to 'plugins/org.eclipse.osee.ote.message')
-rw-r--r-- | plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java | 61 |
1 files changed, 45 insertions, 16 deletions
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java index d72a2247fb8..e0c6843a385 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java @@ -11,10 +11,13 @@ package org.eclipse.osee.ote.message.tool; import java.io.IOException; +import java.net.BindException; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.DatagramChannel; import java.rmi.RemoteException; import java.util.ArrayList; @@ -69,7 +72,7 @@ public class AbstractMessageToolService implements IRemoteMessageService { private static final boolean debugEnabled = false; private final HashMap<String, Throwable> cancelledSubscriptions = new HashMap<String, Throwable>(40); - private final DatagramChannel channel; + private DatagramChannel channel; private final HashMap<String, Map<DataType, EnumMap<MessageMode, SubscriptionRecord>>> messageMap = new HashMap<String, Map<DataType, EnumMap<MessageMode, SubscriptionRecord>>>(100); @@ -78,6 +81,8 @@ public class AbstractMessageToolService implements IRemoteMessageService { private final DatagramChannel recorderOutputChannel; private volatile boolean terminated = false; private final AtomicInteger idCounter = new AtomicInteger(0x0DEF0000); + + private InetSocketAddress xmitAddress; private static final class ClientInfo { private final IMsgToolServiceClient remoteReference; @@ -134,7 +139,6 @@ public class AbstractMessageToolService implements IRemoteMessageService { private ByteBuffer msgUpdatePart; private long updateCount = 0; private final SubscriptionKey key; - private final StringBuilder strBuilder = new StringBuilder(); /** * Creates a new listener. A listener is a one to one mapping of a message to a list of client addresses @@ -329,20 +333,20 @@ public class AbstractMessageToolService implements IRemoteMessageService { msg.getName() + " scheduling has changed to " + isScheduled); } - /** - * @return Returns the msg. - */ - public Message<?, ?, ?> getMsg() { - return msg; - } - } /* end of MsgListener */ /** * Constructs a new message manager service */ public AbstractMessageToolService(IMessageManager messageManager) throws IOException { + openXmitChannel(); + recorderOutputChannel = DatagramChannel.open(); + recorderOutputChannel.socket().bind( + new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort())); + messageRequestor = messageManager.createMessageRequestor(getClass().getName()); + } + private void openXmitChannel() throws IOException { channel = DatagramChannel.open(); if (channel.socket().getSendBufferSize() < SEND_BUFFER_SIZE) { channel.socket().setSendBufferSize(SEND_BUFFER_SIZE); @@ -350,12 +354,26 @@ public class AbstractMessageToolService implements IRemoteMessageService { "message tooling service send buffer size is now " + channel.socket().getSendBufferSize()); } - channel.socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort())); + // we want to reuse any existing address if possible + if (xmitAddress == null) { + // no prior address was allocated so get one + xmitAddress = getXmitAddress(); + } + + try { + channel.socket().bind(xmitAddress); + } catch (BindException ex) { + // seems someone stole our address try to get a new one + xmitAddress = getXmitAddress(); + // re-bind, if we fail then give up + channel.socket().bind(xmitAddress); + } channel.configureBlocking(true); - recorderOutputChannel = DatagramChannel.open(); - recorderOutputChannel.socket().bind( - new InetSocketAddress(InetAddress.getLocalHost(), PortUtil.getInstance().getValidPort())); - messageRequestor = messageManager.createMessageRequestor(getClass().getName()); + } + + private InetSocketAddress getXmitAddress() throws UnknownHostException, IOException{ + int xmitPort = PortUtil.getInstance().getValidPort(); + return new InetSocketAddress(InetAddress.getLocalHost(), xmitPort); } @Override @@ -632,9 +650,9 @@ public class AbstractMessageToolService implements IRemoteMessageService { type.name())); } final SubscriptionRecord record = modeMap.get(cmd.getMode()); - ClientInfo client = record.findClient(cmd.getAddress()); if (record != null) { + ClientInfo client = record.findClient(cmd.getAddress()); /* remove the client address from the listener's client list */ record.removeClient(client); @@ -837,7 +855,18 @@ public class AbstractMessageToolService implements IRemoteMessageService { client.toString()); } else { - channel.send(buffer, client.getIpAddress()); + try { + channel.send(buffer, client.getIpAddress()); + } catch (ClosedByInterruptException ex) { + if (!terminated) { + // we got interrupted indirectly we should reopen the channel. This can happen since multiple threads + // can pass through this method. We don't want to lose our socket just because a random thread with the + // interrupt flag set comes through here and hits the channel.send() method. + openXmitChannel(); + } + // re-assert interrupt status + Thread.currentThread().interrupt(); + } } /* * rewind the buffer for next address since we are sending the same data to each client |