001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.loggers;
028    
029    
030    
031    import java.util.ArrayList;
032    import java.util.concurrent.LinkedBlockingQueue;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    
036    import org.opends.messages.Message;
037    import org.opends.server.api.DirectoryThread;
038    import org.opends.server.api.ServerShutdownListener;
039    import org.opends.server.core.DirectoryServer;
040    
041    
042    
043    /**
044     * A Text Writer which writes log records asynchronously to
045     * character-based stream.
046     */
047    public class AsyncronousTextWriter
048        implements ServerShutdownListener, TextWriter
049    {
050      /**
051       * The wrapped Text Writer.
052       */
053      private final TextWriter writer;
054    
055      /** Queue to store unpublished records. */
056      private final LinkedBlockingQueue<String> queue;
057    
058      /** The capacity for the queue. */
059      private final int capacity;
060    
061      private String name;
062      private AtomicBoolean stopRequested;
063      private WriterThread writerThread;
064    
065      private boolean autoFlush;
066    
067      /**
068       * Construct a new AsyncronousTextWriter wrapper.
069       *
070       * @param name      the name of the thread.
071       * @param capacity      the size of the queue before it gets flushed.
072       * @param autoFlush indicates if the underlying writer should be flushed
073       *                  after the queue is flushed.
074       * @param writer    a character stream used for output.
075       */
076      public AsyncronousTextWriter(String name, int capacity, boolean autoFlush,
077                                   TextWriter writer)
078      {
079        this.name = name;
080        this.autoFlush = autoFlush;
081        this.writer = writer;
082    
083        this.queue = new LinkedBlockingQueue<String>(capacity);
084        this.capacity = capacity;
085        this.writerThread = null;
086        this.stopRequested = new AtomicBoolean(false);
087    
088        writerThread = new WriterThread();
089        writerThread.start();
090    
091        DirectoryServer.registerShutdownListener(this);
092      }
093    
094      /**
095       * The publisher thread is responsible for emptying the queue of log records
096       * waiting to published.
097       */
098      private class WriterThread extends DirectoryThread
099      {
100        public WriterThread()
101        {
102          super(name);
103        }
104        /**
105         * the run method of the writerThread. Run until queue is empty
106         * AND we've been asked to terminate
107         */
108        public void run()
109        {
110          ArrayList<String> drainList = new ArrayList<String>(capacity);
111    
112          String message = null;
113          while (!stopRequested.get() || !queue.isEmpty()) {
114            try
115            {
116              queue.drainTo(drainList, capacity);
117              if (drainList.isEmpty())
118              {
119                message = queue.poll(10, TimeUnit.SECONDS);
120                if(message != null)
121                {
122                  do
123                  {
124                    writer.writeRecord(message);
125                    message = queue.poll();
126                  }
127                  while(message != null);
128    
129                  if(autoFlush)
130                  {
131                    flush();
132                  }
133                }
134              }
135              else
136              {
137                for (String record : drainList)
138                {
139                  writer.writeRecord(record);
140                }
141                drainList.clear();
142    
143                if (autoFlush)
144                {
145                  flush();
146                }
147              }
148            }
149            catch (InterruptedException ex) {
150              // Ignore. We'll rerun the loop
151              // and presumably fall out.
152            }
153          }
154        }
155      }
156    
157      /**
158       * Write the log record asyncronously.
159       *
160       * @param record the log record to write.
161       */
162      public void writeRecord(String record)
163      {
164        // No writer?  Off to the bit bucket.
165        if (writer != null) {
166          while (!stopRequested.get())
167          {
168            // Put request on queue for writer
169            try
170            {
171              queue.put(record);
172              break;
173            }
174            catch(InterruptedException e)
175            {
176              // We expect this to happen. Just ignore it and hopefully
177              // drop out in the next try.
178            }
179          }
180        }
181      }
182    
183      /**
184       * {@inheritDoc}
185       */
186      public void flush()
187      {
188        writer.flush();
189      }
190    
191      /**
192       * {@inheritDoc}
193       */
194      public long getBytesWritten()
195      {
196        return writer.getBytesWritten();
197      }
198    
199      /**
200       * Retrieves the wrapped writer.
201       *
202       * @return The wrapped writer used by this asyncronous writer.
203       */
204      public TextWriter getWrappedWriter()
205      {
206        return writer;
207      }
208    
209      /**
210       * {@inheritDoc}
211       */
212      public String getShutdownListenerName()
213      {
214        return "AsyncronousTextWriter Thread " + name;
215      }
216    
217      /**
218       * {@inheritDoc}
219       */
220      public void processServerShutdown(Message reason)
221      {
222        // Don't shutdown the wrapped writer on server shutdown as it
223        // might get more write requests before the log publishers are
224        // manually shutdown just before the server process exists.
225        shutdown(false);
226      }
227    
228      /**
229       * {@inheritDoc}
230       */
231      public void shutdown()
232      {
233        shutdown(true);
234      }
235    
236      /**
237       * Releases any resources held by the writer.
238       *
239       * @param shutdownWrapped If the wrapped writer should be closed as well.
240       */
241      public void shutdown(boolean shutdownWrapped)
242      {
243        stopRequested.set(true);
244    
245        // Wait for publisher thread to terminate
246        while (writerThread != null && writerThread.isAlive()) {
247          try {
248            // Interrupt the thread if its blocking
249            writerThread.interrupt();
250            writerThread.join();
251          }
252          catch (InterruptedException ex) {
253            // Ignore; we gotta wait..
254          }
255        }
256    
257        // The writer writerThread SHOULD have drained the queue.
258        // If not, handle outstanding requests ourselves,
259        // and push them to the writer.
260        while (!queue.isEmpty()) {
261          String message = queue.poll();
262          writer.writeRecord(message);
263        }
264    
265        // Shutdown the wrapped writer.
266        if (shutdownWrapped && writer != null) writer.shutdown();
267    
268        DirectoryServer.deregisterShutdownListener(this);
269      }
270    
271      /**
272       * Set the auto flush setting for this writer.
273       *
274       * @param autoFlush If the writer should flush the buffer after every line.
275       */
276      public void setAutoFlush(boolean autoFlush)
277      {
278        this.autoFlush = autoFlush;
279      }
280    }