001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.ra;
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    
023    import javax.jms.Connection;
024    import javax.jms.ConnectionConsumer;
025    import javax.jms.ConnectionMetaData;
026    import javax.jms.Destination;
027    import javax.jms.ExceptionListener;
028    import javax.jms.IllegalStateException;
029    import javax.jms.JMSException;
030    import javax.jms.Queue;
031    import javax.jms.QueueConnection;
032    import javax.jms.QueueSession;
033    import javax.jms.ServerSessionPool;
034    import javax.jms.Session;
035    import javax.jms.Topic;
036    import javax.jms.TopicConnection;
037    import javax.jms.TopicSession;
038    
039    import org.activemq.ActiveMQQueueSession;
040    import org.activemq.ActiveMQSession;
041    import org.activemq.ActiveMQTopicSession;
042    
043    
044    /**
045     * Acts as a pass through proxy for a JMS Connection object.
046     * It intercepts events that are of interest of the ActiveMQManagedConnection.
047     *
048     * @version $Revision: 1.1.1.1 $
049     */
050    public class JMSConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
051    
052        private ActiveMQManagedConnection managedConnection;
053        private ArrayList sessions = new ArrayList();
054        private ExceptionListener exceptionListener;
055    
056        public JMSConnectionProxy(ActiveMQManagedConnection managedConnection) {
057            this.managedConnection = managedConnection;
058        }
059    
060        /**
061         * Used to let the ActiveMQManagedConnection that this connection
062         * handel is not needed by the app.
063         *
064         * @throws JMSException
065         */
066        public void close() throws JMSException {
067            if( managedConnection!=null ) {
068                managedConnection.proxyClosedEvent(this);
069            }
070        }
071    
072        /**
073         * Called by the ActiveMQManagedConnection to invalidate this proxy.
074         */
075        public void cleanup() {
076            exceptionListener=null;
077            managedConnection = null;
078            for (Iterator iter = sessions.iterator(); iter.hasNext();) {
079                JMSSessionProxy p = (JMSSessionProxy) iter.next();
080                try {
081                    p.cleanup();
082                } catch (JMSException ignore) {
083                }
084                iter.remove();
085            }
086        }
087    
088        /**
089         * 
090         */
091        private Connection getConnection() throws JMSException {
092            if (managedConnection == null) {
093                throw new IllegalStateException("The Connection is closed");
094            }
095            return managedConnection.getPhysicalConnection();
096        }
097    
098        /**
099         * @param transacted
100         * @param acknowledgeMode
101         * @return
102         * @throws JMSException
103         */
104        public Session createSession(boolean transacted, int acknowledgeMode)
105                throws JMSException {
106            return createSessionProxy(transacted, acknowledgeMode);
107        }
108    
109        /**
110         * @param acknowledgeMode
111         * @param transacted
112         * @return
113         * @throws JMSException
114         */
115        private JMSSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {        
116            ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode);
117            RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext());
118            session.setTransactionContext(txContext);        
119            JMSSessionProxy p = new JMSSessionProxy(session);        
120            p.setUseSharedTxContext(managedConnection.isInManagedTx());
121            sessions.add(p);        
122            return p;
123        }
124    
125        public void setUseSharedTxContext(boolean enable) throws JMSException {
126            for (Iterator iter = sessions.iterator(); iter.hasNext();) {
127                JMSSessionProxy p = (JMSSessionProxy) iter.next();
128                p.setUseSharedTxContext(enable);
129            }
130        }
131    
132        /**
133         * @param transacted
134         * @param acknowledgeMode
135         * @return
136         * @throws JMSException
137         */
138        public QueueSession createQueueSession(boolean transacted,
139                                               int acknowledgeMode) throws JMSException {
140            return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
141        }
142    
143        /**
144         * @param transacted
145         * @param acknowledgeMode
146         * @return
147         * @throws JMSException
148         */
149        public TopicSession createTopicSession(boolean transacted,
150                                               int acknowledgeMode) throws JMSException {
151            return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
152        }
153    
154        /**
155         * @return
156         * @throws JMSException
157         */
158        public String getClientID() throws JMSException {
159            return getConnection().getClientID();
160        }
161    
162        /**
163         * @return
164         * @throws JMSException
165         */
166        public ExceptionListener getExceptionListener() throws JMSException {
167            return getConnection().getExceptionListener();
168        }
169    
170        /**
171         * @return
172         * @throws JMSException
173         */
174        public ConnectionMetaData getMetaData() throws JMSException {
175            return getConnection().getMetaData();
176        }
177    
178        /**
179         * @param clientID
180         * @throws JMSException
181         */
182        public void setClientID(String clientID) throws JMSException {
183            getConnection().setClientID(clientID);
184        }
185    
186        /**
187         * @param listener
188         * @throws JMSException
189         */
190        public void setExceptionListener(ExceptionListener listener)
191                throws JMSException {
192            getConnection();
193            exceptionListener = listener;
194        }
195    
196        /**
197         * @throws JMSException
198         */
199        public void start() throws JMSException {
200            getConnection().start();
201        }
202    
203        /**
204         * @throws JMSException
205         */
206        public void stop() throws JMSException {
207            getConnection().stop();
208        }
209    
210    
211        /**
212         * @param queue
213         * @param messageSelector
214         * @param sessionPool
215         * @param maxMessages
216         * @return
217         * @throws JMSException
218         */
219        public ConnectionConsumer createConnectionConsumer(Queue queue,
220                                                           String messageSelector, ServerSessionPool sessionPool,
221                                                           int maxMessages) throws JMSException {
222            throw new JMSException("Not Supported.");
223        }
224    
225        /**
226         * @param topic
227         * @param messageSelector
228         * @param sessionPool
229         * @param maxMessages
230         * @return
231         * @throws JMSException
232         */
233        public ConnectionConsumer createConnectionConsumer(Topic topic,
234                                                           String messageSelector, ServerSessionPool sessionPool,
235                                                           int maxMessages) throws JMSException {
236            throw new JMSException("Not Supported.");
237        }
238    
239        /**
240         * @param destination
241         * @param messageSelector
242         * @param sessionPool
243         * @param maxMessages
244         * @return
245         * @throws JMSException
246         */
247        public ConnectionConsumer createConnectionConsumer(Destination destination,
248                                                           String messageSelector, ServerSessionPool sessionPool,
249                                                           int maxMessages) throws JMSException {
250            throw new JMSException("Not Supported.");
251        }
252    
253        /**
254         * @param topic
255         * @param subscriptionName
256         * @param messageSelector
257         * @param sessionPool
258         * @param maxMessages
259         * @return
260         * @throws JMSException
261         */
262        public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
263                                                                  String subscriptionName, String messageSelector,
264                                                                  ServerSessionPool sessionPool, int maxMessages) throws JMSException {
265            throw new JMSException("Not Supported.");
266        }
267    
268        /**
269         * @return Returns the managedConnection.
270         */
271        public ActiveMQManagedConnection getManagedConnection() {
272            return managedConnection;
273        }
274    
275        public void onException(JMSException e) {
276            if(exceptionListener!=null && managedConnection!=null) {
277                try {
278                    exceptionListener.onException(e);
279                } catch (Throwable ignore) {
280                    // We can never trust user code so ignore any exceptions.
281                }
282            }
283        }
284    
285    }