001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.IOException; 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.SQLException; 023 024 import javax.sql.DataSource; 025 026 import org.apache.activemq.util.Handler; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 030 /** 031 * Represents an exclusive lock on a database to avoid multiple brokers running 032 * against the same logical database. 033 * 034 * @org.apache.xbean.XBean element="database-locker" 035 * @version $Revision: $ 036 */ 037 public class DefaultDatabaseLocker implements DatabaseLocker { 038 public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000; 039 private static final Log LOG = LogFactory.getLog(DefaultDatabaseLocker.class); 040 protected DataSource dataSource; 041 protected Statements statements; 042 protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; 043 044 protected Connection connection; 045 protected boolean stopping; 046 protected Handler<Exception> exceptionHandler; 047 048 public DefaultDatabaseLocker() { 049 } 050 051 public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException { 052 setPersistenceAdapter(persistenceAdapter); 053 } 054 055 public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException { 056 this.dataSource = adapter.getLockDataSource(); 057 this.statements = adapter.getStatements(); 058 } 059 060 public void start() throws Exception { 061 stopping = false; 062 063 LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); 064 String sql = statements.getLockCreateStatement(); 065 LOG.debug("Locking Query is "+sql); 066 067 PreparedStatement statement = null; 068 while (true) { 069 try { 070 connection = dataSource.getConnection(); 071 connection.setAutoCommit(false); 072 statement = connection.prepareStatement(sql); 073 statement.execute(); 074 break; 075 } catch (Exception e) { 076 try { 077 if (stopping) { 078 throw new Exception( 079 "Cannot start broker as being asked to shut down. " 080 + "Interrupted attempt to acquire lock: " 081 + e, e); 082 } 083 if (exceptionHandler != null) { 084 try { 085 exceptionHandler.handle(e); 086 } catch (Throwable handlerException) { 087 LOG.error( "The exception handler " 088 + exceptionHandler.getClass().getCanonicalName() 089 + " threw this exception: " 090 + handlerException 091 + " while trying to handle this excpetion: " 092 + e, handlerException); 093 } 094 095 } else { 096 LOG.debug("Lock failure: "+ e, e); 097 } 098 } finally { 099 // Let's make sure the database connection is properly 100 // closed when an error occurs so that we're not leaking 101 // connections 102 if (null != connection) { 103 try { 104 connection.close(); 105 } catch (SQLException e1) { 106 LOG.error("Caught exception while closing connection: " + e1, e1); 107 } 108 109 connection = null; 110 } 111 } 112 } finally { 113 if (null != statement) { 114 try { 115 statement.close(); 116 } catch (SQLException e1) { 117 LOG.debug("Caught while closing statement: " + e1, e1); 118 } 119 statement = null; 120 } 121 } 122 123 LOG.info("Failed to acquire lock. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again..."); 124 try { 125 Thread.sleep(lockAcquireSleepInterval); 126 } catch (InterruptedException ie) { 127 LOG.warn("Master lock retry sleep interrupted", ie); 128 } 129 } 130 131 LOG.info("Becoming the master on dataSource: " + dataSource); 132 } 133 134 public void stop() throws Exception { 135 stopping = true; 136 try { 137 if (connection != null && !connection.isClosed()) { 138 try { 139 connection.rollback(); 140 } catch (SQLException sqle) { 141 LOG.warn("Exception while rollbacking the connection on shutdown", sqle); 142 } finally { 143 try { 144 connection.close(); 145 } catch (SQLException ignored) { 146 LOG.debug("Exception while closing connection on shutdown", ignored); 147 } 148 } 149 } 150 } catch (SQLException sqle) { 151 LOG.warn("Exception while checking close status of connection on shutdown", sqle); 152 } 153 } 154 155 public boolean keepAlive() { 156 PreparedStatement statement = null; 157 boolean result = false; 158 try { 159 statement = connection.prepareStatement(statements.getLockUpdateStatement()); 160 statement.setLong(1, System.currentTimeMillis()); 161 int rows = statement.executeUpdate(); 162 if (rows == 1) { 163 result=true; 164 } 165 } catch (Exception e) { 166 LOG.error("Failed to update database lock: " + e, e); 167 } finally { 168 if (statement != null) { 169 try { 170 statement.close(); 171 } catch (SQLException e) { 172 LOG.error("Failed to close statement",e); 173 } 174 } 175 } 176 return result; 177 } 178 179 public long getLockAcquireSleepInterval() { 180 return lockAcquireSleepInterval; 181 } 182 183 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { 184 this.lockAcquireSleepInterval = lockAcquireSleepInterval; 185 } 186 187 public Handler getExceptionHandler() { 188 return exceptionHandler; 189 } 190 191 public void setExceptionHandler(Handler exceptionHandler) { 192 this.exceptionHandler = exceptionHandler; 193 } 194 195 }