1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25
26
27
28
29
30
31
32
33
34
35 final class TelnetInputStream extends BufferedInputStream implements Runnable
36 {
37
38 private static final int EOF = -1;
39
40
41 private static final int WOULD_BLOCK = -2;
42
43
44 static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
45 _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
46 _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
47
48 private boolean __hasReachedEOF;
49 private volatile boolean __isClosed;
50 private boolean __readIsWaiting;
51 private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
52 private final int[] __queue;
53 private final TelnetClient __client;
54 private final Thread __thread;
55 private IOException __ioException;
56
57
58 private final int __suboption[] = new int[512];
59 private int __suboption_count = 0;
60
61
62 private volatile boolean __threaded;
63
64 TelnetInputStream(InputStream input, TelnetClient client,
65 boolean readerThread)
66 {
67 super(input);
68 __client = client;
69 __receiveState = _STATE_DATA;
70 __isClosed = true;
71 __hasReachedEOF = false;
72
73
74 __queue = new int[2049];
75 __queueHead = 0;
76 __queueTail = 0;
77 __bytesAvailable = 0;
78 __ioException = null;
79 __readIsWaiting = false;
80 __threaded = false;
81 if(readerThread) {
82 __thread = new Thread(this);
83 } else {
84 __thread = null;
85 }
86 }
87
88 TelnetInputStream(InputStream input, TelnetClient client) {
89 this(input, client, true);
90 }
91
92 void _start()
93 {
94 if(__thread == null) {
95 return;
96 }
97
98 int priority;
99 __isClosed = false;
100
101
102
103
104 priority = Thread.currentThread().getPriority() + 1;
105 if (priority > Thread.MAX_PRIORITY) {
106 priority = Thread.MAX_PRIORITY;
107 }
108 __thread.setPriority(priority);
109 __thread.setDaemon(true);
110 __thread.start();
111 __threaded = true;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 private int __read(boolean mayBlock) throws IOException
129 {
130 int ch;
131
132 while (true)
133 {
134
135
136 if(!mayBlock && super.available() == 0) {
137 return WOULD_BLOCK;
138 }
139
140
141 if ((ch = super.read()) < 0) {
142 return EOF;
143 }
144
145 ch = (ch & 0xff);
146
147
148 synchronized (__client)
149 {
150 __client._processAYTResponse();
151 }
152
153
154
155 __client._spyRead(ch);
156
157
158 switch (__receiveState)
159 {
160
161 case _STATE_CR:
162 if (ch == '\0')
163 {
164
165 continue;
166 }
167
168
169
170
171
172
173 case _STATE_DATA:
174 if (ch == TelnetCommand.IAC)
175 {
176 __receiveState = _STATE_IAC;
177 continue;
178 }
179
180
181 if (ch == '\r')
182 {
183 synchronized (__client)
184 {
185 if (__client._requestedDont(TelnetOption.BINARY)) {
186 __receiveState = _STATE_CR;
187 } else {
188 __receiveState = _STATE_DATA;
189 }
190 }
191 } else {
192 __receiveState = _STATE_DATA;
193 }
194 break;
195
196 case _STATE_IAC:
197 switch (ch)
198 {
199 case TelnetCommand.WILL:
200 __receiveState = _STATE_WILL;
201 continue;
202 case TelnetCommand.WONT:
203 __receiveState = _STATE_WONT;
204 continue;
205 case TelnetCommand.DO:
206 __receiveState = _STATE_DO;
207 continue;
208 case TelnetCommand.DONT:
209 __receiveState = _STATE_DONT;
210 continue;
211
212 case TelnetCommand.SB:
213 __suboption_count = 0;
214 __receiveState = _STATE_SB;
215 continue;
216
217 case TelnetCommand.IAC:
218 __receiveState = _STATE_DATA;
219 break;
220 case TelnetCommand.SE:
221 __receiveState = _STATE_DATA;
222 continue;
223 default:
224 __receiveState = _STATE_DATA;
225 __client._processCommand(ch);
226 continue;
227 }
228 break;
229 case _STATE_WILL:
230 synchronized (__client)
231 {
232 __client._processWill(ch);
233 __client._flushOutputStream();
234 }
235 __receiveState = _STATE_DATA;
236 continue;
237 case _STATE_WONT:
238 synchronized (__client)
239 {
240 __client._processWont(ch);
241 __client._flushOutputStream();
242 }
243 __receiveState = _STATE_DATA;
244 continue;
245 case _STATE_DO:
246 synchronized (__client)
247 {
248 __client._processDo(ch);
249 __client._flushOutputStream();
250 }
251 __receiveState = _STATE_DATA;
252 continue;
253 case _STATE_DONT:
254 synchronized (__client)
255 {
256 __client._processDont(ch);
257 __client._flushOutputStream();
258 }
259 __receiveState = _STATE_DATA;
260 continue;
261
262 case _STATE_SB:
263 switch (ch)
264 {
265 case TelnetCommand.IAC:
266 __receiveState = _STATE_IAC_SB;
267 continue;
268 default:
269
270 if (__suboption_count < __suboption.length) {
271 __suboption[__suboption_count++] = ch;
272 }
273 break;
274 }
275 __receiveState = _STATE_SB;
276 continue;
277 case _STATE_IAC_SB:
278 switch (ch)
279 {
280 case TelnetCommand.SE:
281 synchronized (__client)
282 {
283 __client._processSuboption(__suboption, __suboption_count);
284 __client._flushOutputStream();
285 }
286 __receiveState = _STATE_DATA;
287 continue;
288 case TelnetCommand.IAC:
289 if (__suboption_count < __suboption.length) {
290 __suboption[__suboption_count++] = ch;
291 }
292 break;
293 default:
294 break;
295 }
296 __receiveState = _STATE_SB;
297 continue;
298
299 }
300
301 break;
302 }
303
304 return ch;
305 }
306
307
308
309
310
311 private boolean __processChar(int ch) throws InterruptedException
312 {
313
314
315 boolean bufferWasEmpty;
316 synchronized (__queue)
317 {
318 bufferWasEmpty = (__bytesAvailable == 0);
319 while (__bytesAvailable >= __queue.length - 1)
320 {
321
322
323 if(__threaded)
324 {
325 __queue.notify();
326 try
327 {
328 __queue.wait();
329 }
330 catch (InterruptedException e)
331 {
332 throw e;
333 }
334 }
335 else
336 {
337
338
339 throw new IllegalStateException("Queue is full! Cannot process another character.");
340 }
341 }
342
343
344 if (__readIsWaiting && __threaded)
345 {
346 __queue.notify();
347 }
348
349 __queue[__queueTail] = ch;
350 ++__bytesAvailable;
351
352 if (++__queueTail >= __queue.length) {
353 __queueTail = 0;
354 }
355 }
356 return bufferWasEmpty;
357 }
358
359 @Override
360 public int read() throws IOException
361 {
362
363
364
365 synchronized (__queue)
366 {
367
368 while (true)
369 {
370 if (__ioException != null)
371 {
372 IOException e;
373 e = __ioException;
374 __ioException = null;
375 throw e;
376 }
377
378 if (__bytesAvailable == 0)
379 {
380
381 if (__hasReachedEOF) {
382 return EOF;
383 }
384
385
386 if(__threaded)
387 {
388 __queue.notify();
389 try
390 {
391 __readIsWaiting = true;
392 __queue.wait();
393 __readIsWaiting = false;
394 }
395 catch (InterruptedException e)
396 {
397 throw new InterruptedIOException("Fatal thread interruption during read.");
398 }
399 }
400 else
401 {
402
403 __readIsWaiting = true;
404 int ch;
405 boolean mayBlock = true;
406
407 do
408 {
409 try
410 {
411 if ((ch = __read(mayBlock)) < 0) {
412 if(ch != WOULD_BLOCK) {
413 return (ch);
414 }
415 }
416 }
417 catch (InterruptedIOException e)
418 {
419 synchronized (__queue)
420 {
421 __ioException = e;
422 __queue.notifyAll();
423 try
424 {
425 __queue.wait(100);
426 }
427 catch (InterruptedException interrupted)
428 {
429 }
430 }
431 return EOF;
432 }
433
434
435 try
436 {
437 if(ch != WOULD_BLOCK)
438 {
439 __processChar(ch);
440 }
441 }
442 catch (InterruptedException e)
443 {
444 if (__isClosed) {
445 return EOF;
446 }
447 }
448
449
450
451 mayBlock = false;
452
453 }
454
455 while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
456
457 __readIsWaiting = false;
458 }
459 continue;
460 }
461 else
462 {
463 int ch;
464
465 ch = __queue[__queueHead];
466
467 if (++__queueHead >= __queue.length) {
468 __queueHead = 0;
469 }
470
471 --__bytesAvailable;
472
473
474 if(__bytesAvailable == 0 && __threaded) {
475 __queue.notify();
476 }
477
478 return ch;
479 }
480 }
481 }
482 }
483
484
485
486
487
488
489
490
491
492
493
494
495
496 @Override
497 public int read(byte buffer[]) throws IOException
498 {
499 return read(buffer, 0, buffer.length);
500 }
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517 @Override
518 public int read(byte buffer[], int offset, int length) throws IOException
519 {
520 int ch, off;
521
522 if (length < 1) {
523 return 0;
524 }
525
526
527 synchronized (__queue)
528 {
529 if (length > __bytesAvailable) {
530 length = __bytesAvailable;
531 }
532 }
533
534 if ((ch = read()) == EOF) {
535 return EOF;
536 }
537
538 off = offset;
539
540 do
541 {
542 buffer[offset++] = (byte)ch;
543 }
544 while (--length > 0 && (ch = read()) != EOF);
545
546
547 return (offset - off);
548 }
549
550
551
552 @Override
553 public boolean markSupported()
554 {
555 return false;
556 }
557
558 @Override
559 public int available() throws IOException
560 {
561
562 synchronized (__queue)
563 {
564 if (__threaded) {
565 return __bytesAvailable;
566 } else {
567 return __bytesAvailable + super.available();
568 }
569 }
570 }
571
572
573
574
575 @Override
576 public void close() throws IOException
577 {
578
579
580
581
582 super.close();
583
584 synchronized (__queue)
585 {
586 __hasReachedEOF = true;
587 __isClosed = true;
588
589 if (__thread != null && __thread.isAlive())
590 {
591 __thread.interrupt();
592 }
593
594 __queue.notifyAll();
595 }
596
597 }
598
599
600 public void run()
601 {
602 int ch;
603
604 try
605 {
606 _outerLoop:
607 while (!__isClosed)
608 {
609 try
610 {
611 if ((ch = __read(true)) < 0) {
612 break;
613 }
614 }
615 catch (InterruptedIOException e)
616 {
617 synchronized (__queue)
618 {
619 __ioException = e;
620 __queue.notifyAll();
621 try
622 {
623 __queue.wait(100);
624 }
625 catch (InterruptedException interrupted)
626 {
627 if (__isClosed) {
628 break _outerLoop;
629 }
630 }
631 continue;
632 }
633 } catch(RuntimeException re) {
634
635
636
637 super.close();
638
639
640 break _outerLoop;
641 }
642
643
644 boolean notify = false;
645 try
646 {
647 notify = __processChar(ch);
648 }
649 catch (InterruptedException e)
650 {
651 if (__isClosed) {
652 break _outerLoop;
653 }
654 }
655
656
657 if (notify) {
658 __client.notifyInputListener();
659 }
660 }
661 }
662 catch (IOException ioe)
663 {
664 synchronized (__queue)
665 {
666 __ioException = ioe;
667 }
668 __client.notifyInputListener();
669 }
670
671 synchronized (__queue)
672 {
673 __isClosed = true;
674 __hasReachedEOF = true;
675 __queue.notify();
676 }
677
678 __threaded = false;
679 }
680 }
681
682
683
684
685
686
687
688