001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.pool; 019 020 import java.util.HashMap; 021 import java.util.Map; 022 023 import javax.jms.Connection; 024 import javax.jms.ConnectionConsumer; 025 import javax.jms.ConnectionMetaData; 026 import javax.jms.Destination; 027 import javax.jms.ExceptionListener; 028 import javax.jms.JMSException; 029 import javax.jms.Queue; 030 import javax.jms.QueueConnection; 031 import javax.jms.QueueSession; 032 import javax.jms.ServerSessionPool; 033 import javax.jms.Session; 034 import javax.jms.Topic; 035 import javax.jms.TopicConnection; 036 import javax.jms.TopicSession; 037 038 import org.activemq.ActiveMQConnection; 039 import org.activemq.ActiveMQSession; 040 import org.activemq.AlreadyClosedException; 041 042 /** 043 * Represents a proxy {@link Connection} which is-a 044 * {@link TopicConnection} and {@link QueueConnection} which is pooled and on {@link #close()} 045 * will return itself to the sessionPool. 046 * 047 * @version $Revision: 1.1.1.1 $ 048 */ 049 public class PooledConnection implements TopicConnection, QueueConnection { 050 051 private ActiveMQConnection connection; 052 private Map cache; 053 private boolean stopped; 054 055 public PooledConnection(ActiveMQConnection connection) { 056 this(connection, new HashMap()); 057 } 058 059 public PooledConnection(ActiveMQConnection connection, Map cache) { 060 this.connection = connection; 061 this.cache = cache; 062 } 063 064 /** 065 * Factory method to create a new instance. 066 */ 067 public PooledConnection newInstance() { 068 return new PooledConnection(connection, cache); 069 } 070 071 public void close() throws JMSException { 072 connection = null; 073 } 074 075 public void start() throws JMSException { 076 // TODO should we start connections first before pooling them? 077 getConnection().start(); 078 } 079 080 public void stop() throws JMSException { 081 stopped = true; 082 } 083 084 public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 085 return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages); 086 } 087 088 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 089 return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages); 090 } 091 092 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException { 093 return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i); 094 } 095 096 public String getClientID() throws JMSException { 097 return getConnection().getClientID(); 098 } 099 100 public ExceptionListener getExceptionListener() throws JMSException { 101 return getConnection().getExceptionListener(); 102 } 103 104 public ConnectionMetaData getMetaData() throws JMSException { 105 return getConnection().getMetaData(); 106 } 107 108 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 109 getConnection().setExceptionListener(exceptionListener); 110 } 111 112 public void setClientID(String clientID) throws JMSException { 113 getConnection().setClientID(clientID); 114 } 115 116 public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 117 return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages); 118 } 119 120 121 // Session factory methods 122 //------------------------------------------------------------------------- 123 public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException { 124 return (QueueSession) createSession(transacted, ackMode); 125 } 126 127 public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException { 128 return (TopicSession) createSession(transacted, ackMode); 129 } 130 131 public Session createSession(boolean transacted, int ackMode) throws JMSException { 132 SessionKey key = new SessionKey(transacted, ackMode); 133 SessionPool pool = (SessionPool) cache.get(key); 134 if (pool == null) { 135 pool = new SessionPool(getConnection(), key); 136 cache.put(key, pool); 137 } 138 return pool.borrowSession(); 139 } 140 141 142 // Implementation methods 143 //------------------------------------------------------------------------- 144 protected ActiveMQConnection getConnection() throws JMSException { 145 if (stopped || connection == null) { 146 throw new AlreadyClosedException(); 147 } 148 return connection; 149 } 150 151 protected ActiveMQSession createSession(SessionKey key) throws JMSException { 152 return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode()); 153 } 154 155 }