001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     * Copyright 2004 Hiram Chirino
005     *
006     * Licensed under the Apache License, Version 2.0 (the "License");
007     * you may not use this file except in compliance with the License.
008     * You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     *
018     **/
019    package org.activemq;
020    
021    import java.util.ArrayList;
022    
023    import javax.jms.JMSException;
024    import javax.jms.TransactionInProgressException;
025    import javax.jms.TransactionRolledBackException;
026    import javax.transaction.xa.XAException;
027    import javax.transaction.xa.XAResource;
028    import javax.transaction.xa.Xid;
029    
030    import org.activemq.message.ActiveMQXid;
031    import org.activemq.message.IntResponseReceipt;
032    import org.activemq.message.ResponseReceipt;
033    import org.activemq.message.TransactionInfo;
034    import org.activemq.message.XATransactionInfo;
035    import org.activemq.util.IdGenerator;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * A TransactionContext provides the means to control a JMS transaction.  It provides
041     * a local transaction interface and also an XAResource interface.
042     * 
043     * <p/>
044     * An application server controls the transactional assignment of an XASession
045     * by obtaining its XAResource. It uses the XAResource to assign the session
046     * to a transaction, prepare and commit work on the transaction, and so on.
047     * <p/>
048     * An XAResource provides some fairly sophisticated facilities for
049     * interleaving work on multiple transactions, recovering a list of
050     * transactions in progress, and so on. A JTA aware JMS provider must fully
051     * implement this functionality. This could be done by using the services of a
052     * database that supports XA, or a JMS provider may choose to implement this
053     * functionality from scratch.
054     * <p/>
055     *
056     * @version $Revision: 1.1.1.1 $
057     * @see javax.jms.Session
058     * @see javax.jms.QueueSession
059     * @see javax.jms.TopicSession
060     * @see javax.jms.XASession
061     */
062    public class TransactionContext implements XAResource {
063        
064        private static final Log log = LogFactory.getLog(TransactionContext.class);
065        
066        private final ActiveMQConnection connection;
067        private final ArrayList sessions = new ArrayList(2);
068        private final IdGenerator localTransactionIdGenerator = new IdGenerator();
069    
070        // To track XA transactions.
071        private Xid associatedXid;
072        private ActiveMQXid activeXid;
073    
074        // To track local transactions.
075        private String localTransactionId;
076    
077        private LocalTransactionEventListener localTransactionEventListener;
078        
079        public TransactionContext(ActiveMQConnection connection) {
080            this.connection = connection;        
081        }
082        
083        public boolean isInXATransaction() {
084            return associatedXid!=null;
085        }
086        
087        public boolean isInLocalTransaction() {
088            return localTransactionId!=null;
089        }
090        
091        /**
092         * @return Returns the localTransactionEventListener.
093         */
094        public LocalTransactionEventListener getLocalTransactionEventListener() {
095            return localTransactionEventListener;
096        }
097    
098        /**
099         * Used by the resource adapter to listen to transaction events.
100         * 
101         * @param localTransactionEventListener The localTransactionEventListener to set.
102         */
103        public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
104            this.localTransactionEventListener = localTransactionEventListener;
105        }
106    
107        /////////////////////////////////////////////////////////////
108        //
109        // Methods that interface with the session
110        //
111        /////////////////////////////////////////////////////////////
112        public void addSession(ActiveMQSession session) {
113            sessions.add(session);
114        }
115        public void removeSession(ActiveMQSession session) {
116            sessions.remove(session);
117        }
118        
119        private void postRollback() {
120            int size = sessions.size();        
121            for(int i=0; i < size; i++ ){
122                ((ActiveMQSession)sessions.get(i)).redeliverUnacknowledgedMessages(true);
123            }
124        }
125    
126        private void postCommit() {
127            int size = sessions.size();        
128            for(int i=0; i < size; i++ ){
129                ((ActiveMQSession)sessions.get(i)).clearDeliveredMessages();
130            }
131        }
132    
133        public Object getTransactionId() {
134            if( localTransactionId!=null )
135                return localTransactionId;
136            return activeXid;
137        }
138            
139        /////////////////////////////////////////////////////////////
140        //
141        // Local transaction interface.
142        //
143        /////////////////////////////////////////////////////////////        
144        
145        /**
146         * Start a local transaction.
147         */
148        public void begin() throws JMSException {        
149            if( associatedXid!=null ) 
150                throw new TransactionInProgressException("Cannot start local transction.  XA transaction is allready in progress.");        
151            
152            if( localTransactionId==null ) {         
153                this.localTransactionId = localTransactionIdGenerator.generateId();
154                TransactionInfo info = new TransactionInfo();
155                info.setTransactionId((String)localTransactionId);
156                info.setType(TransactionInfo.START);
157                this.connection.asyncSendPacket(info);
158                
159                // Notify the listener that the tx was started.
160                if (localTransactionEventListener != null) {
161                    localTransactionEventListener.beginEvent();
162                }
163                if( log.isDebugEnabled() )
164                    log.debug("Started local transaction: "+localTransactionId);
165            }
166        }
167        
168        /**
169         * Rolls back any messages done in this transaction and releases any locks currently held.
170         * 
171         * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
172         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
173         */
174        public void rollback() throws JMSException {
175            if( associatedXid!=null ) 
176                throw new TransactionInProgressException("Cannot rollback() if an XA transaction is allready in progress ");
177            
178            if( localTransactionId!=null ) {            
179                TransactionInfo info = new TransactionInfo();
180                info.setTransactionId((String)localTransactionId);
181                info.setType(TransactionInfo.ROLLBACK);
182                //before we send, update the current transaction id
183                this.localTransactionId = null;
184                this.connection.asyncSendPacket(info);
185                // Notify the listener that the tx was rolled back
186                if (localTransactionEventListener != null) {
187                    localTransactionEventListener.rollbackEvent();
188                }
189                if( log.isDebugEnabled() )
190                    log.debug("Rolledback local transaction: "+localTransactionId);
191            }
192            postRollback();
193        }
194            
195        /**
196         * Commits all messages done in this transaction and releases any locks currently held.
197         * 
198         * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
199         * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
200         * commit.
201         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
202         */
203        public void commit() throws JMSException {
204            if( associatedXid!=null ) 
205                throw new TransactionInProgressException("Cannot commit() if an XA transaction is allready in progress ");
206    
207            // Only send commit if the transaction was started.
208            if (localTransactionId!=null) {
209                TransactionInfo info = new TransactionInfo();
210                info.setTransactionId((String)localTransactionId);
211                info.setType(TransactionInfo.COMMIT);
212                //before we send, update the current transaction id
213                this.localTransactionId = null;
214                // Notify the listener that the tx was commited back
215                this.connection.syncSendPacket(info);
216                if (localTransactionEventListener != null) {
217                    localTransactionEventListener.commitEvent();
218                }
219                if( log.isDebugEnabled() )
220                    log.debug("Committed local transaction: "+localTransactionId);
221            }
222            postCommit();
223        }
224        
225        /////////////////////////////////////////////////////////////
226        //
227        // XAResource Implementation
228        //
229        /////////////////////////////////////////////////////////////
230        /**
231         * Associates a transaction with the resource.
232         */
233        public void start(Xid xid, int flags) throws XAException {
234            if( localTransactionId!=null ) 
235                throw new XAException(XAException.XAER_PROTO);
236            
237            // Are we allready associated?
238            if (associatedXid != null) {
239                throw new XAException(XAException.XAER_PROTO);
240            }
241    
242            if ((flags & TMJOIN) == TMJOIN) {
243                // TODO: verify that the server has seen the xid
244            }
245            if ((flags & TMJOIN) == TMRESUME) {
246                // TODO: verify that the xid was suspended.
247            }
248    
249            // associate
250            setXid(xid);
251    
252        }
253    
254        public void end(Xid xid, int flags) throws XAException {
255            if( localTransactionId!=null ) 
256                throw new XAException(XAException.XAER_PROTO);
257    
258            if ((flags & TMSUSPEND) == TMSUSPEND) {
259                // You can only suspend the associated xid.
260                if (associatedXid == null || !ActiveMQXid.equals(associatedXid,xid)) {
261                    throw new XAException(XAException.XAER_PROTO);
262                }
263    
264                //TODO: we may want to put the xid in a suspended list.
265                setXid(null);
266            } else if ((flags & TMFAIL) == TMFAIL) {
267                //TODO: We need to rollback the transaction??
268                setXid(null);
269            } else if ((flags & TMSUCCESS) == TMSUCCESS) {
270                //set to null if this is the current xid.
271                //otherwise this could be an asynchronous success call
272                if (ActiveMQXid.equals(associatedXid,xid)) {
273                    setXid(null);
274                }
275            } else {
276                throw new XAException(XAException.XAER_INVAL);
277            }
278            if( log.isDebugEnabled() )
279                log.debug("Ended XA transaction: "+activeXid);
280    
281        }
282    
283        public int prepare(Xid xid) throws XAException {
284    
285            // We allow interleaving multiple transactions, so
286            // we don't limit prepare to the associated xid.
287            ActiveMQXid x;
288            //THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
289            if (ActiveMQXid.equals(associatedXid,xid)) {
290                throw new XAException(XAException.XAER_PROTO);
291            } else {
292                //TODO cache the known xids so we don't keep recreating this one??
293                x = new ActiveMQXid(xid);
294            }
295    
296            XATransactionInfo info = new XATransactionInfo();
297            info.setXid(x);
298            info.setType(XATransactionInfo.PRE_COMMIT);
299    
300            try {
301                if( log.isDebugEnabled() )
302                    log.debug("Preparing XA transaction: "+x);
303                
304                // Find out if the server wants to commit or rollback.
305                IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
306                return receipt.getResult();
307            } catch (JMSException e) {
308                throw toXAException(e);
309            }
310        }
311    
312        public void rollback(Xid xid) throws XAException {
313    
314            // We allow interleaving multiple transactions, so
315            // we don't limit rollback to the associated xid.
316            ActiveMQXid x;
317            if (ActiveMQXid.equals(associatedXid,xid)) {
318                //I think this can happen even without an end(xid) call.  Need to check spec.
319                x = activeXid;
320            } else {
321                x = new ActiveMQXid(xid);
322            }
323    
324            XATransactionInfo info = new XATransactionInfo();
325            info.setXid(x);
326            info.setType(XATransactionInfo.ROLLBACK);
327    
328            try {
329                if( log.isDebugEnabled() )
330                    log.debug("Rollingback XA transaction: "+x);
331                
332                // Let the server know that the tx is rollback.
333                this.connection.syncSendPacket(info);
334            } catch (JMSException e) {
335                throw toXAException(e);
336            }
337            
338            postRollback();
339        }
340    
341        // XAResource interface
342        public void commit(Xid xid, boolean onePhase) throws XAException {
343    
344            // We allow interleaving multiple transactions, so
345            // we don't limit commit to the associated xid.
346            ActiveMQXid x;
347            if (ActiveMQXid.equals(associatedXid,xid)) {
348                //should never happen, end(xid,TMSUCCESS) must have been previously called
349                throw new XAException(XAException.XAER_PROTO);
350            } else {
351                x = new ActiveMQXid(xid);
352            }
353    
354            XATransactionInfo info = new XATransactionInfo();
355            info.setXid(x);
356            info.setType(onePhase ? XATransactionInfo.COMMIT_ONE_PHASE : XATransactionInfo.COMMIT);
357    
358            try {
359                if( log.isDebugEnabled() )
360                    log.debug("Committing XA transaction: "+x);
361                
362                // Notify the server that the tx was commited back
363                this.connection.syncSendPacket(info);
364            } catch (JMSException e) {
365                throw toXAException(e);
366            }
367    
368            postCommit();
369        }
370    
371    
372        public void forget(Xid xid) throws XAException {
373    
374            // We allow interleaving multiple transactions, so
375            // we don't limit forget to the associated xid.
376            ActiveMQXid x;
377            if (ActiveMQXid.equals(associatedXid,xid)) {
378                //TODO determine if this can happen... I think not.
379                x = activeXid;
380            } else {
381                x = new ActiveMQXid(xid);
382            }
383    
384            XATransactionInfo info = new XATransactionInfo();
385            info.setXid(x);
386            info.setType(XATransactionInfo.FORGET);
387    
388            try {
389                if( log.isDebugEnabled() )
390                    log.debug("Forgetting XA transaction: "+x);
391                
392                // Tell the server to forget the transaction.
393                this.connection.syncSendPacket(info);
394            } catch (JMSException e) {
395                throw toXAException(e);
396            }
397        }
398    
399        public boolean isSameRM(XAResource xaResource) throws XAException {
400            if (xaResource == null) {
401                return false;
402            }
403            if (!(xaResource instanceof TransactionContext)) {
404                return false;
405            }
406            TransactionContext xar = (TransactionContext) xaResource;
407            try {
408                return getResourceManagerId().equals(xar.getResourceManagerId());
409            } catch (Throwable e) {
410                throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
411            }
412        }
413    
414    
415        public Xid[] recover(int flag) throws XAException {
416    
417            XATransactionInfo info = new XATransactionInfo();
418            info.setType(XATransactionInfo.XA_RECOVER);
419    
420            try {
421                ResponseReceipt receipt = (ResponseReceipt) this.connection.syncSendRequest(info);
422                return (ActiveMQXid[]) receipt.getResult();
423            } catch (JMSException e) {
424                throw toXAException(e);
425            }
426        }
427    
428        public int getTransactionTimeout() throws XAException {
429    
430            XATransactionInfo info = new XATransactionInfo();
431            info.setType(XATransactionInfo.GET_TX_TIMEOUT);
432    
433            try {
434                // get the tx timeout that was set.
435                IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
436                return receipt.getResult();
437            } catch (JMSException e) {
438                throw toXAException(e);
439            }
440        }
441    
442        public boolean setTransactionTimeout(int seconds) throws XAException {
443    
444            XATransactionInfo info = new XATransactionInfo();
445            info.setType(XATransactionInfo.SET_TX_TIMEOUT);
446            info.setTransactionTimeout(seconds);
447    
448            try {
449                // Setup the new tx timeout
450                this.connection.asyncSendPacket(info);
451                return true;
452            } catch (JMSException e) {
453                throw toXAException(e);
454            }
455        }
456        
457        /////////////////////////////////////////////////////////////
458        //
459        // Helper methods.
460        //
461        /////////////////////////////////////////////////////////////
462        private String getResourceManagerId() throws JMSException {
463            return this.connection.getResourceManagerId();
464        }
465        
466        private void setXid(Xid xid) throws XAException {
467            if (xid != null) {
468                // associate
469                associatedXid = xid;
470                activeXid = new ActiveMQXid(xid);
471                
472                XATransactionInfo info = new XATransactionInfo();
473                info.setXid(activeXid);
474                info.setType(XATransactionInfo.START);
475                try {
476                    this.connection.asyncSendPacket(info);
477                    if( log.isDebugEnabled() )
478                        log.debug("Started XA transaction: "+activeXid);
479                } catch (JMSException e) {
480                    throw toXAException(e);
481                }
482                
483            } else {
484                
485                if( activeXid!=null ) {
486                    XATransactionInfo info = new XATransactionInfo();
487                    info.setXid(activeXid);
488                    info.setType(XATransactionInfo.END);
489                    try {
490                        this.connection.syncSendPacket(info);
491                        if( log.isDebugEnabled() )
492                            log.debug("Ended XA transaction: "+activeXid);
493                    } catch (JMSException e) {
494                        throw toXAException(e);
495                    }
496                }
497                
498                // dis-associate
499                associatedXid = null;
500                activeXid = null;
501            }
502        }
503    
504        /**
505         * Converts a JMSException from the server to an XAException.
506         * if the JMSException contained a linked XAException that is
507         * returned instead.
508         *
509         * @param e
510         * @return
511         */
512        private XAException toXAException(JMSException e) {
513            if (e.getCause() != null && e.getCause() instanceof XAException) {
514                XAException original = (XAException) e.getCause();
515                XAException xae = new XAException(original.getMessage());
516                xae.errorCode = original.errorCode;
517                xae.initCause(original);
518                return xae;
519            }
520    
521            XAException xae = new XAException(e.getMessage());
522            xae.errorCode = XAException.XAER_RMFAIL;
523            xae.initCause(e);
524            return xae;
525        }
526    
527        public ActiveMQConnection getConnection() {
528            return connection;
529        }
530    
531    }