001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.Message; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 032 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 033 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 034 import static org.opends.messages.ReplicationMessages.*; 035 036 import java.io.IOException; 037 import java.net.SocketException; 038 import java.util.NoSuchElementException; 039 040 import org.opends.server.api.DirectoryThread; 041 import org.opends.server.loggers.debug.DebugTracer; 042 import org.opends.server.replication.protocol.ProtocolSession; 043 import org.opends.server.replication.protocol.UpdateMessage; 044 045 046 /** 047 * This class defines a server writer, which is used to send changes to a 048 * directory server. 049 */ 050 public class ServerWriter extends DirectoryThread 051 { 052 /** 053 * The tracer object for the debug logger. 054 */ 055 private static final DebugTracer TRACER = getTracer(); 056 057 private ProtocolSession session; 058 private ServerHandler handler; 059 private ReplicationServerDomain replicationServerDomain; 060 private short serverId; 061 062 /** 063 * Create a ServerWriter. 064 * Then ServerWriter then waits on the ServerHandler for new updates 065 * and forward them to the server 066 * 067 * @param session the ProtocolSession that will be used to send updates. 068 * @param serverId the Identifier of the server. 069 * @param handler handler for which the ServerWriter is created. 070 * @param replicationServerDomain The ReplicationServerDomain of this 071 * ServerWriter. 072 */ 073 public ServerWriter(ProtocolSession session, short serverId, 074 ServerHandler handler, 075 ReplicationServerDomain replicationServerDomain) 076 { 077 super(handler.toString() + " writer"); 078 079 this.serverId = serverId; 080 this.session = session; 081 this.handler = handler; 082 this.replicationServerDomain = replicationServerDomain; 083 } 084 085 /** 086 * Run method for the ServerWriter. 087 * Loops waiting for changes from the ReplicationServerDomain and forward them 088 * to the other servers 089 */ 090 public void run() 091 { 092 if (debugEnabled()) 093 { 094 if (handler.isReplicationServer()) 095 { 096 TRACER.debugInfo("Replication server writer starting " + serverId); 097 } 098 else 099 { 100 TRACER.debugInfo("LDAP server writer starting " + serverId); 101 } 102 } 103 try 104 { 105 while (true) 106 { 107 UpdateMessage update = replicationServerDomain.take(this.handler); 108 if (update == null) 109 return; /* this connection is closing */ 110 111 // Ignore update to be sent to a replica with a bad generation ID 112 long referenceGenerationId = replicationServerDomain.getGenerationId(); 113 if ((referenceGenerationId != handler.getGenerationId()) 114 || (referenceGenerationId == -1) 115 || (handler.getGenerationId() == -1)) 116 { 117 logError(ERR_IGNORING_UPDATE_TO.get( 118 update.getDn(), 119 this.handler.getMonitorInstanceName())); 120 continue; 121 } 122 123 /* 124 if (debugEnabled()) 125 { 126 TRACER.debugInfo( 127 "In " + replicationServerDomain.getReplicationServer(). 128 getMonitorInstanceName() + 129 ", writer to " + this.handler.getMonitorInstanceName() + 130 " publishes msg=[" + update.toString() + "]"+ 131 " refgenId=" + referenceGenerationId + 132 " server=" + handler.getServerId() + 133 " generationId=" + handler.getGenerationId()); 134 } 135 */ 136 session.publish(update); 137 } 138 } 139 catch (NoSuchElementException e) 140 { 141 /* 142 * The remote host has disconnected and this particular Tree is going to 143 * be removed, just ignore the exception and let the thread die as well 144 */ 145 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); 146 logError(message); 147 } 148 catch (SocketException e) 149 { 150 /* 151 * The remote host has disconnected and this particular Tree is going to 152 * be removed, just ignore the exception and let the thread die as well 153 */ 154 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); 155 logError(message); 156 } 157 catch (Exception e) 158 { 159 /* 160 * An unexpected error happened. 161 * Log an error and close the connection. 162 */ 163 Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() + 164 " " + stackTraceToSingleLineString(e)); 165 logError(message); 166 } 167 finally { 168 try 169 { 170 session.close(); 171 } catch (IOException e) 172 { 173 // Can't do much more : ignore 174 } 175 replicationServerDomain.stopServer(handler); 176 177 if (debugEnabled()) 178 { 179 if (handler.isReplicationServer()) 180 { 181 TRACER.debugInfo("Replication server writer stopping " + serverId); 182 } 183 else 184 { 185 TRACER.debugInfo("LDAP server writer stopping " + serverId); 186 } 187 } 188 } 189 } 190 }