001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.InterruptedIOException;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.List;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.jms.JMSException;
026    import javax.jms.TransactionInProgressException;
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAException;
029    import javax.transaction.xa.XAResource;
030    import javax.transaction.xa.Xid;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.command.ConnectionId;
034    import org.apache.activemq.command.DataArrayResponse;
035    import org.apache.activemq.command.DataStructure;
036    import org.apache.activemq.command.IntegerResponse;
037    import org.apache.activemq.command.LocalTransactionId;
038    import org.apache.activemq.command.Response;
039    import org.apache.activemq.command.TransactionId;
040    import org.apache.activemq.command.TransactionInfo;
041    import org.apache.activemq.command.XATransactionId;
042    import org.apache.activemq.transaction.Synchronization;
043    import org.apache.activemq.util.JMSExceptionSupport;
044    import org.apache.activemq.util.LongSequenceGenerator;
045    import org.apache.commons.logging.Log;
046    import org.apache.commons.logging.LogFactory;
047    
048    /**
049     * A TransactionContext provides the means to control a JMS transaction. It
050     * provides a local transaction interface and also an XAResource interface. <p/>
051     * An application server controls the transactional assignment of an XASession
052     * by obtaining its XAResource. It uses the XAResource to assign the session to
053     * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054     * XAResource provides some fairly sophisticated facilities for interleaving
055     * work on multiple transactions, recovering a list of transactions in progress,
056     * and so on. A JTA aware JMS provider must fully implement this functionality.
057     * This could be done by using the services of a database that supports XA, or a
058     * JMS provider may choose to implement this functionality from scratch. <p/>
059     * 
060     * @version $Revision: 1.10 $
061     * @see javax.jms.Session
062     * @see javax.jms.QueueSession
063     * @see javax.jms.TopicSession
064     * @see javax.jms.XASession
065     */
066    public class TransactionContext implements XAResource {
067    
068        private static final Log LOG = LogFactory.getLog(TransactionContext.class);
069    
070        // XATransactionId -> ArrayList of TransactionContext objects
071        private final static ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
072    
073        private final ActiveMQConnection connection;
074        private final LongSequenceGenerator localTransactionIdGenerator;
075        private final ConnectionId connectionId;
076        private List<Synchronization> synchronizations;
077    
078        // To track XA transactions.
079        private Xid associatedXid;
080        private TransactionId transactionId;
081        private LocalTransactionEventListener localTransactionEventListener;
082        private int beforeEndIndex;
083    
084        public TransactionContext(ActiveMQConnection connection) {
085            this.connection = connection;
086            this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
087            this.connectionId = connection.getConnectionInfo().getConnectionId();
088        }
089    
090        public boolean isInXATransaction() {
091            return transactionId != null && transactionId.isXATransaction();
092        }
093    
094        public boolean isInLocalTransaction() {
095            return transactionId != null && transactionId.isLocalTransaction();
096        }
097    
098        public boolean isInTransaction() {
099            return transactionId != null;
100        }
101        
102        /**
103         * @return Returns the localTransactionEventListener.
104         */
105        public LocalTransactionEventListener getLocalTransactionEventListener() {
106            return localTransactionEventListener;
107        }
108    
109        /**
110         * Used by the resource adapter to listen to transaction events.
111         * 
112         * @param localTransactionEventListener The localTransactionEventListener to
113         *                set.
114         */
115        public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
116            this.localTransactionEventListener = localTransactionEventListener;
117        }
118    
119        // ///////////////////////////////////////////////////////////
120        //
121        // Methods that work with the Synchronization objects registered with
122        // the transaction.
123        //
124        // ///////////////////////////////////////////////////////////
125    
126        public void addSynchronization(Synchronization s) {
127            if (synchronizations == null) {
128                synchronizations = new ArrayList<Synchronization>(10);
129            }
130            synchronizations.add(s);
131        }
132    
133        private void afterRollback() throws JMSException {
134            if (synchronizations == null) {
135                return;
136            }
137    
138            int size = synchronizations.size();
139            try {
140                for (int i = 0; i < size; i++) {
141                    synchronizations.get(i).afterRollback();
142                }
143            } catch (JMSException e) {
144                throw e;
145            } catch (Throwable e) {
146                throw JMSExceptionSupport.create(e);
147            } finally {
148                synchronizations = null;
149            }
150        }
151    
152        private void afterCommit() throws JMSException {
153            if (synchronizations == null) {
154                return;
155            }
156    
157            int size = synchronizations.size();
158            try {
159                for (int i = 0; i < size; i++) {
160                    synchronizations.get(i).afterCommit();
161                }
162            } catch (JMSException e) {
163                throw e;
164            } catch (Throwable e) {
165                throw JMSExceptionSupport.create(e);
166            } finally {
167                    synchronizations = null;
168            }
169        }
170    
171        private void beforeEnd() throws JMSException {
172            if (synchronizations == null) {
173                return;
174            }
175    
176            int size = synchronizations.size();
177            try {
178                for (;beforeEndIndex < size;) {
179                    synchronizations.get(beforeEndIndex++).beforeEnd();
180                }
181            } catch (JMSException e) {
182                throw e;
183            } catch (Throwable e) {
184                throw JMSExceptionSupport.create(e);
185            }
186        }
187    
188        public TransactionId getTransactionId() {
189            return transactionId;
190        }
191    
192        // ///////////////////////////////////////////////////////////
193        //
194        // Local transaction interface.
195        //
196        // ///////////////////////////////////////////////////////////
197    
198        /**
199         * Start a local transaction.
200         * @throws javax.jms.JMSException on internal error
201         */
202        public void begin() throws JMSException {
203    
204            if (isInXATransaction()) {
205                throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
206            }
207            
208            if (transactionId == null) {
209                synchronizations = null;
210                beforeEndIndex = 0;
211                this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
212                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
213                this.connection.ensureConnectionInfoSent();
214                this.connection.asyncSendPacket(info);
215    
216                // Notify the listener that the tx was started.
217                if (localTransactionEventListener != null) {
218                    localTransactionEventListener.beginEvent();
219                }
220                if (LOG.isDebugEnabled()) {
221                    LOG.debug("Begin:" + transactionId);
222                }
223            }
224            
225        }
226    
227        /**
228         * Rolls back any work done in this transaction and releases any locks
229         * currently held.
230         * 
231         * @throws JMSException if the JMS provider fails to roll back the
232         *                 transaction due to some internal error.
233         * @throws javax.jms.IllegalStateException if the method is not called by a
234         *                 transacted session.
235         */
236        public void rollback() throws JMSException {
237            if (isInXATransaction()) {
238                throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
239            }
240            
241            try {
242                beforeEnd();
243            } catch (TransactionRolledBackException canOcurrOnFailover) {
244                LOG.warn("rollback processing error", canOcurrOnFailover);
245            }
246            if (transactionId != null) {
247                if (LOG.isDebugEnabled()) {
248                    LOG.debug("Rollback: "  + transactionId
249                    + " syncCount: " 
250                    + (synchronizations != null ? synchronizations.size() : 0));
251                }
252    
253                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
254                this.transactionId = null;
255                //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
256                this.connection.syncSendPacket(info);
257                // Notify the listener that the tx was rolled back
258                if (localTransactionEventListener != null) {
259                    localTransactionEventListener.rollbackEvent();
260                }
261            }
262    
263            afterRollback();
264        }
265    
266        /**
267         * Commits all work done in this transaction and releases any locks
268         * currently held.
269         * 
270         * @throws JMSException if the JMS provider fails to commit the transaction
271         *                 due to some internal error.
272         * @throws javax.jms.IllegalStateException if the method is not called by a
273         *                 transacted session.
274         */
275        public void commit() throws JMSException {
276            if (isInXATransaction()) {
277                throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
278            }
279            
280            try {
281                beforeEnd();
282            } catch (JMSException e) {
283                rollback();
284                throw e;
285            }
286    
287            // Only send commit if the transaction was started.
288            if (transactionId != null) {
289                if (LOG.isDebugEnabled()) {
290                    LOG.debug("Commit: "  + transactionId
291                            + " syncCount: " 
292                            + (synchronizations != null ? synchronizations.size() : 0));
293                }
294    
295                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
296                this.transactionId = null;
297                // Notify the listener that the tx was committed back
298                try {
299                    syncSendPacketWithInterruptionHandling(info);
300                    if (localTransactionEventListener != null) {
301                        localTransactionEventListener.commitEvent();
302                    }
303                    afterCommit();
304                } catch (JMSException cause) {
305                    LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
306                    if (localTransactionEventListener != null) {
307                        localTransactionEventListener.rollbackEvent();
308                    }
309                    afterRollback();
310                    throw cause;
311                }
312                
313            }
314        }
315    
316        // ///////////////////////////////////////////////////////////
317        //
318        // XAResource Implementation
319        //
320        // ///////////////////////////////////////////////////////////
321        /**
322         * Associates a transaction with the resource.
323         */
324        public void start(Xid xid, int flags) throws XAException {
325    
326            if (LOG.isDebugEnabled()) {
327                LOG.debug("Start: " + xid);
328            }
329            if (isInLocalTransaction()) {
330                throw new XAException(XAException.XAER_PROTO);
331            }
332            // Are we already associated?
333            if (associatedXid != null) {
334                throw new XAException(XAException.XAER_PROTO);
335            }
336    
337            // if ((flags & TMJOIN) == TMJOIN) {
338            // // TODO: verify that the server has seen the xid
339            // }
340            // if ((flags & TMJOIN) == TMRESUME) {
341            // // TODO: verify that the xid was suspended.
342            // }
343    
344            // associate
345            synchronizations = null;
346            beforeEndIndex = 0;
347            setXid(xid);
348        }
349    
350        /**
351         * @return connectionId for connection
352         */
353        private ConnectionId getConnectionId() {
354            return connection.getConnectionInfo().getConnectionId();
355        }
356    
357        public void end(Xid xid, int flags) throws XAException {
358    
359            if (LOG.isDebugEnabled()) {
360                LOG.debug("End: " + xid);
361            }
362            
363            if (isInLocalTransaction()) {
364                throw new XAException(XAException.XAER_PROTO);
365            }
366            
367            if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
368                // You can only suspend the associated xid.
369                if (!equals(associatedXid, xid)) {
370                    throw new XAException(XAException.XAER_PROTO);
371                }
372    
373                // TODO: we may want to put the xid in a suspended list.
374                try {
375                    beforeEnd();
376                } catch (JMSException e) {
377                    throw toXAException(e);
378                }
379                setXid(null);
380            } else if ((flags & TMSUCCESS) == TMSUCCESS) {
381                // set to null if this is the current xid.
382                // otherwise this could be an asynchronous success call
383                if (equals(associatedXid, xid)) {
384                    try {
385                        beforeEnd();
386                    } catch (JMSException e) {
387                        throw toXAException(e);
388                    }
389                    setXid(null);
390                }
391            } else {
392                throw new XAException(XAException.XAER_INVAL);
393            }
394        }
395    
396        private boolean equals(Xid xid1, Xid xid2) {
397            if (xid1 == xid2) {
398                return true;
399            }
400            if (xid1 == null ^ xid2 == null) {
401                return false;
402            }
403            return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
404                   && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
405        }
406    
407        public int prepare(Xid xid) throws XAException {
408            if (LOG.isDebugEnabled()) {
409                LOG.debug("Prepare: " + xid);
410            }
411            
412            // We allow interleaving multiple transactions, so
413            // we don't limit prepare to the associated xid.
414            XATransactionId x;
415            // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
416            // called first
417            if (xid == null || (equals(associatedXid, xid))) {
418                throw new XAException(XAException.XAER_PROTO);
419            } else {
420                // TODO: cache the known xids so we don't keep recreating this one??
421                x = new XATransactionId(xid);
422            }
423    
424            try {
425                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
426    
427                // Find out if the server wants to commit or rollback.
428                IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
429                return response.getResult();
430    
431            } catch (JMSException e) {
432                throw toXAException(e);
433            }
434        }
435    
436        public void rollback(Xid xid) throws XAException {
437    
438            if (LOG.isDebugEnabled()) {
439                LOG.debug("Rollback: " + xid);
440            }
441            
442            // We allow interleaving multiple transactions, so
443            // we don't limit rollback to the associated xid.
444            XATransactionId x;
445            if (xid == null) {
446                throw new XAException(XAException.XAER_PROTO);
447            }
448            if (equals(associatedXid, xid)) {
449                // I think this can happen even without an end(xid) call. Need to
450                // check spec.
451                x = (XATransactionId)transactionId;
452            } else {
453                x = new XATransactionId(xid);
454            }
455    
456            try {
457                this.connection.checkClosedOrFailed();
458                this.connection.ensureConnectionInfoSent();
459    
460                // Let the server know that the tx is rollback.
461                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
462                syncSendPacketWithInterruptionHandling(info);
463    
464                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
465                if (l != null && !l.isEmpty()) {
466                    for (TransactionContext ctx : l) {
467                        ctx.afterRollback();
468                    }
469                }
470    
471            } catch (JMSException e) {
472                throw toXAException(e);
473            }
474        }
475    
476        // XAResource interface
477        public void commit(Xid xid, boolean onePhase) throws XAException {
478    
479            if (LOG.isDebugEnabled()) {
480                LOG.debug("Commit: " + xid);
481            }
482            
483            // We allow interleaving multiple transactions, so
484            // we don't limit commit to the associated xid.
485            XATransactionId x;
486            if (xid == null || (equals(associatedXid, xid))) {
487                // should never happen, end(xid,TMSUCCESS) must have been previously
488                // called
489                throw new XAException(XAException.XAER_PROTO);
490            } else {
491                x = new XATransactionId(xid);
492            }
493    
494            try {
495                this.connection.checkClosedOrFailed();
496                this.connection.ensureConnectionInfoSent();
497    
498                // Notify the server that the tx was committed back
499                TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
500    
501                syncSendPacketWithInterruptionHandling(info);
502    
503                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
504                if (l != null && !l.isEmpty()) {
505                    for (TransactionContext ctx : l) {
506                        ctx.afterCommit();
507                    }
508                }
509    
510            } catch (JMSException e) {
511                throw toXAException(e);
512            }
513    
514        }
515    
516        public void forget(Xid xid) throws XAException {
517            if (LOG.isDebugEnabled()) {
518                LOG.debug("Forget: " + xid);
519            }
520            
521            // We allow interleaving multiple transactions, so
522            // we don't limit forget to the associated xid.
523            XATransactionId x;
524            if (xid == null) {
525                throw new XAException(XAException.XAER_PROTO);
526            }
527            if (equals(associatedXid, xid)) {
528                // TODO determine if this can happen... I think not.
529                x = (XATransactionId)transactionId;
530            } else {
531                x = new XATransactionId(xid);
532            }
533    
534            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
535    
536            try {
537                // Tell the server to forget the transaction.
538                syncSendPacketWithInterruptionHandling(info);
539            } catch (JMSException e) {
540                throw toXAException(e);
541            }
542        }
543    
544        public boolean isSameRM(XAResource xaResource) throws XAException {
545            if (xaResource == null) {
546                return false;
547            }
548            if (!(xaResource instanceof TransactionContext)) {
549                return false;
550            }
551            TransactionContext xar = (TransactionContext)xaResource;
552            try {
553                return getResourceManagerId().equals(xar.getResourceManagerId());
554            } catch (Throwable e) {
555                throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
556            }
557        }
558    
559        public Xid[] recover(int flag) throws XAException {
560            if (LOG.isDebugEnabled()) {
561                LOG.debug("Recover: " + flag);
562            }
563            
564            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
565            try {
566                this.connection.checkClosedOrFailed();
567                this.connection.ensureConnectionInfoSent();
568    
569                DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
570                DataStructure[] data = receipt.getData();
571                XATransactionId[] answer;
572                if (data instanceof XATransactionId[]) {
573                    answer = (XATransactionId[])data;
574                } else {
575                    answer = new XATransactionId[data.length];
576                    System.arraycopy(data, 0, answer, 0, data.length);
577                }
578                return answer;
579            } catch (JMSException e) {
580                throw toXAException(e);
581            }
582        }
583    
584        public int getTransactionTimeout() throws XAException {
585            return 0;
586        }
587    
588        public boolean setTransactionTimeout(int seconds) throws XAException {
589            return false;
590        }
591    
592        // ///////////////////////////////////////////////////////////
593        //
594        // Helper methods.
595        //
596        // ///////////////////////////////////////////////////////////
597        private String getResourceManagerId() throws JMSException {
598            return this.connection.getResourceManagerId();
599        }
600    
601        private void setXid(Xid xid) throws XAException {
602    
603            try {
604                this.connection.checkClosedOrFailed();
605                this.connection.ensureConnectionInfoSent();
606            } catch (JMSException e) {
607                throw toXAException(e);
608            }
609    
610            if (xid != null) {
611                // associate
612                associatedXid = xid;
613                transactionId = new XATransactionId(xid);
614    
615                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
616                try {
617                    this.connection.asyncSendPacket(info);
618                    if (LOG.isDebugEnabled()) {
619                        LOG.debug("Started XA transaction: " + transactionId);
620                    }
621                } catch (JMSException e) {
622                    throw toXAException(e);
623                }
624    
625            } else {
626    
627                if (transactionId != null) {
628                    TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
629                    try {
630                        syncSendPacketWithInterruptionHandling(info);
631                        if (LOG.isDebugEnabled()) {
632                            LOG.debug("Ended XA transaction: " + transactionId);
633                        }
634                    } catch (JMSException e) {
635                        throw toXAException(e);
636                    }
637    
638                    // Add our self to the list of contexts that are interested in
639                    // post commit/rollback events.
640                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
641                    if (l == null) {
642                        l = new ArrayList<TransactionContext>(3);
643                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
644                        l.add(this);
645                    } else if (!l.contains(this)) {
646                        l.add(this);
647                    }
648                }
649    
650                // dis-associate
651                associatedXid = null;
652                transactionId = null;
653            }
654        }
655    
656        /**
657         * Sends the given command. Also sends the command in case of interruption,
658         * so that important commands like rollback and commit are never interrupted.
659         * If interruption occurred, set the interruption state of the current 
660         * after performing the action again. 
661         * 
662         * @return the response
663         */
664        private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
665            try {
666                            return this.connection.syncSendPacket(command);
667                    } catch (JMSException e) {
668                            if (e.getLinkedException() instanceof InterruptedIOException) {
669                                    try {
670                                            Thread.interrupted();
671                                            return this.connection.syncSendPacket(command);
672                                    } finally {
673                                            Thread.currentThread().interrupt();
674                                    }                               
675                            }
676                            
677                            throw e;
678                    }
679        }
680    
681        /**
682         * Converts a JMSException from the server to an XAException. if the
683         * JMSException contained a linked XAException that is returned instead.
684         * 
685         * @param e JMSException to convert
686         * @return XAException wrapping original exception or its message
687         */
688        private XAException toXAException(JMSException e) {
689            if (e.getCause() != null && e.getCause() instanceof XAException) {
690                XAException original = (XAException)e.getCause();
691                XAException xae = new XAException(original.getMessage());
692                xae.errorCode = original.errorCode;
693                xae.initCause(original);
694                return xae;
695            }
696    
697            XAException xae = new XAException(e.getMessage());
698            xae.errorCode = XAException.XAER_RMFAIL;
699            xae.initCause(e);
700            return xae;
701        }
702    
703        public ActiveMQConnection getConnection() {
704            return connection;
705        }
706    
707        public void cleanup() {
708            associatedXid = null;
709            transactionId = null;
710        }
711    }