View Javadoc

1   /*
2    * Copyright 2003-2004 The Apache Software Foundation
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.apache.commons.net.telnet;
17  
18  import java.io.BufferedInputStream;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.InterruptedIOException;
22  
23  /***
24   *
25   * <p>
26   *
27   * <p>
28   * <p>
29   * @author Daniel F. Savarese
30   * @author Bruno D'Avanzo
31   ***/
32  
33  
34  final class TelnetInputStream extends BufferedInputStream implements Runnable
35  {
36      static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
37                       _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
38                       _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
39  
40      private boolean __hasReachedEOF, __isClosed;
41      private boolean __readIsWaiting;
42      private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
43      private int[] __queue;
44      private TelnetClient __client;
45      private Thread __thread;
46      private IOException __ioException;
47  
48      /* TERMINAL-TYPE option (start)*/
49      private int __suboption[] = new int[256];
50      private int __suboption_count = 0;
51      /* TERMINAL-TYPE option (end)*/
52  
53      private boolean __threaded;
54  
55      TelnetInputStream(InputStream input, TelnetClient client,
56                        boolean readerThread)
57      {
58          super(input);
59          __client = client;
60          __receiveState = _STATE_DATA;
61          __isClosed = true;
62          __hasReachedEOF = false;
63          // Make it 2049, because when full, one slot will go unused, and we
64          // want a 2048 byte buffer just to have a round number (base 2 that is)
65          __queue = new int[2049];
66          __queueHead = 0;
67          __queueTail = 0;
68          __bytesAvailable = 0;
69          __ioException = null;
70          __readIsWaiting = false;
71          __threaded = false;
72          if(readerThread)
73              __thread = new Thread(this);
74          else
75              __thread = null;
76      }
77  
78      TelnetInputStream(InputStream input, TelnetClient client) {
79          this(input, client, true);
80      }
81  
82      void _start()
83      {
84          if(__thread == null)
85              return;
86  
87          int priority;
88          __isClosed = false;
89          // Need to set a higher priority in case JVM does not use pre-emptive
90          // threads.  This should prevent scheduler induced deadlock (rather than
91          // deadlock caused by a bug in this code).
92          priority = Thread.currentThread().getPriority() + 1;
93          if (priority > Thread.MAX_PRIORITY)
94              priority = Thread.MAX_PRIORITY;
95          __thread.setPriority(priority);
96          __thread.setDaemon(true);
97          __thread.start();
98          __threaded = true;
99      }
100 
101 
102     // synchronized(__client) critical sections are to protect against
103     // TelnetOutputStream writing through the telnet client at same time
104     // as a processDo/Will/etc. command invoked from TelnetInputStream
105     // tries to write.
106     private int __read() throws IOException
107     {
108         int ch;
109 
110 _loop:
111         while (true)
112         {
113             // Exit only when we reach end of stream.
114             if ((ch = super.read()) < 0)
115                 return -1;
116 
117             ch = (ch & 0xff);
118 
119             /* Code Section added for supporting AYT (start)*/
120             synchronized (__client)
121             {
122                 __client._processAYTResponse();
123             }
124             /* Code Section added for supporting AYT (end)*/
125 
126             /* Code Section added for supporting spystreams (start)*/
127             __client._spyRead(ch);
128             /* Code Section added for supporting spystreams (end)*/
129 
130 _mainSwitch:
131             switch (__receiveState)
132             {
133 
134             case _STATE_CR:
135                 if (ch == '\0')
136                 {
137                     // Strip null
138                     continue;
139                 }
140                 // How do we handle newline after cr?
141                 //  else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
142 
143                 // Handle as normal data by falling through to _STATE_DATA case
144 
145             case _STATE_DATA:
146                 if (ch == TelnetCommand.IAC)
147                 {
148                     __receiveState = _STATE_IAC;
149                     continue;
150                 }
151 
152 
153                 if (ch == '\r')
154                 {
155                     synchronized (__client)
156                     {
157                         if (__client._requestedDont(TelnetOption.BINARY))
158                             __receiveState = _STATE_CR;
159                         else
160                             __receiveState = _STATE_DATA;
161                     }
162                 }
163                 else
164                     __receiveState = _STATE_DATA;
165                 break;
166 
167             case _STATE_IAC:
168                 switch (ch)
169                 {
170                 case TelnetCommand.WILL:
171                     __receiveState = _STATE_WILL;
172                     continue;
173                 case TelnetCommand.WONT:
174                     __receiveState = _STATE_WONT;
175                     continue;
176                 case TelnetCommand.DO:
177                     __receiveState = _STATE_DO;
178                     continue;
179                 case TelnetCommand.DONT:
180                     __receiveState = _STATE_DONT;
181                     continue;
182                 /* TERMINAL-TYPE option (start)*/
183                 case TelnetCommand.SB:
184                     __suboption_count = 0;
185                     __receiveState = _STATE_SB;
186                     continue;
187                 /* TERMINAL-TYPE option (end)*/
188                 case TelnetCommand.IAC:
189                     __receiveState = _STATE_DATA;
190                     break;
191                 default:
192                     break;
193                 }
194                 __receiveState = _STATE_DATA;
195                 continue;
196             case _STATE_WILL:
197                 synchronized (__client)
198                 {
199                     __client._processWill(ch);
200                     __client._flushOutputStream();
201                 }
202                 __receiveState = _STATE_DATA;
203                 continue;
204             case _STATE_WONT:
205                 synchronized (__client)
206                 {
207                     __client._processWont(ch);
208                     __client._flushOutputStream();
209                 }
210                 __receiveState = _STATE_DATA;
211                 continue;
212             case _STATE_DO:
213                 synchronized (__client)
214                 {
215                     __client._processDo(ch);
216                     __client._flushOutputStream();
217                 }
218                 __receiveState = _STATE_DATA;
219                 continue;
220             case _STATE_DONT:
221                 synchronized (__client)
222                 {
223                     __client._processDont(ch);
224                     __client._flushOutputStream();
225                 }
226                 __receiveState = _STATE_DATA;
227                 continue;
228             /* TERMINAL-TYPE option (start)*/
229             case _STATE_SB:
230                 switch (ch)
231                 {
232                 case TelnetCommand.IAC:
233                     __receiveState = _STATE_IAC_SB;
234                     continue;
235                 default:
236                     // store suboption char
237                     __suboption[__suboption_count++] = ch;
238                     break;
239                 }
240                 __receiveState = _STATE_SB;
241                 continue;
242             case _STATE_IAC_SB:
243                 switch (ch)
244                 {
245                 case TelnetCommand.SE:
246                     synchronized (__client)
247                     {
248                         __client._processSuboption(__suboption, __suboption_count);
249                         __client._flushOutputStream();
250                     }
251                     __receiveState = _STATE_DATA;
252                     continue;
253                 default:
254                     __receiveState = _STATE_SB;
255                     break;
256                 }
257                 __receiveState = _STATE_DATA;
258                 continue;
259             /* TERMINAL-TYPE option (end)*/
260             }
261 
262             break;
263         }
264 
265         return ch;
266     }
267 
268     // synchronized(__client) critical sections are to protect against
269     // TelnetOutputStream writing through the telnet client at same time
270     // as a processDo/Will/etc. command invoked from TelnetInputStream
271     // tries to write.
272     private void __processChar(int ch) throws InterruptedException
273     {
274         // Critical section because we're altering __bytesAvailable,
275         // __queueTail, and the contents of _queue.
276         synchronized (__queue)
277         {
278             while (__bytesAvailable >= __queue.length - 1)
279             {
280                 if(__threaded)
281                 {
282                     __queue.notify();
283                     try
284                     {
285                         __queue.wait();
286                     }
287                     catch (InterruptedException e)
288                     {
289                         throw e;
290                     }
291                 }
292             }
293 
294             // Need to do this in case we're not full, but block on a read
295             if (__readIsWaiting && __threaded)
296             {
297                 __queue.notify();
298             }
299 
300             __queue[__queueTail] = ch;
301             ++__bytesAvailable;
302 
303             if (++__queueTail >= __queue.length)
304                 __queueTail = 0;
305         }
306     }
307 
308     public int read() throws IOException
309     {
310         // Critical section because we're altering __bytesAvailable,
311         // __queueHead, and the contents of _queue in addition to
312         // testing value of __hasReachedEOF.
313         synchronized (__queue)
314         {
315 
316             while (true)
317             {
318                 if (__ioException != null)
319                 {
320                     IOException e;
321                     e = __ioException;
322                     __ioException = null;
323                     throw e;
324                 }
325 
326                 if (__bytesAvailable == 0)
327                 {
328                     // Return -1 if at end of file
329                     if (__hasReachedEOF)
330                         return -1;
331 
332                     // Otherwise, we have to wait for queue to get something
333                     if(__threaded)
334                     {
335                         __queue.notify();
336                         try
337                         {
338                             __readIsWaiting = true;
339                             __queue.wait();
340                             __readIsWaiting = false;
341                         }
342                         catch (InterruptedException e)
343                         {
344                             throw new IOException("Fatal thread interruption during read.");
345                         }
346                     }
347                     else
348                     {
349                         //__alreadyread = false;
350                         __readIsWaiting = true;
351                         int ch;
352 
353                         do
354                         {
355                             try
356                             {
357                                 if ((ch = __read()) < 0)
358                                     if(ch != -2)
359                                         return (ch);
360                             }
361                             catch (InterruptedIOException e)
362                             {
363                                 synchronized (__queue)
364                                 {
365                                     __ioException = e;
366                                     __queue.notifyAll();
367                                     try
368                                     {
369                                         __queue.wait(100);
370                                     }
371                                     catch (InterruptedException interrupted)
372                                     {
373                                     }
374                                 }
375                                 return (-1);
376                             }
377 
378 
379                             try
380                             {
381                                 if(ch != -2)
382                                 {
383                                     __processChar(ch);
384                                 }
385                             }
386                             catch (InterruptedException e)
387                             {
388                                 if (__isClosed)
389                                     return (-1);
390                             }
391                         }
392                         while (super.available() > 0);
393 
394                         __readIsWaiting = false;
395                     }
396                     continue;
397                 }
398                 else
399                 {
400                     int ch;
401 
402                     ch = __queue[__queueHead];
403 
404                     if (++__queueHead >= __queue.length)
405                         __queueHead = 0;
406 
407                     --__bytesAvailable;
408 
409 		    // Need to explicitly notify() so available() works properly
410 		    if(__bytesAvailable == 0 && __threaded) {
411 			    __queue.notify();
412 		    }
413 		    
414                     return ch;
415                 }
416             }
417         }
418     }
419 
420 
421     /***
422      * Reads the next number of bytes from the stream into an array and
423      * returns the number of bytes read.  Returns -1 if the end of the
424      * stream has been reached.
425      * <p>
426      * @param buffer  The byte array in which to store the data.
427      * @return The number of bytes read. Returns -1 if the
428      *          end of the message has been reached.
429      * @exception IOException If an error occurs in reading the underlying
430      *            stream.
431      ***/
432     public int read(byte buffer[]) throws IOException
433     {
434         return read(buffer, 0, buffer.length);
435     }
436 
437 
438     /***
439      * Reads the next number of bytes from the stream into an array and returns
440      * the number of bytes read.  Returns -1 if the end of the
441      * message has been reached.  The characters are stored in the array
442      * starting from the given offset and up to the length specified.
443      * <p>
444      * @param buffer The byte array in which to store the data.
445      * @param offset  The offset into the array at which to start storing data.
446      * @param length   The number of bytes to read.
447      * @return The number of bytes read. Returns -1 if the
448      *          end of the stream has been reached.
449      * @exception IOException If an error occurs while reading the underlying
450      *            stream.
451      ***/
452     public int read(byte buffer[], int offset, int length) throws IOException
453     {
454         int ch, off;
455 
456         if (length < 1)
457             return 0;
458 
459         // Critical section because run() may change __bytesAvailable
460         synchronized (__queue)
461         {
462             if (length > __bytesAvailable)
463                 length = __bytesAvailable;
464         }
465 
466         if ((ch = read()) == -1)
467             return -1;
468 
469         off = offset;
470 
471         do
472         {
473             buffer[offset++] = (byte)ch;
474         }
475         while (--length > 0 && (ch = read()) != -1);
476 
477         //__client._spyRead(buffer, off, offset - off);
478         return (offset - off);
479     }
480 
481 
482     /*** Returns false.  Mark is not supported. ***/
483     public boolean markSupported()
484     {
485         return false;
486     }
487 
488     public int available() throws IOException
489     {
490         // Critical section because run() may change __bytesAvailable
491         synchronized (__queue)
492         {
493             return __bytesAvailable;
494         }
495     }
496 
497 
498     // Cannot be synchronized.  Will cause deadlock if run() is blocked
499     // in read because BufferedInputStream read() is synchronized.
500     public void close() throws IOException
501     {
502         // Completely disregard the fact thread may still be running.
503         // We can't afford to block on this close by waiting for
504         // thread to terminate because few if any JVM's will actually
505         // interrupt a system read() from the interrupt() method.
506         super.close();
507 
508         synchronized (__queue)
509         {
510             __hasReachedEOF = true;
511             __isClosed      = true;
512 
513             if (__thread != null && __thread.isAlive())
514             {
515                 __thread.interrupt();
516             }
517 
518             __queue.notifyAll();
519         }
520 
521         __threaded = false;
522     }
523 
524     public void run()
525     {
526         int ch;
527 
528         try
529         {
530 _outerLoop:
531             while (!__isClosed)
532             {
533                 try
534                 {
535                     if ((ch = __read()) < 0)
536                         break;
537                 }
538                 catch (InterruptedIOException e)
539                 {
540                     synchronized (__queue)
541                     {
542                         __ioException = e;
543                         __queue.notifyAll();
544                         try
545                         {
546                             __queue.wait(100);
547                         }
548                         catch (InterruptedException interrupted)
549                         {
550                             if (__isClosed)
551                                 break _outerLoop;
552                         }
553                         continue;
554                     }
555                 } catch(RuntimeException re) {
556                     // We treat any runtime exceptions as though the
557                     // stream has been closed.  We close the
558                     // underlying stream just to be sure.
559                     super.close();
560                     // Breaking the loop has the effect of setting
561                     // the state to closed at the end of the method.
562                     break _outerLoop;
563                 }
564 
565                 try
566                 {
567                     __processChar(ch);
568                 }
569                 catch (InterruptedException e)
570                 {
571                     if (__isClosed)
572                         break _outerLoop;
573                 }
574             }
575         }
576         catch (IOException ioe)
577         {
578             synchronized (__queue)
579             {
580                 __ioException = ioe;
581             }
582         }
583 
584         synchronized (__queue)
585         {
586             __isClosed      = true; // Possibly redundant
587             __hasReachedEOF = true;
588             __queue.notify();
589         }
590 
591         __threaded = false;
592     }
593 }
594 
595 /* Emacs configuration
596  * Local variables:        **
597  * mode:             java  **
598  * c-basic-offset:   4     **
599  * indent-tabs-mode: nil   **
600  * End:                    **
601  */