001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.*;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033    import org.opends.server.loggers.debug.DebugTracer;
034    import static org.opends.messages.ReplicationMessages.*;
035    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
036    
037    import java.io.File;
038    import java.io.UnsupportedEncodingException;
039    
040    import org.opends.server.types.DN;
041    import org.opends.server.types.DirectoryException;
042    
043    import com.sleepycat.je.Cursor;
044    import com.sleepycat.je.Database;
045    import com.sleepycat.je.DatabaseConfig;
046    import com.sleepycat.je.DatabaseEntry;
047    import com.sleepycat.je.DatabaseException;
048    import com.sleepycat.je.Environment;
049    import com.sleepycat.je.EnvironmentConfig;
050    import com.sleepycat.je.LockMode;
051    import com.sleepycat.je.OperationStatus;
052    import com.sleepycat.je.Transaction;
053    
054    /**
055     * This class is used to represent a Db environement that can be used
056     * to create ReplicationDB.
057     */
058    public class ReplicationDbEnv
059    {
060      private Environment dbEnvironment = null;
061      private Database stateDb = null;
062      private ReplicationServer replicationServer = null;
063      private static final String GENERATION_ID_TAG = "GENID";
064      private static final String FIELD_SEPARATOR = " ";
065      /**
066       * The tracer object for the debug logger.
067       */
068      private static final DebugTracer TRACER = getTracer();
069    
070      /**
071       * Initialize this class.
072       * Creates Db environment that will be used to create databases.
073       * It also reads the currently known databases from the "changelogstate"
074       * database.
075       * @param path Path where the backing files must be created.
076       * @param replicationServer the ReplicationServer that creates this
077       *                          ReplicationDbEnv.
078       * @throws DatabaseException If a DatabaseException occurred that prevented
079       *                           the initialization to happen.
080       * @throws ReplicationDBException If a replicationServer internal error caused
081       *                              a failure of the replicationServer processing.
082       */
083      public ReplicationDbEnv(String path, ReplicationServer replicationServer)
084             throws DatabaseException, ReplicationDBException
085      {
086        this.replicationServer = replicationServer;
087        EnvironmentConfig envConfig = new EnvironmentConfig();
088    
089        /* Create the DB Environment that will be used for all
090         * the ReplicationServer activities related to the db
091         */
092        envConfig.setAllowCreate(true);
093        envConfig.setTransactional(true);
094        envConfig.setConfigParam("je.cleaner.expunge", "true");
095        // TODO : the DB cache size should be configurable
096        // For now set 5M is OK for being efficient in 64M total for the JVM
097        envConfig.setConfigParam("je.maxMemory", "5000000");
098        dbEnvironment = new Environment(new File(path), envConfig);
099    
100        /*
101         * One database is created to store the update from each LDAP
102         * server in the topology.
103         * The database "changelogstate" is used to store the list of all
104         * the servers that have been seen in the past.
105         */
106        DatabaseConfig dbConfig = new DatabaseConfig();
107        dbConfig.setAllowCreate(true);
108        dbConfig.setTransactional(true);
109    
110        stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
111        start();
112    
113      }
114    
115      /**
116       * Read the list of known servers from the database and start dbHandler
117       * for each of them.
118       *
119       * @throws DatabaseException in case of underlying DatabaseException
120       * @throws ReplicationDBException when the information from the database
121       *                              cannot be decoded correctly.
122       */
123      private void start() throws DatabaseException, ReplicationDBException
124      {
125        Cursor cursor = stateDb.openCursor(null, null);
126        DatabaseEntry key = new DatabaseEntry();
127        DatabaseEntry data = new DatabaseEntry();
128    
129        try
130        {
131          /*
132           *  Get the domain base DN/ generationIDs records
133           */
134          OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
135          while (status == OperationStatus.SUCCESS)
136          {
137            try
138            {
139              String stringData = new String(data.getData(), "UTF-8");
140    
141              if (debugEnabled())
142                TRACER.debugInfo(
143                    "In " + this.replicationServer.getMonitorInstanceName() +
144                    " Read tag baseDn generationId=" + stringData);
145    
146              String[] str = stringData.split(FIELD_SEPARATOR, 3);
147              if (str[0].equals(GENERATION_ID_TAG))
148              {
149                long generationId=-1;
150    
151                DN baseDn;
152    
153                try
154                {
155                  // <generationId>
156                  generationId = new Long(str[1]);
157                }
158                catch (NumberFormatException e)
159                {
160                  // should never happen
161                  // TODO: i18n
162                  throw new ReplicationDBException(Message.raw(
163                      "replicationServer state database has a wrong format: " +
164                      e.getLocalizedMessage()
165                      + "<" + str[1] + ">"));
166                }
167    
168                // <baseDn>
169                baseDn = null;
170                try
171                {
172                  baseDn = DN.decode(str[2]);
173                } catch (DirectoryException e)
174                {
175                  Message message =
176                    ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
177                  logError(message);
178    
179                }
180    
181                if (debugEnabled())
182                  TRACER.debugInfo(
183                    "In " + this.replicationServer.getMonitorInstanceName() +
184                    " Has read baseDn=" + baseDn
185                    + " generationId=" + generationId);
186    
187                replicationServer.getReplicationServerDomain(baseDn, true).
188                setGenerationId(generationId, true);
189              }
190            }
191            catch (UnsupportedEncodingException e)
192            {
193              // should never happens
194              // TODO: i18n
195              throw new ReplicationDBException(Message.raw("need UTF-8 support"));
196            }
197            status = cursor.getNext(key, data, LockMode.DEFAULT);
198          }
199    
200          /*
201           * Get the server Id / domain base DN records
202           */
203          status = cursor.getFirst(key, data, LockMode.DEFAULT);
204          while (status == OperationStatus.SUCCESS)
205          {
206            String stringData = null;
207            try
208            {
209              stringData = new String(data.getData(), "UTF-8");
210            }
211            catch (UnsupportedEncodingException e)
212            {
213              // should never happens
214              // TODO: i18n
215              throw new ReplicationDBException(Message.raw(
216              "need UTF-8 support"));
217            }
218    
219            if (debugEnabled())
220              TRACER.debugInfo(
221                "In " + this.replicationServer.getMonitorInstanceName() +
222                " Read serverId BaseDN=" + stringData);
223    
224            String[] str = stringData.split(FIELD_SEPARATOR, 2);
225            if (!str[0].equals(GENERATION_ID_TAG))
226            {
227              short serverId = -1;
228              try
229              {
230                // <serverId>
231                serverId = new Short(str[0]);
232              } catch (NumberFormatException e)
233              {
234                // should never happen
235                // TODO: i18n
236                throw new ReplicationDBException(Message.raw(
237                    "replicationServer state database has a wrong format: " +
238                    e.getLocalizedMessage()
239                    + "<" + str[0] + ">"));
240              }
241              // <baseDn>
242              DN baseDn = null;
243              try
244              {
245                baseDn = DN.decode(str[1]);
246              } catch (DirectoryException e)
247              {
248                Message message =
249                  ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
250                logError(message);
251              }
252    
253              if (debugEnabled())
254                TRACER.debugInfo(
255                  "In " + this.replicationServer.getMonitorInstanceName() +
256                  " Has read: baseDn=" + baseDn
257                  + " serverId=" + serverId);
258    
259              DbHandler dbHandler =
260                new DbHandler(serverId, baseDn, replicationServer, this);
261    
262              replicationServer.getReplicationServerDomain(baseDn, true).
263              setDbHandler(serverId, dbHandler);
264            }
265    
266            status = cursor.getNext(key, data, LockMode.DEFAULT);
267          }
268          cursor.close();
269    
270        }
271        catch (DatabaseException dbe)
272        {
273          cursor.close();
274          throw dbe;
275        }
276      }
277    
278        /**
279         * Finds or creates the database used to store changes from the server
280         * with the given serverId and the given baseDn.
281         *
282         * @param serverId     The server id that identifies the server.
283         * @param baseDn       The baseDn that identifies the domain.
284         * @param generationId The generationId associated to this domain.
285         * @return the Database.
286         * @throws DatabaseException in case of underlying Exception.
287         */
288        public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
289        throws DatabaseException
290        {
291          if (debugEnabled())
292            TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
293              serverId + " " + baseDn + " " + generationId);
294          try
295          {
296            String stringId = serverId.toString() + FIELD_SEPARATOR
297                              + baseDn.toNormalizedString();
298    
299            // Opens the database for the changes received from this server
300            // on this domain. Create it if it does not already exist.
301            DatabaseConfig dbConfig = new DatabaseConfig();
302            dbConfig.setAllowCreate(true);
303            dbConfig.setTransactional(true);
304            Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
305    
306            // Creates the record serverId/domain base Dn in the stateDb
307            // if it does not already exist.
308            byte[] byteId;
309            byteId = stringId.getBytes("UTF-8");
310            DatabaseEntry key = new DatabaseEntry();
311            key.setData(byteId);
312            DatabaseEntry data = new DatabaseEntry();
313            OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
314            if (status == OperationStatus.NOTFOUND)
315            {
316              Transaction txn = dbEnvironment.beginTransaction(null, null);
317              try {
318                data.setData(byteId);
319                if (debugEnabled())
320                  TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
321                    " serverId/Domain=<"+stringId+">");
322                stateDb.put(txn, key, data);
323                txn.commitWriteNoSync();
324              } catch (DatabaseException dbe)
325              {
326                // Abort the txn and propagate the Exception to the caller
327                txn.abort();
328                throw dbe;
329              }
330            }
331    
332            // Creates the record domain base Dn/ generationId in the stateDb
333            // if it does not already exist.
334            stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
335              baseDn.toNormalizedString();
336            String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
337            generationId.toString() + FIELD_SEPARATOR +
338              baseDn.toNormalizedString();
339            byteId = stringId.getBytes("UTF-8");
340            byte[] dataByteId;
341            dataByteId = dataStringId.getBytes("UTF-8");
342            key = new DatabaseEntry();
343            key.setData(byteId);
344            data = new DatabaseEntry();
345            status = stateDb.get(null, key, data, LockMode.DEFAULT);
346            if (status == OperationStatus.NOTFOUND)
347            {
348              Transaction txn = dbEnvironment.beginTransaction(null, null);
349              try {
350                data.setData(dataByteId);
351                if (debugEnabled())
352                  TRACER.debugInfo(
353                      "Created in the state Db record Tag/Domain/GenId key=" +
354                      stringId + " value=" + dataStringId);
355                stateDb.put(txn, key, data);
356                txn.commitWriteNoSync();
357              } catch (DatabaseException dbe)
358              {
359                // Abort the txn and propagate the Exception to the caller
360                txn.abort();
361                throw dbe;
362              }
363            }
364            return db;
365          }
366          catch (UnsupportedEncodingException e)
367          {
368            // can't happen
369            return null;
370          }
371        }
372    
373        /**
374         * Creates a new transaction.
375         *
376         * @return the transaction.
377         * @throws DatabaseException in case of underlying database Exception.
378         */
379        public Transaction beginTransaction() throws DatabaseException
380        {
381          return dbEnvironment.beginTransaction(null, null);
382        }
383    
384        /**
385         * Shutdown the Db environment.
386         */
387        public void shutdown()
388        {
389          try
390          {
391            stateDb.close();
392            dbEnvironment.close();
393          } catch (DatabaseException e)
394          {
395            MessageBuilder mb = new MessageBuilder();
396            mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
397            mb.append(stackTraceToSingleLineString(e));
398            logError(mb.toMessage());
399          }
400        }
401    
402        /**
403         * Clears the provided generationId associated to the provided baseDn
404         * from the state Db.
405         *
406         * @param baseDn The baseDn for which the generationID must be cleared.
407         *
408         */
409        public void clearGenerationId(DN baseDn)
410        {
411          if (debugEnabled())
412            TRACER.debugInfo(
413                "In " + this.replicationServer.getMonitorInstanceName() +
414              " clearGenerationId " + baseDn);
415          try
416          {
417            // Deletes the record domain base Dn/ generationId in the stateDb
418            String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
419              baseDn.toNormalizedString();
420            byte[] byteId = stringId.getBytes("UTF-8");
421            DatabaseEntry key = new DatabaseEntry();
422            key.setData(byteId);
423            DatabaseEntry data = new DatabaseEntry();
424            OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
425            if ((status == OperationStatus.SUCCESS) ||
426                (status == OperationStatus.KEYEXIST))
427            {
428              Transaction txn = dbEnvironment.beginTransaction(null, null);
429              try
430              {
431                stateDb.delete(txn, key);
432                txn.commitWriteNoSync();
433                if (debugEnabled())
434                  TRACER.debugInfo(
435                    "In " + this.replicationServer.getMonitorInstanceName() +
436                    " clearGenerationId (" +
437                    baseDn +") succeeded.");
438              }
439              catch (DatabaseException dbe)
440              {
441                // Abort the txn and propagate the Exception to the caller
442                txn.abort();
443                throw dbe;
444              }
445            }
446            else
447            {
448              // TODO : should have a better error logging
449              if (debugEnabled())
450                TRACER.debugInfo(
451                  "In " + this.replicationServer.getMonitorInstanceName() +
452                  " clearGenerationId ("+ baseDn + " failed" + status.toString());
453            }
454          }
455          catch (UnsupportedEncodingException e)
456          {
457            // can't happen
458          }
459          catch (DatabaseException dbe)
460          {
461            // can't happen
462          }
463        }
464    
465        /**
466         * Clears the provided serverId associated to the provided baseDn
467         * from the state Db.
468         *
469         * @param baseDn The baseDn for which the generationID must be cleared.
470         * @param serverId The serverId to remove from the Db.
471         *
472         */
473        public void clearServerId(DN baseDn, Short serverId)
474        {
475          if (debugEnabled())
476            TRACER.debugInfo(
477                "In " + this.replicationServer.getMonitorInstanceName() +
478                "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
479          try
480          {
481            String stringId = serverId.toString() + FIELD_SEPARATOR
482                             + baseDn.toNormalizedString();
483    
484            // Deletes the record serverId/domain base Dn in the stateDb
485            byte[] byteId;
486            byteId = stringId.getBytes("UTF-8");
487            DatabaseEntry key = new DatabaseEntry();
488            key.setData(byteId);
489            DatabaseEntry data = new DatabaseEntry();
490            OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
491            if (status != OperationStatus.NOTFOUND)
492            {
493              Transaction txn = dbEnvironment.beginTransaction(null, null);
494              try {
495                data.setData(byteId);
496                stateDb.delete(txn, key);
497                txn.commitWriteNoSync();
498                if (debugEnabled())
499                  TRACER.debugInfo(
500                      " In " + this.replicationServer.getMonitorInstanceName() +
501                      " clearServerId() succeeded " + baseDn + " " +
502                      serverId);
503              }
504              catch (DatabaseException dbe)
505              {
506                // Abort the txn and propagate the Exception to the caller
507                txn.abort();
508                throw dbe;
509              }
510            }
511          }
512          catch (UnsupportedEncodingException e)
513          {
514            // can't happen
515          }
516          catch (DatabaseException dbe)
517          {
518            // can't happen
519          }
520        }
521    
522        /**
523         * Clears the database.
524         *
525         * @param databaseName The name of the database to clear.
526         */
527        public final void clearDb(String databaseName)
528        {
529          Transaction txn = null;
530          try
531          {
532            txn = dbEnvironment.beginTransaction(null, null);
533            dbEnvironment.truncateDatabase(txn, databaseName, false);
534            txn.commitWriteNoSync();
535            txn = null;
536          }
537          catch (DatabaseException e)
538          {
539            MessageBuilder mb = new MessageBuilder();
540            mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
541                e.getMessage() + " " +
542                stackTraceToSingleLineString(e)));
543            logError(mb.toMessage());
544          }
545          finally
546          {
547            try
548            {
549              if (txn != null)
550                txn.abort();
551            }
552            catch(Exception e)
553            {}
554          }
555        }
556    }