View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.mitosis.store.derby;
21  
22  
23  import org.apache.commons.dbcp.BasicDataSource;
24  import org.apache.directory.mitosis.common.CSN;
25  import org.apache.directory.mitosis.common.CSNVector;
26  import org.apache.directory.mitosis.common.DefaultCSN;
27  import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
28  import org.apache.directory.mitosis.operation.Operation;
29  import org.apache.directory.mitosis.operation.OperationCodec;
30  import org.apache.directory.mitosis.store.ReplicationLogIterator;
31  import org.apache.directory.mitosis.store.ReplicationStore;
32  import org.apache.directory.mitosis.store.ReplicationStoreException;
33  import org.apache.directory.server.core.DirectoryService;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import javax.naming.Name;
38  import javax.naming.ldap.LdapName;
39  import java.io.File;
40  import java.sql.Connection;
41  import java.sql.DriverManager;
42  import java.sql.PreparedStatement;
43  import java.sql.ResultSet;
44  import java.sql.SQLException;
45  import java.sql.Statement;
46  import java.util.HashSet;
47  import java.util.Set;
48  import java.util.UUID;
49  
50  
51  public class DerbyReplicationStore implements ReplicationStore
52  {
53      private static final Logger LOG = LoggerFactory.getLogger( DerbyReplicationStore.class );
54  
55      private static final String DEFAULT_TABLE_PREFIX = "REPLICATION_";
56      private static final String KEY_REPLICA_ID = "replicaId";
57  
58      private static final String DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
59      private static final String DB_URI_PREFIX = "jdbc:derby:";
60  
61      private String dbURI;
62      private BasicDataSource dataSource;
63      private String replicaId;
64      private String tablePrefix = DEFAULT_TABLE_PREFIX;
65      private String metadataTableName;
66      private String uuidTableName;
67      private String logTableName;
68      private Set<String> knownReplicaIds;
69      private final Object knownReplicaIdsLock = new Object();
70      private final OperationCodec operationCodec = new OperationCodec();
71  
72  
73      public String getTablePrefix()
74      {
75          return tablePrefix;
76      }
77  
78  
79      public void setTablePrefix( String tablePrefix )
80      {
81          if ( tablePrefix == null )
82          {
83              tablePrefix = DEFAULT_TABLE_PREFIX;
84          }
85  
86          tablePrefix = tablePrefix.trim();
87          if ( tablePrefix.length() == 0 )
88          {
89              tablePrefix = DEFAULT_TABLE_PREFIX;
90          }
91  
92          this.tablePrefix = tablePrefix;
93      }
94  
95  
96      public void open( DirectoryService serviceCfg, ReplicationConfiguration cfg )
97      {
98          replicaId = cfg.getReplicaId();
99  
100         // Calculate DB URI
101         dbURI = DB_URI_PREFIX + serviceCfg.getWorkingDirectory().getPath() + File.separator
102             + "replication";
103 
104         // Create database if not exists.
105         try
106         {
107             Class.forName( DRIVER_NAME );
108             Connection con = DriverManager.getConnection( dbURI + ";create=true" );
109             con.close();
110         }
111         catch ( Exception e )
112         {
113             throw new ReplicationStoreException( "Failed to initialize Derby database.", e );
114         }
115 
116         // Initialize DataSource
117         BasicDataSource dataSource = new BasicDataSource();
118         dataSource.setDriverClassName( DRIVER_NAME );
119         dataSource.setUrl( dbURI );
120         dataSource.setUsername( "sa" );
121         dataSource.setPassword( "" );
122         this.dataSource = dataSource;
123 
124         // Pre-calculate table names
125         metadataTableName = tablePrefix + "METADATA";
126         uuidTableName = tablePrefix + "UUID";
127         logTableName = tablePrefix + "LOG";
128 
129         initSchema();
130         loadMetadata();
131     }
132 
133 
134     private void initSchema()
135     {
136         Connection con = null;
137         Statement stmt = null;
138         ResultSet rs = null;
139 
140         try
141         {
142             con = dataSource.getConnection();
143             con.setAutoCommit( true );
144 
145             stmt = con.createStatement();
146 
147             try
148             {
149                 rs = stmt.executeQuery( "SELECT M_KEY FROM " + metadataTableName + " WHERE M_KEY IS NULL" );
150                 rs.close();
151                 rs = null;
152             }
153             catch ( SQLException e )
154             {
155                 stmt.executeUpdate( "CREATE TABLE " + metadataTableName + " ("
156                     + "    M_KEY VARCHAR(30) NOT NULL PRIMARY KEY," + "    M_VALUE VARCHAR(100) NOT NULL )" );
157             }
158 
159             try
160             {
161                 rs = stmt.executeQuery( "SELECT UUID FROM " + uuidTableName + " WHERE UUID IS NULL" );
162                 rs.close();
163                 rs = null;
164             }
165             catch ( SQLException e )
166             {
167                 stmt.executeUpdate( "CREATE TABLE " + uuidTableName + " (" + "    UUID CHAR(36) NOT NULL PRIMARY KEY,"
168                     + "    DN CLOB NOT NULL" + ")" );
169             }
170 
171             try
172             {
173                 rs = stmt.executeQuery( "SELECT CSN_REPLICA_ID FROM " + logTableName + " WHERE CSN_REPLICA_ID IS NULL" );
174                 rs.close();
175                 rs = null;
176             }
177             catch ( SQLException e )
178             {
179                 stmt.executeUpdate( "CREATE TABLE " + logTableName + " (" + "    CSN_REPLICA_ID VARCHAR(16) NOT NULL,"
180                     + "    CSN_TIMESTAMP BIGINT NOT NULL," + "    CSN_OP_SEQ INTEGER NOT NULL,"
181                     + "    OPERATION BLOB NOT NULL," + "CONSTRAINT " + logTableName + "_PK PRIMARY KEY ("
182                     + "    CSN_REPLICA_ID," + "    CSN_TIMESTAMP," + "    CSN_OP_SEQ)" + ")" );
183             }
184         }
185         catch ( SQLException e )
186         {
187             throw new ReplicationStoreException( "Failed to initialize DB schema.", e );
188         }
189         finally
190         {
191             SQLUtil.cleanup( con, stmt, rs );
192         }
193     }
194 
195 
196     private void loadMetadata()
197     {
198         Connection con = null;
199         PreparedStatement ps = null;
200         ResultSet rs = null;
201 
202         try
203         {
204             con = dataSource.getConnection();
205             con.setAutoCommit( true );
206             con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
207             con.setReadOnly( true );
208 
209             // Check if replicaId is already registered
210             ps = con.prepareStatement( "SELECT M_VALUE FROM " + metadataTableName + " WHERE M_KEY=?" );
211             ps.setString( 1, KEY_REPLICA_ID );
212             rs = ps.executeQuery();
213             
214             if ( rs.next() )
215             {
216                 // If already registered, match it with what user specified.
217                 String actualReplicaId = rs.getString( 1 );
218                 
219                 if ( !replicaId.equalsIgnoreCase( actualReplicaId ) )
220                 {
221                     throw new ReplicationStoreException( "Replica ID mismatches: " + actualReplicaId + " (expected: "
222                         + replicaId + ")" );
223                 }
224             }
225             else
226             {
227                 rs.close();
228                 rs = null;
229                 ps.close();
230                 ps = null;
231 
232                 con.setReadOnly( false );
233                 // If not registered yet, register with what user specified.
234                 ps = con.prepareStatement( "INSERT INTO " + metadataTableName + " (M_KEY, M_VALUE) VALUES (?,?)" );
235                 ps.setString( 1, KEY_REPLICA_ID );
236                 ps.setString( 2, replicaId );
237                 ps.executeUpdate();
238             }
239 
240             if ( rs != null )
241             {
242                 rs.close();
243                 rs = null;
244             }
245             ps.close();
246             ps = null;
247 
248             // Get known replica IDs.
249             ps = con.prepareStatement( "SELECT DISTINCT CSN_REPLICA_ID FROM " + logTableName );
250             rs = ps.executeQuery();
251             knownReplicaIds = new HashSet<String>();
252             while ( rs.next() )
253             {
254                 knownReplicaIds.add( rs.getString( 1 ) );
255             }
256         }
257         catch ( Exception e )
258         {
259             if ( e instanceof ReplicationStoreException )
260             {
261                 throw ( ReplicationStoreException ) e;
262             }
263             throw new ReplicationStoreException( e );
264         }
265         finally
266         {
267             SQLUtil.cleanup( con, ps, rs );
268         }
269     }
270 
271 
272     public void close()
273     {
274         try
275         {
276             dataSource.close();
277         }
278         catch ( SQLException e )
279         {
280             LOG.warn( "Failed to close the dataSource.", e );
281         }
282         dataSource = null;
283         replicaId = null;
284 
285         try
286         {
287             DriverManager.getConnection( dbURI + ";shutdown=true" );
288         }
289         catch ( Exception e )
290         {
291             // An exception is thrown always.
292         }
293     }
294 
295 
296     public String getReplicaId()
297     {
298         return replicaId;
299     }
300 
301 
302     public Set<String> getKnownReplicaIds()
303     {
304         return new HashSet<String>( knownReplicaIds );
305     }
306 
307 
308     public Name getDN( UUID uuid )
309     {
310         Connection con = null;
311         PreparedStatement ps = null;
312         ResultSet rs = null;
313 
314         try
315         {
316             con = dataSource.getConnection();
317             con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
318             con.setReadOnly( true );
319             ps = con.prepareStatement( "SELECT DN FROM " + uuidTableName + " WHERE UUID=?" );
320             ps.setString( 1, uuid.toString() );
321             rs = ps.executeQuery();
322             if ( rs.next() )
323             {
324                 return new LdapName( rs.getString( 1 ) );
325             }
326             else
327             {
328                 return null;
329             }
330         }
331         catch ( Exception e )
332         {
333             throw new ReplicationStoreException( e );
334         }
335         finally
336         {
337             SQLUtil.cleanup( con, ps, rs );
338         }
339     }
340 
341 
342     public boolean putUUID( UUID uuid, Name dn )
343     {
344         String uuidString = uuid.toString();
345         Connection con = null;
346         PreparedStatement ps = null;
347         ResultSet rs = null;
348 
349         try
350         {
351             con = dataSource.getConnection();
352             con.setAutoCommit( false );
353             con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
354             con.setReadOnly( true );
355 
356             // Check if the specified uuid already exists
357             ps = con.prepareStatement( "SELECT UUID FROM " + uuidTableName + " WHERE UUID=?" );
358             ps.setString( 1, uuidString );
359             rs = ps.executeQuery();
360             if ( rs.next() )
361             {
362                 return false;
363             }
364 
365             rs.close();
366             rs = null;
367 
368             // insert
369             con.setReadOnly( false );
370             ps = con.prepareStatement( "INSERT INTO " + uuidTableName + " (UUID, DN) VALUES(?,?)" );
371             ps.setString( 1, uuidString );
372             ps.setString( 2, dn.toString() );
373 
374             int updateCnt = ps.executeUpdate();
375             con.commit();
376             return updateCnt == 1;
377         }
378         catch ( Exception e )
379         {
380             try
381             {
382                 con.rollback();
383             }
384             catch ( SQLException e1 )
385             {
386                 LOG.error( "Failed to rollback transaction.", e );
387             }
388 
389             throw new ReplicationStoreException( e );
390         }
391         finally
392         {
393             SQLUtil.cleanup( con, ps, rs );
394         }
395     }
396 
397 
398     public boolean removeUUID( UUID uuid )
399     {
400         String uuidString = uuid.toString();
401         Connection con = null;
402         PreparedStatement ps = null;
403 
404         try
405         {
406             con = dataSource.getConnection();
407             con.setAutoCommit( true );
408             con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
409             con.setReadOnly( false );
410 
411             // Check if the specified uuid already exists
412             ps = con.prepareStatement( "DELETE FROM " + uuidTableName + " WHERE UUID=?" );
413             ps.setString( 1, uuidString );
414             return ps.executeUpdate() == 1;
415         }
416         catch ( Exception e )
417         {
418             throw new ReplicationStoreException( e );
419         }
420         finally
421         {
422             SQLUtil.cleanup( con, ps, null );
423         }
424     }
425 
426 
427     public void putLog( Operation op )
428     {
429         CSN csn = op.getCSN();
430         byte[] encodedOp = operationCodec.encode( op );
431         Connection con = null;
432         PreparedStatement ps = null;
433 
434         try
435         {
436             con = dataSource.getConnection();
437             con.setAutoCommit( true );
438             con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
439             con.setReadOnly( false );
440 
441             // Check if the specified uuid already exists
442             ps = con.prepareStatement( "INSERT INTO " + logTableName
443                 + " (CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION) VALUES(?,?,?,?)" );
444             ps.setString( 1, csn.getReplicaId() );
445             ps.setLong( 2, csn.getTimestamp() );
446             ps.setInt( 3, csn.getOperationSequence() );
447             ps.setBytes( 4, encodedOp );
448             if ( ps.executeUpdate() != 1 )
449             {
450                 throw new ReplicationStoreException( "Failed to insert a row." );
451             }
452         }
453         catch ( Exception e )
454         {
455             if ( e instanceof ReplicationStoreException )
456             {
457                 throw ( ReplicationStoreException ) e;
458             }
459 
460             throw new ReplicationStoreException( e );
461         }
462         finally
463         {
464             SQLUtil.cleanup( con, ps, null );
465         }
466 
467         if ( !knownReplicaIds.contains( csn.getReplicaId() ) )
468         {
469             synchronized ( knownReplicaIdsLock )
470             {
471                 Set<String> newKnownReplicaIds = new HashSet<String>( knownReplicaIds );
472                 newKnownReplicaIds.add( csn.getReplicaId() );
473                 knownReplicaIds = newKnownReplicaIds;
474             }
475         }
476     }
477 
478 
479     public ReplicationLogIterator getLogs( CSNVector updateVector, boolean inclusive )
480     {
481         Connection con;
482         PreparedStatement ps;
483         ResultSet rs;
484 
485         updateVector = getNormalizedUpdateVector( updateVector );
486 
487         StringBuffer buf = new StringBuffer( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM "
488             + logTableName + " " );
489 
490         if ( updateVector.size() > 0 )
491         {
492             buf.append( "WHERE " );
493             for ( int i = updateVector.size();; )
494             {
495                 buf.append( "( CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >" + ( inclusive ? "=" : "" )
496                     + " ? OR CSN_TIMESTAMP > ?) ) " );
497                 i--;
498                 if ( i == 0 )
499                 {
500                     break;
501                 }
502                 else
503                 {
504                     buf.append( "OR " );
505                 }
506 
507             }
508         }
509         buf.append( "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
510 
511         String query = buf.toString();
512 
513         try
514         {
515             con = dataSource.getConnection();
516             con.setAutoCommit( true );
517             con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
518             con.setReadOnly( true );
519 
520             // Check if the specified uuid already exists
521             ps = con.prepareStatement( query );
522 
523             int paramIdx = 1;
524             
525             for ( String replicaId:updateVector.getReplicaIds() )
526             {
527                 CSN csn = updateVector.getCSN( replicaId );
528                 ps.setString( paramIdx++, replicaId );
529                 ps.setLong( paramIdx++, csn.getTimestamp() );
530                 ps.setInt( paramIdx++, csn.getOperationSequence() );
531                 ps.setLong( paramIdx++, csn.getTimestamp() );
532             }
533             
534             rs = ps.executeQuery();
535 
536             return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
537         }
538         catch ( Exception e )
539         {
540             throw new ReplicationStoreException( e );
541         }
542     }
543 
544 
545     private CSNVector getNormalizedUpdateVector( CSNVector updateVector )
546     {
547         CSNVector newUV = new CSNVector();
548         
549         synchronized ( knownReplicaIds )
550         {
551             for ( String knownReplicaId : knownReplicaIds )
552             {
553                 newUV.setCSN( new DefaultCSN( 0, knownReplicaId, 0 ) );
554             }
555         }
556 
557         newUV.setAllCSN( updateVector );
558         return newUV;
559     }
560 
561 
562     public ReplicationLogIterator getLogs( CSN fromCSN, boolean inclusive )
563     {
564         Connection con;
565         PreparedStatement ps;
566         ResultSet rs;
567 
568         try
569         {
570             con = dataSource.getConnection();
571             con.setAutoCommit( true );
572             con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
573             con.setReadOnly( true );
574 
575             // Check if the specified uuid already exists
576             ps = con
577                 .prepareStatement( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM " + logTableName
578                     + " " + "WHERE CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >"
579                     + ( inclusive ? "=" : "" ) + " ? OR CSN_TIMESTAMP > ?) "
580                     + "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
581             ps.setString( 1, fromCSN.getReplicaId() );
582             ps.setLong( 2, fromCSN.getTimestamp() );
583             ps.setInt( 3, fromCSN.getOperationSequence() );
584             ps.setLong( 4, fromCSN.getTimestamp() );
585             rs = ps.executeQuery();
586 
587             return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
588         }
589         catch ( Exception e )
590         {
591             throw new ReplicationStoreException( e );
592         }
593     }
594 
595 
596     public int removeLogs( CSN toCSN, boolean inclusive )
597     {
598         Connection con = null;
599         PreparedStatement ps = null;
600 
601         try
602         {
603             con = dataSource.getConnection();
604             con.setAutoCommit( true );
605             con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
606             con.setReadOnly( false );
607 
608             // Check if the specified uuid already exists
609             ps = con.prepareStatement( "DELETE FROM " + logTableName + " WHERE "
610                 + "CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ <" + ( inclusive ? "=" : "" )
611                 + " ? OR CSN_TIMESTAMP < ?)" );
612             ps.setString( 1, toCSN.getReplicaId() );
613             ps.setLong( 2, toCSN.getTimestamp() );
614             ps.setInt( 3, toCSN.getOperationSequence() );
615             ps.setLong( 4, toCSN.getTimestamp() );
616             return ps.executeUpdate();
617         }
618         catch ( Exception e )
619         {
620             throw new ReplicationStoreException( e );
621         }
622         finally
623         {
624             SQLUtil.cleanup( con, ps, null );
625         }
626     }
627 
628 
629     public int getLogSize()
630     {
631         Connection con = null;
632         Statement stmt = null;
633         ResultSet rs = null;
634 
635         try
636         {
637             con = dataSource.getConnection();
638             con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
639             con.setReadOnly( true );
640             stmt = con.createStatement();
641             rs = stmt.executeQuery( "SELECT COUNT(*) FROM " + logTableName );
642             rs.next();
643             return rs.getInt( 1 );
644         }
645         catch ( Exception e )
646         {
647             throw new ReplicationStoreException( e );
648         }
649         finally
650         {
651             SQLUtil.cleanup( con, stmt, rs );
652         }
653     }
654 
655 
656     public int getLogSize( String replicaId )
657     {
658         Connection con = null;
659         PreparedStatement ps = null;
660         ResultSet rs = null;
661 
662         try
663         {
664             con = dataSource.getConnection();
665             con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
666             con.setReadOnly( true );
667             ps = con.prepareStatement( "SELECT COUNT(*) FROM " + logTableName + " WHERE CSN_REPLICA_ID=?" );
668             ps.setString( 1, replicaId );
669             rs = ps.executeQuery();
670             rs.next();
671             return rs.getInt( 1 );
672         }
673         catch ( Exception e )
674         {
675             throw new ReplicationStoreException( e );
676         }
677         finally
678         {
679             SQLUtil.cleanup( con, ps, rs );
680         }
681     }
682 
683 
684     public CSNVector getUpdateVector()
685     {
686         return getVector( false );
687     }
688 
689 
690     public CSNVector getPurgeVector()
691     {
692         return getVector( true );
693     }
694 
695 
696     private CSNVector getVector( boolean min )
697     {
698         final String ORDER = min ? "ASC" : "DESC";
699 
700         Connection con = null;
701         PreparedStatement ps = null;
702         ResultSet rs = null;
703         CSNVector result = new CSNVector();
704 
705         try
706         {
707             con = dataSource.getConnection();
708             con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
709             con.setReadOnly( true );
710             ps = con.prepareStatement( "SELECT CSN_TIMESTAMP, CSN_OP_SEQ FROM " + logTableName
711                 + " WHERE CSN_REPLICA_ID=? ORDER BY CSN_TIMESTAMP " + ORDER + ", CSN_OP_SEQ " + ORDER );
712 
713             for ( String replicaId:knownReplicaIds )
714             {
715                 ps.setString( 1, replicaId );
716                 rs = ps.executeQuery();
717                 
718                 if ( rs.next() )
719                 {
720                     result.setCSN( new DefaultCSN( rs.getLong( 1 ), replicaId, rs.getInt( 2 ) ) );
721                 }
722                 
723                 rs.close();
724                 rs = null;
725                 ps.clearParameters();
726             }
727 
728             return result;
729         }
730         catch ( Exception e )
731         {
732             throw new ReplicationStoreException( e );
733         }
734         finally
735         {
736             SQLUtil.cleanup( con, ps, rs );
737         }
738     }
739 }