View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.tika.utils;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.ByteArrayInputStream;
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.FileOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.OutputStream;
28  
29  
30  /**
31   * Wraps an input stream, reading it only once, but making it available
32   * for rereading an arbitrary number of times.  The stream's bytes are
33   * stored in memory up to a user specified maximum, and then stored in a
34   * temporary file which is deleted when this class' close() method is called.
35   */
36  public class RereadableInputStream extends InputStream {
37  
38  
39      /**
40       * Input stream originally passed to the constructor.
41       */
42      private InputStream originalInputStream;
43  
44      /**
45       * The inputStream currently being used by this object to read contents;
46       * may be the original stream passed in, or a stream that reads
47       * the saved copy.
48       */
49      private InputStream inputStream;
50  
51      /**
52       * Maximum number of bytes that can be stored in memory before
53       * storage will be moved to a temporary file.
54       */
55      private int maxBytesInMemory;
56  
57      /**
58       * True when the original stream is being read; set to false when
59       * reading is set to use the stored data instead.
60       */
61      private boolean firstPass = true;
62  
63      /**
64       * Whether or not the stream's contents are being stored in a file
65       * as opposed to memory.
66       */
67      private boolean bufferIsInFile;
68  
69      /**
70       * The buffer used to store the stream's content; this storage is moved
71       * to a file when the stored data's size exceeds maxBytesInMemory.
72       */
73      private byte[] byteBuffer;
74  
75      /**
76       * The total number of bytes read from the original stream at the time.
77       */
78      private int size;
79  
80      /**
81       * File used to store the stream's contents; is null until the stored
82       * content's size exceeds maxBytesInMemory.
83       */
84      private File storeFile;
85  
86      /**
87       * OutputStream used to save the content of the input stream in a
88       * temporary file.
89       */
90      private OutputStream storeOutputStream;
91  
92  
93      /**
94       * Specifies whether or not to read to the end of stream on first
95       * rewind.  This defaults to true.  If this is set to false,
96       * then the first time when rewind() is called, only those bytes
97       * already read from the original stream will be available from then on.
98       */
99      private boolean readToEndOfStreamOnFirstRewind = true;
100 
101 
102     /**
103      * Specifies whether or not to close the original input stream
104      * when close() is called.  Defaults to true.
105      */
106     private boolean closeOriginalStreamOnClose = true;
107 
108 
109     // TODO: At some point it would be better to replace the current approach
110     // (specifying the above) with more automated behavior.  The stream could
111     // keep the original stream open until EOF was reached.  For example, if:
112     //
113     // the original stream is 10 bytes, and
114     // only 2 bytes are read on the first pass
115     // rewind() is called
116     // 5 bytes are read
117     //
118     // In this case, this instance gets the first 2 from its store,
119     // and the next 3 from the original stream, saving those additional 3
120     // bytes in the store.  In this way, only the maximum number of bytes
121     // ever needed must be saved in the store; unused bytes are never read.
122     // The original stream is closed when EOF is reached, or when close()
123     // is called, whichever comes first.  Using this approach eliminates
124     // the need to specify the flag (though makes implementation more complex).
125     
126 
127 
128     /**
129      * Creates a rereadable input stream.
130      *
131      * @param inputStream stream containing the source of data
132      * @param maxBytesInMemory maximum number of bytes to use to store
133      *     the stream's contents in memory before switching to disk; note that
134      *     the instance will preallocate a byte array whose size is
135      *     maxBytesInMemory.  This byte array will be made available for
136      *     garbage collection (i.e. its reference set to null) when the
137      *     content size exceeds the array's size, when close() is called, or
138      *     when there are no more references to the instance.
139      * @param readToEndOfStreamOnFirstRewind Specifies whether or not to
140      *     read to the end of stream on first rewind.  If this is set to false,
141      *     then when rewind() is first called, only those bytes already read
142      *     from the original stream will be available from then on.
143      */
144     public RereadableInputStream(InputStream inputStream, int maxBytesInMemory,
145             boolean readToEndOfStreamOnFirstRewind,
146             boolean closeOriginalStreamOnClose) {
147         this.inputStream = inputStream;
148         this.originalInputStream = inputStream;
149         this.maxBytesInMemory = maxBytesInMemory;
150         byteBuffer = new byte[maxBytesInMemory];
151         this.readToEndOfStreamOnFirstRewind = readToEndOfStreamOnFirstRewind;
152         this.closeOriginalStreamOnClose = closeOriginalStreamOnClose;
153     }
154 
155     /**
156      * Reads a byte from the stream, saving it in the store if it is being
157      * read from the original stream.  Implements the abstract
158      * InputStream.read().
159      *
160      * @return the read byte, or -1 on end of stream.
161      * @throws IOException
162      */
163     public int read() throws IOException {
164         int inputByte = inputStream.read();
165         if (firstPass) {
166             saveByte(inputByte);
167         }
168         return inputByte;
169     }
170 
171     /**
172      * "Rewinds" the stream to the beginning for rereading.
173      * @throws IOException
174      */
175     public void rewind() throws IOException {
176 
177         if (firstPass && readToEndOfStreamOnFirstRewind) {
178             // Force read to end of stream to fill store with any
179             // remaining bytes from original stream.
180             while(read() != -1) {
181                 // empty loop
182             }
183         }
184 
185         closeStream();
186         if (storeOutputStream != null) {
187             storeOutputStream.close();
188             storeOutputStream = null;
189         }
190         firstPass = false;
191         boolean newStreamIsInMemory = (size < maxBytesInMemory);
192         inputStream = newStreamIsInMemory
193                 ? new ByteArrayInputStream(byteBuffer)
194                 : new BufferedInputStream(new FileInputStream(storeFile));
195     }
196 
197     /**
198      * Closes the input stream currently used for reading (may either be
199      * the original stream or a memory or file stream after the first pass).
200      *
201      * @throws IOException
202      */
203     // Does anyone need/want for this to be public?
204     private void closeStream() throws IOException {
205         if (inputStream != null
206                 &&
207                 (inputStream != originalInputStream
208                         || closeOriginalStreamOnClose)) {
209             inputStream.close();
210             inputStream = null;
211         }
212     }
213 
214     /**
215      * Closes the input stream and removes the temporary file if one was
216      * created.
217      * 
218      * @throws IOException
219      */
220     public void close() throws IOException {
221         closeStream();
222         super.close();
223         if (storeFile != null) {
224             storeFile.delete();
225         }
226     }
227 
228     /**
229      * Returns the number of bytes read from the original stream.
230      *
231      * @return number of bytes read
232      */
233     public int getSize() {
234         return size;
235     }
236 
237     /**
238      * Saves the byte read from the original stream to the store.
239      *
240      * @param inputByte byte read from original stream
241      * @throws IOException
242      */
243     private void saveByte(int inputByte) throws IOException {
244 
245         if (!bufferIsInFile) {
246             boolean switchToFile = (size == (maxBytesInMemory));
247             if (switchToFile) {
248                 storeFile = File.createTempFile("TIKA_streamstore_", ".tmp");
249                 bufferIsInFile = true;
250                 storeOutputStream = new BufferedOutputStream(
251                         new FileOutputStream(storeFile));
252                 storeOutputStream.write(byteBuffer, 0, size);
253                 storeOutputStream.write(inputByte);
254                 byteBuffer = null; // release for garbage collection
255             } else {
256                 byteBuffer[size] = (byte) inputByte;
257             }
258         } else {
259             storeOutputStream.write(inputByte);
260         }
261         ++size;
262     }
263 }