Skip to main content
summaryrefslogtreecommitdiffstats
blob: 65d9af4d16cd52c6e3b5c03cd89a0449fffc3bc3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package org.eclipse.osee.ote.endpoint;

import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import org.eclipse.osee.framework.jdk.core.util.Lib;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.collections.ObjectPool;

final class OteEndpointSendRunnable implements Runnable {

   private static final int SEND_BUFFER_SIZE = 1024 * 512;
   
   private final ArrayBlockingQueue<AddressBuffer> toSend;
   private final ObjectPool<AddressBuffer> buffers;

   private boolean debug = false;

   OteEndpointSendRunnable(ArrayBlockingQueue<AddressBuffer> toSend, ObjectPool<AddressBuffer> buffers, boolean debug) {
      this.toSend = toSend;
      this.buffers = buffers;
      this.debug = debug;
   }

   @Override
   public void run() {
      DatagramChannel threadChannel = null; 
      try {
         threadChannel = openAndInitializeDatagramChannel();
         boolean keepRunning = true;
         final List<AddressBuffer> dataToSend = new ArrayList<>(32);
         System.setSecurityManager(null);
         while(keepRunning){
            try{
               dataToSend.clear();
               if (toSend.drainTo(dataToSend) < 1) {
                  try {
                     // block until something is available
                     AddressBuffer addrBuf = toSend.poll(15, TimeUnit.SECONDS);
                     if (addrBuf == null) {
                        // no activity for a while so lets clean ourselves up. Our master will restart
                        // a new thread if another event comes along after we self terminate
                        keepRunning = false;
                     } else {
                        dataToSend.add(addrBuf);
                     }
                  } catch (InterruptedException e) {
                     keepRunning = false;
                     continue;
                  }
               }
               int size = dataToSend.size();
               for (int i = 0; i < size; i++) {
                  AddressBuffer data = dataToSend.get(i);
                  if (data == OteUdpEndpointSender.POISON_PILL) {
                     keepRunning = false;
                     break;
                  }
                  threadChannel.send(data.getBuffer(), data.getAddress());
               }
            } catch (ClosedByInterruptException ex){
               if(debug){
                  OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
               }
               threadChannel = openAndInitializeDatagramChannel();
            } catch (AsynchronousCloseException ex){
               if(debug){
                  OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
               }
               Lib.close(threadChannel);
               threadChannel = openAndInitializeDatagramChannel();
            } catch (ClosedChannelException ex){
               if(debug){
                  OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
               }
               Lib.close(threadChannel);
               threadChannel = openAndInitializeDatagramChannel();
            } catch (IOException ex){
               OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
            } finally {
               int size = dataToSend.size();
               for (int i = 0; i < size; i++) {
                  buffers.returnObj(dataToSend.get(i));
               }
            }
         } 
      } catch (IOException ex){
         OseeLog.log(getClass(), Level.SEVERE, "Error opening DatagramChannel.  Ending OteEndpointSendRunnable unexpectedly.", ex);
      } finally{
         Lib.close(threadChannel);
      }

   }
   
   private DatagramChannel openAndInitializeDatagramChannel() throws IOException {
      DatagramChannel channel = DatagramChannel.open();
      if (channel.socket().getSendBufferSize() < SEND_BUFFER_SIZE) {
         channel.socket().setSendBufferSize(SEND_BUFFER_SIZE);
      }
      channel.socket().setReuseAddress(true);
      channel.configureBlocking(true);
      return channel;
   }
}

Back to the top