1 /*
2 * Copyright 2003-2004 The Apache Software Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.apache.commons.net.telnet;
17
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.InterruptedIOException;
22
23 /***
24 *
25 * <p>
26 *
27 * <p>
28 * <p>
29 * @author Daniel F. Savarese
30 * @author Bruno D'Avanzo
31 ***/
32
33
34 final class TelnetInputStream extends BufferedInputStream implements Runnable
35 {
36 static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
37 _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
38 _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
39
40 private boolean __hasReachedEOF, __isClosed;
41 private boolean __readIsWaiting;
42 private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
43 private int[] __queue;
44 private TelnetClient __client;
45 private Thread __thread;
46 private IOException __ioException;
47
48 /* TERMINAL-TYPE option (start)*/
49 private int __suboption[] = new int[256];
50 private int __suboption_count = 0;
51 /* TERMINAL-TYPE option (end)*/
52
53 private boolean __threaded;
54
55 TelnetInputStream(InputStream input, TelnetClient client,
56 boolean readerThread)
57 {
58 super(input);
59 __client = client;
60 __receiveState = _STATE_DATA;
61 __isClosed = true;
62 __hasReachedEOF = false;
63 // Make it 2049, because when full, one slot will go unused, and we
64 // want a 2048 byte buffer just to have a round number (base 2 that is)
65 __queue = new int[2049];
66 __queueHead = 0;
67 __queueTail = 0;
68 __bytesAvailable = 0;
69 __ioException = null;
70 __readIsWaiting = false;
71 __threaded = false;
72 if(readerThread)
73 __thread = new Thread(this);
74 else
75 __thread = null;
76 }
77
78 TelnetInputStream(InputStream input, TelnetClient client) {
79 this(input, client, true);
80 }
81
82 void _start()
83 {
84 if(__thread == null)
85 return;
86
87 int priority;
88 __isClosed = false;
89 // Need to set a higher priority in case JVM does not use pre-emptive
90 // threads. This should prevent scheduler induced deadlock (rather than
91 // deadlock caused by a bug in this code).
92 priority = Thread.currentThread().getPriority() + 1;
93 if (priority > Thread.MAX_PRIORITY)
94 priority = Thread.MAX_PRIORITY;
95 __thread.setPriority(priority);
96 __thread.setDaemon(true);
97 __thread.start();
98 __threaded = true;
99 }
100
101
102 // synchronized(__client) critical sections are to protect against
103 // TelnetOutputStream writing through the telnet client at same time
104 // as a processDo/Will/etc. command invoked from TelnetInputStream
105 // tries to write.
106 private int __read() throws IOException
107 {
108 int ch;
109
110 _loop:
111 while (true)
112 {
113 // Exit only when we reach end of stream.
114 if ((ch = super.read()) < 0)
115 return -1;
116
117 ch = (ch & 0xff);
118
119 /* Code Section added for supporting AYT (start)*/
120 synchronized (__client)
121 {
122 __client._processAYTResponse();
123 }
124 /* Code Section added for supporting AYT (end)*/
125
126 /* Code Section added for supporting spystreams (start)*/
127 __client._spyRead(ch);
128 /* Code Section added for supporting spystreams (end)*/
129
130 _mainSwitch:
131 switch (__receiveState)
132 {
133
134 case _STATE_CR:
135 if (ch == '\0')
136 {
137 // Strip null
138 continue;
139 }
140 // How do we handle newline after cr?
141 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
142
143 // Handle as normal data by falling through to _STATE_DATA case
144
145 case _STATE_DATA:
146 if (ch == TelnetCommand.IAC)
147 {
148 __receiveState = _STATE_IAC;
149 continue;
150 }
151
152
153 if (ch == '\r')
154 {
155 synchronized (__client)
156 {
157 if (__client._requestedDont(TelnetOption.BINARY))
158 __receiveState = _STATE_CR;
159 else
160 __receiveState = _STATE_DATA;
161 }
162 }
163 else
164 __receiveState = _STATE_DATA;
165 break;
166
167 case _STATE_IAC:
168 switch (ch)
169 {
170 case TelnetCommand.WILL:
171 __receiveState = _STATE_WILL;
172 continue;
173 case TelnetCommand.WONT:
174 __receiveState = _STATE_WONT;
175 continue;
176 case TelnetCommand.DO:
177 __receiveState = _STATE_DO;
178 continue;
179 case TelnetCommand.DONT:
180 __receiveState = _STATE_DONT;
181 continue;
182 /* TERMINAL-TYPE option (start)*/
183 case TelnetCommand.SB:
184 __suboption_count = 0;
185 __receiveState = _STATE_SB;
186 continue;
187 /* TERMINAL-TYPE option (end)*/
188 case TelnetCommand.IAC:
189 __receiveState = _STATE_DATA;
190 break;
191 default:
192 break;
193 }
194 __receiveState = _STATE_DATA;
195 continue;
196 case _STATE_WILL:
197 synchronized (__client)
198 {
199 __client._processWill(ch);
200 __client._flushOutputStream();
201 }
202 __receiveState = _STATE_DATA;
203 continue;
204 case _STATE_WONT:
205 synchronized (__client)
206 {
207 __client._processWont(ch);
208 __client._flushOutputStream();
209 }
210 __receiveState = _STATE_DATA;
211 continue;
212 case _STATE_DO:
213 synchronized (__client)
214 {
215 __client._processDo(ch);
216 __client._flushOutputStream();
217 }
218 __receiveState = _STATE_DATA;
219 continue;
220 case _STATE_DONT:
221 synchronized (__client)
222 {
223 __client._processDont(ch);
224 __client._flushOutputStream();
225 }
226 __receiveState = _STATE_DATA;
227 continue;
228 /* TERMINAL-TYPE option (start)*/
229 case _STATE_SB:
230 switch (ch)
231 {
232 case TelnetCommand.IAC:
233 __receiveState = _STATE_IAC_SB;
234 continue;
235 default:
236 // store suboption char
237 __suboption[__suboption_count++] = ch;
238 break;
239 }
240 __receiveState = _STATE_SB;
241 continue;
242 case _STATE_IAC_SB:
243 switch (ch)
244 {
245 case TelnetCommand.SE:
246 synchronized (__client)
247 {
248 __client._processSuboption(__suboption, __suboption_count);
249 __client._flushOutputStream();
250 }
251 __receiveState = _STATE_DATA;
252 continue;
253 default:
254 __receiveState = _STATE_SB;
255 break;
256 }
257 __receiveState = _STATE_DATA;
258 continue;
259 /* TERMINAL-TYPE option (end)*/
260 }
261
262 break;
263 }
264
265 return ch;
266 }
267
268 // synchronized(__client) critical sections are to protect against
269 // TelnetOutputStream writing through the telnet client at same time
270 // as a processDo/Will/etc. command invoked from TelnetInputStream
271 // tries to write.
272 private void __processChar(int ch) throws InterruptedException
273 {
274 // Critical section because we're altering __bytesAvailable,
275 // __queueTail, and the contents of _queue.
276 synchronized (__queue)
277 {
278 while (__bytesAvailable >= __queue.length - 1)
279 {
280 if(__threaded)
281 {
282 __queue.notify();
283 try
284 {
285 __queue.wait();
286 }
287 catch (InterruptedException e)
288 {
289 throw e;
290 }
291 }
292 }
293
294 // Need to do this in case we're not full, but block on a read
295 if (__readIsWaiting && __threaded)
296 {
297 __queue.notify();
298 }
299
300 __queue[__queueTail] = ch;
301 ++__bytesAvailable;
302
303 if (++__queueTail >= __queue.length)
304 __queueTail = 0;
305 }
306 }
307
308 public int read() throws IOException
309 {
310 // Critical section because we're altering __bytesAvailable,
311 // __queueHead, and the contents of _queue in addition to
312 // testing value of __hasReachedEOF.
313 synchronized (__queue)
314 {
315
316 while (true)
317 {
318 if (__ioException != null)
319 {
320 IOException e;
321 e = __ioException;
322 __ioException = null;
323 throw e;
324 }
325
326 if (__bytesAvailable == 0)
327 {
328 // Return -1 if at end of file
329 if (__hasReachedEOF)
330 return -1;
331
332 // Otherwise, we have to wait for queue to get something
333 if(__threaded)
334 {
335 __queue.notify();
336 try
337 {
338 __readIsWaiting = true;
339 __queue.wait();
340 __readIsWaiting = false;
341 }
342 catch (InterruptedException e)
343 {
344 throw new IOException("Fatal thread interruption during read.");
345 }
346 }
347 else
348 {
349 //__alreadyread = false;
350 __readIsWaiting = true;
351 int ch;
352
353 do
354 {
355 try
356 {
357 if ((ch = __read()) < 0)
358 if(ch != -2)
359 return (ch);
360 }
361 catch (InterruptedIOException e)
362 {
363 synchronized (__queue)
364 {
365 __ioException = e;
366 __queue.notifyAll();
367 try
368 {
369 __queue.wait(100);
370 }
371 catch (InterruptedException interrupted)
372 {
373 }
374 }
375 return (-1);
376 }
377
378
379 try
380 {
381 if(ch != -2)
382 {
383 __processChar(ch);
384 }
385 }
386 catch (InterruptedException e)
387 {
388 if (__isClosed)
389 return (-1);
390 }
391 }
392 while (super.available() > 0);
393
394 __readIsWaiting = false;
395 }
396 continue;
397 }
398 else
399 {
400 int ch;
401
402 ch = __queue[__queueHead];
403
404 if (++__queueHead >= __queue.length)
405 __queueHead = 0;
406
407 --__bytesAvailable;
408
409 // Need to explicitly notify() so available() works properly
410 if(__bytesAvailable == 0 && __threaded) {
411 __queue.notify();
412 }
413
414 return ch;
415 }
416 }
417 }
418 }
419
420
421 /***
422 * Reads the next number of bytes from the stream into an array and
423 * returns the number of bytes read. Returns -1 if the end of the
424 * stream has been reached.
425 * <p>
426 * @param buffer The byte array in which to store the data.
427 * @return The number of bytes read. Returns -1 if the
428 * end of the message has been reached.
429 * @exception IOException If an error occurs in reading the underlying
430 * stream.
431 ***/
432 public int read(byte buffer[]) throws IOException
433 {
434 return read(buffer, 0, buffer.length);
435 }
436
437
438 /***
439 * Reads the next number of bytes from the stream into an array and returns
440 * the number of bytes read. Returns -1 if the end of the
441 * message has been reached. The characters are stored in the array
442 * starting from the given offset and up to the length specified.
443 * <p>
444 * @param buffer The byte array in which to store the data.
445 * @param offset The offset into the array at which to start storing data.
446 * @param length The number of bytes to read.
447 * @return The number of bytes read. Returns -1 if the
448 * end of the stream has been reached.
449 * @exception IOException If an error occurs while reading the underlying
450 * stream.
451 ***/
452 public int read(byte buffer[], int offset, int length) throws IOException
453 {
454 int ch, off;
455
456 if (length < 1)
457 return 0;
458
459 // Critical section because run() may change __bytesAvailable
460 synchronized (__queue)
461 {
462 if (length > __bytesAvailable)
463 length = __bytesAvailable;
464 }
465
466 if ((ch = read()) == -1)
467 return -1;
468
469 off = offset;
470
471 do
472 {
473 buffer[offset++] = (byte)ch;
474 }
475 while (--length > 0 && (ch = read()) != -1);
476
477 //__client._spyRead(buffer, off, offset - off);
478 return (offset - off);
479 }
480
481
482 /*** Returns false. Mark is not supported. ***/
483 public boolean markSupported()
484 {
485 return false;
486 }
487
488 public int available() throws IOException
489 {
490 // Critical section because run() may change __bytesAvailable
491 synchronized (__queue)
492 {
493 return __bytesAvailable;
494 }
495 }
496
497
498 // Cannot be synchronized. Will cause deadlock if run() is blocked
499 // in read because BufferedInputStream read() is synchronized.
500 public void close() throws IOException
501 {
502 // Completely disregard the fact thread may still be running.
503 // We can't afford to block on this close by waiting for
504 // thread to terminate because few if any JVM's will actually
505 // interrupt a system read() from the interrupt() method.
506 super.close();
507
508 synchronized (__queue)
509 {
510 __hasReachedEOF = true;
511 __isClosed = true;
512
513 if (__thread != null && __thread.isAlive())
514 {
515 __thread.interrupt();
516 }
517
518 __queue.notifyAll();
519 }
520
521 __threaded = false;
522 }
523
524 public void run()
525 {
526 int ch;
527
528 try
529 {
530 _outerLoop:
531 while (!__isClosed)
532 {
533 try
534 {
535 if ((ch = __read()) < 0)
536 break;
537 }
538 catch (InterruptedIOException e)
539 {
540 synchronized (__queue)
541 {
542 __ioException = e;
543 __queue.notifyAll();
544 try
545 {
546 __queue.wait(100);
547 }
548 catch (InterruptedException interrupted)
549 {
550 if (__isClosed)
551 break _outerLoop;
552 }
553 continue;
554 }
555 } catch(RuntimeException re) {
556 // We treat any runtime exceptions as though the
557 // stream has been closed. We close the
558 // underlying stream just to be sure.
559 super.close();
560 // Breaking the loop has the effect of setting
561 // the state to closed at the end of the method.
562 break _outerLoop;
563 }
564
565 try
566 {
567 __processChar(ch);
568 }
569 catch (InterruptedException e)
570 {
571 if (__isClosed)
572 break _outerLoop;
573 }
574 }
575 }
576 catch (IOException ioe)
577 {
578 synchronized (__queue)
579 {
580 __ioException = ioe;
581 }
582 }
583
584 synchronized (__queue)
585 {
586 __isClosed = true; // Possibly redundant
587 __hasReachedEOF = true;
588 __queue.notify();
589 }
590
591 __threaded = false;
592 }
593 }
594
595 /* Emacs configuration
596 * Local variables: **
597 * mode: java **
598 * c-basic-offset: 4 **
599 * indent-tabs-mode: nil **
600 * End: **
601 */