001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.pool; 018 019 import java.util.HashMap; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.Map; 023 import java.util.concurrent.atomic.AtomicBoolean; 024 import javax.jms.Connection; 025 import javax.jms.ConnectionFactory; 026 import javax.jms.JMSException; 027 import org.apache.activemq.ActiveMQConnection; 028 import org.apache.activemq.ActiveMQConnectionFactory; 029 import org.apache.activemq.Service; 030 import org.apache.activemq.util.IOExceptionSupport; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 import org.apache.commons.pool.ObjectPoolFactory; 034 import org.apache.commons.pool.impl.GenericObjectPoolFactory; 035 036 /** 037 * A JMS provider which pools Connection, Session and MessageProducer instances 038 * so it can be used with tools like Spring's <a 039 * href="http://activemq.org/Spring+Support">JmsTemplate</a>. 040 * 041 * <b>NOTE</b> this implementation is only intended for use when sending 042 * messages. It does not deal with pooling of consumers; for that look at a 043 * library like <a href="http://jencks.org/">Jencks</a> such as in <a 044 * href="http://jencks.org/Message+Driven+POJOs">this example</a> 045 * 046 * @org.apache.xbean.XBean element="pooledConnectionFactory" 047 * 048 * @version $Revision: 1.1 $ 049 */ 050 public class PooledConnectionFactory implements ConnectionFactory, Service { 051 private static final transient Log LOG = LogFactory.getLog(PooledConnectionFactory.class); 052 private ConnectionFactory connectionFactory; 053 private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); 054 private ObjectPoolFactory poolFactory; 055 private int maximumActive = 500; 056 private int maxConnections = 1; 057 private int idleTimeout = 30 * 1000; 058 private AtomicBoolean stopped = new AtomicBoolean(false); 059 060 public PooledConnectionFactory() { 061 this(new ActiveMQConnectionFactory()); 062 } 063 064 public PooledConnectionFactory(String brokerURL) { 065 this(new ActiveMQConnectionFactory(brokerURL)); 066 } 067 068 public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { 069 this.connectionFactory = connectionFactory; 070 } 071 072 public ConnectionFactory getConnectionFactory() { 073 return connectionFactory; 074 } 075 076 public void setConnectionFactory(ConnectionFactory connectionFactory) { 077 this.connectionFactory = connectionFactory; 078 } 079 080 public Connection createConnection() throws JMSException { 081 return createConnection(null, null); 082 } 083 084 public synchronized Connection createConnection(String userName, String password) throws JMSException { 085 if (stopped.get()) { 086 LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); 087 return null; 088 } 089 090 ConnectionKey key = new ConnectionKey(userName, password); 091 LinkedList<ConnectionPool> pools = cache.get(key); 092 093 if (pools == null) { 094 pools = new LinkedList<ConnectionPool>(); 095 cache.put(key, pools); 096 } 097 098 ConnectionPool connection = null; 099 if (pools.size() == maxConnections) { 100 connection = pools.removeFirst(); 101 } 102 103 // Now.. we might get a connection, but it might be that we need to 104 // dump it.. 105 if (connection != null && connection.expiredCheck()) { 106 connection = null; 107 } 108 109 if (connection == null) { 110 ActiveMQConnection delegate = createConnection(key); 111 connection = createConnectionPool(delegate); 112 } 113 pools.add(connection); 114 return new PooledConnection(connection); 115 } 116 117 protected ConnectionPool createConnectionPool(ActiveMQConnection connection) { 118 ConnectionPool result = new ConnectionPool(connection, getPoolFactory()); 119 result.setIdleTimeout(getIdleTimeout()); 120 return result; 121 } 122 123 protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException { 124 if (key.getUserName() == null && key.getPassword() == null) { 125 return (ActiveMQConnection)connectionFactory.createConnection(); 126 } else { 127 return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword()); 128 } 129 } 130 131 /** 132 * @see org.apache.activemq.service.Service#start() 133 */ 134 public void start() { 135 try { 136 stopped.set(false); 137 createConnection(); 138 } catch (JMSException e) { 139 LOG.warn("Create pooled connection during start failed.", e); 140 IOExceptionSupport.create(e); 141 } 142 } 143 144 public void stop() { 145 LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size()); 146 stopped.set(true); 147 for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) { 148 LinkedList list = iter.next(); 149 for (Iterator i = list.iterator(); i.hasNext();) { 150 ConnectionPool connection = (ConnectionPool) i.next(); 151 try { 152 connection.close(); 153 }catch(Exception e) { 154 LOG.warn("Close connection failed",e); 155 } 156 } 157 } 158 cache.clear(); 159 } 160 161 public ObjectPoolFactory getPoolFactory() { 162 if (poolFactory == null) { 163 poolFactory = createPoolFactory(); 164 } 165 return poolFactory; 166 } 167 168 /** 169 * Sets the object pool factory used to create individual session pools for 170 * each connection 171 */ 172 public void setPoolFactory(ObjectPoolFactory poolFactory) { 173 this.poolFactory = poolFactory; 174 } 175 176 public int getMaximumActive() { 177 return maximumActive; 178 } 179 180 /** 181 * Sets the maximum number of active sessions per connection 182 */ 183 public void setMaximumActive(int maximumActive) { 184 this.maximumActive = maximumActive; 185 } 186 187 /** 188 * @return the maxConnections 189 */ 190 public int getMaxConnections() { 191 return maxConnections; 192 } 193 194 /** 195 * @param maxConnections the maxConnections to set 196 */ 197 public void setMaxConnections(int maxConnections) { 198 this.maxConnections = maxConnections; 199 } 200 201 protected ObjectPoolFactory createPoolFactory() { 202 return new GenericObjectPoolFactory(null, maximumActive); 203 } 204 205 public int getIdleTimeout() { 206 return idleTimeout; 207 } 208 209 public void setIdleTimeout(int idleTimeout) { 210 this.idleTimeout = idleTimeout; 211 } 212 }