001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.sql.SQLException;
022    import java.util.Collections;
023    import java.util.Set;
024    import java.util.concurrent.ScheduledFuture;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.ThreadFactory;
027    import java.util.concurrent.TimeUnit;
028    
029    import javax.sql.DataSource;
030    
031    import org.apache.activemq.ActiveMQMessageAudit;
032    import org.apache.activemq.broker.BrokerService;
033    import org.apache.activemq.broker.BrokerServiceAware;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.command.ActiveMQDestination;
036    import org.apache.activemq.command.ActiveMQQueue;
037    import org.apache.activemq.command.ActiveMQTopic;
038    import org.apache.activemq.command.Message;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.openwire.OpenWireFormat;
041    import org.apache.activemq.store.MessageStore;
042    import org.apache.activemq.store.PersistenceAdapter;
043    import org.apache.activemq.store.TopicMessageStore;
044    import org.apache.activemq.store.TransactionStore;
045    import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
046    import org.apache.activemq.store.memory.MemoryTransactionStore;
047    import org.apache.activemq.usage.SystemUsage;
048    import org.apache.activemq.util.ByteSequence;
049    import org.apache.activemq.util.FactoryFinder;
050    import org.apache.activemq.util.IOExceptionSupport;
051    import org.apache.activemq.util.LongSequenceGenerator;
052    import org.apache.activemq.wireformat.WireFormat;
053    import org.apache.commons.logging.Log;
054    import org.apache.commons.logging.LogFactory;
055    
056    /**
057     * A {@link PersistenceAdapter} implementation using JDBC for persistence
058     * storage.
059     * 
060     * This persistence adapter will correctly remember prepared XA transactions,
061     * but it will not keep track of local transaction commits so that operations
062     * performed against the Message store are done as a single uow.
063     * 
064     * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
065     * 
066     * @version $Revision: 1.9 $
067     */
068    public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
069        BrokerServiceAware {
070    
071        private static final Log LOG = LogFactory.getLog(JDBCPersistenceAdapter.class);
072        private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
073                                                                       "META-INF/services/org/apache/activemq/store/jdbc/");
074        private static FactoryFinder lockFactoryFinder = new FactoryFinder(
075                                                                        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
076    
077        private WireFormat wireFormat = new OpenWireFormat();
078        private BrokerService brokerService;
079        private Statements statements;
080        private JDBCAdapter adapter;
081        private MemoryTransactionStore transactionStore;
082        private ScheduledThreadPoolExecutor clockDaemon;
083        private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
084        private int cleanupPeriod = 1000 * 60 * 5;
085        private boolean useExternalMessageReferences;
086        private boolean useDatabaseLock = true;
087        private long lockKeepAlivePeriod = 1000*30;
088        private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
089        private DatabaseLocker databaseLocker;
090        private boolean createTablesOnStartup = true;
091        private DataSource lockDataSource;
092        private int transactionIsolation;
093        
094        protected int maxProducersToAudit=1024;
095        protected int maxAuditDepth=1000;
096        protected boolean enableAudit=true;
097        protected int auditRecoveryDepth = 1024;
098        protected ActiveMQMessageAudit audit;
099        
100        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
101    
102        public JDBCPersistenceAdapter() {
103        }
104    
105        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
106            super(ds);
107            this.wireFormat = wireFormat;
108        }
109    
110        public Set<ActiveMQDestination> getDestinations() {
111            // Get a connection and insert the message into the DB.
112            TransactionContext c = null;
113            try {
114                c = getTransactionContext();
115                return getAdapter().doGetDestinations(c);
116            } catch (IOException e) {
117                return emptyDestinationSet();
118            } catch (SQLException e) {
119                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
120                return emptyDestinationSet();
121            } finally {
122                if (c != null) {
123                    try {
124                        c.close();
125                    } catch (Throwable e) {
126                    }
127                }
128            }
129        }
130    
131        @SuppressWarnings("unchecked")
132        private Set<ActiveMQDestination> emptyDestinationSet() {
133            return Collections.EMPTY_SET;
134        }
135        
136        protected void createMessageAudit() {
137            if (enableAudit && audit == null) {
138                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
139                TransactionContext c = null;
140                
141                try {
142                    c = getTransactionContext();
143                    getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
144                        public void messageId(MessageId id) {
145                            audit.isDuplicate(id);
146                        }
147                    });
148                } catch (Exception e) {
149                    LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
150                } finally {
151                    if (c != null) {
152                        try {
153                            c.close();
154                        } catch (Throwable e) {
155                        }
156                    }
157                }
158            }
159        }
160        
161        public void initSequenceIdGenerator() {
162            TransactionContext c = null;
163            try {
164                c = getTransactionContext();
165                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
166                    public void messageId(MessageId id) {
167                        audit.isDuplicate(id);
168                    }
169                });
170            } catch (Exception e) {
171                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
172            } finally {
173                if (c != null) {
174                    try {
175                        c.close();
176                    } catch (Throwable e) {
177                    }
178                }
179            }
180            
181        }
182    
183        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
184            MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
185            if (transactionStore != null) {
186                rc = transactionStore.proxy(rc);
187            }
188            return rc;
189        }
190    
191        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
192            TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
193            if (transactionStore != null) {
194                rc = transactionStore.proxy(rc);
195            }
196            return rc;
197        }
198    
199        /**
200         * Cleanup method to remove any state associated with the given destination
201         * No state retained.... nothing to do
202         *
203         * @param destination Destination to forget
204         */
205        public void removeQueueMessageStore(ActiveMQQueue destination) {
206        }
207    
208        /**
209         * Cleanup method to remove any state associated with the given destination
210         * No state retained.... nothing to do
211         *
212         * @param destination Destination to forget
213         */
214        public void removeTopicMessageStore(ActiveMQTopic destination) {
215        }
216    
217        public TransactionStore createTransactionStore() throws IOException {
218            if (transactionStore == null) {
219                transactionStore = new MemoryTransactionStore(this);
220            }
221            return this.transactionStore;
222        }
223    
224        public long getLastMessageBrokerSequenceId() throws IOException {
225            // Get a connection and insert the message into the DB.
226            TransactionContext c = getTransactionContext();
227            try {
228                long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
229                sequenceGenerator.setLastSequenceId(seq);
230                long brokerSeq = 0;
231                if (seq != 0) {
232                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
233                    brokerSeq = last.getMessageId().getBrokerSequenceId();
234                }
235                return brokerSeq;
236            } catch (SQLException e) {
237                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
238                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
239            } finally {
240                c.close();
241            }
242        }
243    
244        public void start() throws Exception {
245            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
246    
247            if (isCreateTablesOnStartup()) {
248                TransactionContext transactionContext = getTransactionContext();
249                transactionContext.begin();
250                try {
251                    try {
252                        getAdapter().doCreateTables(transactionContext);
253                    } catch (SQLException e) {
254                        LOG.warn("Cannot create tables due to: " + e);
255                        JDBCPersistenceAdapter.log("Failure Details: ", e);
256                    }
257                } finally {
258                    transactionContext.commit();
259                }
260            }
261    
262            if (isUseDatabaseLock()) {
263                DatabaseLocker service = getDatabaseLocker();
264                if (service == null) {
265                    LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
266                } else {
267                    service.start();
268                    if (lockKeepAlivePeriod > 0) {
269                        keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
270                            public void run() {
271                                databaseLockKeepAlive();
272                            }
273                        }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
274                    }
275                    if (brokerService != null) {
276                        brokerService.getBroker().nowMasterBroker();
277                    }
278                }
279            }
280    
281            cleanup();
282    
283            // Cleanup the db periodically.
284            if (cleanupPeriod > 0) {
285                cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
286                    public void run() {
287                        cleanup();
288                    }
289                }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
290            }
291            
292            createMessageAudit();
293        }
294    
295        public synchronized void stop() throws Exception {
296            if (cleanupTicket != null) {
297                cleanupTicket.cancel(true);
298                cleanupTicket = null;
299            }
300            if (keepAliveTicket != null) {
301                keepAliveTicket.cancel(false);
302                keepAliveTicket = null;
303            }
304            
305            // do not shutdown clockDaemon as it may kill the thread initiating shutdown
306            DatabaseLocker service = getDatabaseLocker();
307            if (service != null) {
308                service.stop();
309            }
310        }
311    
312        public void cleanup() {
313            TransactionContext c = null;
314            try {
315                LOG.debug("Cleaning up old messages.");
316                c = getTransactionContext();
317                getAdapter().doDeleteOldMessages(c);
318            } catch (IOException e) {
319                LOG.warn("Old message cleanup failed due to: " + e, e);
320            } catch (SQLException e) {
321                LOG.warn("Old message cleanup failed due to: " + e);
322                JDBCPersistenceAdapter.log("Failure Details: ", e);
323            } finally {
324                if (c != null) {
325                    try {
326                        c.close();
327                    } catch (Throwable e) {
328                    }
329                }
330                LOG.debug("Cleanup done.");
331            }
332        }
333    
334        public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
335            this.clockDaemon = clockDaemon;
336        }
337    
338        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
339            if (clockDaemon == null) {
340                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
341                    public Thread newThread(Runnable runnable) {
342                        Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
343                        thread.setDaemon(true);
344                        return thread;
345                    }
346                });
347            }
348            return clockDaemon;
349        }
350    
351        public JDBCAdapter getAdapter() throws IOException {
352            if (adapter == null) {
353                setAdapter(createAdapter());
354            }
355            return adapter;
356        }
357    
358        public DatabaseLocker getDatabaseLocker() throws IOException {
359            if (databaseLocker == null && isUseDatabaseLock()) {
360                setDatabaseLocker(loadDataBaseLocker());
361            }
362            return databaseLocker;
363        }
364    
365        /**
366         * Sets the database locker strategy to use to lock the database on startup
367         * @throws IOException 
368         */
369        public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
370            databaseLocker = locker;
371            databaseLocker.setPersistenceAdapter(this);
372            databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
373        }
374    
375        public DataSource getLockDataSource() throws IOException {
376            if (lockDataSource == null) {
377                lockDataSource = getDataSource();
378                if (lockDataSource == null) {
379                    throw new IllegalArgumentException(
380                            "No dataSource property has been configured");
381                }
382            } else {
383                LOG.info("Using a separate dataSource for locking: "
384                        + lockDataSource);
385            }
386            return lockDataSource;
387        }
388        
389        public void setLockDataSource(DataSource dataSource) {
390            this.lockDataSource = dataSource;
391        }
392    
393        public BrokerService getBrokerService() {
394            return brokerService;
395        }
396    
397        public void setBrokerService(BrokerService brokerService) {
398            this.brokerService = brokerService;
399        }
400    
401        /**
402         * @throws IOException
403         */
404        protected JDBCAdapter createAdapter() throws IOException {
405           
406            adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
407           
408            // Use the default JDBC adapter if the
409            // Database type is not recognized.
410            if (adapter == null) {
411                adapter = new DefaultJDBCAdapter();
412                LOG.debug("Using default JDBC Adapter: " + adapter);
413            }
414            return adapter;
415        }
416    
417        private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
418            Object adapter = null;
419            TransactionContext c = getTransactionContext();
420            try {
421                try {
422                    // Make the filename file system safe.
423                    String dirverName = c.getConnection().getMetaData().getDriverName();
424                    dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
425    
426                    try {
427                        adapter = finder.newInstance(dirverName);
428                        LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
429                    } catch (Throwable e) {
430                        LOG.info("Database " + kind + " driver override not found for : [" + dirverName
431                                 + "].  Will use default implementation.");
432                    }
433                } catch (SQLException e) {
434                    LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
435                              + e.getMessage());
436                    JDBCPersistenceAdapter.log("Failure Details: ", e);
437                }
438            } finally {
439                c.close();
440            }
441            return adapter;
442        }
443    
444        public void setAdapter(JDBCAdapter adapter) {
445            this.adapter = adapter;
446            this.adapter.setStatements(getStatements());
447        }
448    
449        public WireFormat getWireFormat() {
450            return wireFormat;
451        }
452    
453        public void setWireFormat(WireFormat wireFormat) {
454            this.wireFormat = wireFormat;
455        }
456    
457        public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
458            if (context == null) {
459                return getTransactionContext();
460            } else {
461                TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
462                if (answer == null) {
463                    answer = getTransactionContext();
464                    context.setLongTermStoreContext(answer);
465                }
466                return answer;
467            }
468        }
469    
470        public TransactionContext getTransactionContext() throws IOException {
471            TransactionContext answer = new TransactionContext(getDataSource());
472            if (transactionIsolation > 0) {
473                answer.setTransactionIsolation(transactionIsolation);
474            }
475            return answer;
476        }
477    
478        public void beginTransaction(ConnectionContext context) throws IOException {
479            TransactionContext transactionContext = getTransactionContext(context);
480            transactionContext.begin();
481        }
482    
483        public void commitTransaction(ConnectionContext context) throws IOException {
484            TransactionContext transactionContext = getTransactionContext(context);
485            transactionContext.commit();
486        }
487    
488        public void rollbackTransaction(ConnectionContext context) throws IOException {
489            TransactionContext transactionContext = getTransactionContext(context);
490            transactionContext.rollback();
491        }
492    
493        public int getCleanupPeriod() {
494            return cleanupPeriod;
495        }
496    
497        /**
498         * Sets the number of milliseconds until the database is attempted to be
499         * cleaned up for durable topics
500         */
501        public void setCleanupPeriod(int cleanupPeriod) {
502            this.cleanupPeriod = cleanupPeriod;
503        }
504    
505        public void deleteAllMessages() throws IOException {
506            TransactionContext c = getTransactionContext();
507            try {
508                getAdapter().doDropTables(c);
509                getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
510                getAdapter().doCreateTables(c);
511            } catch (SQLException e) {
512                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
513                throw IOExceptionSupport.create(e);
514            } finally {
515                c.close();
516            }
517        }
518    
519        public boolean isUseExternalMessageReferences() {
520            return useExternalMessageReferences;
521        }
522    
523        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
524            this.useExternalMessageReferences = useExternalMessageReferences;
525        }
526    
527        public boolean isCreateTablesOnStartup() {
528            return createTablesOnStartup;
529        }
530    
531        /**
532         * Sets whether or not tables are created on startup
533         */
534        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
535            this.createTablesOnStartup = createTablesOnStartup;
536        }
537    
538        public boolean isUseDatabaseLock() {
539            return useDatabaseLock;
540        }
541    
542        /**
543         * Sets whether or not an exclusive database lock should be used to enable
544         * JDBC Master/Slave. Enabled by default.
545         */
546        public void setUseDatabaseLock(boolean useDatabaseLock) {
547            this.useDatabaseLock = useDatabaseLock;
548        }
549    
550        public static void log(String msg, SQLException e) {
551            String s = msg + e.getMessage();
552            while (e.getNextException() != null) {
553                e = e.getNextException();
554                s += ", due to: " + e.getMessage();
555            }
556            LOG.warn(s, e);
557        }
558    
559        public Statements getStatements() {
560            if (statements == null) {
561                statements = new Statements();
562            }
563            return statements;
564        }
565    
566        public void setStatements(Statements statements) {
567            this.statements = statements;
568        }
569    
570        /**
571         * @param usageManager The UsageManager that is controlling the
572         *                destination's memory usage.
573         */
574        public void setUsageManager(SystemUsage usageManager) {
575        }
576    
577        protected void databaseLockKeepAlive() {
578            boolean stop = false;
579            try {
580                DatabaseLocker locker = getDatabaseLocker();
581                if (locker != null) {
582                    if (!locker.keepAlive()) {
583                        stop = true;
584                    }
585                }
586            } catch (IOException e) {
587                LOG.error("Failed to get database when trying keepalive: " + e, e);
588            }
589            if (stop) {
590                stopBroker();
591            }
592        }
593    
594        protected void stopBroker() {
595            // we can no longer keep the lock so lets fail
596            LOG.info("No longer able to keep the exclusive lock so giving up being a master");
597            try {
598                brokerService.stop();
599            } catch (Exception e) {
600                LOG.warn("Failure occured while stopping broker");
601            }
602        }
603    
604        protected DatabaseLocker loadDataBaseLocker() throws IOException {
605            DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
606            if (locker == null) {
607                locker = new DefaultDatabaseLocker();
608                LOG.debug("Using default JDBC Locker: " + locker);
609            }
610            return locker;
611        }
612    
613        public void setBrokerName(String brokerName) {
614        }
615    
616        public String toString() {
617            return "JDBCPersistenceAdapter(" + super.toString() + ")";
618        }
619    
620        public void setDirectory(File dir) {
621        }
622    
623        public void checkpoint(boolean sync) throws IOException {
624        }
625    
626        public long size(){
627            return 0;
628        }
629    
630        public long getLockKeepAlivePeriod() {
631            return lockKeepAlivePeriod;
632        }
633    
634        public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
635            this.lockKeepAlivePeriod = lockKeepAlivePeriod;
636        }
637    
638        public long getLockAcquireSleepInterval() {
639            return lockAcquireSleepInterval;
640        }
641    
642        /**
643         * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
644         * not applied if DataBaseLocker is injected.
645         */
646        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
647            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
648        }
649        
650        /**
651         * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
652         * This allowable dirty isolation level may not be achievable in clustered DB environments
653         * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ
654         * see isolation level constants in {@link java.sql.Connection}
655         * @param transactionIsolation the isolation level to use
656         */
657        public void setTransactionIsolation(int transactionIsolation) {
658            this.transactionIsolation = transactionIsolation;
659        }
660    
661            public int getMaxProducersToAudit() {
662                    return maxProducersToAudit;
663            }
664    
665            public void setMaxProducersToAudit(int maxProducersToAudit) {
666                    this.maxProducersToAudit = maxProducersToAudit;
667            }
668    
669            public int getMaxAuditDepth() {
670                    return maxAuditDepth;
671            }
672    
673            public void setMaxAuditDepth(int maxAuditDepth) {
674                    this.maxAuditDepth = maxAuditDepth;
675            }
676    
677            public boolean isEnableAudit() {
678                    return enableAudit;
679            }
680    
681            public void setEnableAudit(boolean enableAudit) {
682                    this.enableAudit = enableAudit;
683            }
684    
685        public int getAuditRecoveryDepth() {
686            return auditRecoveryDepth;
687        }
688    
689        public void setAuditRecoveryDepth(int auditRecoveryDepth) {
690            this.auditRecoveryDepth = auditRecoveryDepth;
691        }
692    
693        public long getNextSequenceId() {
694            synchronized(sequenceGenerator) {
695                return sequenceGenerator.getNextSequenceId();
696            }
697        }
698        
699    }