001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 029 import org.opends.messages.*; 030 031 import static org.opends.server.loggers.ErrorLogger.logError; 032 import static org.opends.server.loggers.debug.DebugLogger.*; 033 import org.opends.server.loggers.debug.DebugTracer; 034 import static org.opends.messages.ReplicationMessages.*; 035 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 036 037 import java.io.IOException; 038 import java.net.ConnectException; 039 import java.net.InetAddress; 040 import java.net.InetSocketAddress; 041 import java.net.Socket; 042 import java.net.SocketException; 043 import java.net.SocketTimeoutException; 044 import java.util.Collection; 045 import java.util.HashMap; 046 import java.util.Iterator; 047 import java.util.LinkedHashSet; 048 import java.util.TreeSet; 049 import java.util.concurrent.Semaphore; 050 import java.util.concurrent.TimeUnit; 051 052 import org.opends.server.protocols.asn1.ASN1OctetString; 053 import org.opends.server.protocols.internal.InternalClientConnection; 054 import org.opends.server.protocols.internal.InternalSearchListener; 055 import org.opends.server.protocols.internal.InternalSearchOperation; 056 import org.opends.server.protocols.ldap.LDAPFilter; 057 import org.opends.server.replication.common.ChangeNumber; 058 import org.opends.server.replication.common.ServerState; 059 import org.opends.server.replication.protocol.*; 060 import org.opends.server.types.DN; 061 import org.opends.server.types.DereferencePolicy; 062 import org.opends.server.types.ResultCode; 063 import org.opends.server.types.SearchResultEntry; 064 import org.opends.server.types.SearchResultReference; 065 import org.opends.server.types.SearchScope; 066 067 /** 068 * The broker for Multi-master Replication. 069 */ 070 public class ReplicationBroker implements InternalSearchListener 071 { 072 073 /** 074 * The tracer object for the debug logger. 075 */ 076 private static final DebugTracer TRACER = getTracer(); 077 private boolean shutdown = false; 078 private Collection<String> servers; 079 private boolean connected = false; 080 private String replicationServer = "Not connected"; 081 private TreeSet<FakeOperation> replayOperations; 082 private ProtocolSession session = null; 083 private final ServerState state; 084 private final DN baseDn; 085 private final short serverID; 086 private int maxSendDelay; 087 private int maxReceiveDelay; 088 private int maxSendQueue; 089 private int maxReceiveQueue; 090 private Semaphore sendWindow; 091 private int maxSendWindow; 092 private int rcvWindow; 093 private int halfRcvWindow; 094 private int maxRcvWindow; 095 private int timeout = 0; 096 private short protocolVersion; 097 private long generationId = -1; 098 private ReplSessionSecurity replSessionSecurity; 099 100 // Trick for avoiding a inner class for many parameters return for 101 // performHandshake method. 102 private String tmpReadableServerName = null; 103 /** 104 * The time in milliseconds between heartbeats from the replication 105 * server. Zero means heartbeats are off. 106 */ 107 private long heartbeatInterval = 0; 108 /** 109 * A thread to monitor heartbeats on the session. 110 */ 111 private HeartbeatMonitor heartbeatMonitor = null; 112 /** 113 * The number of times the connection was lost. 114 */ 115 private int numLostConnections = 0; 116 /** 117 * When the broker cannot connect to any replication server 118 * it log an error and keeps continuing every second. 119 * This boolean is set when the first failure happens and is used 120 * to avoid repeating the error message for further failure to connect 121 * and to know that it is necessary to print a new message when the broker 122 * finally succeed to connect. 123 */ 124 private boolean connectionError = false; 125 private final Object connectPhaseLock = new Object(); 126 127 /** 128 * Creates a new ReplicationServer Broker for a particular ReplicationDomain. 129 * 130 * @param state The ServerState that should be used by this broker 131 * when negociating the session with the replicationServer. 132 * @param baseDn The base DN that should be used by this broker 133 * when negociating the session with the replicationServer. 134 * @param serverID The server ID that should be used by this broker 135 * when negociating the session with the replicationServer. 136 * @param maxReceiveQueue The maximum size of the receive queue to use on 137 * the replicationServer. 138 * @param maxReceiveDelay The maximum replication delay to use on the 139 * replicationServer. 140 * @param maxSendQueue The maximum size of the send queue to use on 141 * the replicationServer. 142 * @param maxSendDelay The maximum send delay to use on the replicationServer. 143 * @param window The size of the send and receive window to use. 144 * @param heartbeatInterval The interval between heartbeats requested of the 145 * replicationServer, or zero if no heartbeats are requested. 146 * 147 * @param generationId The generationId for the server associated to the 148 * provided serverID and for the domain associated to the provided baseDN. 149 * @param replSessionSecurity The session security configuration. 150 */ 151 public ReplicationBroker(ServerState state, DN baseDn, short serverID, 152 int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, 153 int maxSendDelay, int window, long heartbeatInterval, 154 long generationId, ReplSessionSecurity replSessionSecurity) 155 { 156 this.baseDn = baseDn; 157 this.serverID = serverID; 158 this.maxReceiveDelay = maxReceiveDelay; 159 this.maxSendDelay = maxSendDelay; 160 this.maxReceiveQueue = maxReceiveQueue; 161 this.maxSendQueue = maxSendQueue; 162 this.state = state; 163 replayOperations = 164 new TreeSet<FakeOperation>(new FakeOperationComparator()); 165 this.rcvWindow = window; 166 this.maxRcvWindow = window; 167 this.halfRcvWindow = window / 2; 168 this.heartbeatInterval = heartbeatInterval; 169 this.protocolVersion = ProtocolVersion.currentVersion(); 170 this.generationId = generationId; 171 this.replSessionSecurity = replSessionSecurity; 172 } 173 174 /** 175 * Start the ReplicationBroker. 176 * 177 * @param servers list of servers used 178 */ 179 public void start(Collection<String> servers) 180 { 181 /* 182 * Open Socket to the ReplicationServer 183 * Send the Start message 184 */ 185 shutdown = false; 186 this.servers = servers; 187 if (servers.size() < 1) 188 { 189 Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); 190 logError(message); 191 } 192 193 this.rcvWindow = this.maxRcvWindow; 194 this.connect(); 195 } 196 197 /** 198 * Connect to a ReplicationServer. 199 * 200 * @throws NumberFormatException address was invalid 201 */ 202 private void connect() 203 { 204 HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); 205 206 // Stop any existing heartbeat monitor from a previous session. 207 stopHeartBeat(); 208 209 synchronized (connectPhaseLock) 210 { 211 /* 212 * Connect to each replication server and get their ServerState then find 213 * out which one is the best to connect to. 214 */ 215 for (String server : servers) 216 { 217 // Connect to server and get reply message 218 ReplServerStartMessage replServerStartMsg = 219 performHandshake(server, false); 220 tmpReadableServerName = null; // Not needed now 221 222 // Store reply message in list 223 if (replServerStartMsg != null) 224 { 225 ServerState rsState = replServerStartMsg.getServerState(); 226 rsStates.put(server, rsState); 227 } 228 } // for servers 229 230 ReplServerStartMessage replServerStartMsg = null; 231 232 if (rsStates.size() > 0) 233 { 234 235 // At least one server answered, find the best one. 236 String bestServer = computeBestReplicationServer(state, rsStates, 237 serverID, baseDn); 238 239 // Best found, now connect to this one 240 replServerStartMsg = performHandshake(bestServer, true); 241 242 if (replServerStartMsg != null) 243 { 244 try 245 { 246 /* 247 * We must not publish changes to a replicationServer that has not 248 * seen all our previous changes because this could cause some 249 * other ldap servers to miss those changes. 250 * Check that the ReplicationServer has seen all our previous 251 * changes. 252 */ 253 ChangeNumber replServerMaxChangeNumber = 254 replServerStartMsg.getServerState().getMaxChangeNumber(serverID); 255 256 if (replServerMaxChangeNumber == null) 257 { 258 replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); 259 } 260 ChangeNumber ourMaxChangeNumber = 261 state.getMaxChangeNumber(serverID); 262 263 if ((ourMaxChangeNumber != null) && 264 (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) 265 { 266 267 // Replication server is missing some of our changes: let's send 268 // them to him. 269 replayOperations.clear(); 270 271 Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); 272 logError(message); 273 274 /* 275 * Get all the changes that have not been seen by this 276 * replication server and populate the replayOperations 277 * list. 278 */ 279 InternalSearchOperation op = searchForChangedEntries( 280 baseDn, replServerMaxChangeNumber, this); 281 if (op.getResultCode() != ResultCode.SUCCESS) 282 { 283 /* 284 * An error happened trying to search for the updates 285 * This server will start acepting again new updates but 286 * some inconsistencies will stay between servers. 287 * Log an error for the repair tool 288 * that will need to resynchronize the servers. 289 */ 290 message = ERR_CANNOT_RECOVER_CHANGES.get( 291 baseDn.toNormalizedString()); 292 logError(message); 293 } else 294 { 295 for (FakeOperation replayOp : replayOperations) 296 { 297 message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber(). 298 toString()); 299 logError(message); 300 session.publish(replayOp.generateMessage()); 301 } 302 message = DEBUG_CHANGES_SENT.get(); 303 logError(message); 304 } 305 } 306 307 replicationServer = tmpReadableServerName; 308 maxSendWindow = replServerStartMsg.getWindowSize(); 309 connected = true; 310 startHeartBeat(); 311 } catch (IOException e) 312 { 313 Message message = ERR_PUBLISHING_FAKE_OPS.get( 314 baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + 315 stackTraceToSingleLineString(e)); 316 logError(message); 317 } catch (Exception e) 318 { 319 Message message = ERR_COMPUTING_FAKE_OPS.get( 320 baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + 321 stackTraceToSingleLineString(e)); 322 logError(message); 323 } finally 324 { 325 if (connected == false) 326 { 327 if (session != null) 328 { 329 try 330 { 331 session.close(); 332 } catch (IOException e) 333 { 334 // The session was already closed, just ignore. 335 } 336 session = null; 337 } 338 } 339 } 340 } // Could perform handshake with best 341 } // Reached some servers 342 343 if (connected) 344 { 345 // Log a message to let the administrator know that the failure was 346 // resolved. 347 // Wakeup all the thread that were waiting on the window 348 // on the previous connection. 349 connectionError = false; 350 if (sendWindow != null) 351 { 352 sendWindow.release(Integer.MAX_VALUE); 353 } 354 this.sendWindow = new Semaphore(maxSendWindow); 355 connectPhaseLock.notify(); 356 357 if ((replServerStartMsg.getGenerationId() == this.generationId) || 358 (replServerStartMsg.getGenerationId() == -1)) 359 { 360 Message message = 361 NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( 362 baseDn.toString(), 363 replicationServer, 364 Long.toString(this.generationId)); 365 logError(message); 366 } else 367 { 368 Message message = 369 NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( 370 baseDn.toString(), 371 replicationServer, 372 Long.toString(this.generationId), 373 Long.toString(replServerStartMsg.getGenerationId())); 374 logError(message); 375 } 376 } else 377 { 378 /* 379 * This server could not find any replicationServer. It's going to start 380 * in degraded mode. Log a message. 381 */ 382 if (!connectionError) 383 { 384 connectionError = true; 385 connectPhaseLock.notify(); 386 Message message = 387 NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); 388 logError(message); 389 } 390 } 391 } 392 } 393 394 /** 395 * Connect to the provided server performing the handshake (start messages 396 * exchange) and return the reply message from the replication server. 397 * 398 * @param server Server to connect to. 399 * @param keepConnection Do we keep session opened or not after handshake. 400 * @return The ReplServerStartMessage the server replied. Null if could not 401 * get an answer. 402 */ 403 public ReplServerStartMessage performHandshake(String server, 404 boolean keepConnection) 405 { 406 ReplServerStartMessage replServerStartMsg = null; 407 408 // Parse server string. 409 int separator = server.lastIndexOf(':'); 410 String port = server.substring(separator + 1); 411 String hostname = server.substring(0, separator); 412 413 boolean error = false; 414 try 415 { 416 /* 417 * Open a socket connection to the next candidate. 418 */ 419 int intPort = Integer.parseInt(port); 420 InetSocketAddress serverAddr = new InetSocketAddress( 421 InetAddress.getByName(hostname), intPort); 422 tmpReadableServerName = serverAddr.toString(); 423 Socket socket = new Socket(); 424 socket.setReceiveBufferSize(1000000); 425 socket.setTcpNoDelay(true); 426 socket.connect(serverAddr, 500); 427 session = replSessionSecurity.createClientSession(server, socket); 428 boolean isSslEncryption = 429 replSessionSecurity.isSslEncryption(server); 430 /* 431 * Send our ServerStartMessage. 432 */ 433 ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, 434 maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, 435 halfRcvWindow * 2, heartbeatInterval, state, 436 protocolVersion, generationId, isSslEncryption, !keepConnection); 437 session.publish(msg); 438 439 /* 440 * Read the ReplServerStartMessage that should come back. 441 */ 442 session.setSoTimeout(1000); 443 replServerStartMsg = (ReplServerStartMessage) session.receive(); 444 445 /* 446 * We have sent our own protocol version to the replication server. 447 * The replication server will use the same one (or an older one 448 * if it is an old replication server). 449 */ 450 protocolVersion = ProtocolVersion.minWithCurrent( 451 replServerStartMsg.getVersion()); 452 session.setSoTimeout(timeout); 453 454 if (!isSslEncryption) 455 { 456 session.stopEncryption(); 457 } 458 } catch (ConnectException e) 459 { 460 /* 461 * There was no server waiting on this host:port 462 * Log a notice and try the next replicationServer in the list 463 */ 464 if (!connectionError) 465 { 466 // the error message is only logged once to avoid overflowing 467 // the error log 468 Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); 469 logError(message); 470 } 471 error = true; 472 } catch (Exception e) 473 { 474 Message message = ERR_EXCEPTION_STARTING_SESSION.get( 475 baseDn.toNormalizedString(), server, e.getLocalizedMessage() + 476 stackTraceToSingleLineString(e)); 477 logError(message); 478 error = true; 479 } 480 481 // Close session if requested 482 if (!keepConnection || error) 483 { 484 if (session != null) 485 { 486 try 487 { 488 session.close(); 489 } catch (IOException e) 490 { 491 // The session was already closed, just ignore. 492 } 493 session = null; 494 } 495 if (error) 496 { 497 replServerStartMsg = null; 498 } // Be sure to return null. 499 } 500 501 return replServerStartMsg; 502 } 503 504 /** 505 * Returns the replication server that best fits our need so that we can 506 * connect to it. 507 * 508 * Note: this method put as public static for unit testing purpose. 509 * 510 * @param myState The local server state. 511 * @param rsStates The list of available replication servers and their 512 * associated server state. 513 * @param serverId The server id for the suffix we are working for. 514 * @param baseDn The suffix for which we are working for. 515 * @return The computed best replication server. 516 */ 517 public static String computeBestReplicationServer(ServerState myState, 518 HashMap<String, ServerState> rsStates, short serverId, DN baseDn) 519 { 520 521 /* 522 * Find replication servers who are up to date (or more up to date than us, 523 * if for instance we failed and restarted, having sent some changes to the 524 * RS but without having time to store our own state) regarding our own 525 * server id. Then, among them, choose the server that is the most up to 526 * date regarding the whole topology. 527 * 528 * If no server is up to date regarding our own server id, find the one who 529 * is the most up to date regarding our server id. 530 */ 531 532 // Should never happen (sanity check) 533 if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) || 534 (baseDn == null)) 535 { 536 return null; 537 } 538 539 String bestServer = null; 540 // Servers up to dates with regard to our changes 541 HashMap<String, ServerState> upToDateServers = 542 new HashMap<String, ServerState>(); 543 // Servers late with regard to our changes 544 HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); 545 546 /* 547 * Start loop to differenciate up to date servers from late ones. 548 */ 549 ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId); 550 if (myChangeNumber == null) 551 { 552 myChangeNumber = new ChangeNumber(0, 0, serverId); 553 } 554 for (String repServer : rsStates.keySet()) 555 { 556 557 ServerState rsState = rsStates.get(repServer); 558 ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId); 559 if (rsChangeNumber == null) 560 { 561 rsChangeNumber = new ChangeNumber(0, 0, serverId); 562 } 563 564 // Store state in right list 565 if (myChangeNumber.olderOrEqual(rsChangeNumber)) 566 { 567 upToDateServers.put(repServer, rsState); 568 } else 569 { 570 lateOnes.put(repServer, rsState); 571 } 572 } 573 574 if (upToDateServers.size() > 0) 575 { 576 577 /* 578 * Some up to date servers, among them, choose the one that has the 579 * maximum number of changes to send us. This is the most up to date one 580 * regarding the whole topology. This server is the one which has the less 581 * difference with the topology server state. For comparison, we need to 582 * compute the difference for each server id with the topology server 583 * state. 584 */ 585 586 Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( 587 upToDateServers.size(), 588 baseDn.toNormalizedString()); 589 logError(message); 590 591 /* 592 * First of all, compute the virtual server state for the whole topology, 593 * which is composed of the most up to date change numbers for 594 * each server id in the topology. 595 */ 596 ServerState topoState = new ServerState(); 597 for (ServerState curState : upToDateServers.values()) 598 { 599 600 Iterator<Short> it = curState.iterator(); 601 while (it.hasNext()) 602 { 603 Short sId = it.next(); 604 ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); 605 if (curSidCn == null) 606 { 607 curSidCn = new ChangeNumber(0, 0, sId); 608 } 609 // Update topology state 610 topoState.update(curSidCn); 611 } 612 } // For up to date servers 613 614 // Min of the max shifts 615 long minShift = -1L; 616 for (String upServer : upToDateServers.keySet()) 617 { 618 619 /* 620 * Compute the maximum difference between the time of a server id's 621 * change number and the time of the matching server id's change 622 * number in the topology server state. 623 * 624 * Note: we could have used the sequence number here instead of the 625 * timestamp, but this would have caused a problem when the sequence 626 * number loops and comes back to 0 (computation would have becomen 627 * meaningless). 628 */ 629 long shift = -1L; 630 ServerState curState = upToDateServers.get(upServer); 631 Iterator<Short> it = curState.iterator(); 632 while (it.hasNext()) 633 { 634 Short sId = it.next(); 635 ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); 636 if (curSidCn == null) 637 { 638 curSidCn = new ChangeNumber(0, 0, sId); 639 } 640 // Cannot be null as checked at construction time 641 ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId); 642 // Cannot be negative as topoState computed as being the max CN 643 // for each server id in the topology 644 long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime(); 645 if (tmpShift > shift) 646 { 647 shift = tmpShift; 648 } 649 } 650 651 if ((minShift < 0) // First time in loop 652 || (shift < minShift)) 653 { 654 // This sever is even closer to topo state 655 bestServer = upServer; 656 minShift = shift; 657 } 658 } // For up to date servers 659 660 } else 661 { 662 /* 663 * We could not find a replication server that has seen all the 664 * changes that this server has already processed, 665 */ 666 // lateOnes cannot be empty 667 Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( 668 baseDn.toNormalizedString(), lateOnes.size()); 669 logError(message); 670 671 // Min of the shifts 672 long minShift = -1L; 673 for (String lateServer : lateOnes.keySet()) 674 { 675 676 /* 677 * Choose the server who is the closest to us regarding our server id 678 * (this is the most up to date regarding our server id). 679 */ 680 ServerState curState = lateOnes.get(lateServer); 681 ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId); 682 if (ourSidCn == null) 683 { 684 ourSidCn = new ChangeNumber(0, 0, serverId); 685 } 686 // Cannot be negative as our Cn for our server id is strictly 687 // greater than those of the servers in late server list 688 long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime(); 689 690 if ((minShift < 0) // First time in loop 691 || (tmpShift < minShift)) 692 { 693 // This sever is even closer to topo state 694 bestServer = lateServer; 695 minShift = tmpShift; 696 } 697 } // For late servers 698 } 699 700 return bestServer; 701 } 702 703 /** 704 * Search for the changes that happened since fromChangeNumber 705 * based on the historical attribute. 706 * @param baseDn the base DN 707 * @param fromChangeNumber The change number from which we want the changes 708 * @param resultListener that will process the entries returned. 709 * @return the internal search operation 710 * @throws Exception when raised. 711 */ 712 public static InternalSearchOperation searchForChangedEntries( 713 DN baseDn, 714 ChangeNumber fromChangeNumber, 715 InternalSearchListener resultListener) 716 throws Exception 717 { 718 InternalClientConnection conn = 719 InternalClientConnection.getRootConnection(); 720 LDAPFilter filter = LDAPFilter.decode( 721 "(" + Historical.HISTORICALATTRIBUTENAME + 722 ">=dummy:" + fromChangeNumber + ")"); 723 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); 724 attrs.add(Historical.HISTORICALATTRIBUTENAME); 725 attrs.add(Historical.ENTRYUIDNAME); 726 return conn.processSearch( 727 new ASN1OctetString(baseDn.toString()), 728 SearchScope.WHOLE_SUBTREE, 729 DereferencePolicy.NEVER_DEREF_ALIASES, 730 0, 0, false, filter, 731 attrs, 732 resultListener); 733 } 734 735 /** 736 * Start the heartbeat monitor thread. 737 */ 738 private void startHeartBeat() 739 { 740 // Start a heartbeat monitor thread. 741 if (heartbeatInterval > 0) 742 { 743 heartbeatMonitor = 744 new HeartbeatMonitor("Replication Heartbeat Monitor on " + 745 baseDn + " with " + getReplicationServer(), 746 session, heartbeatInterval); 747 heartbeatMonitor.start(); 748 } 749 } 750 751 /** 752 * Stop the heartbeat monitor thread. 753 */ 754 void stopHeartBeat() 755 { 756 if (heartbeatMonitor != null) 757 { 758 heartbeatMonitor.shutdown(); 759 heartbeatMonitor = null; 760 } 761 } 762 763 /** 764 * restart the ReplicationBroker. 765 */ 766 public void reStart() 767 { 768 reStart(this.session); 769 } 770 771 /** 772 * Restart the ReplicationServer broker after a failure. 773 * 774 * @param failingSession the socket which failed 775 */ 776 public void reStart(ProtocolSession failingSession) 777 { 778 try 779 { 780 if (failingSession != null) 781 { 782 failingSession.close(); 783 numLostConnections++; 784 } 785 } catch (IOException e1) 786 { 787 // ignore 788 } 789 790 if (failingSession == session) 791 { 792 this.connected = false; 793 } 794 while (!this.connected && (!this.shutdown)) 795 { 796 try 797 { 798 this.connect(); 799 } catch (Exception e) 800 { 801 MessageBuilder mb = new MessageBuilder(); 802 mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( 803 baseDn.toNormalizedString(), e.getLocalizedMessage())); 804 mb.append(stackTraceToSingleLineString(e)); 805 logError(mb.toMessage()); 806 } 807 if ((!connected) && (!shutdown)) 808 { 809 try 810 { 811 Thread.sleep(500); 812 } catch (InterruptedException e) 813 { 814 // ignore 815 } 816 } 817 } 818 } 819 820 /** 821 * Publish a message to the other servers. 822 * @param msg the message to publish 823 */ 824 public void publish(ReplicationMessage msg) 825 { 826 boolean done = false; 827 828 while (!done && !shutdown) 829 { 830 if (connectionError) 831 { 832 // It was not possible to connect to any replication server. 833 // Since the operation was already processed, we have no other 834 // choice than to return without sending the ReplicationMessage 835 // and relying on the resend procedure of the connect phase to 836 // fix the problem when we finally connect. 837 838 if (debugEnabled()) 839 { 840 debugInfo("ReplicationBroker.publish() Publishing a " + 841 " message is not possible due to existing connection error."); 842 } 843 844 return; 845 } 846 847 try 848 { 849 boolean credit; 850 ProtocolSession current_session; 851 Semaphore currentWindowSemaphore; 852 853 // save the session at the time when we acquire the 854 // sendwindow credit so that we can make sure later 855 // that the session did not change in between. 856 // This is necessary to make sure that we don't publish a message 857 // on a session with a credit that was acquired from a previous 858 // session. 859 synchronized (connectPhaseLock) 860 { 861 current_session = session; 862 currentWindowSemaphore = sendWindow; 863 } 864 865 if (msg instanceof UpdateMessage) 866 { 867 // Acquiring the window credit must be done outside of the 868 // connectPhaseLock because it can be blocking and we don't 869 // want to hold off reconnection in case the connection dropped. 870 credit = 871 currentWindowSemaphore.tryAcquire( 872 (long) 500, TimeUnit.MILLISECONDS); 873 } else 874 { 875 credit = true; 876 } 877 if (credit) 878 { 879 synchronized (connectPhaseLock) 880 { 881 // check the session. If it has changed, some 882 // deconnection/reconnection happened and we need to restart from 883 // scratch. 884 if (session == current_session) 885 { 886 session.publish(msg); 887 done = true; 888 } 889 } 890 } 891 if (!credit) 892 { 893 // the window is still closed. 894 // Send a WindowProbe message to wakeup the receiver in case the 895 // window update message was lost somehow... 896 // then loop to check again if connection was closed. 897 session.publish(new WindowProbe()); 898 } 899 } catch (IOException e) 900 { 901 // The receive threads should handle reconnection or 902 // mark this broker in error. Just retry. 903 synchronized (connectPhaseLock) 904 { 905 try 906 { 907 connectPhaseLock.wait(100); 908 } catch (InterruptedException e1) 909 { 910 // ignore 911 if (debugEnabled()) 912 { 913 debugInfo("ReplicationBroker.publish() " + 914 "IO exception raised : " + e.getLocalizedMessage()); 915 } 916 } 917 } 918 } catch (InterruptedException e) 919 { 920 // just loop. 921 if (debugEnabled()) 922 { 923 debugInfo("ReplicationBroker.publish() " + 924 "Interrupted exception raised." + e.getLocalizedMessage()); 925 } 926 } 927 } 928 } 929 930 /** 931 * Receive a message. 932 * This method is not multithread safe and should either always be 933 * called in a single thread or protected by a locking mechanism 934 * before being called. 935 * 936 * @return the received message 937 * @throws SocketTimeoutException if the timeout set by setSoTimeout 938 * has expired 939 */ 940 public ReplicationMessage receive() throws SocketTimeoutException 941 { 942 while (shutdown == false) 943 { 944 if (!connected) 945 { 946 reStart(null); 947 } 948 949 ProtocolSession failingSession = session; 950 try 951 { 952 ReplicationMessage msg = session.receive(); 953 if (msg instanceof WindowMessage) 954 { 955 WindowMessage windowMsg = (WindowMessage) msg; 956 sendWindow.release(windowMsg.getNumAck()); 957 } 958 else 959 { 960 return msg; 961 } 962 } catch (SocketTimeoutException e) 963 { 964 throw e; 965 } catch (Exception e) 966 { 967 if (shutdown == false) 968 { 969 Message message = 970 NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); 971 logError(message); 972 973 debugInfo("ReplicationBroker.receive() " + baseDn + 974 " Exception raised." + e + e.getLocalizedMessage()); 975 this.reStart(failingSession); 976 } 977 } 978 } 979 return null; 980 } 981 982 /** 983 * This method allows to do the necessary computing for the window 984 * management after treatment by the worker threads. 985 * 986 * This should be called once the replay thread have done their job 987 * and the window can be open again. 988 */ 989 public synchronized void updateWindowAfterReplay() 990 { 991 try 992 { 993 rcvWindow--; 994 if (rcvWindow < halfRcvWindow) 995 { 996 session.publish(new WindowMessage(halfRcvWindow)); 997 rcvWindow += halfRcvWindow; 998 } 999 } catch (IOException e) 1000 { 1001 // Any error on the socket will be handled by the thread calling receive() 1002 // just ignore. 1003 } 1004 } 1005 1006 /** 1007 * stop the server. 1008 */ 1009 public void stop() 1010 { 1011 replicationServer = "stopped"; 1012 shutdown = true; 1013 connected = false; 1014 try 1015 { 1016 if (debugEnabled()) 1017 { 1018 debugInfo("ReplicationBroker is stopping. and will" + 1019 " close the connection"); 1020 } 1021 1022 if (session != null) 1023 { 1024 session.close(); 1025 } 1026 } catch (IOException e) 1027 { 1028 } 1029 } 1030 1031 /** 1032 * Set a timeout value. 1033 * With this option set to a non-zero value, calls to the receive() method 1034 * block for only this amount of time after which a 1035 * java.net.SocketTimeoutException is raised. 1036 * The Broker is valid and useable even after such an Exception is raised. 1037 * 1038 * @param timeout the specified timeout, in milliseconds. 1039 * @throws SocketException if there is an error in the underlying protocol, 1040 * such as a TCP error. 1041 */ 1042 public void setSoTimeout(int timeout) throws SocketException 1043 { 1044 this.timeout = timeout; 1045 if (session != null) 1046 { 1047 session.setSoTimeout(timeout); 1048 } 1049 } 1050 1051 /** 1052 * Set the value of the generationId for that broker. Normally the 1053 * generationId is set through the constructor but there are cases 1054 * where the value of the generationId must be changed while the broker 1055 * already exist for example after an on-line import. 1056 * 1057 * @param generationId The value of the generationId. 1058 * 1059 */ 1060 public void setGenerationId(long generationId) 1061 { 1062 this.generationId = generationId; 1063 } 1064 1065 /** 1066 * Get the name of the replicationServer to which this broker is currently 1067 * connected. 1068 * 1069 * @return the name of the replicationServer to which this domain 1070 * is currently connected. 1071 */ 1072 public String getReplicationServer() 1073 { 1074 return replicationServer; 1075 } 1076 1077 /** 1078 * {@inheritDoc} 1079 */ 1080 public void handleInternalSearchEntry( 1081 InternalSearchOperation searchOperation, 1082 SearchResultEntry searchEntry) 1083 { 1084 /* 1085 * Only deal with modify operation so far 1086 * TODO : implement code for ADD, DEL, MODDN operation 1087 * 1088 * Parse all ds-sync-hist attribute values 1089 * - for each Changenumber > replication server MaxChangeNumber : 1090 * build an attribute mod 1091 * 1092 */ 1093 Iterable<FakeOperation> updates = 1094 Historical.generateFakeOperations(searchEntry); 1095 for (FakeOperation op : updates) 1096 { 1097 replayOperations.add(op); 1098 } 1099 } 1100 1101 /** 1102 * {@inheritDoc} 1103 */ 1104 public void handleInternalSearchReference( 1105 InternalSearchOperation searchOperation, 1106 SearchResultReference searchReference) 1107 { 1108 // TODO to be implemented 1109 } 1110 1111 /** 1112 * Get the maximum receive window size. 1113 * 1114 * @return The maximum receive window size. 1115 */ 1116 public int getMaxRcvWindow() 1117 { 1118 return maxRcvWindow; 1119 } 1120 1121 /** 1122 * Get the current receive window size. 1123 * 1124 * @return The current receive window size. 1125 */ 1126 public int getCurrentRcvWindow() 1127 { 1128 return rcvWindow; 1129 } 1130 1131 /** 1132 * Get the maximum send window size. 1133 * 1134 * @return The maximum send window size. 1135 */ 1136 public int getMaxSendWindow() 1137 { 1138 return maxSendWindow; 1139 } 1140 1141 /** 1142 * Get the current send window size. 1143 * 1144 * @return The current send window size. 1145 */ 1146 public int getCurrentSendWindow() 1147 { 1148 if (connected) 1149 { 1150 return sendWindow.availablePermits(); 1151 } else 1152 { 1153 return 0; 1154 } 1155 } 1156 1157 /** 1158 * Get the number of times the connection was lost. 1159 * @return The number of times the connection was lost. 1160 */ 1161 public int getNumLostConnections() 1162 { 1163 return numLostConnections; 1164 } 1165 1166 /** 1167 * Change some config parameters. 1168 * 1169 * @param replicationServers The new list of replication servers. 1170 * @param maxReceiveQueue The max size of receive queue. 1171 * @param maxReceiveDelay The max receive delay. 1172 * @param maxSendQueue The max send queue. 1173 * @param maxSendDelay The max Send Delay. 1174 * @param window The max window size. 1175 * @param heartbeatInterval The heartbeat interval. 1176 */ 1177 public void changeConfig(Collection<String> replicationServers, 1178 int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, 1179 int maxSendDelay, int window, long heartbeatInterval) 1180 { 1181 this.servers = replicationServers; 1182 this.maxRcvWindow = window; 1183 this.heartbeatInterval = heartbeatInterval; 1184 this.maxReceiveDelay = maxReceiveDelay; 1185 this.maxReceiveQueue = maxReceiveQueue; 1186 this.maxSendDelay = maxSendDelay; 1187 this.maxSendQueue = maxSendQueue; 1188 // TODO : Changing those parameters requires to either restart a new 1189 // session with the replicationServer or renegociate the parameters that 1190 // were sent in the ServerStart message 1191 } 1192 1193 /** 1194 * Get the version of the replication protocol. 1195 * @return The version of the replication protocol. 1196 */ 1197 public short getProtocolVersion() 1198 { 1199 return protocolVersion; 1200 } 1201 1202 /** 1203 * Check if the broker is connected to a ReplicationServer and therefore 1204 * ready to received and send Replication Messages. 1205 * 1206 * @return true if the server is connected, false if not. 1207 */ 1208 public boolean isConnected() 1209 { 1210 return !connectionError; 1211 } 1212 1213 private boolean debugEnabled() 1214 { 1215 return true; 1216 } 1217 1218 private static final void debugInfo(String s) 1219 { 1220 logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); 1221 TRACER.debugInfo(s); 1222 } 1223 1224 /** 1225 * Determine whether the connection to the replication server is encrypted. 1226 * @return true if the connection is encrypted, false otherwise. 1227 */ 1228 public boolean isSessionEncrypted() 1229 { 1230 boolean isEncrypted = false; 1231 if (session != null) 1232 { 1233 return session.isEncrypted(); 1234 } 1235 return isEncrypted; 1236 } 1237 }