001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 029 import org.opends.messages.*; 030 031 import static org.opends.server.loggers.ErrorLogger.logError; 032 import static org.opends.server.loggers.debug.DebugLogger.*; 033 034 import org.opends.server.loggers.debug.DebugTracer; 035 import static org.opends.messages.ReplicationMessages.*; 036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 037 038 import java.io.IOException; 039 import java.util.Date; 040 import java.util.List; 041 import java.util.ArrayList; 042 import java.util.HashMap; 043 import java.util.LinkedHashSet; 044 import java.util.Map; 045 import java.util.Set; 046 import java.util.SortedSet; 047 import java.util.TreeSet; 048 import java.util.concurrent.ConcurrentHashMap; 049 import java.util.concurrent.Semaphore; 050 import java.util.concurrent.TimeUnit; 051 052 import org.opends.server.admin.std.server.MonitorProviderCfg; 053 import org.opends.server.api.MonitorProvider; 054 import org.opends.server.config.ConfigException; 055 import org.opends.server.core.DirectoryServer; 056 import org.opends.server.replication.common.ChangeNumber; 057 import org.opends.server.replication.common.ServerState; 058 import org.opends.server.replication.protocol.*; 059 import org.opends.server.types.Attribute; 060 import org.opends.server.types.AttributeType; 061 import org.opends.server.types.AttributeValue; 062 import org.opends.server.types.DN; 063 import org.opends.server.types.InitializationException; 064 import org.opends.server.util.TimeThread; 065 066 /** 067 * This class defines a server handler, which handles all interaction with a 068 * replication server. 069 */ 070 public class ServerHandler extends MonitorProvider<MonitorProviderCfg> 071 { 072 /** 073 * The tracer object for the debug logger. 074 */ 075 private static final DebugTracer TRACER = getTracer(); 076 077 /** 078 * Time during which the server will wait for existing thread to stop 079 * during the shutdown. 080 */ 081 private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; 082 083 private short serverId; 084 private ProtocolSession session; 085 private final MsgQueue msgQueue = new MsgQueue(); 086 private MsgQueue lateQueue = new MsgQueue(); 087 private final Map<ChangeNumber, AckMessageList> waitingAcks = 088 new HashMap<ChangeNumber, AckMessageList>(); 089 private ReplicationServerDomain replicationServerDomain = null; 090 private String serverURL; 091 private int outCount = 0; // number of update sent to the server 092 private int inCount = 0; // number of updates received from the server 093 private int inAckCount = 0; 094 private int outAckCount = 0; 095 private int maxReceiveQueue = 0; 096 private int maxSendQueue = 0; 097 private int maxReceiveDelay = 0; 098 private int maxSendDelay = 0; 099 private int maxQueueSize = 10000; 100 private int restartReceiveQueue; 101 private int restartSendQueue; 102 private int restartReceiveDelay; 103 private int restartSendDelay; 104 private boolean serverIsLDAPserver; 105 private boolean following = false; 106 private ServerState serverState; 107 private boolean active = true; 108 private ServerWriter writer = null; 109 private DN baseDn = null; 110 private String serverAddressURL; 111 private int rcvWindow; 112 private int rcvWindowSizeHalf; 113 private int maxRcvWindow; 114 private ServerReader reader; 115 private Semaphore sendWindow; 116 private int sendWindowSize; 117 private boolean flowControl = false; // indicate that the server is 118 // flow controled and should 119 // be stopped from sending messsages. 120 private int saturationCount = 0; 121 private short replicationServerId; 122 123 private short protocolVersion; 124 private long generationId = -1; 125 126 127 /** 128 * When this Handler is related to a remote replication server 129 * this collection will contain as many elements as there are 130 * LDAP servers connected to the remote replication server. 131 */ 132 private final Map<Short, LightweightServerHandler> connectedServers = 133 new ConcurrentHashMap<Short, LightweightServerHandler>(); 134 135 /** 136 * The time in milliseconds between heartbeats from the replication 137 * server. Zero means heartbeats are off. 138 */ 139 private long heartbeatInterval = 0; 140 141 /** 142 * The thread that will send heartbeats. 143 */ 144 HeartbeatThread heartbeatThread = null; 145 146 /** 147 * Set when ServerHandler is stopping. 148 */ 149 private boolean shutdown = false; 150 151 private static final Map<ChangeNumber, ReplServerAckMessageList> 152 changelogsWaitingAcks = 153 new HashMap<ChangeNumber, ReplServerAckMessageList>(); 154 155 /** 156 * Creates a new server handler instance with the provided socket. 157 * 158 * @param session The ProtocolSession used by the ServerHandler to 159 * communicate with the remote entity. 160 * @param queueSize The maximum number of update that will be kept 161 * in memory by this ServerHandler. 162 */ 163 public ServerHandler(ProtocolSession session, int queueSize) 164 { 165 super("Server Handler"); 166 this.session = session; 167 this.maxQueueSize = queueSize; 168 this.protocolVersion = ProtocolVersion.currentVersion(); 169 } 170 171 /** 172 * Do the exchange of start messages to know if the remote 173 * server is an LDAP or replication server and to exchange serverID. 174 * Then create the reader and writer thread. 175 * 176 * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. 177 * null if this is an incoming connection (listen). 178 * @param replicationServerId The identifier of the replicationServer that 179 * creates this server handler. 180 * @param replicationServerURL The URL of the replicationServer that creates 181 * this server handler. 182 * @param windowSize the window size that this server handler must use. 183 * @param sslEncryption For outgoing connections indicates whether encryption 184 * should be used after the exchange of start messages. 185 * Ignored for incoming connections. 186 * @param replicationServer the ReplicationServer that created this server 187 * handler. 188 */ 189 public void start(DN baseDn, short replicationServerId, 190 String replicationServerURL, 191 int windowSize, boolean sslEncryption, 192 ReplicationServer replicationServer) 193 { 194 if (debugEnabled()) 195 TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + 196 " starts a new LS or RS " + 197 ((baseDn == null)?"incoming connection":"outgoing connection")); 198 199 this.replicationServerId = replicationServerId; 200 rcvWindowSizeHalf = windowSize/2; 201 maxRcvWindow = windowSize; 202 rcvWindow = windowSize; 203 long localGenerationId = -1; 204 boolean handshakeOnly = false; 205 206 try 207 { 208 if (baseDn != null) 209 { 210 // This is an outgoing connection. Publish our start message. 211 this.baseDn = baseDn; 212 213 // Get or create the ReplicationServerDomain 214 replicationServerDomain = 215 replicationServer.getReplicationServerDomain(baseDn, true); 216 localGenerationId = replicationServerDomain.getGenerationId(); 217 218 ServerState localServerState = 219 replicationServerDomain.getDbServerState(); 220 ReplServerStartMessage msg = 221 new ReplServerStartMessage(replicationServerId, replicationServerURL, 222 baseDn, windowSize, localServerState, 223 protocolVersion, localGenerationId, 224 sslEncryption); 225 226 session.publish(msg); 227 } 228 229 // Wait and process ServerStart or ReplServerStart 230 ReplicationMessage msg = session.receive(); 231 if (msg instanceof ServerStartMessage) 232 { 233 // The remote server is an LDAP Server. 234 ServerStartMessage receivedMsg = (ServerStartMessage) msg; 235 236 generationId = receivedMsg.getGenerationId(); 237 protocolVersion = ProtocolVersion.minWithCurrent( 238 receivedMsg.getVersion()); 239 serverId = receivedMsg.getServerId(); 240 serverURL = receivedMsg.getServerURL(); 241 this.baseDn = receivedMsg.getBaseDn(); 242 this.serverState = receivedMsg.getServerState(); 243 244 maxReceiveDelay = receivedMsg.getMaxReceiveDelay(); 245 maxReceiveQueue = receivedMsg.getMaxReceiveQueue(); 246 maxSendDelay = receivedMsg.getMaxSendDelay(); 247 maxSendQueue = receivedMsg.getMaxSendQueue(); 248 heartbeatInterval = receivedMsg.getHeartbeatInterval(); 249 250 handshakeOnly = receivedMsg.isHandshakeOnly(); 251 252 // The session initiator decides whether to use SSL. 253 sslEncryption = receivedMsg.getSSLEncryption(); 254 255 if (maxReceiveQueue > 0) 256 restartReceiveQueue = (maxReceiveQueue > 1000 ? 257 maxReceiveQueue - 200 : 258 maxReceiveQueue*8/10); 259 else 260 restartReceiveQueue = 0; 261 262 if (maxSendQueue > 0) 263 restartSendQueue = (maxSendQueue > 1000 ? maxSendQueue - 200 : 264 maxSendQueue*8/10); 265 else 266 restartSendQueue = 0; 267 268 if (maxReceiveDelay > 0) 269 restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 : 270 maxReceiveDelay); 271 else 272 restartReceiveDelay = 0; 273 274 if (maxSendDelay > 0) 275 restartSendDelay = (maxSendDelay > 10 ? 276 maxSendDelay -1 : 277 maxSendDelay); 278 else 279 restartSendDelay = 0; 280 281 if (heartbeatInterval < 0) 282 { 283 heartbeatInterval = 0; 284 } 285 286 serverIsLDAPserver = true; 287 288 // Get or Create the ReplicationServerDomain 289 replicationServerDomain = 290 replicationServer.getReplicationServerDomain(this.baseDn, true); 291 292 replicationServerDomain.waitDisconnection(receivedMsg.getServerId()); 293 replicationServerDomain.mayResetGenerationId(); 294 295 localGenerationId = replicationServerDomain.getGenerationId(); 296 297 ServerState localServerState = 298 replicationServerDomain.getDbServerState(); 299 // This an incoming connection. Publish our start message 300 ReplServerStartMessage myStartMsg = 301 new ReplServerStartMessage(replicationServerId, replicationServerURL, 302 this.baseDn, windowSize, localServerState, 303 protocolVersion, localGenerationId, 304 sslEncryption); 305 session.publish(myStartMsg); 306 sendWindowSize = receivedMsg.getWindowSize(); 307 308 /* Until here session is encrypted then it depends on the negociation */ 309 if (!sslEncryption) 310 { 311 session.stopEncryption(); 312 } 313 314 if (debugEnabled()) 315 { 316 Set<String> ss = this.serverState.toStringSet(); 317 Set<String> lss = 318 replicationServerDomain.getDbServerState().toStringSet(); 319 TRACER.debugInfo("In " + replicationServerDomain. 320 getReplicationServer().getMonitorInstanceName() + 321 ", SH received START from LS serverId=" + serverId + 322 " baseDN=" + this.baseDn + 323 " generationId=" + generationId + 324 " localGenerationId=" + localGenerationId + 325 " state=" + ss + 326 " and sent ReplServerStart with state=" + lss); 327 } 328 329 /* 330 * If we have already a generationID set for the domain 331 * then 332 * if the connecting replica has not the same 333 * then it is degraded locally and notified by an error message 334 * else 335 * we set the generationID from the one received 336 * (unsaved yet on disk . will be set with the 1rst change received) 337 */ 338 if (localGenerationId>0) 339 { 340 if (generationId != localGenerationId) 341 { 342 Message message = NOTE_BAD_GENERATION_ID.get( 343 receivedMsg.getBaseDn().toNormalizedString(), 344 Short.toString(receivedMsg.getServerId()), 345 Long.toString(generationId), 346 Long.toString(localGenerationId)); 347 348 ErrorMessage errorMsg = 349 new ErrorMessage(replicationServerId, serverId, message); 350 session.publish(errorMsg); 351 } 352 } 353 else 354 { 355 // We are an empty Replicationserver 356 if ((generationId>0)&&(!serverState.isEmpty())) 357 { 358 // If the LDAP server has already sent changes 359 // it is not expected to connect to an empty RS 360 Message message = NOTE_BAD_GENERATION_ID.get( 361 receivedMsg.getBaseDn().toNormalizedString(), 362 Short.toString(receivedMsg.getServerId()), 363 Long.toString(generationId), 364 Long.toString(localGenerationId)); 365 366 ErrorMessage errorMsg = 367 new ErrorMessage(replicationServerId, serverId, message); 368 session.publish(errorMsg); 369 } 370 else 371 { 372 replicationServerDomain.setGenerationId(generationId, false); 373 } 374 } 375 } 376 else if (msg instanceof ReplServerStartMessage) 377 { 378 // The remote server is a replication server 379 ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; 380 protocolVersion = ProtocolVersion.minWithCurrent( 381 receivedMsg.getVersion()); 382 generationId = receivedMsg.getGenerationId(); 383 serverId = receivedMsg.getServerId(); 384 serverURL = receivedMsg.getServerURL(); 385 int separator = serverURL.lastIndexOf(':'); 386 serverAddressURL = 387 session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); 388 serverIsLDAPserver = false; 389 this.baseDn = receivedMsg.getBaseDn(); 390 if (baseDn == null) 391 { 392 // Get or create the ReplicationServerDomain 393 replicationServerDomain = replicationServer. 394 getReplicationServerDomain(this.baseDn, true); 395 localGenerationId = replicationServerDomain.getGenerationId(); 396 ServerState serverState = replicationServerDomain.getDbServerState(); 397 398 // The session initiator decides whether to use SSL. 399 sslEncryption = receivedMsg.getSSLEncryption(); 400 401 // Publish our start message 402 ReplServerStartMessage outMsg = 403 new ReplServerStartMessage(replicationServerId, 404 replicationServerURL, 405 this.baseDn, windowSize, serverState, 406 protocolVersion, 407 localGenerationId, 408 sslEncryption); 409 session.publish(outMsg); 410 } 411 else 412 { 413 this.baseDn = baseDn; 414 } 415 this.serverState = receivedMsg.getServerState(); 416 sendWindowSize = receivedMsg.getWindowSize(); 417 418 /* Until here session is encrypted then it depends on the negociation */ 419 if (!sslEncryption) 420 { 421 session.stopEncryption(); 422 } 423 424 if (debugEnabled()) 425 { 426 Set<String> ss = this.serverState.toStringSet(); 427 Set<String> lss = 428 replicationServerDomain.getDbServerState().toStringSet(); 429 TRACER.debugInfo("In " + replicationServerDomain. 430 getReplicationServer().getMonitorInstanceName() + 431 ", SH received START from RS serverId=" + serverId + 432 " baseDN=" + this.baseDn + 433 " generationId=" + generationId + 434 " localGenerationId=" + localGenerationId + 435 " state=" + ss + 436 " and sent ReplServerStart with state=" + lss); 437 } 438 439 // if the remote RS and the local RS have the same genID 440 // then it's ok and nothing else to do 441 if (generationId == localGenerationId) 442 { 443 if (debugEnabled()) 444 { 445 TRACER.debugInfo("In " + 446 replicationServerDomain.getReplicationServer(). 447 getMonitorInstanceName() + " RS with serverID=" + serverId + 448 " is connected with the right generation ID"); 449 } 450 } 451 else 452 { 453 if (localGenerationId>0) 454 { 455 // if the local RS is initialized 456 if (generationId>0) 457 { 458 // if the remote RS is initialized 459 if (generationId != localGenerationId) 460 { 461 // if the 2 RS have different generationID 462 if (replicationServerDomain.getGenerationIdSavedStatus()) 463 { 464 // it the present RS has received changes regarding its 465 // gen ID and so won't change without a reset 466 // then we are just degrading the peer. 467 Message message = NOTE_BAD_GENERATION_ID.get( 468 this.baseDn.toNormalizedString(), 469 Short.toString(receivedMsg.getServerId()), 470 Long.toString(generationId), 471 Long.toString(localGenerationId)); 472 473 ErrorMessage errorMsg = 474 new ErrorMessage(replicationServerId, serverId, message); 475 session.publish(errorMsg); 476 } 477 else 478 { 479 // The present RS has never received changes regarding its 480 // gen ID. 481 // 482 // Example case: 483 // - we are in RS1 484 // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) 485 // - RS1 has genId1 from LS1 /genId1 comes from data in suffix 486 // - we are in RS1 and we receive a START msg from RS2 487 // - Each RS keeps its genID / is degraded and when LS2 will 488 // be populated from LS1 everything will becomes ok. 489 // 490 // Issue: 491 // FIXME : Would it be a good idea in some cases to just 492 // set the gen ID received from the peer RS 493 // specially if the peer has a non nul state and 494 // we have a nul state ? 495 // replicationServerDomain. 496 // setGenerationId(generationId, false); 497 Message message = NOTE_BAD_GENERATION_ID.get( 498 this.baseDn.toNormalizedString(), 499 Short.toString(receivedMsg.getServerId()), 500 Long.toString(generationId), 501 Long.toString(localGenerationId)); 502 503 ErrorMessage errorMsg = 504 new ErrorMessage(replicationServerId, serverId, message); 505 session.publish(errorMsg); 506 } 507 } 508 } 509 else 510 { 511 // The remote has no genId. We don't change anything for the 512 // current RS. 513 } 514 } 515 else 516 { 517 // The local RS is not initialized - take the one received 518 replicationServerDomain.setGenerationId(generationId, false); 519 } 520 } 521 } 522 else 523 { 524 // TODO : log error 525 return; // we did not recognize the message, ignore it 526 } 527 528 // Get or create the ReplicationServerDomain 529 replicationServerDomain = replicationServer. 530 getReplicationServerDomain(this.baseDn,true); 531 532 if (!handshakeOnly) 533 { 534 boolean started; 535 if (serverIsLDAPserver) 536 { 537 started = replicationServerDomain.startServer(this); 538 } 539 else 540 { 541 started = replicationServerDomain.startReplicationServer(this); 542 } 543 544 if (started) 545 { 546 // sendWindow MUST be created before starting the writer 547 sendWindow = new Semaphore(sendWindowSize); 548 549 writer = new ServerWriter(session, serverId, 550 this, replicationServerDomain); 551 reader = new ServerReader(session, serverId, 552 this, replicationServerDomain); 553 554 reader.start(); 555 writer.start(); 556 557 // Create a thread to send heartbeat messages. 558 if (heartbeatInterval > 0) 559 { 560 heartbeatThread = new HeartbeatThread( 561 "replication Heartbeat to " + serverURL + 562 " for " + this.baseDn, 563 session, heartbeatInterval/3); 564 heartbeatThread.start(); 565 } 566 567 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); 568 DirectoryServer.registerMonitorProvider(this); 569 } 570 else 571 { 572 // the connection is not valid, close it. 573 try 574 { 575 if (debugEnabled()) 576 { 577 TRACER.debugInfo("In " + 578 replicationServerDomain.getReplicationServer(). 579 getMonitorInstanceName() + " RS failed to start locally " + 580 " the connection from serverID="+serverId); 581 } 582 session.close(); 583 } catch (IOException e1) 584 { 585 // ignore 586 } 587 } 588 } 589 else 590 { 591 // For a hanshakeOnly connection, let's only create a reader 592 // in order to detect the connection closure. 593 reader = new ServerReader(session, serverId, 594 this, replicationServerDomain); 595 reader.start(); 596 } 597 } 598 catch (Exception e) 599 { 600 // some problem happened, reject the connection 601 MessageBuilder mb = new MessageBuilder(); 602 mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get( 603 this.getMonitorInstanceName())); 604 mb.append(stackTraceToSingleLineString(e)); 605 logError(mb.toMessage()); 606 try 607 { 608 session.close(); 609 } catch (IOException e1) 610 { 611 // ignore 612 } 613 } 614 } 615 616 /** 617 * get the Server Id. 618 * 619 * @return the ID of the server to which this object is linked 620 */ 621 public short getServerId() 622 { 623 return serverId; 624 } 625 626 /** 627 * Retrieves the Address URL for this server handler. 628 * 629 * @return The Address URL for this server handler, 630 * in the form of an IP address and port separated by a colon. 631 */ 632 public String getServerAddressURL() 633 { 634 return serverAddressURL; 635 } 636 637 /** 638 * Retrieves the URL for this server handler. 639 * 640 * @return The URL for this server handler, in the form of an address and 641 * port separated by a colon. 642 */ 643 public String getServerURL() 644 { 645 return serverURL; 646 } 647 648 /** 649 * Increase the counter of updates sent to the server. 650 */ 651 public void incrementOutCount() 652 { 653 outCount++; 654 } 655 656 /** 657 * Increase the counter of update received from the server. 658 */ 659 public void incrementInCount() 660 { 661 inCount++; 662 } 663 664 /** 665 * Get the count of updates received from the server. 666 * @return the count of update received from the server. 667 */ 668 public int getInCount() 669 { 670 return inCount; 671 } 672 673 /** 674 * Get the count of updates sent to this server. 675 * @return The count of update sent to this server. 676 */ 677 public int getOutCount() 678 { 679 return outCount; 680 } 681 682 /** 683 * Get the number of Ack received from the server managed by this handler. 684 * 685 * @return Returns the inAckCount. 686 */ 687 public int getInAckCount() 688 { 689 return inAckCount; 690 } 691 692 /** 693 * Get the number of Ack sent to the server managed by this handler. 694 * 695 * @return Returns the outAckCount. 696 */ 697 public int getOutAckCount() 698 { 699 return outAckCount; 700 } 701 702 /** 703 * Check is this server is saturated (this server has already been 704 * sent a bunch of updates and has not processed them so they are staying 705 * in the message queue for this server an the size of the queue 706 * for this server is above the configured limit. 707 * 708 * The limit can be defined in number of updates or with a maximum delay 709 * 710 * @param changeNumber The changenumber to use to make the delay calculations. 711 * @param sourceHandler The ServerHandler which is sending the update. 712 * @return true is saturated false if not saturated. 713 */ 714 public boolean isSaturated(ChangeNumber changeNumber, 715 ServerHandler sourceHandler) 716 { 717 synchronized (msgQueue) 718 { 719 int size = msgQueue.size(); 720 721 if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) 722 return true; 723 724 if ((sourceHandler.maxSendQueue > 0) && 725 (size >= sourceHandler.maxSendQueue)) 726 return true; 727 728 if (!msgQueue.isEmpty()) 729 { 730 UpdateMessage firstUpdate = msgQueue.first(); 731 732 if (firstUpdate != null) 733 { 734 long timeDiff = changeNumber.getTimeSec() - 735 firstUpdate.getChangeNumber().getTimeSec(); 736 737 if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) 738 return true; 739 740 if ((sourceHandler.maxSendDelay > 0) && 741 (timeDiff >= sourceHandler.maxSendDelay)) 742 return true; 743 } 744 } 745 return false; 746 } 747 } 748 749 /** 750 * Check that the size of the Server Handler messages Queue has lowered 751 * below the limit and therefore allowing the reception of messages 752 * from other servers to restart. 753 * @param source The ServerHandler which was sending the update. 754 * can be null. 755 * @return true if the processing can restart 756 */ 757 public boolean restartAfterSaturation(ServerHandler source) 758 { 759 synchronized (msgQueue) 760 { 761 int queueSize = msgQueue.size(); 762 if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) 763 return false; 764 if ((source != null) && (source.maxSendQueue > 0) && 765 (queueSize >= source.restartSendQueue)) 766 return false; 767 768 if (!msgQueue.isEmpty()) 769 { 770 UpdateMessage firstUpdate = msgQueue.first(); 771 UpdateMessage lastUpdate = msgQueue.last(); 772 773 if ((firstUpdate != null) && (lastUpdate != null)) 774 { 775 long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - 776 firstUpdate.getChangeNumber().getTimeSec(); 777 if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) 778 return false; 779 if ((source != null) && (source.maxSendDelay > 0) 780 && (timeDiff >= source.restartSendDelay)) 781 return false; 782 } 783 } 784 } 785 return true; 786 } 787 788 /** 789 * Check if the server associated to this ServerHandler is a replication 790 * server. 791 * @return true if the server associated to this ServerHandler is a 792 * replication server. 793 */ 794 public boolean isReplicationServer() 795 { 796 return (!serverIsLDAPserver); 797 } 798 799 /** 800 * Get the number of message in the receive message queue. 801 * @return Size of the receive message queue. 802 */ 803 public int getRcvMsgQueueSize() 804 { 805 synchronized (msgQueue) 806 { 807 /* 808 * When the server is up to date or close to be up to date, 809 * the number of updates to be sent is the size of the receive queue. 810 */ 811 if (isFollowing()) 812 return msgQueue.size(); 813 else 814 { 815 /* 816 * When the server is not able to follow, the msgQueue 817 * may become too large and therefore won't contain all the 818 * changes. Some changes may only be stored in the backing DB 819 * of the servers. 820 * The total size of teh receieve queue is calculated by doing 821 * the sum of the number of missing changes for every dbHandler. 822 */ 823 int totalCount = 0; 824 ServerState dbState = replicationServerDomain.getDbServerState(); 825 for (short id : dbState) 826 { 827 totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id), 828 serverState.getMaxChangeNumber(id)); 829 } 830 return totalCount; 831 } 832 } 833 } 834 835 /** 836 * Get an approximation of the delay by looking at the age of the oldest 837 * message that has not been sent to this server. 838 * This is an approximation because the age is calculated using the 839 * clock of the servee where the replicationServer is currently running 840 * while it should be calculated using the clock of the server 841 * that originally processed the change. 842 * 843 * The approximation error is therefore the time difference between 844 * 845 * @return the approximate delay for the connected server. 846 */ 847 public long getApproxDelay() 848 { 849 long olderUpdateTime = getOlderUpdateTime(); 850 if (olderUpdateTime == 0) 851 return 0; 852 853 long currentTime = TimeThread.getTime(); 854 return ((currentTime - olderUpdateTime)/1000); 855 } 856 857 /** 858 * Get the age of the older change that has not yet been replicated 859 * to the server handled by this ServerHandler. 860 * @return The age if the older change has not yet been replicated 861 * to the server handled by this ServerHandler. 862 */ 863 public Long getApproxFirstMissingDate() 864 { 865 Long result = (long)0; 866 867 // Get the older CN received 868 ChangeNumber olderUpdateCN = getOlderUpdateCN(); 869 if (olderUpdateCN != null) 870 { 871 // If not present in the local RS db, 872 // then approximate with the older update time 873 result=olderUpdateCN.getTime(); 874 } 875 return result; 876 } 877 878 /** 879 * Get the older update time for that server. 880 * @return The older update time. 881 */ 882 public long getOlderUpdateTime() 883 { 884 ChangeNumber olderUpdateCN = getOlderUpdateCN(); 885 if (olderUpdateCN == null) 886 return 0; 887 return olderUpdateCN.getTime(); 888 } 889 890 /** 891 * Get the older Change Number for that server. 892 * Returns null when the queue is empty. 893 * @return The older change number. 894 */ 895 public ChangeNumber getOlderUpdateCN() 896 { 897 ChangeNumber result = null; 898 synchronized (msgQueue) 899 { 900 if (isFollowing()) 901 { 902 if (msgQueue.isEmpty()) 903 { 904 result=null; 905 } 906 else 907 { 908 UpdateMessage msg = msgQueue.first(); 909 result = msg.getChangeNumber(); 910 } 911 } 912 else 913 { 914 if (lateQueue.isEmpty()) 915 { 916 // isFollowing is false AND lateQueue is empty 917 // We may be at the very moment when the writer has emptyed the 918 // lateQueue when it sent the last update. The writer will fill again 919 // the lateQueue when it will send the next update but we are not yet 920 // there. So let's take the last change not sent directly from 921 // the db. 922 923 ReplicationIteratorComparator comparator = 924 new ReplicationIteratorComparator(); 925 SortedSet<ReplicationIterator> iteratorSortedSet = 926 new TreeSet<ReplicationIterator>(comparator); 927 try 928 { 929 // Build a list of candidates iterator (i.e. db i.e. server) 930 for (short serverId : replicationServerDomain.getServers()) 931 { 932 // get the last already sent CN from that server 933 ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); 934 // get an iterator in this server db from that last change 935 ReplicationIterator iterator = 936 replicationServerDomain.getChangelogIterator(serverId, lastCsn); 937 // if that iterator has changes, then it is a candidate 938 // it is added in the sorted list at a position given by its 939 // current change (see ReplicationIteratorComparator). 940 if ((iterator != null) && (iterator.getChange() != null)) 941 { 942 iteratorSortedSet.add(iterator); 943 } 944 } 945 UpdateMessage msg = iteratorSortedSet.first().getChange(); 946 result = msg.getChangeNumber(); 947 } 948 catch(Exception e) 949 { 950 result=null; 951 } 952 finally 953 { 954 for (ReplicationIterator iterator : iteratorSortedSet) 955 { 956 iterator.releaseCursor(); 957 } 958 } 959 } 960 else 961 { 962 UpdateMessage msg = lateQueue.first(); 963 result = msg.getChangeNumber(); 964 } 965 } 966 } 967 return result; 968 } 969 970 /** 971 * Check if the LDAP server can follow the speed of the other servers. 972 * @return true when the server has all the not yet sent changes 973 * in its queue. 974 */ 975 public boolean isFollowing() 976 { 977 return following; 978 } 979 980 /** 981 * Set the following flag of this server. 982 * @param following the value that should be set. 983 */ 984 public void setFollowing(boolean following) 985 { 986 this.following = following; 987 } 988 989 /** 990 * Add an update the list of updates that must be sent to the server 991 * managed by this ServerHandler. 992 * 993 * @param update The update that must be added to the list of updates. 994 * @param sourceHandler The server that sent the update. 995 */ 996 public void add(UpdateMessage update, ServerHandler sourceHandler) 997 { 998 /* 999 * Ignore updates from a server that is degraded due to 1000 * its inconsistent generationId 1001 */ 1002 long referenceGenerationId = replicationServerDomain.getGenerationId(); 1003 if ((referenceGenerationId>0) && 1004 (referenceGenerationId != generationId)) 1005 { 1006 logError(ERR_IGNORING_UPDATE_TO.get( 1007 update.getDn(), 1008 this.getMonitorInstanceName())); 1009 1010 return; 1011 } 1012 1013 synchronized (msgQueue) 1014 { 1015 /* 1016 * If queue was empty the writer thread was probably asleep 1017 * waiting for some changes, wake it up 1018 */ 1019 if (msgQueue.isEmpty()) 1020 msgQueue.notify(); 1021 1022 msgQueue.add(update); 1023 1024 /* TODO : size should be configurable 1025 * and larger than max-receive-queue-size 1026 */ 1027 while (msgQueue.size() > maxQueueSize) 1028 { 1029 setFollowing(false); 1030 msgQueue.removeFirst(); 1031 } 1032 } 1033 1034 if (isSaturated(update.getChangeNumber(), sourceHandler)) 1035 { 1036 sourceHandler.setSaturated(true); 1037 } 1038 1039 } 1040 1041 private void setSaturated(boolean value) 1042 { 1043 flowControl = value; 1044 } 1045 1046 /** 1047 * Select the next update that must be sent to the server managed by this 1048 * ServerHandler. 1049 * 1050 * @return the next update that must be sent to the server managed by this 1051 * ServerHandler. 1052 */ 1053 public UpdateMessage take() 1054 { 1055 boolean interrupted = true; 1056 UpdateMessage msg = getnextMessage(); 1057 1058 /* 1059 * When we remove a message from the queue we need to check if another 1060 * server is waiting in flow control because this queue was too long. 1061 * This check might cause a performance penalty an therefore it 1062 * is not done for every message removed but only every few messages. 1063 */ 1064 if (++saturationCount > 10) 1065 { 1066 saturationCount = 0; 1067 try 1068 { 1069 replicationServerDomain.checkAllSaturation(); 1070 } 1071 catch (IOException e) 1072 { 1073 } 1074 } 1075 boolean acquired = false; 1076 do 1077 { 1078 try 1079 { 1080 acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS); 1081 interrupted = false; 1082 } catch (InterruptedException e) 1083 { 1084 // loop until not interrupted 1085 } 1086 } while (((interrupted) || (!acquired )) && (!shutdown)); 1087 this.incrementOutCount(); 1088 return msg; 1089 } 1090 1091 /** 1092 * Get the next update that must be sent to the server 1093 * from the message queue or from the database. 1094 * 1095 * @return The next update that must be sent to the server. 1096 */ 1097 private UpdateMessage getnextMessage() 1098 { 1099 UpdateMessage msg; 1100 while (active == true) 1101 { 1102 if (following == false) 1103 { 1104 /* this server is late with regard to some other masters 1105 * in the topology or just joined the topology. 1106 * In such cases, we can't keep all changes in the queue 1107 * without saturating the memory, we therefore use 1108 * a lateQueue that is filled with a few changes from the changelogDB 1109 * If this server is able to close the gap, it will start using again 1110 * the regular msgQueue later. 1111 */ 1112 if (lateQueue.isEmpty()) 1113 { 1114 /* 1115 * Start from the server State 1116 * Loop until the queue high mark or until no more changes 1117 * for each known LDAP master 1118 * get the next CSN after this last one : 1119 * - try to get next from the file 1120 * - if not found in the file 1121 * - try to get the next from the queue 1122 * select the smallest of changes 1123 * check if it is in the memory tree 1124 * yes : lock memory tree. 1125 * check all changes from the list, remove the ones that 1126 * are already sent 1127 * unlock memory tree 1128 * restart as usual 1129 * load this change on the delayList 1130 * 1131 */ 1132 ReplicationIteratorComparator comparator = 1133 new ReplicationIteratorComparator(); 1134 SortedSet<ReplicationIterator> iteratorSortedSet = 1135 new TreeSet<ReplicationIterator>(comparator); 1136 /* fill the lateQueue */ 1137 for (short serverId : replicationServerDomain.getServers()) 1138 { 1139 ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); 1140 ReplicationIterator iterator = 1141 replicationServerDomain.getChangelogIterator(serverId, lastCsn); 1142 if (iterator != null) 1143 { 1144 if (iterator.getChange() != null) 1145 { 1146 iteratorSortedSet.add(iterator); 1147 } 1148 else 1149 { 1150 iterator.releaseCursor(); 1151 } 1152 } 1153 } 1154 1155 // The loop below relies on the fact that it is sorted based 1156 // on the currentChange of each iterator to consider the next 1157 // change accross all servers. 1158 // Hence it is necessary to remove and eventual add again an iterator 1159 // when looping in order to keep consistent the order of the 1160 // iterators (see ReplicationIteratorComparator. 1161 while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) 1162 { 1163 ReplicationIterator iterator = iteratorSortedSet.first(); 1164 iteratorSortedSet.remove(iterator); 1165 lateQueue.add(iterator.getChange()); 1166 if (iterator.next()) 1167 iteratorSortedSet.add(iterator); 1168 else 1169 iterator.releaseCursor(); 1170 } 1171 for (ReplicationIterator iterator : iteratorSortedSet) 1172 { 1173 iterator.releaseCursor(); 1174 } 1175 /* 1176 * Check if the first change in the lateQueue is also on the regular 1177 * queue 1178 */ 1179 if (lateQueue.isEmpty()) 1180 { 1181 synchronized (msgQueue) 1182 { 1183 if (msgQueue.size() < maxQueueSize) 1184 { 1185 setFollowing(true); 1186 } 1187 } 1188 } 1189 else 1190 { 1191 msg = lateQueue.first(); 1192 synchronized (msgQueue) 1193 { 1194 if (msgQueue.contains(msg)) 1195 { 1196 /* we finally catched up with the regular queue */ 1197 setFollowing(true); 1198 lateQueue.clear(); 1199 UpdateMessage msg1; 1200 do 1201 { 1202 msg1 = msgQueue.removeFirst(); 1203 } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); 1204 this.updateServerState(msg); 1205 return msg; 1206 } 1207 } 1208 } 1209 } 1210 else 1211 { 1212 /* get the next change from the lateQueue */ 1213 msg = lateQueue.removeFirst(); 1214 this.updateServerState(msg); 1215 return msg; 1216 } 1217 } 1218 synchronized (msgQueue) 1219 { 1220 if (following == true) 1221 { 1222 try 1223 { 1224 while (msgQueue.isEmpty()) 1225 { 1226 msgQueue.wait(500); 1227 if (!active) 1228 return null; 1229 } 1230 } catch (InterruptedException e) 1231 { 1232 return null; 1233 } 1234 msg = msgQueue.removeFirst(); 1235 if (this.updateServerState(msg)) 1236 { 1237 /* 1238 * Only push the message if it has not yet been seen 1239 * by the other server. 1240 * Otherwise just loop to select the next message. 1241 */ 1242 return msg; 1243 } 1244 } 1245 } 1246 /* 1247 * Need to loop because following flag may have gone to false between 1248 * the first check at the beginning of this method 1249 * and the second check just above. 1250 */ 1251 } 1252 return null; 1253 } 1254 1255 /** 1256 * Update the serverState with the last message sent. 1257 * 1258 * @param msg the last update sent. 1259 * @return boolean indicating if the update was meaningfull. 1260 */ 1261 public boolean updateServerState(UpdateMessage msg) 1262 { 1263 return serverState.update(msg.getChangeNumber()); 1264 } 1265 1266 /** 1267 * Get the state of this server. 1268 * 1269 * @return ServerState the state for this server.. 1270 */ 1271 public ServerState getServerState() 1272 { 1273 return serverState; 1274 } 1275 1276 /** 1277 * Stop this server handler processing. 1278 */ 1279 public void stopHandler() 1280 { 1281 active = false; 1282 1283 // Stop the remote LSHandler 1284 for (LightweightServerHandler lsh : connectedServers.values()) 1285 { 1286 lsh.stopHandler(); 1287 } 1288 connectedServers.clear(); 1289 1290 try 1291 { 1292 session.close(); 1293 } catch (IOException e) 1294 { 1295 // ignore. 1296 } 1297 1298 synchronized (msgQueue) 1299 { 1300 /* wake up the writer thread on an empty queue so that it disappear */ 1301 msgQueue.clear(); 1302 msgQueue.notify(); 1303 msgQueue.notifyAll(); 1304 } 1305 1306 // Stop the heartbeat thread. 1307 if (heartbeatThread != null) 1308 { 1309 heartbeatThread.shutdown(); 1310 } 1311 1312 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); 1313 } 1314 1315 /** 1316 * Send the ack to the server that did the original modification. 1317 * 1318 * @param changeNumber The ChangeNumber of the update that is acked. 1319 * @throws IOException In case of Exception thrown sending the ack. 1320 */ 1321 public void sendAck(ChangeNumber changeNumber) throws IOException 1322 { 1323 AckMessage ack = new AckMessage(changeNumber); 1324 session.publish(ack); 1325 outAckCount++; 1326 } 1327 1328 /** 1329 * Do the work when an ack message has been received from another server. 1330 * 1331 * @param message The ack message that was received. 1332 * @param ackingServerId The id of the server that acked the change. 1333 */ 1334 public void ack(AckMessage message, short ackingServerId) 1335 { 1336 ChangeNumber changeNumber = message.getChangeNumber(); 1337 AckMessageList ackList; 1338 boolean completedFlag; 1339 synchronized (waitingAcks) 1340 { 1341 ackList = waitingAcks.get(changeNumber); 1342 if (ackList == null) 1343 return; 1344 ackList.addAck(ackingServerId); 1345 completedFlag = ackList.completed(); 1346 if (completedFlag) 1347 { 1348 waitingAcks.remove(changeNumber); 1349 } 1350 } 1351 if (completedFlag) 1352 { 1353 replicationServerDomain.sendAck(changeNumber, true); 1354 } 1355 } 1356 1357 /** 1358 * Process reception of an for an update that was received from a 1359 * ReplicationServer. 1360 * 1361 * @param message the ack message that was received. 1362 * @param ackingServerId The id of the server that acked the change. 1363 */ 1364 public static void ackChangelog(AckMessage message, short ackingServerId) 1365 { 1366 ChangeNumber changeNumber = message.getChangeNumber(); 1367 ReplServerAckMessageList ackList; 1368 boolean completedFlag; 1369 synchronized (changelogsWaitingAcks) 1370 { 1371 ackList = changelogsWaitingAcks.get(changeNumber); 1372 if (ackList == null) 1373 return; 1374 ackList.addAck(ackingServerId); 1375 completedFlag = ackList.completed(); 1376 if (completedFlag) 1377 { 1378 changelogsWaitingAcks.remove(changeNumber); 1379 } 1380 } 1381 if (completedFlag) 1382 { 1383 ReplicationServerDomain replicationServerDomain = 1384 ackList.getChangelogCache(); 1385 replicationServerDomain.sendAck(changeNumber, false, 1386 ackList.getReplicationServerId()); 1387 } 1388 } 1389 1390 /** 1391 * Add an update to the list of update waiting for acks. 1392 * 1393 * @param update the update that must be added to the list 1394 * @param nbWaitedAck The number of ack that must be received before 1395 * the update is fully acked. 1396 */ 1397 public void addWaitingAck(UpdateMessage update, int nbWaitedAck) 1398 { 1399 AckMessageList ackList = new AckMessageList(update.getChangeNumber(), 1400 nbWaitedAck); 1401 synchronized(waitingAcks) 1402 { 1403 waitingAcks.put(update.getChangeNumber(), ackList); 1404 } 1405 } 1406 1407 /** 1408 * Add an update to the list of update received from a replicationServer and 1409 * waiting for acks. 1410 * 1411 * @param update The update that must be added to the list. 1412 * @param ChangelogServerId The identifier of the replicationServer that sent 1413 * the update. 1414 * @param replicationServerDomain The ReplicationServerDomain from which the 1415 * change was processed and to which the ack 1416 * must later be sent. 1417 * @param nbWaitedAck The number of ack that must be received before 1418 * the update is fully acked. 1419 */ 1420 public static void addWaitingAck( 1421 UpdateMessage update, 1422 short ChangelogServerId, ReplicationServerDomain replicationServerDomain, 1423 int nbWaitedAck) 1424 { 1425 ReplServerAckMessageList ackList = 1426 new ReplServerAckMessageList(update.getChangeNumber(), 1427 nbWaitedAck, 1428 ChangelogServerId, 1429 replicationServerDomain); 1430 synchronized(changelogsWaitingAcks) 1431 { 1432 changelogsWaitingAcks.put(update.getChangeNumber(), ackList); 1433 } 1434 } 1435 1436 /** 1437 * Get the size of the list of update waiting for acks. 1438 * 1439 * @return the size of the list of update waiting for acks. 1440 */ 1441 public int getWaitingAckSize() 1442 { 1443 synchronized (waitingAcks) 1444 { 1445 return waitingAcks.size(); 1446 } 1447 } 1448 1449 /** 1450 * Increment the count of Acks received from this server. 1451 */ 1452 public void incrementInAckCount() 1453 { 1454 inAckCount++; 1455 } 1456 1457 /** 1458 * Check type of server handled. 1459 * 1460 * @return true if the handled server is an LDAP server. 1461 * false if the handled server is a replicationServer 1462 */ 1463 public boolean isLDAPserver() 1464 { 1465 return serverIsLDAPserver; 1466 } 1467 1468 /** 1469 * {@inheritDoc} 1470 */ 1471 @Override 1472 public void initializeMonitorProvider(MonitorProviderCfg configuration) 1473 throws ConfigException,InitializationException 1474 { 1475 // Nothing to do for now 1476 } 1477 1478 /** 1479 * Retrieves the name of this monitor provider. It should be unique among all 1480 * monitor providers, including all instances of the same monitor provider. 1481 * 1482 * @return The name of this monitor provider. 1483 */ 1484 @Override 1485 public String getMonitorInstanceName() 1486 { 1487 String str = baseDn.toString() + 1488 " " + serverURL + " " + String.valueOf(serverId); 1489 1490 if (serverIsLDAPserver) 1491 return "Direct LDAP Server " + str; 1492 else 1493 return "Remote Repl Server " + str; 1494 } 1495 1496 /** 1497 * Retrieves the length of time in milliseconds that should elapse between 1498 * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero 1499 * return value indicates that the <CODE>updateMonitorData()</CODE> method 1500 * should not be periodically invoked. 1501 * 1502 * @return The length of time in milliseconds that should elapse between 1503 * calls to the <CODE>updateMonitorData()</CODE> method. 1504 */ 1505 @Override 1506 public long getUpdateInterval() 1507 { 1508 /* we don't wont to do polling on this monitor */ 1509 return 0; 1510 } 1511 1512 /** 1513 * Performs any processing periodic processing that may be desired to update 1514 * the information associated with this monitor. Note that best-effort 1515 * attempts will be made to ensure that calls to this method come 1516 * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will 1517 * be made. 1518 */ 1519 @Override 1520 public void updateMonitorData() 1521 { 1522 // As long as getUpdateInterval() returns 0, this will never get called 1523 1524 } 1525 1526 /** 1527 * Retrieves a set of attributes containing monitor data that should be 1528 * returned to the client if the corresponding monitor entry is requested. 1529 * 1530 * @return A set of attributes containing monitor data that should be 1531 * returned to the client if the corresponding monitor entry is 1532 * requested. 1533 */ 1534 @Override 1535 public ArrayList<Attribute> getMonitorData() 1536 { 1537 ArrayList<Attribute> attributes = new ArrayList<Attribute>(); 1538 if (serverIsLDAPserver) 1539 { 1540 attributes.add(new Attribute("LDAP-Server", serverURL)); 1541 attributes.add(new Attribute("connected-to", this.replicationServerDomain. 1542 getReplicationServer().getMonitorInstanceName())); 1543 1544 } 1545 else 1546 { 1547 attributes.add(new Attribute("ReplicationServer-Server", serverURL)); 1548 } 1549 attributes.add(new Attribute("server-id", 1550 String.valueOf(serverId))); 1551 attributes.add(new Attribute("base-dn", 1552 baseDn.toString())); 1553 1554 if (serverIsLDAPserver) 1555 { 1556 MonitorData md; 1557 try 1558 { 1559 md = replicationServerDomain.getMonitorData(); 1560 1561 // Oldest missing update 1562 Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); 1563 if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0)) 1564 { 1565 Date date = new Date(approxFirstMissingDate); 1566 attributes.add(new Attribute("approx-older-change-not-synchronized", 1567 date.toString())); 1568 attributes.add( 1569 new Attribute("approx-older-change-not-synchronized-millis", 1570 String.valueOf(approxFirstMissingDate))); 1571 } 1572 1573 // Missing changes 1574 long missingChanges = md.getMissingChanges(serverId); 1575 attributes.add(new Attribute("missing-changes", 1576 String.valueOf(missingChanges))); 1577 1578 // Replication delay 1579 long delay = md.getApproxDelay(serverId); 1580 attributes.add(new Attribute("approximate-delay", 1581 String.valueOf(delay))); 1582 } 1583 catch(Exception e) 1584 { 1585 // TODO: improve the log 1586 // We failed retrieving the remote monitor data. 1587 attributes.add(new Attribute("error", 1588 stackTraceToSingleLineString(e))); 1589 } 1590 } 1591 1592 // Deprecated 1593 attributes.add(new Attribute("max-waiting-changes", 1594 String.valueOf(maxQueueSize))); 1595 attributes.add(new Attribute("update-sent", 1596 String.valueOf(getOutCount()))); 1597 attributes.add(new Attribute("update-received", 1598 String.valueOf(getInCount()))); 1599 1600 // Deprecated as long as assured is not exposed 1601 attributes.add(new Attribute("update-waiting-acks", 1602 String.valueOf(getWaitingAckSize()))); 1603 attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount()))); 1604 attributes.add(new Attribute("ack-received", 1605 String.valueOf(getInAckCount()))); 1606 1607 // Window stats 1608 attributes.add(new Attribute("max-send-window", 1609 String.valueOf(sendWindowSize))); 1610 attributes.add(new Attribute("current-send-window", 1611 String.valueOf(sendWindow.availablePermits()))); 1612 attributes.add(new Attribute("max-rcv-window", 1613 String.valueOf(maxRcvWindow))); 1614 attributes.add(new Attribute("current-rcv-window", 1615 String.valueOf(rcvWindow))); 1616 1617 /* 1618 * FIXME:PGB DEPRECATED 1619 * 1620 // Missing changes 1621 attributes.add(new Attribute("waiting-changes", 1622 String.valueOf(getRcvMsgQueueSize()))); 1623 // Age of oldest missing change 1624 1625 // Date of the oldest missing change 1626 long olderUpdateTime = getOlderUpdateTime(); 1627 if (olderUpdateTime != 0) 1628 { 1629 Date date = new Date(getOlderUpdateTime()); 1630 attributes.add(new Attribute("older-change-not-synchronized", 1631 String.valueOf(date.toString()))); 1632 } 1633 */ 1634 1635 /* get the Server State */ 1636 final String ATTR_SERVER_STATE = "server-state"; 1637 AttributeType type = 1638 DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); 1639 LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); 1640 for (String str : serverState.toStringSet()) 1641 { 1642 values.add(new AttributeValue(type,str)); 1643 } 1644 Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); 1645 attributes.add(attr); 1646 1647 // Encryption 1648 attributes.add(new Attribute("ssl-encryption", 1649 String.valueOf(session.isEncrypted()))); 1650 1651 // Data generation 1652 attributes.add(new Attribute("generation-id", 1653 String.valueOf(generationId))); 1654 1655 return attributes; 1656 } 1657 1658 /** 1659 * Shutdown This ServerHandler. 1660 */ 1661 public void shutdown() 1662 { 1663 shutdown = true; 1664 try 1665 { 1666 session.close(); 1667 } catch (IOException e) 1668 { 1669 // Service is closing. 1670 } 1671 1672 stopHandler(); 1673 1674 try 1675 { 1676 if (writer != null) { 1677 writer.join(SHUTDOWN_JOIN_TIMEOUT); 1678 } 1679 if (reader != null) { 1680 reader.join(SHUTDOWN_JOIN_TIMEOUT); 1681 } 1682 } catch (InterruptedException e) 1683 { 1684 // don't try anymore to join and return. 1685 } 1686 } 1687 1688 /** 1689 * {@inheritDoc} 1690 */ 1691 @Override 1692 public String toString() 1693 { 1694 String localString; 1695 if (serverId != 0) 1696 { 1697 if (serverIsLDAPserver) 1698 localString = "Directory Server "; 1699 else 1700 localString = "Replication Server "; 1701 1702 1703 localString += serverId + " " + serverURL + " " + baseDn; 1704 } 1705 else 1706 localString = "Unknown server"; 1707 1708 return localString; 1709 } 1710 1711 /** 1712 * Decrement the protocol window, then check if it is necessary 1713 * to send a WindowMessage and send it. 1714 * 1715 * @throws IOException when the session becomes unavailable. 1716 */ 1717 public synchronized void decAndCheckWindow() throws IOException 1718 { 1719 rcvWindow--; 1720 checkWindow(); 1721 } 1722 1723 /** 1724 * Check the protocol window and send WindowMessage if necessary. 1725 * 1726 * @throws IOException when the session becomes unavailable. 1727 */ 1728 public synchronized void checkWindow() throws IOException 1729 { 1730 if (rcvWindow < rcvWindowSizeHalf) 1731 { 1732 if (flowControl) 1733 { 1734 if (replicationServerDomain.restartAfterSaturation(this)) 1735 { 1736 flowControl = false; 1737 } 1738 } 1739 if (!flowControl) 1740 { 1741 WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); 1742 session.publish(msg); 1743 outAckCount++; 1744 rcvWindow += rcvWindowSizeHalf; 1745 } 1746 } 1747 } 1748 1749 /** 1750 * Update the send window size based on the credit specified in the 1751 * given window message. 1752 * 1753 * @param windowMsg The Window Message containing the information 1754 * necessary for updating the window size. 1755 */ 1756 public void updateWindow(WindowMessage windowMsg) 1757 { 1758 sendWindow.release(windowMsg.getNumAck()); 1759 } 1760 1761 /** 1762 * Get our heartbeat interval. 1763 * @return Our heartbeat interval. 1764 */ 1765 public long getHeartbeatInterval() 1766 { 1767 return heartbeatInterval; 1768 } 1769 1770 /** 1771 * Processes a routable message. 1772 * 1773 * @param msg The message to be processed. 1774 */ 1775 public void process(RoutableMessage msg) 1776 { 1777 if (debugEnabled()) 1778 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). 1779 getMonitorInstanceName() + 1780 " SH for remote server " + this.getMonitorInstanceName() + 1781 " processes received msg=" + msg); 1782 replicationServerDomain.process(msg, this); 1783 } 1784 1785 /** 1786 * Sends the provided ReplServerInfoMessage. 1787 * 1788 * @param info The ReplServerInfoMessage message to be sent. 1789 * @throws IOException When it occurs while sending the message, 1790 * 1791 */ 1792 public void sendInfo(ReplServerInfoMessage info) 1793 throws IOException 1794 { 1795 if (debugEnabled()) 1796 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). 1797 getMonitorInstanceName() + 1798 " SH for remote server " + this.getMonitorInstanceName() + 1799 " sends message=" + info); 1800 1801 session.publish(info); 1802 } 1803 1804 /** 1805 * 1806 * Sets the replication server from the message provided. 1807 * 1808 * @param infoMsg The information message. 1809 */ 1810 public void receiveReplServerInfo(ReplServerInfoMessage infoMsg) 1811 { 1812 if (debugEnabled()) 1813 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). 1814 getMonitorInstanceName() + 1815 " SH for remote server " + this.getMonitorInstanceName() + 1816 " sets replServerInfo " + "<" + infoMsg + ">"); 1817 1818 List<String> newRemoteLDAPservers = infoMsg.getConnectedServers(); 1819 generationId = infoMsg.getGenerationId(); 1820 1821 synchronized(connectedServers) 1822 { 1823 // Removes the existing structures 1824 for (LightweightServerHandler lsh : connectedServers.values()) 1825 { 1826 lsh.stopHandler(); 1827 } 1828 connectedServers.clear(); 1829 1830 // Creates the new structure according to the message received. 1831 for (String newConnectedServer : newRemoteLDAPservers) 1832 { 1833 LightweightServerHandler lsh 1834 = new LightweightServerHandler(newConnectedServer, this); 1835 lsh.startHandler(); 1836 connectedServers.put(lsh.getServerId(), lsh); 1837 } 1838 } 1839 } 1840 1841 /** 1842 * When this handler is connected to a replication server, specifies if 1843 * a wanted server is connected to this replication server. 1844 * 1845 * @param wantedServer The server we want to know if it is connected 1846 * to the replication server represented by this handler. 1847 * @return boolean True is the wanted server is connected to the server 1848 * represented by this handler. 1849 */ 1850 public boolean isRemoteLDAPServer(short wantedServer) 1851 { 1852 synchronized(connectedServers) 1853 { 1854 for (LightweightServerHandler server : connectedServers.values()) 1855 { 1856 if (wantedServer == server.getServerId()) 1857 { 1858 return true; 1859 } 1860 } 1861 return false; 1862 } 1863 } 1864 1865 /** 1866 * When the handler is connected to a replication server, specifies the 1867 * replication server has remote LDAP servers connected to it. 1868 * 1869 * @return boolean True is the replication server has remote LDAP servers 1870 * connected to it. 1871 */ 1872 public boolean hasRemoteLDAPServers() 1873 { 1874 return !connectedServers.isEmpty(); 1875 } 1876 1877 /** 1878 * Send an InitializeRequestMessage to the server connected through this 1879 * handler. 1880 * 1881 * @param msg The message to be processed 1882 * @throws IOException when raised by the underlying session 1883 */ 1884 public void send(RoutableMessage msg) throws IOException 1885 { 1886 if (debugEnabled()) 1887 TRACER.debugInfo("In " + 1888 replicationServerDomain.getReplicationServer(). 1889 getMonitorInstanceName() + 1890 " SH for remote server " + this.getMonitorInstanceName() + 1891 " sends message=" + msg); 1892 session.publish(msg); 1893 } 1894 1895 /** 1896 * Send an ErrorMessage to the peer. 1897 * 1898 * @param errorMsg The message to be sent 1899 * @throws IOException when raised by the underlying session 1900 */ 1901 public void sendError(ErrorMessage errorMsg) throws IOException 1902 { 1903 session.publish(errorMsg); 1904 } 1905 1906 /** 1907 * Process the reception of a WindowProbe message. 1908 * 1909 * @param windowProbeMsg The message to process. 1910 * 1911 * @throws IOException When the session becomes unavailable. 1912 */ 1913 public void process(WindowProbe windowProbeMsg) throws IOException 1914 { 1915 if (rcvWindow > 0) 1916 { 1917 // The LDAP server believes that its window is closed 1918 // while it is not, this means that some problem happened in the 1919 // window exchange procedure ! 1920 // lets update the LDAP server with out current window size and hope 1921 // that everything will work better in the futur. 1922 // TODO also log an error message. 1923 WindowMessage msg = new WindowMessage(rcvWindow); 1924 session.publish(msg); 1925 outAckCount++; 1926 } 1927 else 1928 { 1929 // Both the LDAP server and the replication server believes that the 1930 // window is closed. Lets check the flowcontrol in case we 1931 // can now resume operations and send a windowMessage if necessary. 1932 checkWindow(); 1933 } 1934 } 1935 1936 /** 1937 * Returns the value of generationId for that handler. 1938 * @return The value of the generationId. 1939 */ 1940 public long getGenerationId() 1941 { 1942 return generationId; 1943 } 1944 1945 /** 1946 * Resets the generationId for this domain. 1947 */ 1948 public void warnBadGenerationId() 1949 { 1950 // Notify the peer that it is now invalid regarding the generationId 1951 // We are now waiting a startServer message from this server with 1952 // a valid generationId. 1953 try 1954 { 1955 Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString()); 1956 ErrorMessage errorMsg = 1957 new ErrorMessage(serverId, replicationServerId, message); 1958 session.publish(errorMsg); 1959 } 1960 catch (Exception e) 1961 { 1962 // FIXME Log exception when sending reset error message 1963 } 1964 } 1965 1966 /** 1967 * Sends a message containing a generationId to a peer server. 1968 * The peer is expected to be a replication server. 1969 * 1970 * @param msg The GenerationIdMessage message to be sent. 1971 * @throws IOException When it occurs while sending the message, 1972 * 1973 */ 1974 public void forwardGenerationIdToRS(ResetGenerationId msg) 1975 throws IOException 1976 { 1977 session.publish(msg); 1978 } 1979 1980 /** 1981 * Set a new generation ID. 1982 * 1983 * @param generationId The new generation ID 1984 * 1985 */ 1986 public void setGenerationId(long generationId) 1987 { 1988 this.generationId = generationId; 1989 } 1990 1991 /** 1992 * Returns the Replication Server Domain to which belongs this server handler. 1993 * 1994 * @return The replication server domain. 1995 */ 1996 public ReplicationServerDomain getDomain() 1997 { 1998 return this.replicationServerDomain; 1999 } 2000 2001 /** 2002 * Return a Set containing the servers known by this replicationServer. 2003 * @return a set containing the servers known by this replicationServer. 2004 */ 2005 public Set<Short> getConnectedServerIds() 2006 { 2007 return connectedServers.keySet(); 2008 } 2009 }