1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mitosis.store.derby;
21
22
23 import org.apache.commons.dbcp.BasicDataSource;
24 import org.apache.directory.mitosis.common.CSN;
25 import org.apache.directory.mitosis.common.CSNVector;
26 import org.apache.directory.mitosis.common.DefaultCSN;
27 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
28 import org.apache.directory.mitosis.operation.Operation;
29 import org.apache.directory.mitosis.operation.OperationCodec;
30 import org.apache.directory.mitosis.store.ReplicationLogIterator;
31 import org.apache.directory.mitosis.store.ReplicationStore;
32 import org.apache.directory.mitosis.store.ReplicationStoreException;
33 import org.apache.directory.server.core.DirectoryService;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import javax.naming.Name;
38 import javax.naming.ldap.LdapName;
39 import java.io.File;
40 import java.sql.Connection;
41 import java.sql.DriverManager;
42 import java.sql.PreparedStatement;
43 import java.sql.ResultSet;
44 import java.sql.SQLException;
45 import java.sql.Statement;
46 import java.util.HashSet;
47 import java.util.Set;
48 import java.util.UUID;
49
50
51 public class DerbyReplicationStore implements ReplicationStore
52 {
53 private static final Logger LOG = LoggerFactory.getLogger( DerbyReplicationStore.class );
54
55 private static final String DEFAULT_TABLE_PREFIX = "REPLICATION_";
56 private static final String KEY_REPLICA_ID = "replicaId";
57
58 private static final String DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
59 private static final String DB_URI_PREFIX = "jdbc:derby:";
60
61 private String dbURI;
62 private BasicDataSource dataSource;
63 private String replicaId;
64 private String tablePrefix = DEFAULT_TABLE_PREFIX;
65 private String metadataTableName;
66 private String uuidTableName;
67 private String logTableName;
68 private Set<String> knownReplicaIds;
69 private final Object knownReplicaIdsLock = new Object();
70 private final OperationCodec operationCodec = new OperationCodec();
71
72
73 public String getTablePrefix()
74 {
75 return tablePrefix;
76 }
77
78
79 public void setTablePrefix( String tablePrefix )
80 {
81 if ( tablePrefix == null )
82 {
83 tablePrefix = DEFAULT_TABLE_PREFIX;
84 }
85
86 tablePrefix = tablePrefix.trim();
87 if ( tablePrefix.length() == 0 )
88 {
89 tablePrefix = DEFAULT_TABLE_PREFIX;
90 }
91
92 this.tablePrefix = tablePrefix;
93 }
94
95
96 public void open( DirectoryService serviceCfg, ReplicationConfiguration cfg )
97 {
98 replicaId = cfg.getReplicaId();
99
100
101 dbURI = DB_URI_PREFIX + serviceCfg.getWorkingDirectory().getPath() + File.separator
102 + "replication";
103
104
105 try
106 {
107 Class.forName( DRIVER_NAME );
108 Connection con = DriverManager.getConnection( dbURI + ";create=true" );
109 con.close();
110 }
111 catch ( Exception e )
112 {
113 throw new ReplicationStoreException( "Failed to initialize Derby database.", e );
114 }
115
116
117 BasicDataSource dataSource = new BasicDataSource();
118 dataSource.setDriverClassName( DRIVER_NAME );
119 dataSource.setUrl( dbURI );
120 dataSource.setUsername( "sa" );
121 dataSource.setPassword( "" );
122 this.dataSource = dataSource;
123
124
125 metadataTableName = tablePrefix + "METADATA";
126 uuidTableName = tablePrefix + "UUID";
127 logTableName = tablePrefix + "LOG";
128
129 initSchema();
130 loadMetadata();
131 }
132
133
134 private void initSchema()
135 {
136 Connection con = null;
137 Statement stmt = null;
138 ResultSet rs = null;
139
140 try
141 {
142 con = dataSource.getConnection();
143 con.setAutoCommit( true );
144
145 stmt = con.createStatement();
146
147 try
148 {
149 rs = stmt.executeQuery( "SELECT M_KEY FROM " + metadataTableName + " WHERE M_KEY IS NULL" );
150 rs.close();
151 rs = null;
152 }
153 catch ( SQLException e )
154 {
155 stmt.executeUpdate( "CREATE TABLE " + metadataTableName + " ("
156 + " M_KEY VARCHAR(30) NOT NULL PRIMARY KEY," + " M_VALUE VARCHAR(100) NOT NULL )" );
157 }
158
159 try
160 {
161 rs = stmt.executeQuery( "SELECT UUID FROM " + uuidTableName + " WHERE UUID IS NULL" );
162 rs.close();
163 rs = null;
164 }
165 catch ( SQLException e )
166 {
167 stmt.executeUpdate( "CREATE TABLE " + uuidTableName + " (" + " UUID CHAR(36) NOT NULL PRIMARY KEY,"
168 + " DN CLOB NOT NULL" + ")" );
169 }
170
171 try
172 {
173 rs = stmt.executeQuery( "SELECT CSN_REPLICA_ID FROM " + logTableName + " WHERE CSN_REPLICA_ID IS NULL" );
174 rs.close();
175 rs = null;
176 }
177 catch ( SQLException e )
178 {
179 stmt.executeUpdate( "CREATE TABLE " + logTableName + " (" + " CSN_REPLICA_ID VARCHAR(16) NOT NULL,"
180 + " CSN_TIMESTAMP BIGINT NOT NULL," + " CSN_OP_SEQ INTEGER NOT NULL,"
181 + " OPERATION BLOB NOT NULL," + "CONSTRAINT " + logTableName + "_PK PRIMARY KEY ("
182 + " CSN_REPLICA_ID," + " CSN_TIMESTAMP," + " CSN_OP_SEQ)" + ")" );
183 }
184 }
185 catch ( SQLException e )
186 {
187 throw new ReplicationStoreException( "Failed to initialize DB schema.", e );
188 }
189 finally
190 {
191 SQLUtil.cleanup( con, stmt, rs );
192 }
193 }
194
195
196 private void loadMetadata()
197 {
198 Connection con = null;
199 PreparedStatement ps = null;
200 ResultSet rs = null;
201
202 try
203 {
204 con = dataSource.getConnection();
205 con.setAutoCommit( true );
206 con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
207 con.setReadOnly( true );
208
209
210 ps = con.prepareStatement( "SELECT M_VALUE FROM " + metadataTableName + " WHERE M_KEY=?" );
211 ps.setString( 1, KEY_REPLICA_ID );
212 rs = ps.executeQuery();
213
214 if ( rs.next() )
215 {
216
217 String actualReplicaId = rs.getString( 1 );
218
219 if ( !replicaId.equalsIgnoreCase( actualReplicaId ) )
220 {
221 throw new ReplicationStoreException( "Replica ID mismatches: " + actualReplicaId + " (expected: "
222 + replicaId + ")" );
223 }
224 }
225 else
226 {
227 rs.close();
228 rs = null;
229 ps.close();
230 ps = null;
231
232 con.setReadOnly( false );
233
234 ps = con.prepareStatement( "INSERT INTO " + metadataTableName + " (M_KEY, M_VALUE) VALUES (?,?)" );
235 ps.setString( 1, KEY_REPLICA_ID );
236 ps.setString( 2, replicaId );
237 ps.executeUpdate();
238 }
239
240 if ( rs != null )
241 {
242 rs.close();
243 rs = null;
244 }
245 ps.close();
246 ps = null;
247
248
249 ps = con.prepareStatement( "SELECT DISTINCT CSN_REPLICA_ID FROM " + logTableName );
250 rs = ps.executeQuery();
251 knownReplicaIds = new HashSet<String>();
252 while ( rs.next() )
253 {
254 knownReplicaIds.add( rs.getString( 1 ) );
255 }
256 }
257 catch ( Exception e )
258 {
259 if ( e instanceof ReplicationStoreException )
260 {
261 throw ( ReplicationStoreException ) e;
262 }
263 throw new ReplicationStoreException( e );
264 }
265 finally
266 {
267 SQLUtil.cleanup( con, ps, rs );
268 }
269 }
270
271
272 public void close()
273 {
274 try
275 {
276 dataSource.close();
277 }
278 catch ( SQLException e )
279 {
280 LOG.warn( "Failed to close the dataSource.", e );
281 }
282 dataSource = null;
283 replicaId = null;
284
285 try
286 {
287 DriverManager.getConnection( dbURI + ";shutdown=true" );
288 }
289 catch ( Exception e )
290 {
291
292 }
293 }
294
295
296 public String getReplicaId()
297 {
298 return replicaId;
299 }
300
301
302 public Set<String> getKnownReplicaIds()
303 {
304 return new HashSet<String>( knownReplicaIds );
305 }
306
307
308 public Name getDN( UUID uuid )
309 {
310 Connection con = null;
311 PreparedStatement ps = null;
312 ResultSet rs = null;
313
314 try
315 {
316 con = dataSource.getConnection();
317 con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
318 con.setReadOnly( true );
319 ps = con.prepareStatement( "SELECT DN FROM " + uuidTableName + " WHERE UUID=?" );
320 ps.setString( 1, uuid.toString() );
321 rs = ps.executeQuery();
322 if ( rs.next() )
323 {
324 return new LdapName( rs.getString( 1 ) );
325 }
326 else
327 {
328 return null;
329 }
330 }
331 catch ( Exception e )
332 {
333 throw new ReplicationStoreException( e );
334 }
335 finally
336 {
337 SQLUtil.cleanup( con, ps, rs );
338 }
339 }
340
341
342 public boolean putUUID( UUID uuid, Name dn )
343 {
344 String uuidString = uuid.toString();
345 Connection con = null;
346 PreparedStatement ps = null;
347 ResultSet rs = null;
348
349 try
350 {
351 con = dataSource.getConnection();
352 con.setAutoCommit( false );
353 con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
354 con.setReadOnly( true );
355
356
357 ps = con.prepareStatement( "SELECT UUID FROM " + uuidTableName + " WHERE UUID=?" );
358 ps.setString( 1, uuidString );
359 rs = ps.executeQuery();
360 if ( rs.next() )
361 {
362 return false;
363 }
364
365 rs.close();
366 rs = null;
367
368
369 con.setReadOnly( false );
370 ps = con.prepareStatement( "INSERT INTO " + uuidTableName + " (UUID, DN) VALUES(?,?)" );
371 ps.setString( 1, uuidString );
372 ps.setString( 2, dn.toString() );
373
374 int updateCnt = ps.executeUpdate();
375 con.commit();
376 return updateCnt == 1;
377 }
378 catch ( Exception e )
379 {
380 try
381 {
382 con.rollback();
383 }
384 catch ( SQLException e1 )
385 {
386 LOG.error( "Failed to rollback transaction.", e );
387 }
388
389 throw new ReplicationStoreException( e );
390 }
391 finally
392 {
393 SQLUtil.cleanup( con, ps, rs );
394 }
395 }
396
397
398 public boolean removeUUID( UUID uuid )
399 {
400 String uuidString = uuid.toString();
401 Connection con = null;
402 PreparedStatement ps = null;
403
404 try
405 {
406 con = dataSource.getConnection();
407 con.setAutoCommit( true );
408 con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
409 con.setReadOnly( false );
410
411
412 ps = con.prepareStatement( "DELETE FROM " + uuidTableName + " WHERE UUID=?" );
413 ps.setString( 1, uuidString );
414 return ps.executeUpdate() == 1;
415 }
416 catch ( Exception e )
417 {
418 throw new ReplicationStoreException( e );
419 }
420 finally
421 {
422 SQLUtil.cleanup( con, ps, null );
423 }
424 }
425
426
427 public void putLog( Operation op )
428 {
429 CSN csn = op.getCSN();
430 byte[] encodedOp = operationCodec.encode( op );
431 Connection con = null;
432 PreparedStatement ps = null;
433
434 try
435 {
436 con = dataSource.getConnection();
437 con.setAutoCommit( true );
438 con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
439 con.setReadOnly( false );
440
441
442 ps = con.prepareStatement( "INSERT INTO " + logTableName
443 + " (CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION) VALUES(?,?,?,?)" );
444 ps.setString( 1, csn.getReplicaId() );
445 ps.setLong( 2, csn.getTimestamp() );
446 ps.setInt( 3, csn.getOperationSequence() );
447 ps.setBytes( 4, encodedOp );
448 if ( ps.executeUpdate() != 1 )
449 {
450 throw new ReplicationStoreException( "Failed to insert a row." );
451 }
452 }
453 catch ( Exception e )
454 {
455 if ( e instanceof ReplicationStoreException )
456 {
457 throw ( ReplicationStoreException ) e;
458 }
459
460 throw new ReplicationStoreException( e );
461 }
462 finally
463 {
464 SQLUtil.cleanup( con, ps, null );
465 }
466
467 if ( !knownReplicaIds.contains( csn.getReplicaId() ) )
468 {
469 synchronized ( knownReplicaIdsLock )
470 {
471 Set<String> newKnownReplicaIds = new HashSet<String>( knownReplicaIds );
472 newKnownReplicaIds.add( csn.getReplicaId() );
473 knownReplicaIds = newKnownReplicaIds;
474 }
475 }
476 }
477
478
479 public ReplicationLogIterator getLogs( CSNVector updateVector, boolean inclusive )
480 {
481 Connection con;
482 PreparedStatement ps;
483 ResultSet rs;
484
485 updateVector = getNormalizedUpdateVector( updateVector );
486
487 StringBuffer buf = new StringBuffer( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM "
488 + logTableName + " " );
489
490 if ( updateVector.size() > 0 )
491 {
492 buf.append( "WHERE " );
493 for ( int i = updateVector.size();; )
494 {
495 buf.append( "( CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >" + ( inclusive ? "=" : "" )
496 + " ? OR CSN_TIMESTAMP > ?) ) " );
497 i--;
498 if ( i == 0 )
499 {
500 break;
501 }
502 else
503 {
504 buf.append( "OR " );
505 }
506
507 }
508 }
509 buf.append( "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
510
511 String query = buf.toString();
512
513 try
514 {
515 con = dataSource.getConnection();
516 con.setAutoCommit( true );
517 con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
518 con.setReadOnly( true );
519
520
521 ps = con.prepareStatement( query );
522
523 int paramIdx = 1;
524
525 for ( String replicaId:updateVector.getReplicaIds() )
526 {
527 CSN csn = updateVector.getCSN( replicaId );
528 ps.setString( paramIdx++, replicaId );
529 ps.setLong( paramIdx++, csn.getTimestamp() );
530 ps.setInt( paramIdx++, csn.getOperationSequence() );
531 ps.setLong( paramIdx++, csn.getTimestamp() );
532 }
533
534 rs = ps.executeQuery();
535
536 return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
537 }
538 catch ( Exception e )
539 {
540 throw new ReplicationStoreException( e );
541 }
542 }
543
544
545 private CSNVector getNormalizedUpdateVector( CSNVector updateVector )
546 {
547 CSNVector newUV = new CSNVector();
548
549 synchronized ( knownReplicaIds )
550 {
551 for ( String knownReplicaId : knownReplicaIds )
552 {
553 newUV.setCSN( new DefaultCSN( 0, knownReplicaId, 0 ) );
554 }
555 }
556
557 newUV.setAllCSN( updateVector );
558 return newUV;
559 }
560
561
562 public ReplicationLogIterator getLogs( CSN fromCSN, boolean inclusive )
563 {
564 Connection con;
565 PreparedStatement ps;
566 ResultSet rs;
567
568 try
569 {
570 con = dataSource.getConnection();
571 con.setAutoCommit( true );
572 con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
573 con.setReadOnly( true );
574
575
576 ps = con
577 .prepareStatement( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM " + logTableName
578 + " " + "WHERE CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >"
579 + ( inclusive ? "=" : "" ) + " ? OR CSN_TIMESTAMP > ?) "
580 + "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
581 ps.setString( 1, fromCSN.getReplicaId() );
582 ps.setLong( 2, fromCSN.getTimestamp() );
583 ps.setInt( 3, fromCSN.getOperationSequence() );
584 ps.setLong( 4, fromCSN.getTimestamp() );
585 rs = ps.executeQuery();
586
587 return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
588 }
589 catch ( Exception e )
590 {
591 throw new ReplicationStoreException( e );
592 }
593 }
594
595
596 public int removeLogs( CSN toCSN, boolean inclusive )
597 {
598 Connection con = null;
599 PreparedStatement ps = null;
600
601 try
602 {
603 con = dataSource.getConnection();
604 con.setAutoCommit( true );
605 con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
606 con.setReadOnly( false );
607
608
609 ps = con.prepareStatement( "DELETE FROM " + logTableName + " WHERE "
610 + "CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ <" + ( inclusive ? "=" : "" )
611 + " ? OR CSN_TIMESTAMP < ?)" );
612 ps.setString( 1, toCSN.getReplicaId() );
613 ps.setLong( 2, toCSN.getTimestamp() );
614 ps.setInt( 3, toCSN.getOperationSequence() );
615 ps.setLong( 4, toCSN.getTimestamp() );
616 return ps.executeUpdate();
617 }
618 catch ( Exception e )
619 {
620 throw new ReplicationStoreException( e );
621 }
622 finally
623 {
624 SQLUtil.cleanup( con, ps, null );
625 }
626 }
627
628
629 public int getLogSize()
630 {
631 Connection con = null;
632 Statement stmt = null;
633 ResultSet rs = null;
634
635 try
636 {
637 con = dataSource.getConnection();
638 con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
639 con.setReadOnly( true );
640 stmt = con.createStatement();
641 rs = stmt.executeQuery( "SELECT COUNT(*) FROM " + logTableName );
642 rs.next();
643 return rs.getInt( 1 );
644 }
645 catch ( Exception e )
646 {
647 throw new ReplicationStoreException( e );
648 }
649 finally
650 {
651 SQLUtil.cleanup( con, stmt, rs );
652 }
653 }
654
655
656 public int getLogSize( String replicaId )
657 {
658 Connection con = null;
659 PreparedStatement ps = null;
660 ResultSet rs = null;
661
662 try
663 {
664 con = dataSource.getConnection();
665 con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
666 con.setReadOnly( true );
667 ps = con.prepareStatement( "SELECT COUNT(*) FROM " + logTableName + " WHERE CSN_REPLICA_ID=?" );
668 ps.setString( 1, replicaId );
669 rs = ps.executeQuery();
670 rs.next();
671 return rs.getInt( 1 );
672 }
673 catch ( Exception e )
674 {
675 throw new ReplicationStoreException( e );
676 }
677 finally
678 {
679 SQLUtil.cleanup( con, ps, rs );
680 }
681 }
682
683
684 public CSNVector getUpdateVector()
685 {
686 return getVector( false );
687 }
688
689
690 public CSNVector getPurgeVector()
691 {
692 return getVector( true );
693 }
694
695
696 private CSNVector getVector( boolean min )
697 {
698 final String ORDER = min ? "ASC" : "DESC";
699
700 Connection con = null;
701 PreparedStatement ps = null;
702 ResultSet rs = null;
703 CSNVector result = new CSNVector();
704
705 try
706 {
707 con = dataSource.getConnection();
708 con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
709 con.setReadOnly( true );
710 ps = con.prepareStatement( "SELECT CSN_TIMESTAMP, CSN_OP_SEQ FROM " + logTableName
711 + " WHERE CSN_REPLICA_ID=? ORDER BY CSN_TIMESTAMP " + ORDER + ", CSN_OP_SEQ " + ORDER );
712
713 for ( String replicaId:knownReplicaIds )
714 {
715 ps.setString( 1, replicaId );
716 rs = ps.executeQuery();
717
718 if ( rs.next() )
719 {
720 result.setCSN( new DefaultCSN( rs.getLong( 1 ), replicaId, rs.getInt( 2 ) ) );
721 }
722
723 rs.close();
724 rs = null;
725 ps.clearParameters();
726 }
727
728 return result;
729 }
730 catch ( Exception e )
731 {
732 throw new ReplicationStoreException( e );
733 }
734 finally
735 {
736 SQLUtil.cleanup( con, ps, rs );
737 }
738 }
739 }