001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.tools.makeldif;
028    
029    
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.InputStream;
033    import java.io.IOException;
034    import java.nio.ByteBuffer;
035    import java.util.concurrent.LinkedBlockingQueue;
036    import java.util.concurrent.TimeUnit;
037    
038    import org.opends.server.types.Entry;
039    import org.opends.server.types.LDIFExportConfig;
040    import org.opends.server.util.LDIFException;
041    import org.opends.server.util.LDIFWriter;
042    
043    
044    
045    /**
046     * This class creates an input stream that can be used to read entries generated
047     * by MakeLDIF as if they were being read from another source like a file.  It
048     * has a fixed-size queue that dictates how many entries may be held in memory
049     * at any given time.
050     */
051    public class MakeLDIFInputStream
052           extends InputStream
053           implements EntryWriter
054    {
055      // Indicates whether all of the entries have been generated.
056      private boolean allGenerated;
057    
058      // Indicates whether this input stream has been closed.
059      private boolean closed;
060    
061      // The byte array output stream that will be used to convert entries to byte
062      // arrays with their LDIF representations.
063      private ByteArrayOutputStream entryOutputStream;
064    
065      // The byte array that will hold the LDIF representation of the next entry to
066      // be read.
067      private ByteBuffer entryBytes;
068    
069      // The IOException that should be thrown the next time a read is requested.
070      private IOException ioException;
071    
072      // The LDIF writer that will be used to write the entries to LDIF.
073      private LDIFWriter ldifWriter;
074    
075      // The queue used to hold generated entries until they can be read.
076      private LinkedBlockingQueue<Entry> entryQueue;
077    
078      // The background thread being used to actually generate the entries.
079      private MakeLDIFInputStreamThread generatorThread;
080    
081      // The template file to use to generate the entries.
082      private TemplateFile templateFile;
083    
084    
085    
086      /**
087       * Creates a new MakeLDIF input stream that will generate entries based on the
088       * provided template file.
089       *
090       * @param  templateFile  The template file to use to generate the entries.
091       */
092      public MakeLDIFInputStream(TemplateFile templateFile)
093      {
094        this.templateFile = templateFile;
095    
096        allGenerated = false;
097        closed       = false;
098        entryQueue   = new LinkedBlockingQueue<Entry>(10);
099        ioException  = null;
100        entryBytes   = null;
101    
102        entryOutputStream = new ByteArrayOutputStream(8192);
103        LDIFExportConfig exportConfig = new LDIFExportConfig(entryOutputStream);
104    
105        try
106        {
107          ldifWriter = new LDIFWriter(exportConfig);
108        }
109        catch (IOException ioe)
110        {
111          // This should never happen.
112          ioException = ioe;
113        }
114    
115        generatorThread = new MakeLDIFInputStreamThread(this, templateFile);
116        generatorThread.start();
117      }
118    
119    
120    
121      /**
122       * Closes this input stream so that no more data may be read from it.
123       */
124      public void close()
125      {
126        closed      = true;
127        ioException = null;
128      }
129    
130    
131    
132      /**
133       * Reads a single byte of data from this input stream.
134       *
135       * @return  The byte read from the input stream, or -1 if the end of the
136       *          stream has been reached.
137       *
138       * @throws  IOException  If a problem has occurred while generating data for
139       *                       use by this input stream.
140       */
141      public int read()
142             throws IOException
143      {
144        if (closed)
145        {
146          return -1;
147        }
148        else if (ioException != null)
149        {
150          throw ioException;
151        }
152    
153        if ((entryBytes == null) || (! entryBytes.hasRemaining()))
154        {
155          if (! getNextEntry())
156          {
157            closed = true;
158            return -1;
159          }
160        }
161    
162        return (0xFF & entryBytes.get());
163      }
164    
165    
166    
167      /**
168       * Reads data from this input stream.
169       *
170       * @param  b    The array into which the data should be read.
171       * @param  off  The position in the array at which point the data read may be
172       *              placed.
173       * @param  len  The maximum number of bytes that may be read into the
174       *              provided array.
175       *
176       * @return  The number of bytes read from the input stream into the provided
177       *          array, or -1 if the end of the stream has been reached.
178       *
179       * @throws  IOException  If a problem has occurred while generating data for
180       *                       use by this input stream.
181       */
182      public int read(byte[] b, int off, int len)
183             throws IOException
184      {
185        if (closed)
186        {
187          return -1;
188        }
189        else if (ioException != null)
190        {
191          throw ioException;
192        }
193    
194        if ((entryBytes == null) || (! entryBytes.hasRemaining()))
195        {
196          if (! getNextEntry())
197          {
198            closed = true;
199            return -1;
200          }
201        }
202    
203        int bytesRead = Math.min(len, entryBytes.remaining());
204        entryBytes.get(b, off, bytesRead);
205        return bytesRead;
206      }
207    
208    
209    
210      /**
211       * {@inheritDoc}
212       */
213      public boolean writeEntry(Entry entry)
214             throws IOException, MakeLDIFException
215      {
216        while (! closed)
217        {
218          try
219          {
220            if (entryQueue.offer(entry, 500, TimeUnit.MILLISECONDS))
221            {
222              return true;
223            }
224          } catch (InterruptedException ie) {}
225        }
226    
227        return false;
228      }
229    
230    
231    
232      /**
233       * {@inheritDoc}
234       */
235      public void closeEntryWriter()
236      {
237        allGenerated = true;
238      }
239    
240    
241    
242      /**
243       * Sets the I/O exception that should be thrown on any subsequent calls to
244       * <CODE>available</CODE> or <CODE>read</CODE>.
245       *
246       * @param  ioException   The I/O exception that should be thrown.
247       */
248      void setIOException(IOException ioException)
249      {
250        this.ioException = ioException;
251      }
252    
253    
254    
255      /**
256       * Retrieves the next entry and puts it in the entry byte buffer.
257       *
258       * @return  <CODE>true</CODE> if the next entry is available, or
259       *          <CODE>false</CODE> if there are no more entries or if the input
260       *          stream has been closed.
261       */
262      private boolean getNextEntry()
263      {
264        Entry entry = entryQueue.poll();
265        while (entry == null)
266        {
267          if (closed)
268          {
269            return false;
270          }
271          else if (allGenerated)
272          {
273            entry = entryQueue.poll();
274            if (entry == null)
275            {
276              return false;
277            }
278          }
279          else
280          {
281            try
282            {
283              entry = entryQueue.poll(500, TimeUnit.MILLISECONDS);
284            } catch (InterruptedException ie) {}
285          }
286        }
287    
288        try
289        {
290          entryOutputStream.reset();
291          ldifWriter.writeEntry(entry);
292          ldifWriter.flush();
293          entryBytes = ByteBuffer.wrap(entryOutputStream.toByteArray());
294        }
295        catch (LDIFException le)
296        {
297          // This should never happen.
298          ioException = new IOException(le.getMessage());
299          return false;
300        }
301        catch (IOException ioe)
302        {
303          // Neither should this.
304          ioException = ioe;
305          return false;
306        }
307    
308        return true;
309      }
310    }
311