001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 028 package org.opends.server.replication.protocol; 029 030 import org.opends.server.api.DirectoryThread; 031 import static org.opends.server.loggers.debug.DebugLogger.*; 032 033 import org.opends.server.loggers.debug.DebugTracer; 034 035 import java.io.IOException; 036 037 /** 038 * This thread publishes a heartbeat message on a given protocol session at 039 * regular intervals when there are no other replication messages being 040 * published. 041 */ 042 public class HeartbeatThread extends DirectoryThread 043 { 044 /** 045 * The tracer object for the debug logger. 046 */ 047 private static final DebugTracer TRACER = getTracer(); 048 049 050 /** 051 * For test purposes only to simulate loss of heartbeats. 052 */ 053 static private boolean heartbeatsDisabled = false; 054 055 /** 056 * The session on which heartbeats are to be sent. 057 */ 058 private ProtocolSession session; 059 060 061 /** 062 * The time in milliseconds between heartbeats. 063 */ 064 private long heartbeatInterval; 065 066 067 /** 068 * Set this to stop the thread. 069 */ 070 private Boolean shutdown = false; 071 private final Object shutdown_lock = new Object(); 072 073 074 /** 075 * Create a heartbeat thread. 076 * @param threadName The name of the heartbeat thread. 077 * @param session The session on which heartbeats are to be sent. 078 * @param heartbeatInterval The desired interval between heartbeats in 079 * milliseconds. 080 */ 081 public HeartbeatThread(String threadName, ProtocolSession session, 082 long heartbeatInterval) 083 { 084 super(threadName); 085 this.session = session; 086 this.heartbeatInterval = heartbeatInterval; 087 } 088 089 /** 090 * {@inheritDoc} 091 */ 092 @Override 093 public void run() 094 { 095 try 096 { 097 if (debugEnabled()) 098 { 099 TRACER.debugInfo("Heartbeat thread is starting, interval is %d", 100 heartbeatInterval); 101 } 102 HeartbeatMessage heartbeatMessage = new HeartbeatMessage(); 103 104 while (!shutdown) 105 { 106 long now = System.currentTimeMillis(); 107 if (debugEnabled()) 108 { 109 TRACER.debugVerbose("Heartbeat thread awoke at %d, last message " + 110 "was sent at %d", now, session.getLastPublishTime()); 111 } 112 113 if (now > session.getLastPublishTime() + heartbeatInterval) 114 { 115 if (!heartbeatsDisabled) 116 { 117 if (debugEnabled()) 118 { 119 TRACER.debugVerbose("Heartbeat sent at %d", now); 120 } 121 session.publish(heartbeatMessage); 122 } 123 } 124 125 try 126 { 127 long sleepTime = session.getLastPublishTime() + 128 heartbeatInterval - now; 129 if (sleepTime <= 0) 130 { 131 sleepTime = heartbeatInterval; 132 } 133 134 if (debugEnabled()) 135 { 136 TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime); 137 } 138 139 synchronized (shutdown_lock) 140 { 141 if (!shutdown) 142 { 143 shutdown_lock.wait(sleepTime); 144 } 145 } 146 } 147 catch (InterruptedException e) 148 { 149 // Keep looping. 150 } 151 } 152 } 153 catch (IOException e) 154 { 155 if (debugEnabled()) 156 { 157 TRACER.debugInfo("Heartbeat thread could not send a heartbeat."); 158 } 159 // This will be caught in another thread. 160 } 161 finally 162 { 163 if (debugEnabled()) 164 { 165 TRACER.debugInfo("Heartbeat thread is exiting."); 166 } 167 } 168 } 169 170 171 /** 172 * Call this method to stop the thread. 173 * This method is blocking until the thread has stopped. 174 */ 175 public void shutdown() 176 { 177 synchronized (shutdown_lock) 178 { 179 shutdown = true; 180 shutdown_lock.notifyAll(); 181 if (debugEnabled()) 182 { 183 TRACER.debugInfo("Going to notify Heartbeat thread."); 184 } 185 } 186 if (debugEnabled()) 187 { 188 TRACER.debugInfo("Returning from Heartbeat shutdown."); 189 } 190 } 191 192 193 /** 194 * For testing purposes only to simulate loss of heartbeats. 195 * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. 196 */ 197 public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) 198 { 199 HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled; 200 } 201 }