001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import javax.jms.JMSException;
027    import javax.transaction.xa.XAException;
028    
029    import org.apache.activemq.ActiveMQMessageAudit;
030    import org.apache.activemq.command.ConnectionInfo;
031    import org.apache.activemq.command.LocalTransactionId;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.MessageAck;
034    import org.apache.activemq.command.ProducerInfo;
035    import org.apache.activemq.command.TransactionId;
036    import org.apache.activemq.command.XATransactionId;
037    import org.apache.activemq.state.ProducerState;
038    import org.apache.activemq.store.TransactionRecoveryListener;
039    import org.apache.activemq.store.TransactionStore;
040    import org.apache.activemq.transaction.LocalTransaction;
041    import org.apache.activemq.transaction.Synchronization;
042    import org.apache.activemq.transaction.Transaction;
043    import org.apache.activemq.transaction.XATransaction;
044    import org.apache.activemq.util.IOExceptionSupport;
045    import org.apache.activemq.util.WrappedException;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    
049    /**
050     * This broker filter handles the transaction related operations in the Broker
051     * interface.
052     * 
053     * @version $Revision: 1.10 $
054     */
055    public class TransactionBroker extends BrokerFilter {
056    
057        private static final Log LOG = LogFactory.getLog(TransactionBroker.class);
058    
059        // The prepared XA transactions.
060        private TransactionStore transactionStore;
061        private Map<TransactionId, Transaction> xaTransactions = new LinkedHashMap<TransactionId, Transaction>();
062        private ActiveMQMessageAudit audit;
063    
064        public TransactionBroker(Broker next, TransactionStore transactionStore) {
065            super(next);
066            this.transactionStore = transactionStore;
067        }
068    
069        // ////////////////////////////////////////////////////////////////////////////
070        //
071        // Life cycle Methods
072        //
073        // ////////////////////////////////////////////////////////////////////////////
074    
075        /**
076         * Recovers any prepared transactions.
077         */
078        public void start() throws Exception {
079            transactionStore.start();
080            try {
081                final ConnectionContext context = new ConnectionContext();
082                context.setBroker(this);
083                context.setInRecoveryMode(true);
084                context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
085                context.setProducerFlowControl(false);
086                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
087                producerExchange.setMutable(true);
088                producerExchange.setConnectionContext(context);
089                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
090                final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
091                consumerExchange.setConnectionContext(context);
092                transactionStore.recover(new TransactionRecoveryListener() {
093                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
094                        try {
095                            beginTransaction(context, xid);
096                            for (int i = 0; i < addedMessages.length; i++) {
097                                send(producerExchange, addedMessages[i]);
098                            }
099                            for (int i = 0; i < aks.length; i++) {
100                                acknowledge(consumerExchange, aks[i]);
101                            }
102                            prepareTransaction(context, xid);
103                        } catch (Throwable e) {
104                            throw new WrappedException(e);
105                        }
106                    }
107                });
108            } catch (WrappedException e) {
109                Throwable cause = e.getCause();
110                throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
111            }
112            next.start();
113        }
114    
115        public void stop() throws Exception {
116            transactionStore.stop();
117            next.stop();
118        }
119    
120        // ////////////////////////////////////////////////////////////////////////////
121        //
122        // BrokerFilter overrides
123        //
124        // ////////////////////////////////////////////////////////////////////////////
125        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
126            List<TransactionId> txs = new ArrayList<TransactionId>();
127            synchronized (xaTransactions) {
128                for (Iterator<Transaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
129                    Transaction tx = iter.next();
130                    if (tx.isPrepared()) {
131                        txs.add(tx.getTransactionId());
132                    }
133                }
134            }
135            XATransactionId rc[] = new XATransactionId[txs.size()];
136            txs.toArray(rc);
137            return rc;
138        }
139    
140        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
141            // the transaction may have already been started.
142            if (xid.isXATransaction()) {
143                Transaction transaction = null;
144                synchronized (xaTransactions) {
145                    transaction = xaTransactions.get(xid);
146                    if (transaction != null) {
147                        return;
148                    }
149                    transaction = new XATransaction(transactionStore, (XATransactionId)xid, this);
150                    xaTransactions.put(xid, transaction);
151                }
152            } else {
153                Map<TransactionId, Transaction> transactionMap = context.getTransactions();
154                Transaction transaction = transactionMap.get(xid);
155                if (transaction != null) {
156                    throw new JMSException("Transaction '" + xid + "' has already been started.");
157                }
158                transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
159                transactionMap.put(xid, transaction);
160            }
161        }
162    
163        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
164            Transaction transaction = getTransaction(context, xid, false);
165            return transaction.prepare();
166        }
167    
168        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
169            Transaction transaction = getTransaction(context, xid, true);
170            transaction.commit(onePhase);
171        }
172    
173        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
174            Transaction transaction = getTransaction(context, xid, true);
175            transaction.rollback();
176        }
177    
178        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
179            Transaction transaction = getTransaction(context, xid, true);
180            transaction.rollback();
181        }
182    
183        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
184            // This method may be invoked recursively.
185            // Track original tx so that it can be restored.
186            final ConnectionContext context = consumerExchange.getConnectionContext();
187            Transaction originalTx = context.getTransaction();
188            Transaction transaction = null;
189            if (ack.isInTransaction()) {
190                transaction = getTransaction(context, ack.getTransactionId(), false);
191            }
192            context.setTransaction(transaction);
193            try {
194                next.acknowledge(consumerExchange, ack);
195            } finally {
196                context.setTransaction(originalTx);
197            }
198        }
199    
200        public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
201            // This method may be invoked recursively.
202            // Track original tx so that it can be restored.
203            final ConnectionContext context = producerExchange.getConnectionContext();
204            Transaction originalTx = context.getTransaction();
205            Transaction transaction = null;
206            Synchronization sync = null;
207            if (message.getTransactionId() != null) {
208                transaction = getTransaction(context, message.getTransactionId(), false);
209                if (transaction != null) {
210                    sync = new Synchronization() {
211    
212                        public void afterRollback() {
213                            if (audit != null) {
214                                audit.rollback(message);
215                            }
216                        }
217                    };
218                    transaction.addSynchronization(sync);
219                }
220            }
221            if (audit == null || !audit.isDuplicate(message)) {
222                context.setTransaction(transaction);
223                try {
224                    next.send(producerExchange, message);
225                } finally {
226                    context.setTransaction(originalTx);
227                }
228            } else {
229                if (sync != null && transaction != null) {
230                    transaction.removeSynchronization(sync);
231                }
232                if (LOG.isDebugEnabled()) {
233                    LOG.debug("IGNORING duplicate message " + message);
234                }
235            }
236        }
237    
238        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
239            for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
240                try {
241                    Transaction transaction = iter.next();
242                    transaction.rollback();
243                } catch (Exception e) {
244                    LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
245                }
246                iter.remove();
247            }
248            next.removeConnection(context, info, error);
249        }
250    
251        // ////////////////////////////////////////////////////////////////////////////
252        //
253        // Implementation help methods.
254        //
255        // ////////////////////////////////////////////////////////////////////////////
256        public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
257            Map transactionMap = null;
258            synchronized (xaTransactions) {
259                transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
260            }
261            Transaction transaction = (Transaction)transactionMap.get(xid);
262            if (transaction != null) {
263                return transaction;
264            }
265            if (xid.isXATransaction()) {
266                XAException e = new XAException("Transaction '" + xid + "' has not been started.");
267                e.errorCode = XAException.XAER_NOTA;
268                throw e;
269            } else {
270                throw new JMSException("Transaction '" + xid + "' has not been started.");
271            }
272        }
273    
274        public void removeTransaction(XATransactionId xid) {
275            synchronized (xaTransactions) {
276                xaTransactions.remove(xid);
277            }
278        }
279    
280        public synchronized void brokerServiceStarted() {
281            super.brokerServiceStarted();
282            if (getBrokerService().isSupportFailOver() && audit == null) {
283                audit = new ActiveMQMessageAudit();
284            }
285        }
286    
287    }