001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.Message; 029 import org.opends.messages.MessageBuilder; 030 031 import static org.opends.server.loggers.debug.DebugLogger.*; 032 033 import org.opends.server.loggers.debug.DebugTracer; 034 import static org.opends.server.loggers.ErrorLogger.logError; 035 import static org.opends.messages.ReplicationMessages.*; 036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 037 038 import java.io.IOException; 039 import java.util.ArrayList; 040 import java.util.LinkedHashSet; 041 import java.util.List; 042 import java.util.Map; 043 import java.util.Set; 044 import java.util.concurrent.ConcurrentHashMap; 045 import java.util.concurrent.Semaphore; 046 import java.util.concurrent.TimeUnit; 047 import java.util.Iterator; 048 049 import org.opends.server.replication.common.ChangeNumber; 050 import org.opends.server.replication.common.ServerState; 051 import org.opends.server.replication.protocol.AckMessage; 052 import org.opends.server.replication.protocol.ErrorMessage; 053 import org.opends.server.replication.protocol.RoutableMessage; 054 import org.opends.server.replication.protocol.UpdateMessage; 055 import org.opends.server.replication.protocol.ReplServerInfoMessage; 056 import org.opends.server.replication.protocol.MonitorMessage; 057 import org.opends.server.replication.protocol.MonitorRequestMessage; 058 import org.opends.server.replication.protocol.ResetGenerationId; 059 import org.opends.server.types.DN; 060 import org.opends.server.types.DirectoryException; 061 import org.opends.server.types.ResultCode; 062 import org.opends.server.util.TimeThread; 063 import com.sleepycat.je.DatabaseException; 064 065 /** 066 * This class define an in-memory cache that will be used to store 067 * the messages that have been received from an LDAP server or 068 * from another replication server and that should be forwarded to 069 * other servers. 070 * 071 * The size of the cache is set by configuration. 072 * If the cache becomes bigger than the configured size, the older messages 073 * are removed and should they be needed again must be read from the backing 074 * file 075 * 076 * 077 * it runs a thread that is responsible for saving the messages 078 * received to the disk and for trimming them 079 * Decision to trim can be based on disk space or age of the message 080 */ 081 public class ReplicationServerDomain 082 { 083 private final Object flowControlLock = new Object(); 084 private final DN baseDn; 085 086 /* 087 * The following map contains one balanced tree for each replica ID 088 * to which we are currently publishing 089 * the first update in the balanced tree is the next change that we 090 * must push to this particular server 091 * 092 * We add new TreeSet in the HashMap when a new server register 093 * to this replication server. 094 * 095 */ 096 private final Map<Short, ServerHandler> connectedServers = 097 new ConcurrentHashMap<Short, ServerHandler>(); 098 099 /* 100 * This map contains one ServerHandler for each replication servers 101 * with which we are connected (so normally all the replication servers) 102 * the first update in the balanced tree is the next change that we 103 * must push to this particular server 104 * 105 * We add new TreeSet in the HashMap when a new replication server register 106 * to this replication server. 107 */ 108 109 private final Map<Short, ServerHandler> replicationServers = 110 new ConcurrentHashMap<Short, ServerHandler>(); 111 112 /* 113 * This map contains the List of updates received from each 114 * LDAP server 115 */ 116 private final Map<Short, DbHandler> sourceDbHandlers = 117 new ConcurrentHashMap<Short, DbHandler>(); 118 private ReplicationServer replicationServer; 119 120 /* GenerationId management */ 121 private long generationId = -1; 122 private boolean generationIdSavedStatus = false; 123 124 /** 125 * The tracer object for the debug logger. 126 */ 127 private static final DebugTracer TRACER = getTracer(); 128 129 /* Monitor data management */ 130 131 // TODO: Remote monitor data cache lifetime is 500ms/should be configurable 132 private long monitorDataLifeTime = 500; 133 134 /* Search op on monitor data is processed by a worker thread. 135 * Requests are sent to the other RS,and responses are received by the 136 * listener threads. 137 * The worker thread is awoke on this semaphore, or on timeout. 138 */ 139 Semaphore remoteMonitorResponsesSemaphore; 140 141 /** 142 * The monitor data consolidated over the topology. 143 */ 144 private MonitorData monitorData = new MonitorData(); 145 private MonitorData wrkMonitorData; 146 147 /** 148 * Creates a new ReplicationServerDomain associated to the DN baseDn. 149 * 150 * @param baseDn The baseDn associated to the ReplicationServerDomain. 151 * @param replicationServer the ReplicationServer that created this 152 * replicationServer cache. 153 */ 154 public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer) 155 { 156 this.baseDn = baseDn; 157 this.replicationServer = replicationServer; 158 } 159 160 /** 161 * Add an update that has been received to the list of 162 * updates that must be forwarded to all other servers. 163 * 164 * @param update The update that has been received. 165 * @param sourceHandler The ServerHandler for the server from which the 166 * update was received 167 * @throws IOException When an IO exception happens during the update 168 * processing. 169 */ 170 public void put(UpdateMessage update, ServerHandler sourceHandler) 171 throws IOException 172 { 173 /* 174 * TODO : In case that the source server is a LDAP server this method 175 * should check that change did get pushed to at least one 176 * other replication server before pushing it to the LDAP servers 177 */ 178 179 short id = update.getChangeNumber().getServerId(); 180 sourceHandler.updateServerState(update); 181 sourceHandler.incrementInCount(); 182 183 if (update.isAssured()) 184 { 185 int count = this.NumServers(); 186 if (count > 1) 187 { 188 if (sourceHandler.isReplicationServer()) 189 ServerHandler.addWaitingAck(update, sourceHandler.getServerId(), 190 this, count - 1); 191 else 192 sourceHandler.addWaitingAck(update, count - 1); 193 } 194 else 195 { 196 sourceHandler.sendAck(update.getChangeNumber()); 197 } 198 } 199 200 if (generationId < 0) 201 { 202 generationId = sourceHandler.getGenerationId(); 203 } 204 205 // look for the dbHandler that is responsible for the LDAP server which 206 // generated the change. 207 DbHandler dbHandler = null; 208 synchronized (sourceDbHandlers) 209 { 210 dbHandler = sourceDbHandlers.get(id); 211 if (dbHandler == null) 212 { 213 try 214 { 215 dbHandler = replicationServer.newDbHandler(id, baseDn); 216 generationIdSavedStatus = true; 217 } 218 catch (DatabaseException e) 219 { 220 /* 221 * Because of database problem we can't save any more changes 222 * from at least one LDAP server. 223 * This replicationServer therefore can't do it's job properly anymore 224 * and needs to close all its connections and shutdown itself. 225 */ 226 MessageBuilder mb = new MessageBuilder(); 227 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 228 mb.append(stackTraceToSingleLineString(e)); 229 logError(mb.toMessage()); 230 replicationServer.shutdown(); 231 return; 232 } 233 sourceDbHandlers.put(id, dbHandler); 234 } 235 } 236 237 // Publish the messages to the source handler 238 dbHandler.add(update); 239 240 241 /* 242 * Push the message to the replication servers 243 */ 244 if (!sourceHandler.isReplicationServer()) 245 { 246 for (ServerHandler handler : replicationServers.values()) 247 { 248 handler.add(update, sourceHandler); 249 } 250 } 251 252 /* 253 * Push the message to the LDAP servers 254 */ 255 for (ServerHandler handler : connectedServers.values()) 256 { 257 // don't forward the change to the server that just sent it 258 if (handler == sourceHandler) 259 { 260 continue; 261 } 262 263 handler.add(update, sourceHandler); 264 } 265 266 } 267 268 /** 269 * Wait a short while for ServerId disconnection. 270 * 271 * @param serverId the serverId to be checked. 272 */ 273 public void waitDisconnection(short serverId) 274 { 275 if (connectedServers.containsKey(serverId)) 276 { 277 // try again 278 try 279 { 280 Thread.sleep(100); 281 } catch (InterruptedException e) 282 { 283 } 284 } 285 } 286 287 /** 288 * Create initialize context necessary for finding the changes 289 * that must be sent to a given LDAP or replication server. 290 * 291 * @param handler handler for the server that must be started 292 * @throws Exception when method has failed 293 * @return A boolean indicating if the start was successfull. 294 */ 295 public boolean startServer(ServerHandler handler) throws Exception 296 { 297 /* 298 * create the balanced tree that will be used to forward changes 299 */ 300 synchronized (connectedServers) 301 { 302 ServerHandler oldHandler = connectedServers.get(handler.getServerId()); 303 304 if (connectedServers.containsKey(handler.getServerId())) 305 { 306 // looks like two LDAP servers have the same serverId 307 // log an error message and drop this connection. 308 Message message = ERR_DUPLICATE_SERVER_ID.get( 309 oldHandler.toString(), handler.toString(), handler.getServerId()); 310 logError(message); 311 return false; 312 } 313 connectedServers.put(handler.getServerId(), handler); 314 315 // It can be that the server that connects here is the 316 // first server connected for a domain. 317 // In that case, we will establish the appriopriate connections 318 // to the other repl servers for this domain and receive 319 // their ReplServerInfo messages. 320 // FIXME: Is it necessary to end this above processing BEFORE listening 321 // to incoming messages for that domain ? But the replica 322 // would raise Read Timeout for replica that connects. 323 324 // Update the remote replication servers with our list 325 // of connected LDAP servers 326 sendReplServerInfo(); 327 328 return true; 329 } 330 } 331 332 /** 333 * Stop operations with a given server. 334 * 335 * @param handler the server for which we want to stop operations 336 */ 337 public void stopServer(ServerHandler handler) 338 { 339 if (debugEnabled()) 340 TRACER.debugInfo( 341 "In RS " + this.replicationServer.getMonitorInstanceName() + 342 " for " + baseDn + " " + 343 " stopServer " + handler.getMonitorInstanceName()); 344 345 346 if (handler.isReplicationServer()) 347 { 348 if (replicationServers.containsValue(handler)) 349 { 350 replicationServers.remove(handler.getServerId()); 351 handler.stopHandler(); 352 353 // Update the remote replication servers with our list 354 // of connected LDAP servers 355 sendReplServerInfo(); 356 } 357 } 358 else 359 { 360 if (connectedServers.containsValue(handler)) 361 { 362 connectedServers.remove(handler.getServerId()); 363 handler.stopHandler(); 364 365 // Update the remote replication servers with our list 366 // of connected LDAP servers 367 sendReplServerInfo(); 368 } 369 } 370 } 371 372 /** 373 * Resets the generationId for this domain if there is no LDAP 374 * server currently connected and if the generationId has never 375 * been saved. 376 */ 377 protected void mayResetGenerationId() 378 { 379 if (debugEnabled()) 380 TRACER.debugInfo( 381 "In RS " + this.replicationServer.getMonitorInstanceName() + 382 " for " + baseDn + " " + 383 " mayResetGenerationId generationIdSavedStatus=" + 384 generationIdSavedStatus); 385 386 // If there is no more any LDAP server connected to this domain in the 387 // topology and the generationId has never been saved, then we can reset 388 // it and the next LDAP server to connect will become the new reference. 389 boolean lDAPServersConnectedInTheTopology = false; 390 if (connectedServers.isEmpty()) 391 { 392 for (ServerHandler rsh : replicationServers.values()) 393 { 394 if (generationId != rsh.getGenerationId()) 395 { 396 if (debugEnabled()) 397 TRACER.debugInfo( 398 "In RS " + this.replicationServer.getMonitorInstanceName() + 399 " for " + baseDn + " " + 400 " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() + 401 " thas different genId"); 402 } 403 else 404 { 405 if (rsh.hasRemoteLDAPServers()) 406 { 407 lDAPServersConnectedInTheTopology = true; 408 409 if (debugEnabled()) 410 TRACER.debugInfo( 411 "In RS " + this.replicationServer.getMonitorInstanceName() + 412 " for " + baseDn + " " + 413 " mayResetGenerationId RS" + rsh.getMonitorInstanceName() + 414 " has servers connected to it - will not reset generationId"); 415 } 416 } 417 } 418 } 419 else 420 { 421 lDAPServersConnectedInTheTopology = true; 422 if (debugEnabled()) 423 TRACER.debugInfo( 424 "In RS " + this.replicationServer.getMonitorInstanceName() + 425 " for " + baseDn + " " + 426 " has servers connected to it - will not reset generationId"); 427 } 428 429 if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus) 430 && (generationId != -1)) 431 { 432 setGenerationId(-1, false); 433 } 434 } 435 436 /** 437 * Create initialize context necessary for finding the changes 438 * that must be sent to a given replication server. 439 * 440 * @param handler the server ID to which we want to forward changes 441 * @throws Exception in case of errors 442 * @return A boolean indicating if the start was successfull. 443 */ 444 public boolean startReplicationServer(ServerHandler handler) throws Exception 445 { 446 /* 447 * create the balanced tree that will be used to forward changes 448 */ 449 synchronized (replicationServers) 450 { 451 ServerHandler oldHandler = replicationServers.get(handler.getServerId()); 452 if ((oldHandler != null)) 453 { 454 if (oldHandler.getServerAddressURL().equals( 455 handler.getServerAddressURL())) 456 { 457 // this is the same server, this means that our ServerStart messages 458 // have been sent at about the same time and 2 connections 459 // have been established. 460 // Silently drop this connection. 461 } 462 else 463 { 464 // looks like two replication servers have the same serverId 465 // log an error message and drop this connection. 466 Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID. 467 get(oldHandler.getServerAddressURL(), 468 handler.getServerAddressURL(), handler.getServerId()); 469 logError(message); 470 } 471 return false; 472 } 473 replicationServers.put(handler.getServerId(), handler); 474 475 // Update this server with the list of LDAP servers 476 // already connected 477 handler.sendInfo( 478 new ReplServerInfoMessage(getConnectedLDAPservers(),generationId)); 479 480 return true; 481 } 482 } 483 484 /** 485 * Get the next update that need to be sent to a given LDAP server. 486 * This call is blocking when no update is available or when dependencies 487 * do not allow to send the next available change 488 * 489 * @param handler The server handler for the target directory server. 490 * 491 * @return the update that must be forwarded 492 */ 493 public UpdateMessage take(ServerHandler handler) 494 { 495 UpdateMessage msg; 496 /* 497 * Get the balanced tree that we use to sort the changes to be 498 * sent to the replica from the cookie 499 * 500 * The next change to send is always the first one in the tree 501 * So this methods simply need to check that dependencies are OK 502 * and update this replicaId RUV 503 * 504 * TODO : dependency : 505 * before forwarding change, we should check that the dependency 506 * that is indicated in this change is OK (change already in the RUV) 507 */ 508 msg = handler.take(); 509 synchronized (flowControlLock) 510 { 511 if (handler.restartAfterSaturation(null)) 512 flowControlLock.notifyAll(); 513 } 514 return msg; 515 } 516 517 /** 518 * Return a Set of String containing the lists of Replication servers 519 * connected to this server. 520 * @return the set of connected servers 521 */ 522 public Set<String> getChangelogs() 523 { 524 LinkedHashSet<String> mySet = new LinkedHashSet<String>(); 525 526 for (ServerHandler handler : replicationServers.values()) 527 { 528 mySet.add(handler.getServerAddressURL()); 529 } 530 531 return mySet; 532 } 533 534 535 /** 536 * Return a Set containing the servers known by this replicationServer. 537 * @return a set containing the servers known by this replicationServer. 538 */ 539 public Set<Short> getServers() 540 { 541 return sourceDbHandlers.keySet(); 542 } 543 544 /** 545 * Returns as a set of String the list of LDAP servers connected to us. 546 * Each string is the serverID of a connected LDAP server. 547 * 548 * @return The set of connected LDAP servers 549 */ 550 public List<String> getConnectedLDAPservers() 551 { 552 List<String> mySet = new ArrayList<String>(0); 553 554 for (ServerHandler handler : connectedServers.values()) 555 { 556 mySet.add(String.valueOf(handler.getServerId())); 557 } 558 return mySet; 559 } 560 561 /** 562 * Creates and returns an iterator. 563 * When the iterator is not used anymore, the caller MUST call the 564 * ReplicationIterator.releaseCursor() method to free the ressources 565 * and locks used by the ReplicationIterator. 566 * 567 * @param serverId Identifier of the server for which the iterator is created. 568 * @param changeNumber Starting point for the iterator. 569 * @return the created ReplicationIterator. Null when no DB is available 570 * for the provided server Id. 571 */ 572 public ReplicationIterator getChangelogIterator(short serverId, 573 ChangeNumber changeNumber) 574 { 575 DbHandler handler = sourceDbHandlers.get(serverId); 576 if (handler == null) 577 return null; 578 579 try 580 { 581 return handler.generateIterator(changeNumber); 582 } 583 catch (Exception e) 584 { 585 return null; 586 } 587 } 588 589 /** 590 * Returns the change count for that ReplicationServerDomain. 591 * 592 * @return the change count. 593 */ 594 public long getChangesCount() 595 { 596 long entryCount = 0; 597 for (DbHandler dbHandler : sourceDbHandlers.values()) 598 { 599 entryCount += dbHandler.getChangesCount(); 600 } 601 return entryCount; 602 } 603 604 /** 605 * Get the baseDn. 606 * @return Returns the baseDn. 607 */ 608 public DN getBaseDn() 609 { 610 return baseDn; 611 } 612 613 /** 614 * Sets the provided DbHandler associated to the provided serverId. 615 * 616 * @param serverId the serverId for the server to which is 617 * associated the Dbhandler. 618 * @param dbHandler the dbHandler associated to the serverId. 619 * 620 * @throws DatabaseException If a database error happened. 621 */ 622 public void setDbHandler(short serverId, DbHandler dbHandler) 623 throws DatabaseException 624 { 625 synchronized (sourceDbHandlers) 626 { 627 sourceDbHandlers.put(serverId , dbHandler); 628 } 629 } 630 631 /** 632 * Get the number of currently connected servers. 633 * 634 * @return the number of currently connected servers. 635 */ 636 private int NumServers() 637 { 638 return replicationServers.size() + connectedServers.size(); 639 } 640 641 642 /** 643 * Add an ack to the list of ack received for a given change. 644 * 645 * @param message The ack message received. 646 * @param fromServerId The identifier of the server that sent the ack. 647 */ 648 public void ack(AckMessage message, short fromServerId) 649 { 650 /* 651 * there are 2 possible cases here : 652 * - the message that was acked comes from a server to which 653 * we are directly connected. 654 * In this case, we can find the handler from the connectedServers map 655 * - the message that was acked comes from a server to which we are not 656 * connected. 657 * In this case we need to find the replication server that forwarded 658 * the change and send back the ack to this server. 659 */ 660 ServerHandler handler = connectedServers.get( 661 message.getChangeNumber().getServerId()); 662 if (handler != null) 663 handler.ack(message, fromServerId); 664 else 665 { 666 ServerHandler.ackChangelog(message, fromServerId); 667 } 668 } 669 670 /** 671 * Retrieves the destination handlers for a routable message. 672 * 673 * @param msg The message to route. 674 * @param senderHandler The handler of the server that published this message. 675 * @return The list of destination handlers. 676 */ 677 protected List<ServerHandler> getDestinationServers(RoutableMessage msg, 678 ServerHandler senderHandler) 679 { 680 List<ServerHandler> servers = 681 new ArrayList<ServerHandler>(); 682 683 if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) 684 { 685 // TODO Import from the "closest server" to be implemented 686 } 687 else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) 688 { 689 if (!senderHandler.isReplicationServer()) 690 { 691 // Send to all replication servers with a least one remote 692 // server connected 693 for (ServerHandler rsh : replicationServers.values()) 694 { 695 if (rsh.hasRemoteLDAPServers()) 696 { 697 servers.add(rsh); 698 } 699 } 700 } 701 702 // Sends to all connected LDAP servers 703 for (ServerHandler destinationHandler : connectedServers.values()) 704 { 705 // Don't loop on the sender 706 if (destinationHandler == senderHandler) 707 continue; 708 servers.add(destinationHandler); 709 } 710 } 711 else 712 { 713 // Destination is one server 714 ServerHandler destinationHandler = 715 connectedServers.get(msg.getDestination()); 716 if (destinationHandler != null) 717 { 718 servers.add(destinationHandler); 719 } 720 else 721 { 722 // the targeted server is NOT connected 723 // Let's search for THE changelog server that MAY 724 // have the targeted server connected. 725 if (senderHandler.isLDAPserver()) 726 { 727 for (ServerHandler h : replicationServers.values()) 728 { 729 // Send to all replication servers with a least one remote 730 // server connected 731 if (h.isRemoteLDAPServer(msg.getDestination())) 732 { 733 servers.add(h); 734 } 735 } 736 } 737 } 738 } 739 return servers; 740 } 741 742 /** 743 * Processes a message coming from one server in the topology 744 * and potentially forwards it to one or all other servers. 745 * 746 * @param msg The message received and to be processed. 747 * @param senderHandler The server handler of the server that emitted 748 * the message. 749 */ 750 public void process(RoutableMessage msg, ServerHandler senderHandler) 751 { 752 753 // Test the message for which a ReplicationServer is expected 754 // to be the destination 755 if (msg.getDestination() == this.replicationServer.getServerId()) 756 { 757 if (msg instanceof ErrorMessage) 758 { 759 ErrorMessage errorMsg = (ErrorMessage)msg; 760 logError(ERR_ERROR_MSG_RECEIVED.get( 761 errorMsg.getDetails())); 762 } 763 else if (msg instanceof MonitorRequestMessage) 764 { 765 MonitorRequestMessage replServerMonitorRequestMsg = 766 (MonitorRequestMessage) msg; 767 768 MonitorMessage monitorMsg = 769 new MonitorMessage( 770 replServerMonitorRequestMsg.getDestination(), 771 replServerMonitorRequestMsg.getsenderID()); 772 773 // Populate for each connected LDAP Server 774 // from the states stored in the serverHandler. 775 // - the server state 776 // - the older missing change 777 for (ServerHandler lsh : this.connectedServers.values()) 778 { 779 monitorMsg.setServerState( 780 lsh.getServerId(), 781 lsh.getServerState(), 782 lsh.getApproxFirstMissingDate(), 783 true); 784 } 785 786 // Same for the connected RS 787 for (ServerHandler rsh : this.replicationServers.values()) 788 { 789 monitorMsg.setServerState( 790 rsh.getServerId(), 791 rsh.getServerState(), 792 rsh.getApproxFirstMissingDate(), 793 false); 794 } 795 796 // Populate the RS state in the msg from the DbState 797 monitorMsg.setReplServerDbState(this.getDbServerState()); 798 799 800 try 801 { 802 senderHandler.send(monitorMsg); 803 } 804 catch(Exception e) 805 { 806 // We log the error. The requestor will detect a timeout or 807 // any other failure on the connection. 808 logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( 809 Short.toString((msg.getDestination())))); 810 } 811 } 812 else if (msg instanceof MonitorMessage) 813 { 814 MonitorMessage monitorMsg = 815 (MonitorMessage) msg; 816 817 receivesMonitorDataResponse(monitorMsg); 818 } 819 else 820 { 821 logError(NOTE_ERR_ROUTING_TO_SERVER.get( 822 msg.getClass().getCanonicalName())); 823 } 824 return; 825 } 826 827 List<ServerHandler> servers = getDestinationServers(msg, senderHandler); 828 829 if (servers.isEmpty()) 830 { 831 MessageBuilder mb = new MessageBuilder(); 832 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); 833 mb.append(" In Replication Server=" + this.replicationServer. 834 getMonitorInstanceName()); 835 mb.append(" domain =" + this.baseDn); 836 mb.append(" unroutable message =" + msg.toString()); 837 mb.append(" routing table is empty"); 838 ErrorMessage errMsg = new ErrorMessage( 839 this.replicationServer.getServerId(), 840 msg.getsenderID(), 841 mb.toMessage()); 842 logError(mb.toMessage()); 843 try 844 { 845 senderHandler.send(errMsg); 846 } 847 catch(IOException ioe) 848 { 849 // TODO Handle error properly (sender timeout in addition) 850 /* 851 * An error happened trying to send an error msg to this server. 852 * Log an error and close the connection to this server. 853 */ 854 MessageBuilder mb2 = new MessageBuilder(); 855 mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); 856 mb2.append(stackTraceToSingleLineString(ioe)); 857 logError(mb2.toMessage()); 858 senderHandler.shutdown(); 859 } 860 } 861 else 862 { 863 for (ServerHandler targetHandler : servers) 864 { 865 try 866 { 867 targetHandler.send(msg); 868 } 869 catch(IOException ioe) 870 { 871 /* 872 * An error happened trying the send a routabled message 873 * to its destination server. 874 * Send back an error to the originator of the message. 875 */ 876 MessageBuilder mb = new MessageBuilder(); 877 mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString())); 878 mb.append(stackTraceToSingleLineString(ioe)); 879 mb.append(" "); 880 mb.append(msg.getClass().getCanonicalName()); 881 logError(mb.toMessage()); 882 883 MessageBuilder mb1 = new MessageBuilder(); 884 mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); 885 mb1.append("serverID:" + msg.getDestination()); 886 ErrorMessage errMsg = new ErrorMessage( 887 msg.getsenderID(), mb1.toMessage()); 888 try 889 { 890 senderHandler.send(errMsg); 891 } 892 catch(IOException ioe1) 893 { 894 // an error happened on the sender session trying to recover 895 // from an error on the receiver session. 896 // We don't have much solution left beside closing the sessions. 897 senderHandler.shutdown(); 898 targetHandler.shutdown(); 899 } 900 // TODO Handle error properly (sender timeout in addition) 901 } 902 } 903 } 904 905 } 906 907 /** 908 * Send back an ack to the server that sent the change. 909 * 910 * @param changeNumber The ChangeNumber of the change that must be acked. 911 * @param isLDAPserver This boolean indicates if the server that sent the 912 * change was an LDAP server or a ReplicationServer. 913 */ 914 public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) 915 { 916 short serverId = changeNumber.getServerId(); 917 sendAck(changeNumber, isLDAPserver, serverId); 918 } 919 920 /** 921 * 922 * Send back an ack to a server that sent the change. 923 * 924 * @param changeNumber The ChangeNumber of the change that must be acked. 925 * @param isLDAPserver This boolean indicates if the server that sent the 926 * change was an LDAP server or a ReplicationServer. 927 * @param serverId The identifier of the server from which we 928 * received the change.. 929 */ 930 public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, 931 short serverId) 932 { 933 ServerHandler handler; 934 if (isLDAPserver) 935 handler = connectedServers.get(serverId); 936 else 937 handler = replicationServers.get(serverId); 938 939 // TODO : check for null handler and log error 940 try 941 { 942 handler.sendAck(changeNumber); 943 } catch (IOException e) 944 { 945 /* 946 * An error happened trying the send back an ack to this server. 947 * Log an error and close the connection to this server. 948 */ 949 MessageBuilder mb = new MessageBuilder(); 950 mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString())); 951 mb.append(stackTraceToSingleLineString(e)); 952 logError(mb.toMessage()); 953 handler.shutdown(); 954 } 955 } 956 957 /** 958 * Shutdown this ReplicationServerDomain. 959 */ 960 public void shutdown() 961 { 962 // Close session with other changelogs 963 for (ServerHandler serverHandler : replicationServers.values()) 964 { 965 serverHandler.shutdown(); 966 } 967 968 // Close session with other LDAP servers 969 for (ServerHandler serverHandler : connectedServers.values()) 970 { 971 serverHandler.shutdown(); 972 } 973 974 // Shutdown the dbHandlers 975 synchronized (sourceDbHandlers) 976 { 977 for (DbHandler dbHandler : sourceDbHandlers.values()) 978 { 979 dbHandler.shutdown(); 980 } 981 sourceDbHandlers.clear(); 982 } 983 } 984 985 /** 986 * Returns the ServerState describing the last change from this replica. 987 * 988 * @return The ServerState describing the last change from this replica. 989 */ 990 public ServerState getDbServerState() 991 { 992 ServerState serverState = new ServerState(); 993 for (DbHandler db : sourceDbHandlers.values()) 994 { 995 serverState.update(db.getLastChange()); 996 } 997 return serverState; 998 } 999 1000 /** 1001 * {@inheritDoc} 1002 */ 1003 @Override 1004 public String toString() 1005 { 1006 return "ReplicationServerDomain " + baseDn; 1007 } 1008 1009 /** 1010 * Check if some server Handler should be removed from flow control state. 1011 * @throws IOException If an error happened. 1012 */ 1013 public void checkAllSaturation() throws IOException 1014 { 1015 for (ServerHandler handler : replicationServers.values()) 1016 { 1017 handler.checkWindow(); 1018 } 1019 1020 for (ServerHandler handler : connectedServers.values()) 1021 { 1022 handler.checkWindow(); 1023 } 1024 } 1025 1026 /** 1027 * Check if a server that was in flow control can now restart 1028 * sending updates. 1029 * @param sourceHandler The server that must be checked. 1030 * @return true if the server can restart sending changes. 1031 * false if the server can't restart sending changes. 1032 */ 1033 public boolean restartAfterSaturation(ServerHandler sourceHandler) 1034 { 1035 for (ServerHandler handler : replicationServers.values()) 1036 { 1037 if (!handler.restartAfterSaturation(sourceHandler)) 1038 return false; 1039 } 1040 1041 for (ServerHandler handler : connectedServers.values()) 1042 { 1043 if (!handler.restartAfterSaturation(sourceHandler)) 1044 return false; 1045 } 1046 return true; 1047 } 1048 1049 /** 1050 * Send a ReplServerInfoMessage to all the connected replication servers 1051 * in order to let them know our connected LDAP servers. 1052 */ 1053 private void sendReplServerInfo() 1054 { 1055 ReplServerInfoMessage info = 1056 new ReplServerInfoMessage(getConnectedLDAPservers(), generationId); 1057 for (ServerHandler handler : replicationServers.values()) 1058 { 1059 try 1060 { 1061 handler.sendInfo(info); 1062 } 1063 catch (IOException e) 1064 { 1065 /* 1066 * An error happened trying the send back an ack to this server. 1067 * Log an error and close the connection to this server. 1068 */ 1069 MessageBuilder mb = new MessageBuilder(); 1070 mb.append(ERR_CHANGELOG_ERROR_SENDING_INFO.get(this.toString())); 1071 mb.append(stackTraceToSingleLineString(e)); 1072 logError(mb.toMessage()); 1073 handler.shutdown(); 1074 } 1075 } 1076 } 1077 1078 /** 1079 * Get the generationId associated to this domain. 1080 * 1081 * @return The generationId 1082 */ 1083 public long getGenerationId() 1084 { 1085 return generationId; 1086 } 1087 1088 /** 1089 * Get the generationId saved status. 1090 * 1091 * @return The generationId saved status. 1092 */ 1093 public boolean getGenerationIdSavedStatus() 1094 { 1095 return generationIdSavedStatus; 1096 } 1097 1098 /** 1099 * Sets the provided value as the new in memory generationId. 1100 * 1101 * @param generationId The new value of generationId. 1102 * @param savedStatus The saved status of the generationId. 1103 */ 1104 synchronized public void setGenerationId(long generationId, 1105 boolean savedStatus) 1106 { 1107 if (debugEnabled()) 1108 TRACER.debugInfo( 1109 "In " + this.replicationServer.getMonitorInstanceName() + 1110 " baseDN=" + baseDn + 1111 " RCache.set GenerationId=" + generationId); 1112 1113 if (this.generationId != generationId) 1114 { 1115 // we are changing of genId 1116 clearDbs(); 1117 1118 this.generationId = generationId; 1119 this.generationIdSavedStatus = savedStatus; 1120 1121 // they have a generationId different from the reference one 1122 for (ServerHandler handler : connectedServers.values()) 1123 { 1124 if (generationId != handler.getGenerationId()) 1125 { 1126 // Notify our remote LS that from now on have a different genID 1127 handler.warnBadGenerationId(); 1128 } 1129 } 1130 } 1131 } 1132 1133 /** 1134 * Resets the generationID. 1135 * 1136 * @param senderHandler The handler associated to the server 1137 * that requested to reset the generationId. 1138 * @param genIdMsg The reset generation ID msg received. 1139 */ 1140 public void resetGenerationId(ServerHandler senderHandler, 1141 ResetGenerationId genIdMsg) 1142 { 1143 long newGenId = genIdMsg.getGenerationId(); 1144 1145 if (newGenId != this.generationId) 1146 { 1147 this.setGenerationId(newGenId, false); 1148 } 1149 1150 // If we are the first replication server warned, 1151 // then forwards the reset message to the remote replication servers 1152 for (ServerHandler rsHandler : replicationServers.values()) 1153 { 1154 try 1155 { 1156 // After we'll have send the mssage , the remote RS will adopt 1157 // the new genId 1158 rsHandler.setGenerationId(newGenId); 1159 if (senderHandler.isLDAPserver()) 1160 { 1161 rsHandler.forwardGenerationIdToRS(genIdMsg); 1162 } 1163 } 1164 catch (IOException e) 1165 { 1166 logError(ERR_CHANGELOG_ERROR_SENDING_INFO. 1167 get(rsHandler.getMonitorInstanceName())); 1168 } 1169 } 1170 } 1171 1172 /** 1173 * Clears the Db associated with that cache. 1174 */ 1175 public void clearDbs() 1176 { 1177 // Reset the localchange and state db for the current domain 1178 synchronized (sourceDbHandlers) 1179 { 1180 for (DbHandler dbHandler : sourceDbHandlers.values()) 1181 { 1182 try 1183 { 1184 dbHandler.clear(); 1185 } 1186 catch (Exception e) 1187 { 1188 // TODO: i18n 1189 MessageBuilder mb = new MessageBuilder(); 1190 mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), 1191 e.getMessage() + " " + 1192 stackTraceToSingleLineString(e))); 1193 logError(mb.toMessage()); 1194 } 1195 } 1196 sourceDbHandlers.clear(); 1197 1198 if (debugEnabled()) 1199 TRACER.debugInfo( 1200 "In " + this.replicationServer.getMonitorInstanceName() + 1201 " baseDN=" + baseDn + 1202 " The source db handler has been cleared"); 1203 } 1204 try 1205 { 1206 replicationServer.clearGenerationId(baseDn); 1207 } 1208 catch (Exception e) 1209 { 1210 // TODO: i18n 1211 logError(Message.raw( 1212 "Exception caught while clearing generationId:" + 1213 e.getLocalizedMessage())); 1214 } 1215 } 1216 1217 /** 1218 * Returns whether the provided server is in degraded 1219 * state due to the fact that the peer server has an invalid 1220 * generationId for this domain. 1221 * 1222 * @param serverId The serverId for which we want to know the 1223 * the state. 1224 * @return Whether it is degraded or not. 1225 */ 1226 1227 public boolean isDegradedDueToGenerationId(short serverId) 1228 { 1229 if (debugEnabled()) 1230 TRACER.debugInfo( 1231 "In " + this.replicationServer.getMonitorInstanceName() + 1232 " baseDN=" + baseDn + 1233 " isDegraded serverId=" + serverId + 1234 " given local generation Id=" + this.generationId); 1235 1236 ServerHandler handler = replicationServers.get(serverId); 1237 if (handler == null) 1238 { 1239 handler = connectedServers.get(serverId); 1240 if (handler == null) 1241 { 1242 return false; 1243 } 1244 } 1245 1246 if (debugEnabled()) 1247 TRACER.debugInfo( 1248 "In " + this.replicationServer.getMonitorInstanceName() + 1249 " baseDN=" + baseDn + 1250 " Compute degradation of serverId=" + serverId + 1251 " LS server generation Id=" + handler.getGenerationId()); 1252 return (handler.getGenerationId() != this.generationId); 1253 } 1254 1255 /** 1256 * Return the associated replication server. 1257 * @return The replication server. 1258 */ 1259 public ReplicationServer getReplicationServer() 1260 { 1261 return replicationServer; 1262 } 1263 1264 /** 1265 * Process reception of a ReplServerInfoMessage. 1266 * 1267 * @param infoMsg The received message. 1268 * @param handler The handler that received the message. 1269 * @throws IOException when raised by the underlying session. 1270 */ 1271 public void receiveReplServerInfo( 1272 ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException 1273 { 1274 if (debugEnabled()) 1275 { 1276 if (handler.isReplicationServer()) 1277 TRACER.debugInfo( 1278 "In RS " + getReplicationServer().getServerId() + 1279 " Receiving replServerInfo from " + handler.getServerId() + 1280 " baseDn=" + baseDn + 1281 " genId=" + infoMsg.getGenerationId()); 1282 } 1283 1284 mayResetGenerationId(); 1285 if (generationId < 0) 1286 generationId = handler.getGenerationId(); 1287 if (generationId > 0 && (generationId != infoMsg.getGenerationId())) 1288 { 1289 Message message = NOTE_BAD_GENERATION_ID.get( 1290 baseDn.toNormalizedString(), 1291 Short.toString(handler.getServerId()), 1292 Long.toString(infoMsg.getGenerationId()), 1293 Long.toString(generationId)); 1294 1295 ErrorMessage errorMsg = new ErrorMessage( 1296 getReplicationServer().getServerId(), 1297 handler.getServerId(), 1298 message); 1299 handler.sendError(errorMsg); 1300 } 1301 } 1302 1303 /* ======================= 1304 * Monitor Data generation 1305 * ======================= 1306 */ 1307 1308 /** 1309 * Retrieves the global monitor data. 1310 * @return The monitor data. 1311 * @throws DirectoryException When an error occurs. 1312 */ 1313 synchronized protected MonitorData getMonitorData() 1314 throws DirectoryException 1315 { 1316 if (monitorData.getBuildDate() + monitorDataLifeTime 1317 > TimeThread.getTime()) 1318 { 1319 if (debugEnabled()) 1320 TRACER.debugInfo( 1321 "In " + this.replicationServer.getMonitorInstanceName() + 1322 " baseDn=" + baseDn + " getRemoteMonitorData in cache"); 1323 // The current data are still valid. No need to renew them. 1324 return monitorData; 1325 } 1326 1327 wrkMonitorData = new MonitorData(); 1328 synchronized(wrkMonitorData) 1329 { 1330 if (debugEnabled()) 1331 TRACER.debugInfo( 1332 "In " + this.replicationServer.getMonitorInstanceName() + 1333 " baseDn=" + baseDn + " Computing monitor data "); 1334 1335 // Let's process our directly connected LSes 1336 // - in the ServerHandler for a given LS1, the stored state contains : 1337 // - the max CN produced by LS1 1338 // - the last CN consumed by LS1 from LS2..n 1339 // - in the RSdomain/dbHandler, the built-in state contains : 1340 // - the max CN produced by each server 1341 // So for a given LS connected we can take the state and the max from 1342 // the LS/state. 1343 1344 for (ServerHandler directlsh : connectedServers.values()) 1345 { 1346 short serverID = directlsh.getServerId(); 1347 1348 // the state comes from the state stored in the SH 1349 ServerState directlshState = directlsh.getServerState().duplicate(); 1350 1351 // the max CN sent by that LS also comes from the SH 1352 ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID); 1353 if (maxcn == null) 1354 { 1355 // This directly connected LS has never produced any change 1356 maxcn = new ChangeNumber(0, 0 , serverID); 1357 } 1358 wrkMonitorData.setMaxCN(serverID, maxcn); 1359 wrkMonitorData.setLDAPServerState(serverID, directlshState); 1360 wrkMonitorData.setFirstMissingDate(serverID, directlsh. 1361 getApproxFirstMissingDate()); 1362 } 1363 1364 // Then initialize the max CN for the LS that produced something 1365 // - from our own local db state 1366 // - whatever they are directly or undirectly connected 1367 ServerState dbServerState = getDbServerState(); 1368 Iterator<Short> it = dbServerState.iterator(); 1369 while (it.hasNext()) 1370 { 1371 short sid = it.next(); 1372 ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid); 1373 wrkMonitorData.setMaxCN(sid, storedCN); 1374 } 1375 1376 // Now we have used all available local informations 1377 // and we need the remote ones. 1378 if (debugEnabled()) 1379 TRACER.debugInfo( 1380 "In " + this.replicationServer.getMonitorInstanceName() + 1381 " baseDn=" + baseDn + " Local monitor data: " + 1382 wrkMonitorData.toString()); 1383 } 1384 1385 // Send Request to the other Replication Servers 1386 if (remoteMonitorResponsesSemaphore == null) 1387 { 1388 remoteMonitorResponsesSemaphore = new Semaphore(0); 1389 short requestCnt = sendMonitorDataRequest(); 1390 // Wait reponses from them or timeout 1391 waitMonitorDataResponses(requestCnt); 1392 } 1393 else 1394 { 1395 // The processing of renewing the monitor cache is already running 1396 // We'll make it sleeping until the end 1397 // TODO: unit test for this case. 1398 while (remoteMonitorResponsesSemaphore!=null) 1399 { 1400 waitMonitorDataResponses(1); 1401 } 1402 } 1403 1404 wrkMonitorData.completeComputing(); 1405 1406 // Store the new computed data as the reference 1407 synchronized(monitorData) 1408 { 1409 // Now we have the expected answers or an error occured 1410 monitorData = wrkMonitorData; 1411 wrkMonitorData = null; 1412 if (debugEnabled()) 1413 TRACER.debugInfo( 1414 "In " + this.replicationServer.getMonitorInstanceName() + 1415 " baseDn=" + baseDn + " *** Computed MonitorData: " + 1416 monitorData.toString()); 1417 } 1418 return monitorData; 1419 } 1420 1421 1422 /** 1423 * Sends a MonitorRequest message to all connected RS. 1424 * @return the number of requests sent. 1425 * @throws DirectoryException when a problem occurs. 1426 */ 1427 protected short sendMonitorDataRequest() 1428 throws DirectoryException 1429 { 1430 short sent=0; 1431 try 1432 { 1433 for (ServerHandler rs : replicationServers.values()) 1434 { 1435 MonitorRequestMessage msg = new 1436 MonitorRequestMessage(this.replicationServer.getServerId(), 1437 rs.getServerId()); 1438 rs.send(msg); 1439 sent++; 1440 } 1441 } 1442 catch(Exception e) 1443 { 1444 Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(); 1445 logError(message); 1446 throw new DirectoryException(ResultCode.OTHER, 1447 message, e); 1448 } 1449 return sent; 1450 } 1451 1452 /** 1453 * Wait for the expected count of received MonitorMessage. 1454 * @param expectedResponses The number of expected answers. 1455 * @throws DirectoryException When an error occurs. 1456 */ 1457 protected void waitMonitorDataResponses(int expectedResponses) 1458 throws DirectoryException 1459 { 1460 try 1461 { 1462 if (debugEnabled()) 1463 TRACER.debugInfo( 1464 "In " + this.replicationServer.getMonitorInstanceName() + 1465 " baseDn=" + baseDn + 1466 " waiting for " + expectedResponses 1467 + " expected monitor messages"); 1468 1469 boolean allPermitsAcquired = 1470 remoteMonitorResponsesSemaphore.tryAcquire( 1471 expectedResponses, 1472 (long) 5000, TimeUnit.MILLISECONDS); 1473 1474 if (!allPermitsAcquired) 1475 { 1476 logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); 1477 // let's go on in best effort even with limited data received. 1478 } 1479 else 1480 { 1481 if (debugEnabled()) 1482 TRACER.debugInfo( 1483 "In " + this.replicationServer.getMonitorInstanceName() + 1484 " baseDn=" + baseDn + 1485 " Successfully received all " + expectedResponses 1486 + " expected monitor messages"); 1487 } 1488 } 1489 catch(Exception e) 1490 { 1491 logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); 1492 } 1493 finally 1494 { 1495 remoteMonitorResponsesSemaphore = null; 1496 } 1497 } 1498 1499 /** 1500 * Processes a Monitor message receives from a remote Replication Server 1501 * and stores the data received. 1502 * 1503 * @param msg The message to be processed. 1504 */ 1505 public void receivesMonitorDataResponse(MonitorMessage msg) 1506 { 1507 if (debugEnabled()) 1508 TRACER.debugInfo( 1509 "In " + this.replicationServer.getMonitorInstanceName() + 1510 "Receiving " + msg + " from " + msg.getsenderID() + 1511 remoteMonitorResponsesSemaphore); 1512 1513 if (remoteMonitorResponsesSemaphore == null) 1514 { 1515 // Let's ignore the remote monitor data just received 1516 // since the computing processing has been ended. 1517 // An error - probably a timemout - occured that was already logged 1518 logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get( 1519 Short.toString(msg.getsenderID()))); 1520 return; 1521 } 1522 1523 try 1524 { 1525 synchronized(wrkMonitorData) 1526 { 1527 // Here is the RS state : list <serverID, lastChangeNumber> 1528 // For each LDAP Server, we keep the max CN accross the RSes 1529 ServerState replServerState = msg.getReplServerDbState(); 1530 wrkMonitorData.setMaxCNs(replServerState); 1531 1532 // Store the remote LDAP servers states 1533 Iterator<Short> lsidIterator = msg.ldapIterator(); 1534 while (lsidIterator.hasNext()) 1535 { 1536 short sid = lsidIterator.next(); 1537 wrkMonitorData.setLDAPServerState(sid, 1538 msg.getLDAPServerState(sid).duplicate()); 1539 wrkMonitorData.setFirstMissingDate(sid, 1540 msg.getLDAPApproxFirstMissingDate(sid)); 1541 } 1542 1543 // Process the latency reported by the remote RSi on its connections 1544 // to the other RSes 1545 Iterator<Short> rsidIterator = msg.rsIterator(); 1546 while (rsidIterator.hasNext()) 1547 { 1548 short rsid = rsidIterator.next(); 1549 if (rsid == replicationServer.getServerId()) 1550 { 1551 // this is the latency of the remote RSi regarding the current RS 1552 // let's update the fmd of my connected LS 1553 for (ServerHandler connectedlsh : connectedServers.values()) 1554 { 1555 short connectedlsid = connectedlsh.getServerId(); 1556 Long newfmd = msg.getRSApproxFirstMissingDate(rsid); 1557 wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd); 1558 } 1559 } 1560 else 1561 { 1562 // this is the latency of the remote RSi regarding another RSj 1563 // let's update the latency of the LSes connected to RSj 1564 ServerHandler rsjHdr = replicationServers.get(rsid); 1565 if (rsjHdr != null) 1566 { 1567 for(short remotelsid : rsjHdr.getConnectedServerIds()) 1568 { 1569 Long newfmd = msg.getRSApproxFirstMissingDate(rsid); 1570 wrkMonitorData.setFirstMissingDate(remotelsid, newfmd); 1571 } 1572 } 1573 } 1574 } 1575 if (debugEnabled()) 1576 { 1577 if (debugEnabled()) 1578 TRACER.debugInfo( 1579 "In " + this.replicationServer.getMonitorInstanceName() + 1580 " baseDn=" + baseDn + 1581 " Processed msg from " + msg.getsenderID() + 1582 " New monitor data: " + wrkMonitorData.toString()); 1583 } 1584 } 1585 1586 // Decreases the number of expected responses and potentially 1587 // wakes up the waiting requestor thread. 1588 remoteMonitorResponsesSemaphore.release(); 1589 1590 } 1591 catch (Exception e) 1592 { 1593 logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() + 1594 stackTraceToSingleLineString(e))); 1595 1596 // If an exception occurs while processing one of the expected message, 1597 // the processing is aborted and the waiting thread is awoke. 1598 remoteMonitorResponsesSemaphore.notifyAll(); 1599 } 1600 } 1601 1602 /** 1603 * Set the purge delay on all the db Handlers for this Domain 1604 * of Replicaiton. 1605 * 1606 * @param delay The new purge delay to use. 1607 */ 1608 void setPurgeDelay(long delay) 1609 { 1610 for (DbHandler handler : sourceDbHandlers.values()) 1611 { 1612 handler.setPurgeDelay(delay); 1613 } 1614 } 1615 }