001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.pool;
019    
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import javax.jms.JMSException;
027    import javax.jms.Session;
028    import javax.transaction.RollbackException;
029    import javax.transaction.Status;
030    import javax.transaction.SystemException;
031    import javax.transaction.TransactionManager;
032    import javax.transaction.xa.XAResource;
033    
034    import org.apache.activemq.ActiveMQConnection;
035    import org.apache.activemq.transport.TransportListener;
036    import org.apache.commons.pool.ObjectPoolFactory;
037    
038    /**
039     * Holds a real JMS connection along with the session pools associated with it.
040     * 
041     * @version $Revision: 668559 $
042     */
043    public class ConnectionPool {
044    
045        private ActiveMQConnection connection;
046        private Map<SessionKey, SessionPool> cache;
047        private AtomicBoolean started = new AtomicBoolean(false);
048        private int referenceCount;
049        private ObjectPoolFactory poolFactory;
050        private long lastUsed = System.currentTimeMillis();
051        private boolean hasFailed;
052        private boolean hasExpired;
053        private int idleTimeout = 30 * 1000;
054    
055        public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
056            this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
057            // Add a transport Listener so that we can notice if this connection
058            // should be expired due to
059            // a connection failure.
060            connection.addTransportListener(new TransportListener() {
061                public void onCommand(Object command) {
062                }
063    
064                public void onException(IOException error) {
065                    synchronized (ConnectionPool.this) {
066                        hasFailed = true;
067                    }
068                }
069    
070                public void transportInterupted() {
071                }
072    
073                public void transportResumed() {
074                }
075            });       
076            //
077            // make sure that we set the hasFailed flag, in case the transport already failed
078            // prior to the addition of our new TransportListener
079            //
080            if(connection.isTransportFailed()) {
081                hasFailed = true;
082            }
083        }
084    
085        public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
086            this.connection = connection;
087            this.cache = cache;
088            this.poolFactory = poolFactory;
089        }
090    
091        public void start() throws JMSException {
092            if (started.compareAndSet(false, true)) {
093                connection.start();
094            }
095        }
096    
097        public synchronized ActiveMQConnection getConnection() {
098            return connection;
099        }
100    
101        public Session createSession(boolean transacted, int ackMode) throws JMSException {
102            SessionKey key = new SessionKey(transacted, ackMode);
103            SessionPool pool = cache.get(key);
104            if (pool == null) {
105                pool = createSessionPool(key);
106                cache.put(key, pool);
107            }
108            PooledSession session = pool.borrowSession();
109            return session;
110        }
111    
112        public synchronized void close() {
113            if (connection != null) {
114                try {
115                    Iterator<SessionPool> i = cache.values().iterator();
116                    while (i.hasNext()) {
117                        SessionPool pool = i.next();
118                        i.remove();
119                        try {
120                            pool.close();
121                        } catch (Exception e) {
122                        }
123                    }
124                } finally {
125                    try {
126                        connection.close();
127                    } catch (Exception e) {
128                    } finally {
129                        connection = null;
130                    }
131                }
132            }
133        }
134    
135        public synchronized void incrementReferenceCount() {
136            referenceCount++;
137            lastUsed = System.currentTimeMillis();
138        }
139    
140        public synchronized void decrementReferenceCount() {
141            referenceCount--;
142            lastUsed = System.currentTimeMillis();
143            if (referenceCount == 0) {
144                expiredCheck();
145            }
146        }
147    
148        /**
149         * @return true if this connection has expired.
150         */
151        public synchronized boolean expiredCheck() {
152            if (connection == null) {
153                return true;
154            }
155            if (hasExpired) {
156                if (referenceCount == 0) {
157                    close();
158                }
159                return true;
160            }
161            if (hasFailed || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)) {
162                hasExpired = true;
163                if (referenceCount == 0) {
164                    close();
165                }
166                return true;
167            }
168            return false;
169        }
170    
171        public int getIdleTimeout() {
172            return idleTimeout;
173        }
174    
175        public void setIdleTimeout(int idleTimeout) {
176            this.idleTimeout = idleTimeout;
177        }
178    
179        protected SessionPool createSessionPool(SessionKey key) {
180            return new SessionPool(this, key, poolFactory.createPool());
181        }
182    
183    }