Skip to main content
summaryrefslogtreecommitdiffstats
blob: cb12d805d2dc512e81cfdd5a204ceeb36904c22a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*******************************************************************************
 * Copyright (c) 2013 Boeing.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *     Boeing - initial API and implementation
 *******************************************************************************/
package org.eclipse.osee.ote.message.event.send;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.osee.ote.OTEException;
import org.eclipse.osee.ote.endpoint.OteUdpEndpoint;
import org.eclipse.osee.ote.internal.message.event.send.NotifyOnResponse;
import org.eclipse.osee.ote.internal.message.event.send.OteEventMessageFutureImpl;
import org.eclipse.osee.ote.internal.message.event.send.OteEventMessageFutureMultipleResponseImpl;
import org.eclipse.osee.ote.internal.message.event.send.OteEventMessageResponseFutureImpl;
import org.eclipse.osee.ote.message.event.OteEventMessage;

public class OteEndpointSendEventMessage {

   private final OteUdpEndpoint endpoint;
   private final Lock lock;
   private final Condition responseReceived;
   private final InetSocketAddress destination;

   public OteEndpointSendEventMessage(OteUdpEndpoint eventAdmin, InetSocketAddress destination){
      this.endpoint = eventAdmin;
      this.destination = destination;
      lock = new ReentrantLock();
      responseReceived = lock.newCondition();
   }

   /**
    * sends a message and returns immediately
    */
   public void asynchSend(OteEventMessage message) {
      updateHeaderInfo(message);
      endpoint.getOteEndpointThreadedSender(destination).send(message);
   }

   /**
    * Registers for a callback of the given message type as specified by the RESPONSE_TOPIC element in the sent message
    * and the class type passed in, then sends the given message and returns immediately.  The returned value can be used to
    * wait for the response using waitForCompletion().  The callback expects you to handle both the response and the timeout case.
    *
    * @param clazz - Type of OteEventMessage for the response
    * @param message - message to send
    * @param callable - callback executed when the response is recieved or if a timeout occurs or called immediately after the send if
    *                   no response is expected
    * @param timeout - amount of time in milliseconds to wait for response before calling timeout on the passed in OteEventMessageCallable
    * @return   <T extends OteEventMessage> Future<T> - a future that contains the response message
    */
   public <T extends OteEventMessage, R extends OteEventMessage> OteEventMessageFuture<T, R> asynchSendAndResponse(Class<R> clazz, T message, OteEventMessageCallable<T, R> callable, long timeout){
      int responseId = updateHeaderInfo(message);
      String responseTopic = message.getHeader().RESPONSE_TOPIC.getValue();
      OteEventMessageFutureImpl<T, R> response = new OteEventMessageFutureImpl<>(clazz, callable, message, responseTopic, responseId, timeout);
      endpoint.getOteEndpointThreadedSender(destination).send(message);
      return response;
   }

   /**
    * Registers for a callback of the given message type as specified by the RESPONSE_TOPIC element in the sent message
    * and the class type passed in, then sends the given message and returns immediately.  The returned value can be used to
    * wait for the response using waitForCompletion().  The callback expects you to handle both the response and the timeout case and to determine
    * when the appropriate number of responses has been received, by specifying it is complete.
    *
    * @param clazz - Type of OteEventMessage for the response
    * @param message - message to send
    * @param callable - callback executed when the response is recieved or if a timeout occurs or called immediately after the send if
    *                   no response is expected
    * @param timeout - amount of time in milliseconds to wait for response before calling timeout on the passed in OteEventMessageCallable
    * @return   <T extends OteEventMessage> Future<T> - a future that contains the response message
    */
   public <T extends OteEventMessage, R extends OteEventMessage> OteEventMessageFuture<T, R> asynchSendAndMultipleResponse(Class<R> clazz, T message, OteEventMessageCallable<T, R> callable, long timeout){
      int responseId = updateHeaderInfo(message);
      String responseTopic = message.getHeader().RESPONSE_TOPIC.getValue();
      OteEventMessageFutureImpl<T, R> response = new OteEventMessageFutureMultipleResponseImpl<>(clazz, callable, message, responseTopic, responseId, timeout);
      endpoint.getOteEndpointThreadedSender(destination).send(message);
      return response;
   }

   /**
    * Registers for a callback of the given message type and topic.
    *
    * @param clazz - Type of OteEventMessage for the response
    * @param callable - callback executed when the response is recieved
    * @return   a future that you should cancel when done listening so resources can be cleaned up.
    */
   public <R extends OteEventMessage> OteEventMessageResponseFuture<R> asynchResponse(Class<R> clazz, String topic, OteEventMessageResponseCallable<R> callable){
      OteEventMessageResponseFutureImpl<R> response = new OteEventMessageResponseFutureImpl<>(clazz, callable, topic);
      return response;
   }

