001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 import java.util.concurrent.BlockingQueue; 029 import java.util.concurrent.TimeUnit; 030 import org.opends.messages.Message; 031 032 import static org.opends.server.loggers.ErrorLogger.logError; 033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 034 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 035 import static org.opends.messages.ReplicationMessages.*; 036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 037 038 import org.opends.server.api.DirectoryThread; 039 import org.opends.server.loggers.debug.DebugTracer; 040 import org.opends.server.replication.protocol.UpdateMessage; 041 042 /** 043 * Thread that is used to get message from the replication servers (stored 044 * in the updates queue) and replay them in the current server. A configurable 045 * number of this thread is created for the whole MultimasterReplication object 046 * (i.e: these threads are shared accross the ReplicationDomain objects for 047 * replaying the updates they receive) 048 */ 049 public class ReplayThread extends DirectoryThread 050 { 051 /** 052 * The tracer object for the debug logger. 053 */ 054 private static final DebugTracer TRACER = getTracer(); 055 056 private BlockingQueue<UpdateToReplay> updateToReplayQueue = null; 057 private boolean shutdown = false; 058 private boolean done = false; 059 private static int count = 0; 060 061 /** 062 * Constructor for the ReplayThread. 063 * 064 * @param updateToReplayQueue The queue of update messages we have to replay 065 */ 066 public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) 067 { 068 super("Replication Replay thread " + count++); 069 this.updateToReplayQueue = updateToReplayQueue; 070 } 071 072 /** 073 * Shutdown this replay thread. 074 */ 075 public void shutdown() 076 { 077 shutdown = true; 078 } 079 080 /** 081 * Run method for this class. 082 */ 083 @Override 084 public void run() 085 { 086 087 if (debugEnabled()) 088 { 089 TRACER.debugInfo("Replication Replay thread starting."); 090 } 091 092 UpdateToReplay updateToreplay = null; 093 094 while (!shutdown) 095 { 096 try 097 { 098 // Loop getting an updateToReplayQueue from the update message queue and 099 // replaying matching changes 100 while ( (!shutdown) && 101 ((updateToreplay = updateToReplayQueue.poll(1L, 102 TimeUnit.SECONDS)) != null)) 103 { 104 // Find replication domain for that update message 105 UpdateMessage updateMsg = updateToreplay.getUpdateMessage(); 106 ReplicationDomain domain = updateToreplay.getReplicationDomain(); 107 domain.replay(updateMsg); 108 } 109 } catch (Exception e) 110 { 111 /* 112 * catch all exceptions happening so that the thread never dies even 113 * in case of problems. 114 */ 115 Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get( 116 stackTraceToSingleLineString(e)); 117 logError(message); 118 } 119 } 120 done = true; 121 if (debugEnabled()) 122 { 123 TRACER.debugInfo("Replication Replay thread stopping."); 124 } 125 } 126 127 /** 128 * Wait for the completion of this thread. 129 */ 130 public void waitForShutdown() 131 { 132 try 133 { 134 while ((done == false) && (this.isAlive())) 135 { 136 Thread.sleep(50); 137 } 138 } catch (InterruptedException e) 139 { 140 // exit the loop if this thread is interrupted. 141 } 142 } 143 }