001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.extensions; 028 029 030 031 import java.util.ArrayList; 032 import java.util.Iterator; 033 import java.util.LinkedList; 034 import java.util.List; 035 import java.util.concurrent.LinkedBlockingQueue; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.atomic.AtomicLong; 038 039 import org.opends.messages.Message; 040 import org.opends.server.admin.server.ConfigurationChangeListener; 041 import org.opends.server.admin.std.server.TraditionalWorkQueueCfg; 042 import org.opends.server.api.WorkQueue; 043 import org.opends.server.config.ConfigException; 044 import org.opends.server.core.DirectoryServer; 045 import org.opends.server.loggers.debug.DebugTracer; 046 import org.opends.server.monitors.TraditionalWorkQueueMonitor; 047 import org.opends.server.types.AbstractOperation; 048 import org.opends.server.types.CancelRequest; 049 import org.opends.server.types.ConfigChangeResult; 050 import org.opends.server.types.DebugLogLevel; 051 import org.opends.server.types.DirectoryException; 052 import org.opends.server.types.DN; 053 import org.opends.server.types.InitializationException; 054 import org.opends.server.types.Operation; 055 import org.opends.server.types.ResultCode; 056 057 import static org.opends.messages.ConfigMessages.*; 058 import static org.opends.messages.CoreMessages.*; 059 import static org.opends.server.loggers.ErrorLogger.*; 060 import static org.opends.server.loggers.debug.DebugLogger.*; 061 062 063 064 /** 065 * This class defines a data structure for storing and interacting with the 066 * Directory Server work queue. 067 */ 068 public class TraditionalWorkQueue 069 extends WorkQueue<TraditionalWorkQueueCfg> 070 implements ConfigurationChangeListener<TraditionalWorkQueueCfg> 071 { 072 /** 073 * The tracer object for the debug logger. 074 */ 075 private static final DebugTracer TRACER = getTracer(); 076 077 078 079 080 /** 081 * The maximum number of times to retry getting the next operation from the 082 * queue if an unexpected failure occurs. 083 */ 084 private static final int MAX_RETRY_COUNT = 5; 085 086 087 088 // The set of worker threads that will be used to process this work queue. 089 private ArrayList<TraditionalWorkerThread> workerThreads; 090 091 // The number of operations that have been submitted to the work queue for 092 // processing. 093 private AtomicLong opsSubmitted; 094 095 // The number of times that an attempt to submit a new request has been 096 // rejected because the work queue is already at its maximum capacity. 097 private AtomicLong queueFullRejects; 098 099 // Indicates whether one or more of the worker threads needs to be killed at 100 // the next convenient opportunity. 101 private boolean killThreads; 102 103 // Indicates whether the Directory Server is shutting down. 104 private boolean shutdownRequested; 105 106 // The DN of the configuration entry with information to use to configure the 107 // work queue. 108 private DN configEntryDN; 109 110 // The thread number used for the last worker thread that was created. 111 private int lastThreadNumber; 112 113 // The maximum number of pending requests that this work queue will allow 114 // before it will start rejecting them. 115 private int maxCapacity; 116 117 // The number of worker threads that should be active (or will be shortly if 118 // a configuration change has not been completely applied). 119 private int numWorkerThreads; 120 121 // The queue that will be used to actually hold the pending operations. 122 private LinkedBlockingQueue<AbstractOperation> opQueue; 123 124 // The lock used to provide threadsafe access for the queue. 125 private Object queueLock; 126 127 128 129 /** 130 * Creates a new instance of this work queue. All initialization should be 131 * performed in the <CODE>initializeWorkQueue</CODE> method. 132 */ 133 public TraditionalWorkQueue() 134 { 135 // No implementation should be performed here. 136 } 137 138 139 140 /** 141 * {@inheritDoc} 142 */ 143 @Override() 144 public void initializeWorkQueue(TraditionalWorkQueueCfg configuration) 145 throws ConfigException, InitializationException 146 { 147 shutdownRequested = false; 148 killThreads = false; 149 opsSubmitted = new AtomicLong(0); 150 queueFullRejects = new AtomicLong(0); 151 queueLock = new Object(); 152 153 154 // Register to be notified of any configuration changes. 155 configuration.addTraditionalChangeListener(this); 156 157 158 // Get the necessary configuration from the provided entry. 159 configEntryDN = configuration.dn(); 160 numWorkerThreads = configuration.getNumWorkerThreads(); 161 maxCapacity = configuration.getMaxWorkQueueCapacity(); 162 163 164 // Create the actual work queue. 165 if (maxCapacity > 0) 166 { 167 opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity); 168 } 169 else 170 { 171 opQueue = new LinkedBlockingQueue<AbstractOperation>(); 172 } 173 174 175 // Create the set of worker threads that should be used to service the 176 // work queue. 177 workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads); 178 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads; 179 lastThreadNumber++) 180 { 181 TraditionalWorkerThread t = 182 new TraditionalWorkerThread(this, lastThreadNumber); 183 t.start(); 184 workerThreads.add(t); 185 } 186 187 188 // Create and register a monitor provider for the work queue. 189 try 190 { 191 TraditionalWorkQueueMonitor monitor = 192 new TraditionalWorkQueueMonitor(this); 193 monitor.initializeMonitorProvider(null); 194 monitor.start(); 195 DirectoryServer.registerMonitorProvider(monitor); 196 } 197 catch (Exception e) 198 { 199 if (debugEnabled()) 200 { 201 TRACER.debugCaught(DebugLogLevel.ERROR, e); 202 } 203 204 Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get( 205 String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e)); 206 logError(message); 207 } 208 } 209 210 211 212 /** 213 * {@inheritDoc} 214 */ 215 @Override() 216 public void finalizeWorkQueue(Message reason) 217 { 218 shutdownRequested = true; 219 220 221 // Send responses to any operations in the pending queue to indicate that 222 // they won't be processed because the server is shutting down. 223 CancelRequest cancelRequest = new CancelRequest(true, reason); 224 ArrayList<Operation> pendingOperations = new ArrayList<Operation>(); 225 opQueue.removeAll(pendingOperations); 226 for (Operation o : pendingOperations) 227 { 228 try 229 { 230 // The operation has no chance of responding to the cancel 231 // request so avoid waiting for a cancel response. 232 if (o.getCancelResult() == null) { 233 o.abort(cancelRequest); 234 } 235 } 236 catch (Exception e) 237 { 238 if (debugEnabled()) 239 { 240 TRACER.debugCaught(DebugLogLevel.ERROR, e); 241 } 242 243 logError(WARN_QUEUE_UNABLE_TO_CANCEL.get( 244 String.valueOf(o), String.valueOf(e))); 245 } 246 } 247 248 249 // Notify all the worker threads of the shutdown. 250 for (TraditionalWorkerThread t : workerThreads) 251 { 252 try 253 { 254 t.shutDown(); 255 } 256 catch (Exception e) 257 { 258 if (debugEnabled()) 259 { 260 TRACER.debugCaught(DebugLogLevel.ERROR, e); 261 } 262 263 logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get( 264 t.getName(), String.valueOf(e))); 265 } 266 } 267 } 268 269 270 271 /** 272 * Indicates whether this work queue has received a request to shut down. 273 * 274 * @return <CODE>true</CODE> if the work queue has recieved a request to shut 275 * down, or <CODE>false</CODE> if not. 276 */ 277 public boolean shutdownRequested() 278 { 279 return shutdownRequested; 280 } 281 282 283 284 /** 285 * Submits an operation to be processed by one of the worker threads 286 * associated with this work queue. 287 * 288 * @param operation The operation to be processed. 289 * 290 * @throws DirectoryException If the provided operation is not accepted for 291 * some reason (e.g., if the server is shutting 292 * down or the pending operation queue is already 293 * at its maximum capacity). 294 */ 295 public void submitOperation(AbstractOperation operation) 296 throws DirectoryException 297 { 298 if (shutdownRequested) 299 { 300 Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 301 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 302 } 303 304 if (! opQueue.offer(operation)) 305 { 306 queueFullRejects.incrementAndGet(); 307 308 Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity); 309 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 310 } 311 312 opsSubmitted.incrementAndGet(); 313 } 314 315 316 317 /** 318 * Retrieves the next operation that should be processed by one of the worker 319 * threads, blocking if necessary until a new request arrives. This method 320 * should only be called by a worker thread associated with this work queue. 321 * 322 * @param workerThread The worker thread that is requesting the operation. 323 * 324 * @return The next operation that should be processed, or <CODE>null</CODE> 325 * if the server is shutting down and no more operations will be 326 * processed. 327 */ 328 public AbstractOperation nextOperation(TraditionalWorkerThread workerThread) 329 { 330 return retryNextOperation(workerThread, 0); 331 } 332 333 334 335 /** 336 * Retrieves the next operation that should be processed by one of the worker 337 * threads following a previous failure attempt. A maximum of five 338 * consecutive failures will be allowed before returning <CODE>null</CODE>, 339 * which will cause the associated thread to exit. 340 * 341 * @param workerThread The worker thread that is requesting the operation. 342 * @param numFailures The number of consecutive failures that the worker 343 * thread has experienced so far. If this gets too 344 * high, then this method will return <CODE>null</CODE> 345 * rather than retrying. 346 * 347 * @return The next operation that should be processed, or <CODE>null</CODE> 348 * if the server is shutting down and no more operations will be 349 * processed, or if there have been too many consecutive failures. 350 */ 351 private AbstractOperation retryNextOperation( 352 TraditionalWorkerThread workerThread, 353 int numFailures) 354 { 355 // See if we should kill off this thread. This could be necessary if the 356 // number of worker threads has been decreased with the server online. If 357 // so, then return null and the thread will exit. 358 if (killThreads) 359 { 360 synchronized (queueLock) 361 { 362 try 363 { 364 int currentThreads = workerThreads.size(); 365 if (currentThreads > numWorkerThreads) 366 { 367 if (workerThreads.remove(Thread.currentThread())) 368 { 369 currentThreads--; 370 } 371 372 if (currentThreads <= numWorkerThreads) 373 { 374 killThreads = false; 375 } 376 377 workerThread.setStoppedByReducedThreadNumber(); 378 return null; 379 } 380 } 381 catch (Exception e) 382 { 383 if (debugEnabled()) 384 { 385 TRACER.debugCaught(DebugLogLevel.ERROR, e); 386 } 387 } 388 } 389 } 390 391 if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT)) 392 { 393 if (numFailures > MAX_RETRY_COUNT) 394 { 395 Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get( 396 Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT); 397 logError(message); 398 } 399 400 return null; 401 } 402 403 try 404 { 405 while (true) 406 { 407 AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS); 408 if (nextOperation == null) 409 { 410 // There was no work to do in the specified length of time. See if 411 // we should shutdown, and if not then just check again. 412 if (shutdownRequested) 413 { 414 return null; 415 } 416 else if (killThreads) 417 { 418 synchronized (queueLock) 419 { 420 try 421 { 422 int currentThreads = workerThreads.size(); 423 if (currentThreads > numWorkerThreads) 424 { 425 if (workerThreads.remove(Thread.currentThread())) 426 { 427 currentThreads--; 428 } 429 430 if (currentThreads <= numWorkerThreads) 431 { 432 killThreads = false; 433 } 434 435 workerThread.setStoppedByReducedThreadNumber(); 436 return null; 437 } 438 } 439 catch (Exception e) 440 { 441 if (debugEnabled()) 442 { 443 TRACER.debugCaught(DebugLogLevel.ERROR, e); 444 } 445 } 446 } 447 } 448 } 449 else 450 { 451 return nextOperation; 452 } 453 } 454 } 455 catch (InterruptedException ie) 456 { 457 // This is somewhat expected so don't log. 458 // assert debugException(CLASS_NAME, "retryNextOperation", ie); 459 460 // If this occurs, then the worker thread must have been interrupted for 461 // some reason. This could be because the Directory Server is shutting 462 // down, in which case we should return null. 463 if (shutdownRequested) 464 { 465 return null; 466 } 467 468 // If we've gotten here, then the worker thread was interrupted for some 469 // other reason. This should not happen, and we need to log a message. 470 logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get( 471 Thread.currentThread().getName(), String.valueOf(ie))); 472 return retryNextOperation(workerThread, numFailures+1); 473 } 474 catch (Exception e) 475 { 476 if (debugEnabled()) 477 { 478 TRACER.debugCaught(DebugLogLevel.ERROR, e); 479 } 480 481 // This should not happen. The only recourse we have is to log a message 482 // and try again. 483 logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get( 484 Thread.currentThread().getName(), String.valueOf(e))); 485 return retryNextOperation(workerThread, numFailures + 1); 486 } 487 } 488 489 490 491 /** 492 * Attempts to remove the specified operation from this queue if it has not 493 * yet been picked up for processing by one of the worker threads. 494 * 495 * @param operation The operation to remove from the queue. 496 * 497 * @return <CODE>true</CODE> if the provided request was present in the queue 498 * and was removed successfully, or <CODE>false</CODE> it not. 499 */ 500 public boolean removeOperation(AbstractOperation operation) 501 { 502 return opQueue.remove(operation); 503 } 504 505 506 507 /** 508 * Retrieves the total number of operations that have been successfully 509 * submitted to this work queue for processing since server startup. This 510 * does not include operations that have been rejected for some reason like 511 * the queue already at its maximum capacity. 512 * 513 * @return The total number of operations that have been successfully 514 * submitted to this work queue since startup. 515 */ 516 public long getOpsSubmitted() 517 { 518 return opsSubmitted.longValue(); 519 } 520 521 522 523 /** 524 * Retrieves the total number of operations that have been rejected because 525 * the work queue was already at its maximum capacity. 526 * 527 * @return The total number of operations that have been rejected because the 528 * work queue was already at its maximum capacity. 529 */ 530 public long getOpsRejectedDueToQueueFull() 531 { 532 return queueFullRejects.longValue(); 533 } 534 535 536 537 /** 538 * Retrieves the number of pending operations in the queue that have not yet 539 * been picked up for processing. Note that this method is not a 540 * constant-time operation and can be relatively inefficient, so it should be 541 * used sparingly. 542 * 543 * @return The number of pending operations in the queue that have not yet 544 * been picked up for processing. 545 */ 546 public int size() 547 { 548 return opQueue.size(); 549 } 550 551 552 553 /** 554 * {@inheritDoc} 555 */ 556 public boolean isConfigurationChangeAcceptable( 557 TraditionalWorkQueueCfg configuration, 558 List<Message> unacceptableReasons) 559 { 560 // The provided configuration will always be acceptable. 561 return true; 562 } 563 564 565 566 /** 567 * {@inheritDoc} 568 */ 569 public ConfigChangeResult applyConfigurationChange( 570 TraditionalWorkQueueCfg configuration) 571 { 572 ArrayList<Message> resultMessages = new ArrayList<Message>(); 573 int newNumThreads = configuration.getNumWorkerThreads(); 574 int newMaxCapacity = configuration.getMaxWorkQueueCapacity(); 575 576 577 // Apply a change to the number of worker threads if appropriate. 578 int currentThreads = workerThreads.size(); 579 if (newNumThreads != currentThreads) 580 { 581 synchronized (queueLock) 582 { 583 try 584 { 585 int threadsToAdd = newNumThreads - currentThreads; 586 if (threadsToAdd > 0) 587 { 588 for (int i=0; i < threadsToAdd; i++) 589 { 590 TraditionalWorkerThread t = 591 new TraditionalWorkerThread(this, lastThreadNumber++); 592 workerThreads.add(t); 593 t.start(); 594 } 595 596 killThreads = false; 597 } 598 else 599 { 600 killThreads = true; 601 } 602 603 numWorkerThreads = newNumThreads; 604 } 605 catch (Exception e) 606 { 607 if (debugEnabled()) 608 { 609 TRACER.debugCaught(DebugLogLevel.ERROR, e); 610 } 611 } 612 } 613 } 614 615 616 // Apply a change to the maximum capacity if appropriate. Since we can't 617 // change capacity on the fly, then we'll have to create a new queue and 618 // transfer any remaining items into it. Any thread that is waiting on the 619 // original queue will time out after at most a few seconds and further 620 // checks will be against the new queue. 621 if (newMaxCapacity != maxCapacity) 622 { 623 synchronized (queueLock) 624 { 625 try 626 { 627 LinkedBlockingQueue<AbstractOperation> newOpQueue; 628 if (newMaxCapacity > 0) 629 { 630 newOpQueue = 631 new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity); 632 } 633 else 634 { 635 newOpQueue = new LinkedBlockingQueue<AbstractOperation>(); 636 } 637 638 LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue; 639 opQueue = newOpQueue; 640 641 LinkedList<AbstractOperation> pendingOps = 642 new LinkedList<AbstractOperation>(); 643 oldOpQueue.drainTo(pendingOps); 644 645 646 // We have to be careful when adding any existing pending operations 647 // because the new capacity could be less than what was already 648 // backlogged in the previous queue. If that happens, we may have to 649 // loop a few times to get everything in there. 650 while (! pendingOps.isEmpty()) 651 { 652 Iterator<AbstractOperation> iterator = pendingOps.iterator(); 653 while (iterator.hasNext()) 654 { 655 AbstractOperation o = iterator.next(); 656 try 657 { 658 if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS)) 659 { 660 iterator.remove(); 661 } 662 } 663 catch (InterruptedException ie) 664 { 665 if (debugEnabled()) 666 { 667 TRACER.debugCaught(DebugLogLevel.ERROR, ie); 668 } 669 } 670 } 671 } 672 673 maxCapacity = newMaxCapacity; 674 } 675 catch (Exception e) 676 { 677 if (debugEnabled()) 678 { 679 TRACER.debugCaught(DebugLogLevel.ERROR, e); 680 } 681 } 682 } 683 } 684 685 686 return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages); 687 } 688 689 690 691 /** 692 * {@inheritDoc} 693 */ 694 @Override() 695 public boolean isIdle() 696 { 697 if (opQueue.size() > 0) 698 { 699 return false; 700 } 701 702 synchronized (queueLock) 703 { 704 for (TraditionalWorkerThread t : workerThreads) 705 { 706 if (t.isActive()) 707 { 708 return false; 709 } 710 } 711 712 return true; 713 } 714 } 715 } 716