   /**
    * Sends a message and waits for a response.
    *
    * @param class - return type
    * @param message - message to send
    * @param timeout - timeout in milliseconds
    * @return <T extends OteEventMessage> T - NULL if the timeout occurs before a response, otherwise returns the
    *          message specified by the RESPONSE_TOPIC field in the passed in message header.
    * @throws Exception
    */
   public <T extends OteEventMessage> T synchSendAndResponse(Class<T> clazz, String responseTopic, OteEventMessage message, long timeout) throws OTEException {
      lock.lock();
      try{
         int responseId = updateHeaderInfo(message);
         message.getHeader().RESPONSE_TOPIC.setValue(responseTopic);
         NotifyOnResponse<T> response = new NotifyOnResponse<>(clazz, responseTopic, responseId, lock, responseReceived);
         try{
            endpoint.getOteEndpointThreadedSender(destination).send(message);
            long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
            while(nanos > 0 && !response.hasResponse()) {
               try {
                  nanos = responseReceived.awaitNanos(nanos);
               } catch (InterruptedException e) {
                  e.printStackTrace();
               }
            }
         } finally {
            response.dispose();
         }
         return response.getMessage();
      } finally {
         lock.unlock();
      }
   }

   /**
    * Sends a message and waits for a response.
    *
    * @param T - response Message to populate type
    * @param message - message to send
    * @param timeout - timeout in milliseconds
    * @return <T extends OteEventMessage> T - NULL if the timeout occurs before a response, otherwise returns the
    *          message specified by the RESPONSE_TOPIC field in the passed in message header.
    * @throws Exception
    */
   public <T extends OteEventMessage> T synchSendAndResponse(T responseMessage, OteEventMessage sendMessage, long timeout) throws OTEException {
      lock.lock();
      try{
         int responseId = updateHeaderInfo(sendMessage);
         sendMessage.getHeader().RESPONSE_TOPIC.setValue(responseMessage.getHeader().TOPIC.getValue());
         NotifyOnResponse<T> response = new NotifyOnResponse<>(responseMessage, responseId, lock, responseReceived);
         try{
            endpoint.getOteEndpointThreadedSender(destination).send(sendMessage);
            long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
            while(nanos > 0 && !response.hasResponse()) {
               try {
                  nanos = responseReceived.awaitNanos(nanos);
               } catch (InterruptedException e) {
                  e.printStackTrace();
               }
            }
         } finally {
            response.dispose();
         }
         return response.getMessage();
      } finally {
         lock.unlock();
      }
   }


   /**
    * Increment the sequence number and set the source InetSocketAddress
    *
    * @param message
    * @return returns the sequence number that was set
    */
   private int updateHeaderInfo(OteEventMessage message){
      int responseId = message.getHeader().MESSAGE_SEQUENCE_NUMBER.getValue();
      if(responseId >= Integer.MAX_VALUE){
         responseId = 1;
      } else {
         responseId++;
      }
      message.getHeader().MESSAGE_SEQUENCE_NUMBER.setValue(responseId);

      message.getHeader().ADDRESS.setAddress(endpoint.getLocalEndpoint().getAddress());
      message.getHeader().ADDRESS.setPort(endpoint.getLocalEndpoint().getPort());
      return responseId;
   }

   /**
    * Sends a message and waits for a response without creating a sender thread
    *
    * @param T - response Message to populate type
    * @param message - message to send
    * @param timeout - timeout in milliseconds
    * @return <T extends OteEventMessage> T - NULL if the timeout occurs before a response, otherwise returns the
    *          message specified by the RESPONSE_TOPIC field in the passed in message header.
    * @throws Exception
    */
   public <T extends OteEventMessage> T synchSendAndResponseInline(T responseMessage, OteEventMessage sendMessage, long timeout) {
      lock.lock();
      try{
         int responseId = updateHeaderInfo(sendMessage);
         sendMessage.getHeader().RESPONSE_TOPIC.setValue(responseMessage.getHeader().TOPIC.getValue());
         NotifyOnResponse<T> response = new NotifyOnResponse<>(responseMessage, responseId, lock, responseReceived);
         try{
            endpoint.getOteEndpointInlineSender(destination).send(sendMessage);
            long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
            while(nanos > 0 && !response.hasResponse()) {
               try {
                  nanos = responseReceived.awaitNanos(nanos);
               } catch (InterruptedException e) {
                  e.printStackTrace();
               }
            }
         } finally {
            response.dispose();
         }
         return response.getMessage();
      } finally {
         lock.unlock();
      }
   }

}

Back to the top