001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.MessageBuilder;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.messages.ReplicationMessages.*;
032    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
033    
034    import java.util.List;
035    import java.io.UnsupportedEncodingException;
036    
037    import org.opends.server.types.DN;
038    import org.opends.server.replication.common.ChangeNumber;
039    import org.opends.server.replication.protocol.UpdateMessage;
040    import java.util.concurrent.locks.ReentrantReadWriteLock;
041    
042    import com.sleepycat.je.Cursor;
043    import com.sleepycat.je.DatabaseEntry;
044    import com.sleepycat.je.DatabaseException;
045    import com.sleepycat.je.Database;
046    import com.sleepycat.je.DeadlockException;
047    import com.sleepycat.je.LockMode;
048    import com.sleepycat.je.OperationStatus;
049    import com.sleepycat.je.Transaction;
050    
051    /**
052     * This class implements the interface between the underlying database
053     * and the dbHandler class.
054     * This is the only class that should have code using the BDB interfaces.
055     */
056    public class ReplicationDB
057    {
058      private Database db = null;
059      private ReplicationDbEnv dbenv = null;
060      private ReplicationServer replicationServer;
061      private Short serverId;
062      private DN baseDn;
063    
064      // The maximum number of retries in case of DatabaseDeadlock Exception.
065      private static final int DEADLOCK_RETRIES = 10;
066    
067      // The lock used to provide exclusive access to the thread that
068      // close the db (shutdown or clear).
069      private ReentrantReadWriteLock dbCloseLock;
070    
071     /**
072       * Creates a new database or open existing database that will be used
073       * to store and retrieve changes from an LDAP server.
074       * @param serverId The identifier of the LDAP server.
075       * @param baseDn The baseDn of the replication domain.
076       * @param replicationServer The ReplicationServer that needs to be shutdown.
077       * @param dbenv The Db environment to use to create the db.
078       * @throws DatabaseException If a database problem happened.
079       */
080      public ReplicationDB(Short serverId, DN baseDn,
081                         ReplicationServer replicationServer,
082                         ReplicationDbEnv dbenv)
083                         throws DatabaseException
084      {
085        this.serverId = serverId;
086        this.baseDn = baseDn;
087        this.dbenv = dbenv;
088        this.replicationServer = replicationServer;
089    
090        // Get or create the associated ReplicationServerDomain and Db.
091        db = dbenv.getOrAddDb(serverId, baseDn,
092            replicationServer.getReplicationServerDomain(baseDn,
093            true).getGenerationId());
094    
095        dbCloseLock = new ReentrantReadWriteLock(true);
096      }
097    
098      /**
099       * add a list of changes to the underlying db.
100       *
101       * @param changes The list of changes to add to the underlying db.
102       */
103      public void addEntries(List<UpdateMessage> changes)
104      {
105        Transaction txn = null;
106    
107        try
108        {
109          int tries = 0;
110          boolean done = false;
111    
112          // The database can return a Deadlock Exception if several threads are
113          // accessing the database at the same time. This Exception is a
114          // transient state, when it happens the transaction is aborted and
115          // the operation is attempted again up to DEADLOCK_RETRIES times.
116          while ((tries++ < DEADLOCK_RETRIES) && (!done))
117          {
118            dbCloseLock.readLock().lock();
119            try
120            {
121              txn = dbenv.beginTransaction();
122    
123              for (UpdateMessage change : changes)
124              {
125                DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
126                DatabaseEntry data = new ReplicationData(change);
127                db.put(txn, key, data);
128              }
129    
130              txn.commitWriteNoSync();
131              txn = null;
132              done = true;
133            }
134            catch (DeadlockException e)
135            {
136              txn.abort();
137              txn = null;
138            }
139            finally
140            {
141              dbCloseLock.readLock().unlock();
142            }
143          }
144          if (!done)
145          {
146            // Could not write to the DB after DEADLOCK_RETRIES tries.
147            // This ReplicationServer is not reliable and will be shutdown.
148            MessageBuilder mb = new MessageBuilder();
149            mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
150            logError(mb.toMessage());
151            if (txn != null)
152            {
153               txn.abort();
154            }
155            replicationServer.shutdown();
156          }
157        }
158        catch (DatabaseException e)
159        {
160          MessageBuilder mb = new MessageBuilder();
161          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
162          mb.append(stackTraceToSingleLineString(e));
163          logError(mb.toMessage());
164          if (txn != null)
165          {
166            try
167            {
168              txn.abort();
169            } catch (DatabaseException e1)
170            {
171              // can't do much more. The ReplicationServer is shuting down.
172            }
173          }
174          replicationServer.shutdown();
175        }
176        catch (UnsupportedEncodingException e)
177        {
178          MessageBuilder mb = new MessageBuilder();
179          mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
180          mb.append(stackTraceToSingleLineString(e));
181          logError(mb.toMessage());
182          replicationServer.shutdown();
183          if (txn != null)
184          {
185            try
186            {
187              txn.abort();
188            } catch (DatabaseException e1)
189            {
190              // can't do much more. The ReplicationServer is shuting down.
191            }
192          }
193          replicationServer.shutdown();
194        }
195      }
196    
197    
198      /**
199       * Shutdown the database.
200       */
201      public void shutdown()
202      {
203        try
204        {
205          dbCloseLock.writeLock().lock();
206          try
207          {
208            db.close();
209          }
210          finally
211          {
212            dbCloseLock.writeLock().unlock();
213          }
214        }
215        catch (DatabaseException e)
216        {
217          MessageBuilder mb = new MessageBuilder();
218          mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
219          mb.append(stackTraceToSingleLineString(e));
220          logError(mb.toMessage());
221        }
222      }
223    
224      /**
225       * Create a cursor that can be used to search or iterate on this
226       * ReplicationServer DB.
227       *
228       * @param changeNumber The ChangeNumber from which the cursor must start.
229       * @throws DatabaseException If a database error prevented the cursor
230       *                           creation.
231       * @throws Exception if the ReplServerDBCursor creation failed.
232       * @return The ReplServerDBCursor.
233       */
234      public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
235                    throws DatabaseException, Exception
236      {
237        return new ReplServerDBCursor(changeNumber);
238      }
239    
240      /**
241       * Create a cursor that can be used to delete some record from this
242       * ReplicationServer database.
243       *
244       * @throws DatabaseException If a database error prevented the cursor
245       *                           creation.
246       * @throws Exception if the ReplServerDBCursor creation failed.
247       *
248       * @return The ReplServerDBCursor.
249       */
250      public ReplServerDBCursor openDeleteCursor()
251                    throws DatabaseException, Exception
252      {
253        return new ReplServerDBCursor();
254      }
255    
256      private void closeLockedCursor(Cursor cursor)
257        throws DatabaseException
258      {
259        try
260        {
261          if (cursor != null)
262            cursor.close();
263        }
264        finally
265        {
266          dbCloseLock.readLock().unlock();
267        }
268      }
269    
270      /**
271       * Read the first Change from the database.
272       * @return the first ChangeNumber.
273       */
274      public ChangeNumber readFirstChange()
275      {
276        Cursor cursor = null;
277        String str = null;
278    
279        try
280        {
281          dbCloseLock.readLock().lock();
282          cursor = db.openCursor(null, null);
283        }
284        catch (DatabaseException e1)
285        {
286          dbCloseLock.readLock().unlock();
287          return null;
288        }
289        try
290        {
291          try
292          {
293            DatabaseEntry key = new DatabaseEntry();
294            DatabaseEntry data = new DatabaseEntry();
295            OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
296            if (status != OperationStatus.SUCCESS)
297            {
298              /* database is empty */
299              return null;
300            }
301            try
302            {
303              str = new String(key.getData(), "UTF-8");
304            } catch (UnsupportedEncodingException e)
305            {
306              // never happens
307            }
308            return new ChangeNumber(str);
309          }
310          finally
311          {
312            closeLockedCursor(cursor);
313          }
314        }
315        catch (DatabaseException e)
316        {
317          /* database is faulty */
318          MessageBuilder mb = new MessageBuilder();
319          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
320          mb.append(stackTraceToSingleLineString(e));
321          logError(mb.toMessage());
322          replicationServer.shutdown();
323          return null;
324        }
325      }
326    
327      /**
328       * Read the last Change from the database.
329       * @return the last ChangeNumber.
330       */
331      public ChangeNumber readLastChange()
332      {
333        Cursor cursor = null;
334        String str = null;
335    
336        try
337        {
338          dbCloseLock.readLock().lock();
339          try
340          {
341            cursor = db.openCursor(null, null);
342            DatabaseEntry key = new DatabaseEntry();
343            DatabaseEntry data = new DatabaseEntry();
344            OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
345            if (status != OperationStatus.SUCCESS)
346            {
347              /* database is empty */
348              return null;
349            }
350            try
351            {
352              str = new String(key.getData(), "UTF-8");
353            }
354            catch (UnsupportedEncodingException e)
355            {
356              // never happens
357            }
358            return new ChangeNumber(str);
359          }
360          finally
361          {
362            closeLockedCursor(cursor);
363          }
364        }
365        catch (DatabaseException e)
366        {
367          MessageBuilder mb = new MessageBuilder();
368          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
369          mb.append(stackTraceToSingleLineString(e));
370          logError(mb.toMessage());
371          replicationServer.shutdown();
372          return null;
373        }
374      }
375    
376      /**
377       * {@inheritDoc}
378       */
379      @Override
380      public String toString()
381      {
382        return serverId.toString() + baseDn.toString();
383      }
384    
385      /**
386       * This Class implements a cursor that can be used to browse a
387       * replicationServer database.
388       */
389      public class ReplServerDBCursor
390      {
391        private Cursor cursor = null;
392    
393        // The transaction that will protect the actions done with the cursor
394        // Will be let null for a read cursor
395        // Will be set non null for a write cursor
396        private Transaction txn = null;
397        DatabaseEntry key = new DatabaseEntry();
398        DatabaseEntry data = new DatabaseEntry();
399    
400        /**
401         * Creates a ReplServerDBCursor that can be used for browsing a
402         * replicationServer db.
403         *
404         * @param startingChangeNumber The ChangeNumber from which the cursor must
405         *        start.
406         * @throws Exception When the startingChangeNumber does not exist.
407         */
408        private ReplServerDBCursor(ChangeNumber startingChangeNumber)
409                throws Exception
410        {
411          try
412          {
413            // Take the lock. From now on, whatever error that happen in the life
414            // of this cursor should end by unlocking that lock. We must also
415            // unlock it when throwing an exception.
416            dbCloseLock.readLock().lock();
417    
418            cursor = db.openCursor(txn, null);
419            if (startingChangeNumber != null)
420            {
421              key = new ReplicationKey(startingChangeNumber);
422              data = new DatabaseEntry();
423    
424              if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
425                OperationStatus.SUCCESS)
426              {
427                // We could not move the cursor to the expected startingChangeNumber
428                if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
429                  OperationStatus.SUCCESS)
430                {
431                  // We could not even move the cursor closed to it => failure
432                  throw new Exception("ChangeNumber not available");
433                }
434                else
435                {
436                  // We can move close to the startingChangeNumber.
437                  // Let's create a cursor from that point.
438                  DatabaseEntry key = new DatabaseEntry();
439                  DatabaseEntry data = new DatabaseEntry();
440                  if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
441                    OperationStatus.SUCCESS)
442                  {
443                    closeLockedCursor(cursor);
444                    dbCloseLock.readLock().lock();
445                    cursor = db.openCursor(txn, null);
446                  }
447                }
448              }
449            }
450          }
451          catch (Exception e)
452          {
453           // Unlocking is required before throwing any exception
454            closeLockedCursor(cursor);
455            throw (e);
456          }
457        }
458    
459        private ReplServerDBCursor() throws DatabaseException
460        {
461          try
462          {
463            // We'll go on only if no close or no clear is running
464            dbCloseLock.readLock().lock();
465    
466            // Create the transaction that will protect whatever done with this
467            // write cursor.
468            txn = dbenv.beginTransaction();
469    
470            cursor = db.openCursor(txn, null);
471          }
472          catch(DatabaseException e)
473          {
474            if (txn != null)
475            {
476              try
477              {
478                txn.abort();
479              }
480              catch (DatabaseException dbe)
481              {}
482            }
483            closeLockedCursor(cursor);
484            throw (e);
485          }
486        }
487    
488        /**
489         * Close the ReplicationServer Cursor.
490         */
491        public void close()
492        {
493          try
494          {
495            closeLockedCursor(cursor);
496            cursor = null;
497          }
498          catch (DatabaseException e)
499          {
500            MessageBuilder mb = new MessageBuilder();
501            mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
502            mb.append(stackTraceToSingleLineString(e));
503            logError(mb.toMessage());
504            replicationServer.shutdown();
505          }
506          if (txn != null)
507          {
508            try
509            {
510              txn.commit();
511            } catch (DatabaseException e)
512            {
513              MessageBuilder mb = new MessageBuilder();
514              mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
515              mb.append(stackTraceToSingleLineString(e));
516              logError(mb.toMessage());
517              replicationServer.shutdown();
518            }
519          }
520        }
521    
522        /**
523         * Abort the Cursor after a Deadlock Exception.
524         * This method catch and ignore the DeadlockException because
525         * this must be done when aborting a cursor after a DeadlockException
526         * (per the Cursor documentation).
527         * This should not be used in any other case.
528         */
529        public void abort()
530        {
531          if (cursor == null)
532            return;
533          try
534          {
535            closeLockedCursor(cursor);
536            cursor = null;
537          }
538          catch (DeadlockException e1)
539          {
540            // The DB documentation states that a DeadlockException
541            // on the close method of a cursor that is aborting should
542            // be ignored.
543          }
544          catch (DatabaseException e)
545          {
546            MessageBuilder mb = new MessageBuilder();
547            mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
548            mb.append(stackTraceToSingleLineString(e));
549            logError(mb.toMessage());
550            replicationServer.shutdown();
551          }
552          if (txn != null)
553          {
554            try
555            {
556              txn.abort();
557            } catch (DatabaseException e)
558            {
559              MessageBuilder mb = new MessageBuilder();
560              mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
561              mb.append(stackTraceToSingleLineString(e));
562              logError(mb.toMessage());
563              replicationServer.shutdown();
564            }
565          }
566        }
567    
568        /**
569         * Get the next ChangeNumber in the database from this Cursor.
570         *
571         * @return The next ChangeNumber in the database from this cursor.
572         * @throws DatabaseException In case of underlying database problem.
573         */
574        public ChangeNumber nextChangeNumber() throws DatabaseException
575        {
576          OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
577    
578          if (status != OperationStatus.SUCCESS)
579          {
580            return null;
581          }
582          try
583          {
584            String csnString = new String(key.getData(), "UTF-8");
585            return new ChangeNumber(csnString);
586          } catch (UnsupportedEncodingException e)
587          {
588            // can't happen
589            return null;
590          }
591        }
592    
593        /**
594         * Get the next UpdateMessage from this cursor.
595         *
596         * @return the next UpdateMessage.
597         */
598        public UpdateMessage next()
599        {
600          UpdateMessage currentChange = null;
601          while (currentChange == null)
602          {
603            try
604            {
605              OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
606              if (status != OperationStatus.SUCCESS)
607              {
608                return null;
609              }
610            } catch (DatabaseException e)
611            {
612              return null;
613            }
614            try {
615              currentChange = ReplicationData.generateChange(data.getData());
616            } catch (Exception e) {
617              /*
618               * An error happening trying to convert the data from the
619               * replicationServer database to an Update Message.
620               * This can only happen if the database is corrupted.
621               * There is not much more that we can do at this point except trying
622               * to continue with the next record.
623               * In such case, it is therefore possible that we miss some changes.
624               * TODO. log an error message.
625               * TODO : REPAIR : Such problem should be handled by the
626               *        repair functionality.
627               */
628            }
629          }
630          return currentChange;
631        }
632    
633        /**
634         * Delete the record at the current cursor position.
635         *
636         * @throws DatabaseException In case of database problem.
637         */
638        public void delete() throws DatabaseException
639        {
640          cursor.delete();
641        }
642      } // ReplServerDBCursor
643    
644      /**
645       * Clears this change DB from the changes it contains.
646       *
647       * @throws Exception Throws an exception it occurs.
648       * @throws DatabaseException Throws a DatabaseException when it occurs.
649       */
650      public void clear() throws Exception, DatabaseException
651      {
652        // The coming users will be blocked until the clear is done
653        dbCloseLock.writeLock().lock();
654        try
655        {
656          String dbName = db.getDatabaseName();
657    
658          // Clears the reference to this serverID
659          dbenv.clearServerId(baseDn, serverId);
660    
661          // Closing is requested by the Berkeley DB before truncate
662          db.close();
663    
664          // Clears the changes
665          dbenv.clearDb(dbName);
666    
667          db = null;
668    
669          // RE-create the db
670          db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
671        }
672        catch(Exception e)
673        {
674          MessageBuilder mb = new MessageBuilder();
675          mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
676              e.getMessage() + " " +
677              stackTraceToSingleLineString(e)));
678          logError(mb.toMessage());
679        }
680        finally
681        {
682          // Relax the waiting users
683          dbCloseLock.writeLock().unlock();
684        }
685      }
686    }