001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.*; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 032 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 033 import org.opends.server.loggers.debug.DebugTracer; 034 import static org.opends.messages.ReplicationMessages.*; 035 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 036 037 import java.io.File; 038 import java.io.UnsupportedEncodingException; 039 040 import org.opends.server.types.DN; 041 import org.opends.server.types.DirectoryException; 042 043 import com.sleepycat.je.Cursor; 044 import com.sleepycat.je.Database; 045 import com.sleepycat.je.DatabaseConfig; 046 import com.sleepycat.je.DatabaseEntry; 047 import com.sleepycat.je.DatabaseException; 048 import com.sleepycat.je.Environment; 049 import com.sleepycat.je.EnvironmentConfig; 050 import com.sleepycat.je.LockMode; 051 import com.sleepycat.je.OperationStatus; 052 import com.sleepycat.je.Transaction; 053 054 /** 055 * This class is used to represent a Db environement that can be used 056 * to create ReplicationDB. 057 */ 058 public class ReplicationDbEnv 059 { 060 private Environment dbEnvironment = null; 061 private Database stateDb = null; 062 private ReplicationServer replicationServer = null; 063 private static final String GENERATION_ID_TAG = "GENID"; 064 private static final String FIELD_SEPARATOR = " "; 065 /** 066 * The tracer object for the debug logger. 067 */ 068 private static final DebugTracer TRACER = getTracer(); 069 070 /** 071 * Initialize this class. 072 * Creates Db environment that will be used to create databases. 073 * It also reads the currently known databases from the "changelogstate" 074 * database. 075 * @param path Path where the backing files must be created. 076 * @param replicationServer the ReplicationServer that creates this 077 * ReplicationDbEnv. 078 * @throws DatabaseException If a DatabaseException occurred that prevented 079 * the initialization to happen. 080 * @throws ReplicationDBException If a replicationServer internal error caused 081 * a failure of the replicationServer processing. 082 */ 083 public ReplicationDbEnv(String path, ReplicationServer replicationServer) 084 throws DatabaseException, ReplicationDBException 085 { 086 this.replicationServer = replicationServer; 087 EnvironmentConfig envConfig = new EnvironmentConfig(); 088 089 /* Create the DB Environment that will be used for all 090 * the ReplicationServer activities related to the db 091 */ 092 envConfig.setAllowCreate(true); 093 envConfig.setTransactional(true); 094 envConfig.setConfigParam("je.cleaner.expunge", "true"); 095 // TODO : the DB cache size should be configurable 096 // For now set 5M is OK for being efficient in 64M total for the JVM 097 envConfig.setConfigParam("je.maxMemory", "5000000"); 098 dbEnvironment = new Environment(new File(path), envConfig); 099 100 /* 101 * One database is created to store the update from each LDAP 102 * server in the topology. 103 * The database "changelogstate" is used to store the list of all 104 * the servers that have been seen in the past. 105 */ 106 DatabaseConfig dbConfig = new DatabaseConfig(); 107 dbConfig.setAllowCreate(true); 108 dbConfig.setTransactional(true); 109 110 stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); 111 start(); 112 113 } 114 115 /** 116 * Read the list of known servers from the database and start dbHandler 117 * for each of them. 118 * 119 * @throws DatabaseException in case of underlying DatabaseException 120 * @throws ReplicationDBException when the information from the database 121 * cannot be decoded correctly. 122 */ 123 private void start() throws DatabaseException, ReplicationDBException 124 { 125 Cursor cursor = stateDb.openCursor(null, null); 126 DatabaseEntry key = new DatabaseEntry(); 127 DatabaseEntry data = new DatabaseEntry(); 128 129 try 130 { 131 /* 132 * Get the domain base DN/ generationIDs records 133 */ 134 OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); 135 while (status == OperationStatus.SUCCESS) 136 { 137 try 138 { 139 String stringData = new String(data.getData(), "UTF-8"); 140 141 if (debugEnabled()) 142 TRACER.debugInfo( 143 "In " + this.replicationServer.getMonitorInstanceName() + 144 " Read tag baseDn generationId=" + stringData); 145 146 String[] str = stringData.split(FIELD_SEPARATOR, 3); 147 if (str[0].equals(GENERATION_ID_TAG)) 148 { 149 long generationId=-1; 150 151 DN baseDn; 152 153 try 154 { 155 // <generationId> 156 generationId = new Long(str[1]); 157 } 158 catch (NumberFormatException e) 159 { 160 // should never happen 161 // TODO: i18n 162 throw new ReplicationDBException(Message.raw( 163 "replicationServer state database has a wrong format: " + 164 e.getLocalizedMessage() 165 + "<" + str[1] + ">")); 166 } 167 168 // <baseDn> 169 baseDn = null; 170 try 171 { 172 baseDn = DN.decode(str[2]); 173 } catch (DirectoryException e) 174 { 175 Message message = 176 ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); 177 logError(message); 178 179 } 180 181 if (debugEnabled()) 182 TRACER.debugInfo( 183 "In " + this.replicationServer.getMonitorInstanceName() + 184 " Has read baseDn=" + baseDn 185 + " generationId=" + generationId); 186 187 replicationServer.getReplicationServerDomain(baseDn, true). 188 setGenerationId(generationId, true); 189 } 190 } 191 catch (UnsupportedEncodingException e) 192 { 193 // should never happens 194 // TODO: i18n 195 throw new ReplicationDBException(Message.raw("need UTF-8 support")); 196 } 197 status = cursor.getNext(key, data, LockMode.DEFAULT); 198 } 199 200 /* 201 * Get the server Id / domain base DN records 202 */ 203 status = cursor.getFirst(key, data, LockMode.DEFAULT); 204 while (status == OperationStatus.SUCCESS) 205 { 206 String stringData = null; 207 try 208 { 209 stringData = new String(data.getData(), "UTF-8"); 210 } 211 catch (UnsupportedEncodingException e) 212 { 213 // should never happens 214 // TODO: i18n 215 throw new ReplicationDBException(Message.raw( 216 "need UTF-8 support")); 217 } 218 219 if (debugEnabled()) 220 TRACER.debugInfo( 221 "In " + this.replicationServer.getMonitorInstanceName() + 222 " Read serverId BaseDN=" + stringData); 223 224 String[] str = stringData.split(FIELD_SEPARATOR, 2); 225 if (!str[0].equals(GENERATION_ID_TAG)) 226 { 227 short serverId = -1; 228 try 229 { 230 // <serverId> 231 serverId = new Short(str[0]); 232 } catch (NumberFormatException e) 233 { 234 // should never happen 235 // TODO: i18n 236 throw new ReplicationDBException(Message.raw( 237 "replicationServer state database has a wrong format: " + 238 e.getLocalizedMessage() 239 + "<" + str[0] + ">")); 240 } 241 // <baseDn> 242 DN baseDn = null; 243 try 244 { 245 baseDn = DN.decode(str[1]); 246 } catch (DirectoryException e) 247 { 248 Message message = 249 ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); 250 logError(message); 251 } 252 253 if (debugEnabled()) 254 TRACER.debugInfo( 255 "In " + this.replicationServer.getMonitorInstanceName() + 256 " Has read: baseDn=" + baseDn 257 + " serverId=" + serverId); 258 259 DbHandler dbHandler = 260 new DbHandler(serverId, baseDn, replicationServer, this); 261 262 replicationServer.getReplicationServerDomain(baseDn, true). 263 setDbHandler(serverId, dbHandler); 264 } 265 266 status = cursor.getNext(key, data, LockMode.DEFAULT); 267 } 268 cursor.close(); 269 270 } 271 catch (DatabaseException dbe) 272 { 273 cursor.close(); 274 throw dbe; 275 } 276 } 277 278 /** 279 * Finds or creates the database used to store changes from the server 280 * with the given serverId and the given baseDn. 281 * 282 * @param serverId The server id that identifies the server. 283 * @param baseDn The baseDn that identifies the domain. 284 * @param generationId The generationId associated to this domain. 285 * @return the Database. 286 * @throws DatabaseException in case of underlying Exception. 287 */ 288 public Database getOrAddDb(Short serverId, DN baseDn, Long generationId) 289 throws DatabaseException 290 { 291 if (debugEnabled()) 292 TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " + 293 serverId + " " + baseDn + " " + generationId); 294 try 295 { 296 String stringId = serverId.toString() + FIELD_SEPARATOR 297 + baseDn.toNormalizedString(); 298 299 // Opens the database for the changes received from this server 300 // on this domain. Create it if it does not already exist. 301 DatabaseConfig dbConfig = new DatabaseConfig(); 302 dbConfig.setAllowCreate(true); 303 dbConfig.setTransactional(true); 304 Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); 305 306 // Creates the record serverId/domain base Dn in the stateDb 307 // if it does not already exist. 308 byte[] byteId; 309 byteId = stringId.getBytes("UTF-8"); 310 DatabaseEntry key = new DatabaseEntry(); 311 key.setData(byteId); 312 DatabaseEntry data = new DatabaseEntry(); 313 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); 314 if (status == OperationStatus.NOTFOUND) 315 { 316 Transaction txn = dbEnvironment.beginTransaction(null, null); 317 try { 318 data.setData(byteId); 319 if (debugEnabled()) 320 TRACER.debugInfo("getOrAddDb() Created in the state Db record " + 321 " serverId/Domain=<"+stringId+">"); 322 stateDb.put(txn, key, data); 323 txn.commitWriteNoSync(); 324 } catch (DatabaseException dbe) 325 { 326 // Abort the txn and propagate the Exception to the caller 327 txn.abort(); 328 throw dbe; 329 } 330 } 331 332 // Creates the record domain base Dn/ generationId in the stateDb 333 // if it does not already exist. 334 stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + 335 baseDn.toNormalizedString(); 336 String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR + 337 generationId.toString() + FIELD_SEPARATOR + 338 baseDn.toNormalizedString(); 339 byteId = stringId.getBytes("UTF-8"); 340 byte[] dataByteId; 341 dataByteId = dataStringId.getBytes("UTF-8"); 342 key = new DatabaseEntry(); 343 key.setData(byteId); 344 data = new DatabaseEntry(); 345 status = stateDb.get(null, key, data, LockMode.DEFAULT); 346 if (status == OperationStatus.NOTFOUND) 347 { 348 Transaction txn = dbEnvironment.beginTransaction(null, null); 349 try { 350 data.setData(dataByteId); 351 if (debugEnabled()) 352 TRACER.debugInfo( 353 "Created in the state Db record Tag/Domain/GenId key=" + 354 stringId + " value=" + dataStringId); 355 stateDb.put(txn, key, data); 356 txn.commitWriteNoSync(); 357 } catch (DatabaseException dbe) 358 { 359 // Abort the txn and propagate the Exception to the caller 360 txn.abort(); 361 throw dbe; 362 } 363 } 364 return db; 365 } 366 catch (UnsupportedEncodingException e) 367 { 368 // can't happen 369 return null; 370 } 371 } 372 373 /** 374 * Creates a new transaction. 375 * 376 * @return the transaction. 377 * @throws DatabaseException in case of underlying database Exception. 378 */ 379 public Transaction beginTransaction() throws DatabaseException 380 { 381 return dbEnvironment.beginTransaction(null, null); 382 } 383 384 /** 385 * Shutdown the Db environment. 386 */ 387 public void shutdown() 388 { 389 try 390 { 391 stateDb.close(); 392 dbEnvironment.close(); 393 } catch (DatabaseException e) 394 { 395 MessageBuilder mb = new MessageBuilder(); 396 mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); 397 mb.append(stackTraceToSingleLineString(e)); 398 logError(mb.toMessage()); 399 } 400 } 401 402 /** 403 * Clears the provided generationId associated to the provided baseDn 404 * from the state Db. 405 * 406 * @param baseDn The baseDn for which the generationID must be cleared. 407 * 408 */ 409 public void clearGenerationId(DN baseDn) 410 { 411 if (debugEnabled()) 412 TRACER.debugInfo( 413 "In " + this.replicationServer.getMonitorInstanceName() + 414 " clearGenerationId " + baseDn); 415 try 416 { 417 // Deletes the record domain base Dn/ generationId in the stateDb 418 String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + 419 baseDn.toNormalizedString(); 420 byte[] byteId = stringId.getBytes("UTF-8"); 421 DatabaseEntry key = new DatabaseEntry(); 422 key.setData(byteId); 423 DatabaseEntry data = new DatabaseEntry(); 424 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); 425 if ((status == OperationStatus.SUCCESS) || 426 (status == OperationStatus.KEYEXIST)) 427 { 428 Transaction txn = dbEnvironment.beginTransaction(null, null); 429 try 430 { 431 stateDb.delete(txn, key); 432 txn.commitWriteNoSync(); 433 if (debugEnabled()) 434 TRACER.debugInfo( 435 "In " + this.replicationServer.getMonitorInstanceName() + 436 " clearGenerationId (" + 437 baseDn +") succeeded."); 438 } 439 catch (DatabaseException dbe) 440 { 441 // Abort the txn and propagate the Exception to the caller 442 txn.abort(); 443 throw dbe; 444 } 445 } 446 else 447 { 448 // TODO : should have a better error logging 449 if (debugEnabled()) 450 TRACER.debugInfo( 451 "In " + this.replicationServer.getMonitorInstanceName() + 452 " clearGenerationId ("+ baseDn + " failed" + status.toString()); 453 } 454 } 455 catch (UnsupportedEncodingException e) 456 { 457 // can't happen 458 } 459 catch (DatabaseException dbe) 460 { 461 // can't happen 462 } 463 } 464 465 /** 466 * Clears the provided serverId associated to the provided baseDn 467 * from the state Db. 468 * 469 * @param baseDn The baseDn for which the generationID must be cleared. 470 * @param serverId The serverId to remove from the Db. 471 * 472 */ 473 public void clearServerId(DN baseDn, Short serverId) 474 { 475 if (debugEnabled()) 476 TRACER.debugInfo( 477 "In " + this.replicationServer.getMonitorInstanceName() + 478 "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId); 479 try 480 { 481 String stringId = serverId.toString() + FIELD_SEPARATOR 482 + baseDn.toNormalizedString(); 483 484 // Deletes the record serverId/domain base Dn in the stateDb 485 byte[] byteId; 486 byteId = stringId.getBytes("UTF-8"); 487 DatabaseEntry key = new DatabaseEntry(); 488 key.setData(byteId); 489 DatabaseEntry data = new DatabaseEntry(); 490 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); 491 if (status != OperationStatus.NOTFOUND) 492 { 493 Transaction txn = dbEnvironment.beginTransaction(null, null); 494 try { 495 data.setData(byteId); 496 stateDb.delete(txn, key); 497 txn.commitWriteNoSync(); 498 if (debugEnabled()) 499 TRACER.debugInfo( 500 " In " + this.replicationServer.getMonitorInstanceName() + 501 " clearServerId() succeeded " + baseDn + " " + 502 serverId); 503 } 504 catch (DatabaseException dbe) 505 { 506 // Abort the txn and propagate the Exception to the caller 507 txn.abort(); 508 throw dbe; 509 } 510 } 511 } 512 catch (UnsupportedEncodingException e) 513 { 514 // can't happen 515 } 516 catch (DatabaseException dbe) 517 { 518 // can't happen 519 } 520 } 521 522 /** 523 * Clears the database. 524 * 525 * @param databaseName The name of the database to clear. 526 */ 527 public final void clearDb(String databaseName) 528 { 529 Transaction txn = null; 530 try 531 { 532 txn = dbEnvironment.beginTransaction(null, null); 533 dbEnvironment.truncateDatabase(txn, databaseName, false); 534 txn.commitWriteNoSync(); 535 txn = null; 536 } 537 catch (DatabaseException e) 538 { 539 MessageBuilder mb = new MessageBuilder(); 540 mb.append(ERR_ERROR_CLEARING_DB.get(databaseName, 541 e.getMessage() + " " + 542 stackTraceToSingleLineString(e))); 543 logError(mb.toMessage()); 544 } 545 finally 546 { 547 try 548 { 549 if (txn != null) 550 txn.abort(); 551 } 552 catch(Exception e) 553 {} 554 } 555 } 556 }