001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 import org.opends.messages.Message; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.messages.ReplicationMessages.*; 032 033 import java.util.ArrayList; 034 import java.util.LinkedHashSet; 035 import java.util.LinkedList; 036 import java.util.List; 037 import java.util.Iterator; 038 039 import org.opends.server.core.DirectoryServer; 040 import org.opends.server.core.ModifyOperationBasis; 041 import org.opends.server.protocols.asn1.ASN1OctetString; 042 import org.opends.server.protocols.internal.InternalClientConnection; 043 import org.opends.server.protocols.internal.InternalSearchOperation; 044 import org.opends.server.protocols.ldap.LDAPAttribute; 045 import org.opends.server.protocols.ldap.LDAPFilter; 046 import org.opends.server.protocols.ldap.LDAPModification; 047 import org.opends.server.replication.common.ChangeNumber; 048 import org.opends.server.replication.common.ServerState; 049 import org.opends.server.types.Attribute; 050 import org.opends.server.types.AttributeType; 051 import org.opends.server.types.AttributeValue; 052 import org.opends.server.types.Control; 053 import org.opends.server.types.DN; 054 import org.opends.server.types.DereferencePolicy; 055 import org.opends.server.types.DirectoryException; 056 import org.opends.server.types.LDAPException; 057 import org.opends.server.types.ModificationType; 058 import org.opends.server.types.RawModification; 059 import org.opends.server.types.ResultCode; 060 import org.opends.server.types.SearchFilter; 061 import org.opends.server.types.SearchResultEntry; 062 import org.opends.server.types.SearchScope; 063 064 /** 065 * This class implements a ServerState that is stored on the backends 066 * used to store the synchronized data and that is therefore persistent 067 * accross server reboot. 068 */ 069 public class PersistentServerState extends ServerState 070 { 071 private DN baseDn; 072 private boolean savedStatus = true; 073 private InternalClientConnection conn = 074 InternalClientConnection.getRootConnection(); 075 private ASN1OctetString asn1BaseDn; 076 private short serverId; 077 078 /** 079 * The attribute name used to store the state in the backend. 080 */ 081 protected static final String REPLICATION_STATE = "ds-sync-state"; 082 083 /** 084 * create a new ServerState. 085 * @param baseDn The baseDN for which the ServerState is created 086 * @param serverId The serverId 087 */ 088 public PersistentServerState(DN baseDn, short serverId) 089 { 090 this.baseDn = baseDn; 091 this.serverId = serverId; 092 asn1BaseDn = new ASN1OctetString(baseDn.toString()); 093 loadState(); 094 } 095 096 /** 097 * {@inheritDoc} 098 */ 099 @Override 100 public boolean update(ChangeNumber changeNumber) 101 { 102 savedStatus = false; 103 return super.update(changeNumber); 104 } 105 106 /** 107 * Save this object to persistent storage. 108 */ 109 public void save() 110 { 111 if (savedStatus) 112 return; 113 114 savedStatus = true; 115 ResultCode resultCode = updateStateEntry(); 116 if (resultCode != ResultCode.SUCCESS) 117 { 118 savedStatus = false; 119 } 120 } 121 122 /** 123 * Load the ServerState from the backing entry in database to memory. 124 */ 125 public void loadState() 126 { 127 SearchResultEntry stateEntry = null; 128 129 // try to load the state from the base entry. 130 stateEntry = searchBaseEntry(); 131 132 if (stateEntry == null) 133 { 134 // The base entry does not exist yet 135 // in the database or was deleted. Try to read the ServerState 136 // from the configuration instead. 137 stateEntry = searchConfigEntry(); 138 } 139 140 if (stateEntry != null) 141 { 142 updateStateFromEntry(stateEntry); 143 } 144 145 /* 146 * In order to make sure that the replication never looses changes, 147 * the server needs to search all the entries that have been 148 * updated after the last write of the ServerState. 149 * Inconsistencies may append after a crash. 150 */ 151 checkAndUpdateServerState(); 152 } 153 154 /** 155 * Run a search operation to find the base entry 156 * of the replication domain for which this ServerState was created. 157 * 158 * @return Thebasen Entry or null if no entry was found; 159 */ 160 private SearchResultEntry searchBaseEntry() 161 { 162 LDAPFilter filter; 163 164 try 165 { 166 filter = LDAPFilter.decode("objectclass=*"); 167 } catch (LDAPException e) 168 { 169 // can not happen 170 return null; 171 } 172 173 /* 174 * Search the database entry that is used to periodically 175 * save the ServerState 176 */ 177 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); 178 attributes.add(REPLICATION_STATE); 179 InternalSearchOperation search = conn.processSearch(asn1BaseDn, 180 SearchScope.BASE_OBJECT, 181 DereferencePolicy.DEREF_ALWAYS, 0, 0, false, 182 filter,attributes); 183 if (((search.getResultCode() != ResultCode.SUCCESS)) && 184 ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) 185 { 186 Message message = ERR_ERROR_SEARCHING_RUV. 187 get(search.getResultCode().getResultCodeName(), search.toString(), 188 search.getErrorMessage(), baseDn.toString()); 189 logError(message); 190 return null; 191 } 192 193 SearchResultEntry stateEntry = null; 194 if (search.getResultCode() == ResultCode.SUCCESS) 195 { 196 /* 197 * Read the serverState from the REPLICATION_STATE attribute 198 */ 199 LinkedList<SearchResultEntry> result = search.getSearchEntries(); 200 if (!result.isEmpty()) 201 { 202 stateEntry = result.getFirst(); 203 } 204 } 205 return stateEntry; 206 } 207 208 /** 209 * Run a search operation to find the entry with the configuration 210 * of the replication domain for which this ServerState was created. 211 * 212 * @return The configuration Entry or null if no entry was found; 213 */ 214 private SearchResultEntry searchConfigEntry() 215 { 216 try 217 { 218 SearchFilter filter = 219 SearchFilter.createFilterFromString( 220 "(&(objectclass=ds-cfg-replication-domain)" 221 +"(ds-cfg-base-dn="+baseDn+"))"); 222 223 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); 224 attributes.add(REPLICATION_STATE); 225 InternalSearchOperation op = 226 conn.processSearch(DN.decode("cn=config"), 227 SearchScope.SUBORDINATE_SUBTREE, 228 DereferencePolicy.NEVER_DEREF_ALIASES, 229 1, 0, false, filter, attributes); 230 231 if (op.getResultCode() == ResultCode.SUCCESS) 232 { 233 /* 234 * Read the serverState from the REPLICATION_STATE attribute 235 */ 236 LinkedList<SearchResultEntry> resultEntries = 237 op.getSearchEntries(); 238 if (!resultEntries.isEmpty()) 239 { 240 SearchResultEntry resultEntry = resultEntries.getFirst(); 241 return resultEntry; 242 } 243 } 244 return null; 245 } catch (DirectoryException e) 246 { 247 // can not happen 248 return null; 249 } 250 } 251 252 /** 253 * Update this ServerState from the provided entry. 254 * 255 * @param resultEntry The entry that should be used to update this 256 * ServerState. 257 */ 258 private void updateStateFromEntry(SearchResultEntry resultEntry) 259 { 260 AttributeType synchronizationStateType = 261 DirectoryServer.getAttributeType(REPLICATION_STATE); 262 List<Attribute> attrs = 263 resultEntry.getAttribute(synchronizationStateType); 264 if (attrs != null) 265 { 266 Attribute attr = attrs.get(0); 267 LinkedHashSet<AttributeValue> values = attr.getValues(); 268 for (AttributeValue value : values) 269 { 270 ChangeNumber changeNumber = 271 new ChangeNumber(value.getStringValue()); 272 update(changeNumber); 273 } 274 } 275 } 276 277 /** 278 * Save the current values of this PersistentState object 279 * in the appropiate entry of the database. 280 * 281 * @return a ResultCode indicating if the method was successfull. 282 */ 283 private ResultCode updateStateEntry() 284 { 285 /* 286 * Generate a modify operation on the Server State baseD Entry. 287 */ 288 ResultCode result = runUpdateStateEntry(baseDn); 289 290 if (result == ResultCode.NO_SUCH_OBJECT) 291 { 292 // The base entry does not exist yet in the database or 293 // has been deleted, save the state to the config entry instead. 294 SearchResultEntry configEntry = searchConfigEntry(); 295 if (configEntry != null) 296 { 297 DN configDN = configEntry.getDN(); 298 result = runUpdateStateEntry(configDN); 299 } 300 } 301 return result; 302 } 303 304 /** 305 * Run a modify operation to update the entry whose DN is given as 306 * a parameter with the serverState information. 307 * 308 * @param serverStateEntryDN The DN of the entry to be updated. 309 * 310 * @return A ResultCode indicating if the operation was successful. 311 */ 312 private ResultCode runUpdateStateEntry(DN serverStateEntryDN) 313 { 314 ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); 315 316 LDAPAttribute attr = 317 new LDAPAttribute(REPLICATION_STATE, values); 318 LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); 319 ArrayList<RawModification> mods = new ArrayList<RawModification>(1); 320 mods.add(mod); 321 322 ModifyOperationBasis op = 323 new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(), 324 InternalClientConnection.nextMessageID(), 325 new ArrayList<Control>(0), 326 new ASN1OctetString(serverStateEntryDN.toString()), 327 mods); 328 op.setInternalOperation(true); 329 op.setSynchronizationOperation(true); 330 op.setDontSynchronize(true); 331 op.run(); 332 if (op.getResultCode() != ResultCode.SUCCESS) 333 { 334 Message message = DEBUG_ERROR_UPDATING_RUV.get( 335 op.getResultCode().getResultCodeName().toString(), 336 op.toString(), 337 op.getErrorMessage().toString(), 338 baseDn.toString()); 339 logError(message); 340 } 341 return op.getResultCode(); 342 } 343 344 /** 345 * Empty the ServerState. 346 * After this call the Server State will be in the same state 347 * as if it was just created. 348 */ 349 public void clearInMemory() 350 { 351 super.clear(); 352 this.savedStatus = false; 353 } 354 355 /** 356 * Empty the ServerState. 357 * After this call the Server State will be in the same state 358 * as if it was just created. 359 */ 360 public void clear() 361 { 362 clearInMemory(); 363 save(); 364 } 365 366 /** 367 * The ServerState is saved to the database periodically, 368 * therefore in case of crash it is possible that is does not contain 369 * the latest changes that have been processed and saved to the 370 * database. 371 * In order to make sure that we don't loose them, search all the entries 372 * that have been updated after this entry. 373 * This is done by using the HistoricalCsnOrderingMatchingRule 374 * and an ordering index for historical attribute 375 */ 376 public final void checkAndUpdateServerState() { 377 Message message; 378 InternalSearchOperation op; 379 ChangeNumber serverStateMaxCn; 380 ChangeNumber dbMaxCn; 381 final AttributeType histType = 382 DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME); 383 384 // Retrieves the entries that have changed since the 385 // maxCn stored in the serverState 386 synchronized (this) 387 { 388 serverStateMaxCn = this.getMaxChangeNumber(serverId); 389 390 if (serverStateMaxCn == null) 391 return; 392 393 try { 394 op = ReplicationBroker.searchForChangedEntries(baseDn, 395 serverStateMaxCn, null); 396 } 397 catch (Exception e) 398 { 399 return; 400 } 401 if (op.getResultCode() != ResultCode.SUCCESS) 402 { 403 // An error happened trying to search for the updates 404 // Log an error 405 message = ERR_CANNOT_RECOVER_CHANGES.get( 406 baseDn.toNormalizedString()); 407 logError(message); 408 } 409 else 410 { 411 dbMaxCn = serverStateMaxCn; 412 for (SearchResultEntry resEntry : op.getSearchEntries()) 413 { 414 List<Attribute> attrs = resEntry.getAttribute(histType); 415 Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator(); 416 try 417 { 418 while (true) 419 { 420 AttributeValue attrVal = iav.next(); 421 HistVal histVal = new HistVal(attrVal.getStringValue()); 422 ChangeNumber cn = histVal.getCn(); 423 424 if ((cn != null) && (cn.getServerId() == serverId)) 425 { 426 // compare the csn regarding the maxCn we know and 427 // store the biggest 428 if (ChangeNumber.compare(dbMaxCn, cn) < 0) 429 { 430 dbMaxCn = cn; 431 } 432 } 433 } 434 } 435 catch(Exception e) 436 { 437 } 438 } 439 440 if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0) 441 { 442 // Update the serverState with the new maxCn 443 // present in the database 444 this.update(dbMaxCn); 445 message = NOTE_SERVER_STATE_RECOVERY.get( 446 baseDn.toNormalizedString(), dbMaxCn.toString()); 447 logError(message); 448 } 449 } 450 } 451 } 452 }