001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.loggers; 028 029 030 031 import java.util.ArrayList; 032 import java.util.concurrent.LinkedBlockingQueue; 033 import java.util.concurrent.TimeUnit; 034 import java.util.concurrent.atomic.AtomicBoolean; 035 036 import org.opends.messages.Message; 037 import org.opends.server.api.DirectoryThread; 038 import org.opends.server.api.ServerShutdownListener; 039 import org.opends.server.core.DirectoryServer; 040 041 042 043 /** 044 * A Text Writer which writes log records asynchronously to 045 * character-based stream. 046 */ 047 public class AsyncronousTextWriter 048 implements ServerShutdownListener, TextWriter 049 { 050 /** 051 * The wrapped Text Writer. 052 */ 053 private final TextWriter writer; 054 055 /** Queue to store unpublished records. */ 056 private final LinkedBlockingQueue<String> queue; 057 058 /** The capacity for the queue. */ 059 private final int capacity; 060 061 private String name; 062 private AtomicBoolean stopRequested; 063 private WriterThread writerThread; 064 065 private boolean autoFlush; 066 067 /** 068 * Construct a new AsyncronousTextWriter wrapper. 069 * 070 * @param name the name of the thread. 071 * @param capacity the size of the queue before it gets flushed. 072 * @param autoFlush indicates if the underlying writer should be flushed 073 * after the queue is flushed. 074 * @param writer a character stream used for output. 075 */ 076 public AsyncronousTextWriter(String name, int capacity, boolean autoFlush, 077 TextWriter writer) 078 { 079 this.name = name; 080 this.autoFlush = autoFlush; 081 this.writer = writer; 082 083 this.queue = new LinkedBlockingQueue<String>(capacity); 084 this.capacity = capacity; 085 this.writerThread = null; 086 this.stopRequested = new AtomicBoolean(false); 087 088 writerThread = new WriterThread(); 089 writerThread.start(); 090 091 DirectoryServer.registerShutdownListener(this); 092 } 093 094 /** 095 * The publisher thread is responsible for emptying the queue of log records 096 * waiting to published. 097 */ 098 private class WriterThread extends DirectoryThread 099 { 100 public WriterThread() 101 { 102 super(name); 103 } 104 /** 105 * the run method of the writerThread. Run until queue is empty 106 * AND we've been asked to terminate 107 */ 108 public void run() 109 { 110 ArrayList<String> drainList = new ArrayList<String>(capacity); 111 112 String message = null; 113 while (!stopRequested.get() || !queue.isEmpty()) { 114 try 115 { 116 queue.drainTo(drainList, capacity); 117 if (drainList.isEmpty()) 118 { 119 message = queue.poll(10, TimeUnit.SECONDS); 120 if(message != null) 121 { 122 do 123 { 124 writer.writeRecord(message); 125 message = queue.poll(); 126 } 127 while(message != null); 128 129 if(autoFlush) 130 { 131 flush(); 132 } 133 } 134 } 135 else 136 { 137 for (String record : drainList) 138 { 139 writer.writeRecord(record); 140 } 141 drainList.clear(); 142 143 if (autoFlush) 144 { 145 flush(); 146 } 147 } 148 } 149 catch (InterruptedException ex) { 150 // Ignore. We'll rerun the loop 151 // and presumably fall out. 152 } 153 } 154 } 155 } 156 157 /** 158 * Write the log record asyncronously. 159 * 160 * @param record the log record to write. 161 */ 162 public void writeRecord(String record) 163 { 164 // No writer? Off to the bit bucket. 165 if (writer != null) { 166 while (!stopRequested.get()) 167 { 168 // Put request on queue for writer 169 try 170 { 171 queue.put(record); 172 break; 173 } 174 catch(InterruptedException e) 175 { 176 // We expect this to happen. Just ignore it and hopefully 177 // drop out in the next try. 178 } 179 } 180 } 181 } 182 183 /** 184 * {@inheritDoc} 185 */ 186 public void flush() 187 { 188 writer.flush(); 189 } 190 191 /** 192 * {@inheritDoc} 193 */ 194 public long getBytesWritten() 195 { 196 return writer.getBytesWritten(); 197 } 198 199 /** 200 * Retrieves the wrapped writer. 201 * 202 * @return The wrapped writer used by this asyncronous writer. 203 */ 204 public TextWriter getWrappedWriter() 205 { 206 return writer; 207 } 208 209 /** 210 * {@inheritDoc} 211 */ 212 public String getShutdownListenerName() 213 { 214 return "AsyncronousTextWriter Thread " + name; 215 } 216 217 /** 218 * {@inheritDoc} 219 */ 220 public void processServerShutdown(Message reason) 221 { 222 // Don't shutdown the wrapped writer on server shutdown as it 223 // might get more write requests before the log publishers are 224 // manually shutdown just before the server process exists. 225 shutdown(false); 226 } 227 228 /** 229 * {@inheritDoc} 230 */ 231 public void shutdown() 232 { 233 shutdown(true); 234 } 235 236 /** 237 * Releases any resources held by the writer. 238 * 239 * @param shutdownWrapped If the wrapped writer should be closed as well. 240 */ 241 public void shutdown(boolean shutdownWrapped) 242 { 243 stopRequested.set(true); 244 245 // Wait for publisher thread to terminate 246 while (writerThread != null && writerThread.isAlive()) { 247 try { 248 // Interrupt the thread if its blocking 249 writerThread.interrupt(); 250 writerThread.join(); 251 } 252 catch (InterruptedException ex) { 253 // Ignore; we gotta wait.. 254 } 255 } 256 257 // The writer writerThread SHOULD have drained the queue. 258 // If not, handle outstanding requests ourselves, 259 // and push them to the writer. 260 while (!queue.isEmpty()) { 261 String message = queue.poll(); 262 writer.writeRecord(message); 263 } 264 265 // Shutdown the wrapped writer. 266 if (shutdownWrapped && writer != null) writer.shutdown(); 267 268 DirectoryServer.deregisterShutdownListener(this); 269 } 270 271 /** 272 * Set the auto flush setting for this writer. 273 * 274 * @param autoFlush If the writer should flush the buffer after every line. 275 */ 276 public void setAutoFlush(boolean autoFlush) 277 { 278 this.autoFlush = autoFlush; 279 } 280 }