001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transaction;
018    
019    import java.io.IOException;
020    
021    import javax.transaction.xa.XAException;
022    
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.command.LocalTransactionId;
025    import org.apache.activemq.command.TransactionId;
026    import org.apache.activemq.store.TransactionStore;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * @version $Revision: 1.3 $
032     */
033    public class LocalTransaction extends Transaction {
034    
035        private static final Log LOG = LogFactory.getLog(LocalTransaction.class);
036    
037        private final TransactionStore transactionStore;
038        private final LocalTransactionId xid;
039        private final ConnectionContext context;
040    
041        public LocalTransaction(TransactionStore transactionStore, LocalTransactionId xid, ConnectionContext context) {
042            this.transactionStore = transactionStore;
043            this.xid = xid;
044            this.context = context;
045        }
046    
047        public void commit(boolean onePhase) throws XAException, IOException {
048            if (LOG.isDebugEnabled()) {
049                LOG.debug("commit: "  + xid
050                        + " syncCount: " + size());
051            }
052            
053            // Get ready for commit.
054            try {
055                prePrepare();
056            } catch (XAException e) {
057                throw e;
058            } catch (Throwable e) {
059                LOG.warn("COMMIT FAILED: ", e);
060                rollback();
061                // Let them know we rolled back.
062                XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
063                xae.errorCode = XAException.XA_RBOTHER;
064                xae.initCause(e);
065                throw xae;
066            }
067    
068            setState(Transaction.FINISHED_STATE);
069            context.getTransactions().remove(xid);
070            // Sync on transaction store to avoid out of order messages in the cursor
071            // https://issues.apache.org/activemq/browse/AMQ-2594
072            synchronized (transactionStore) {
073                transactionStore.commit(getTransactionId(), false);
074    
075                try {
076                    fireAfterCommit();
077                } catch (Throwable e) {
078                    // I guess this could happen. Post commit task failed
079                    // to execute properly.
080                    LOG.warn("POST COMMIT FAILED: ", e);
081                    XAException xae = new XAException("POST COMMIT FAILED");
082                    xae.errorCode = XAException.XAER_RMERR;
083                    xae.initCause(e);
084                    throw xae;
085                }
086            }
087        }
088    
089        public void rollback() throws XAException, IOException {
090    
091            if (LOG.isDebugEnabled()) {
092                LOG.debug("rollback: "  + xid
093                        + " syncCount: " + size());
094            }
095            setState(Transaction.FINISHED_STATE);
096            context.getTransactions().remove(xid);
097            // Sync on transaction store to avoid out of order messages in the cursor
098            // https://issues.apache.org/activemq/browse/AMQ-2594
099            synchronized (transactionStore) {
100               transactionStore.rollback(getTransactionId());
101    
102                try {
103                    fireAfterRollback();
104                } catch (Throwable e) {
105                    LOG.warn("POST ROLLBACK FAILED: ", e);
106                    XAException xae = new XAException("POST ROLLBACK FAILED");
107                    xae.errorCode = XAException.XAER_RMERR;
108                    xae.initCause(e);
109                    throw xae;
110                }
111            }
112        }
113    
114        public int prepare() throws XAException {
115            XAException xae = new XAException("Prepare not implemented on Local Transactions.");
116            xae.errorCode = XAException.XAER_RMERR;
117            throw xae;
118        }
119    
120        public TransactionId getTransactionId() {
121            return xid;
122        }
123    
124    }