Skip to main content
summaryrefslogtreecommitdiffstats
blob: 37f59ca64a6aef61b5de90cbbe2fd28456d8e39a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/*******************************************************************************
 * Copyright (c) 2004, 2007 Boeing.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *     Boeing - initial API and implementation
 *******************************************************************************/
package org.eclipse.osee.framework.jdk.core.util.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Sole purpose is to read input from a specified stream and save it into a buffer providing some access methods to the
 * buffer.
 * 
 * @author Michael P. Masterson
 */
public class InputBufferThread extends Thread {
   private final byte[] charBuffer = new byte[1024];
   private final InputStream input;
   private final StringBuilder buffer;
   private volatile boolean shouldStopRunning;
   private long lastRead;

   /**
    * @param input The stream to read from
    */
   public InputBufferThread(InputStream input) {
      super("Stream input buffer thread");
      shouldStopRunning = false;
      this.input = input;
      buffer = new StringBuilder(8196);
   }

   /**
    * Overridden Thread.run method. Reads from the input stream on character at a time until the end of available input
    * or until the bold shouldStopRunning is set by an outside source.
    */
   @Override
   public void run() {
      System.out.println("thread started");
      int count = 0;
      try {
         int size = input.read(charBuffer);
         while (size >= 0) {
            count++;
            synchronized (this) {
               lastRead = System.currentTimeMillis();
               if (shouldStopRunning) {
                  break;
               }
               append(charBuffer, size);
            }
            size = input.read(charBuffer);
         }
      } catch (InterruptedIOException e) {
         if (shouldStopRunning != true) {
            // this is an error
            e.printStackTrace(System.err);
         }
      } catch (IOException e) {
         if (!shouldStopRunning) {
            System.err.println("error at count " + count);
            e.printStackTrace(System.err);
         }
      } finally {
         try {
            input.close();
         } catch (IOException e) {
            e.printStackTrace(System.out);
            System.out.flush();
         } finally {
            // wake up anyone waiting for data or else they will be stuck forever
            synchronized (this) {
               notifyAll();
            }
         }
      }
   }

   /**
    * Appends one character to the buffer.
    * 
    * @param line The character to append
    */
   private void append(byte[] line, int size) {
      buffer.append(new String(line, 0, size));
      notify();
   }

   /**
    * Checks if the string passed is contained in the buffer so far
    * 
    * @param matcher The string to look for
    * @return a positive value representing the index at which it was found or negative 1 if it was not found
    */
   public synchronized int contains(String matcher, boolean remove) {
      int index = buffer.lastIndexOf(matcher);
      if (remove && index >= 0) {
         buffer.delete(0, index + matcher.length());
      }
      return index;
   }

   /**
    * Checks if the string passed is contained in the buffer so far
    * 
    * @param matcher The string to look for
    * @return a positive value representing the index at which it was found or negative 1 if it was not found
    */
   public synchronized int contains(String matcher) {
      return contains(matcher, false);
   }

   public synchronized int waitFor(String matcher, int millis) throws InterruptedException {
      return waitFor(matcher, false, millis);
   }

   public synchronized int waitFor(String matcher, boolean remove, int millis) throws InterruptedException {
      if (shouldStopRunning) {
         throw new IllegalStateException("stream processing terminated");
      }
      long time = System.currentTimeMillis();
      int result = contains(matcher, false);
      long timeRemaining = millis;
      while (result < 0 && timeRemaining > 0) {
         wait(timeRemaining);
         if (shouldStopRunning) {
            // we were told to stop or the stream was closed on the other end
            throw new InterruptedException("stream processing terminated");
         }
         result = contains(matcher, false);
         timeRemaining = millis - (System.currentTimeMillis() - time);
      }
      if (remove && result >= 0) {
         buffer.delete(0, result + matcher.length());
      }
      return result;
   }

   public synchronized MatchResult waitFor(Pattern pattern, boolean remove, int millis) throws InterruptedException {
      if (shouldStopRunning) {
         throw new IllegalStateException("stream processing terminated");
      }
      Matcher matcher = pattern.matcher(buffer.toString());
      long time = System.currentTimeMillis();
      long timeRemaining = millis;
      boolean result = matcher.matches();
      while (!result && timeRemaining > 0) {
         wait(timeRemaining);
         if (shouldStopRunning) {
            // we were told to stop or the stream was closed on the other end
            throw new InterruptedException("stream processing terminated");
         }
         matcher = matcher.reset(buffer.toString());
         result = matcher.matches();
         timeRemaining = millis - (System.currentTimeMillis() - time);
      }
      if (remove && result) {
         buffer.delete(0, matcher.end());
      }
      return result ? matcher.toMatchResult() : null;
   }

   /**
    * @param millis
    * @return true if a transmission occurred false otherwise
    * @throws InterruptedException
    */
   public synchronized boolean waitFor(int millis) throws InterruptedException {
      if (shouldStopRunning) {
         throw new IllegalStateException("stream processing terminated");
      }

      long currentTime = System.currentTimeMillis();
      long savedLastRead = lastRead;
      while (savedLastRead != lastRead) {
         wait(millis);
         // make sure we did not reach ourr timeout limit, also we need to handle potential 'spurious' wakeups
         long next = System.currentTimeMillis();
         millis -= next - currentTime;
         currentTime = next;
         if (savedLastRead == lastRead && millis <= 0) {
            return false;
         }
      }
      return true;
   }

   /**
    * @return The entire buffered input.
    */
   public synchronized String getBuffer() {
      return buffer.toString();
   }

   public synchronized String subString(int beginIndex) {
      return buffer.substring(beginIndex);
   }

   public synchronized String subString(int beginIndex, int endIndex) {
      return buffer.substring(beginIndex, endIndex);
   }

   /**
    * @return The entire buffered input.
    */
   public synchronized int getLength() {
      return buffer.length();
   }

   /**
    * Tells this thread whether to stop on the next cycle or not
    * 
    * @param b True if the thread should stop on the next run cycle.
    */
   public void stopOnNextRun(boolean b) {
      this.shouldStopRunning = b;
   }

   public void stopNow() throws InterruptedException {
      this.shouldStopRunning = true;
      interrupt();
      join(5000);
   }

   public synchronized long getLastRead() {
      if (lastRead == 0) {
         return System.currentTimeMillis();
      } else {
         return lastRead;
      }
   }

   public synchronized void clear() {
      buffer.delete(0, buffer.length());
   }

}

Back to the top