001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    
025    import javax.sql.DataSource;
026    
027    import org.apache.activemq.util.IOExceptionSupport;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * Helps keep track of the current transaction/JDBC connection.
033     * 
034     * @version $Revision: 1.2 $
035     */
036    public class TransactionContext {
037    
038        private static final Log LOG = LogFactory.getLog(TransactionContext.class);
039    
040        private final DataSource dataSource;
041        private Connection connection;
042        private boolean inTx;
043        private PreparedStatement addMessageStatement;
044        private PreparedStatement removedMessageStatement;
045        private PreparedStatement updateLastAckStatement;
046        // a cheap dirty level that we can live with    
047        private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
048        
049        public TransactionContext(DataSource dataSource) {
050            this.dataSource = dataSource;
051        }
052    
053        public Connection getConnection() throws IOException {
054            if (connection == null) {
055                try {
056                    connection = dataSource.getConnection();
057                    boolean autoCommit = !inTx;
058                    if (connection.getAutoCommit() != autoCommit) {
059                        connection.setAutoCommit(autoCommit);
060                    }
061                } catch (SQLException e) {
062                    JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
063                    throw IOExceptionSupport.create(e);
064                }
065    
066                try {
067                    connection.setTransactionIsolation(transactionIsolation);
068                } catch (Throwable e) {
069                }
070            }
071            return connection;
072        }
073    
074        public void executeBatch() throws SQLException {
075            try {
076                executeBatch(addMessageStatement, "Failed add a message");
077            } finally {
078                addMessageStatement = null;
079                try {
080                    executeBatch(removedMessageStatement, "Failed to remove a message");
081                } finally {
082                    removedMessageStatement = null;
083                    try {
084                        executeBatch(updateLastAckStatement, "Failed to ack a message");
085                    } finally {
086                        updateLastAckStatement = null;
087                    }
088                }
089            }
090        }
091    
092        private void executeBatch(PreparedStatement p, String message) throws SQLException {
093            if (p == null) {
094                return;
095            }
096    
097            try {
098                int[] rc = p.executeBatch();
099                for (int i = 0; i < rc.length; i++) {
100                    int code = rc[i];
101                    if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
102                        throw new SQLException(message + ". Response code: " + code);
103                    }
104                }
105            } finally {
106                try {
107                    p.close();
108                } catch (Throwable e) {
109                }
110            }
111        }
112    
113        public void close() throws IOException {
114            if (!inTx) {
115                try {
116    
117                    /**
118                     * we are not in a transaction so should not be committing ??
119                     * This was previously commented out - but had adverse affects
120                     * on testing - so it's back!
121                     * 
122                     */
123                    try {
124                        executeBatch();
125                    } finally {
126                        if (connection != null && !connection.getAutoCommit()) {
127                            connection.commit();
128                        }
129                    }
130    
131                } catch (SQLException e) {
132                    JDBCPersistenceAdapter.log("Error while closing connection: ", e);
133                    throw IOExceptionSupport.create(e);
134                } finally {
135                    try {
136                        if (connection != null) {
137                            connection.close();
138                        }
139                    } catch (Throwable e) {
140                        LOG.warn("Close failed: " + e.getMessage(), e);
141                    } finally {
142                        connection = null;
143                    }
144                }
145            }
146        }
147    
148        public void begin() throws IOException {
149            if (inTx) {
150                throw new IOException("Already started.");
151            }
152            inTx = true;
153            connection = getConnection();
154        }
155    
156        public void commit() throws IOException {
157            if (!inTx) {
158                throw new IOException("Not started.");
159            }
160            try {
161                executeBatch();
162                if (!connection.getAutoCommit()) {
163                    connection.commit();
164                }
165            } catch (SQLException e) {
166                JDBCPersistenceAdapter.log("Commit failed: ", e);
167                
168                this.rollback(); 
169                
170                throw IOExceptionSupport.create(e);
171            } finally {
172                inTx = false;
173                close();
174            }
175        }
176    
177        public void rollback() throws IOException {
178            if (!inTx) {
179                throw new IOException("Not started.");
180            }
181            try {
182                if (addMessageStatement != null) {
183                    addMessageStatement.close();
184                    addMessageStatement = null;
185                }
186                if (removedMessageStatement != null) {
187                    removedMessageStatement.close();
188                    removedMessageStatement = null;
189                }
190                if (updateLastAckStatement != null) {
191                    updateLastAckStatement.close();
192                    updateLastAckStatement = null;
193                }
194                connection.rollback();
195    
196            } catch (SQLException e) {
197                JDBCPersistenceAdapter.log("Rollback failed: ", e);
198                throw IOExceptionSupport.create(e);
199            } finally {
200                inTx = false;
201                close();
202            }
203        }
204    
205        public PreparedStatement getAddMessageStatement() {
206            return addMessageStatement;
207        }
208    
209        public void setAddMessageStatement(PreparedStatement addMessageStatement) {
210            this.addMessageStatement = addMessageStatement;
211        }
212    
213        public PreparedStatement getUpdateLastAckStatement() {
214            return updateLastAckStatement;
215        }
216    
217        public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
218            this.updateLastAckStatement = ackMessageStatement;
219        }
220    
221        public PreparedStatement getRemovedMessageStatement() {
222            return removedMessageStatement;
223        }
224    
225        public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
226            this.removedMessageStatement = removedMessageStatement;
227        }
228        
229        public void setTransactionIsolation(int transactionIsolation) {
230            this.transactionIsolation = transactionIsolation;
231        }
232    
233    }