001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.pool; 018 019 import javax.jms.Connection; 020 import javax.jms.ConnectionConsumer; 021 import javax.jms.ConnectionMetaData; 022 import javax.jms.Destination; 023 import javax.jms.ExceptionListener; 024 import javax.jms.JMSException; 025 import javax.jms.Queue; 026 import javax.jms.QueueConnection; 027 import javax.jms.QueueSession; 028 import javax.jms.ServerSessionPool; 029 import javax.jms.Session; 030 import javax.jms.Topic; 031 import javax.jms.TopicConnection; 032 import javax.jms.TopicSession; 033 034 import org.apache.activemq.ActiveMQConnection; 035 import org.apache.activemq.ActiveMQSession; 036 import org.apache.activemq.AlreadyClosedException; 037 import org.apache.activemq.EnhancedConnection; 038 import org.apache.activemq.advisory.DestinationSource; 039 040 /** 041 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and 042 * {@link QueueConnection} which is pooled and on {@link #close()} will return 043 * itself to the sessionPool. 044 * 045 * <b>NOTE</b> this implementation is only intended for use when sending 046 * messages. It does not deal with pooling of consumers; for that look at a 047 * library like <a href="http://jencks.org/">Jencks</a> such as in <a 048 * href="http://jencks.org/Message+Driven+POJOs">this example</a> 049 * 050 * @version $Revision: 1.1.1.1 $ 051 */ 052 public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection { 053 054 private ConnectionPool pool; 055 private boolean stopped; 056 057 public PooledConnection(ConnectionPool pool) { 058 this.pool = pool; 059 this.pool.incrementReferenceCount(); 060 } 061 062 /** 063 * Factory method to create a new instance. 064 */ 065 public PooledConnection newInstance() { 066 return new PooledConnection(pool); 067 } 068 069 public void close() throws JMSException { 070 if (this.pool != null) { 071 this.pool.decrementReferenceCount(); 072 this.pool = null; 073 } 074 } 075 076 public void start() throws JMSException { 077 assertNotClosed(); 078 pool.start(); 079 } 080 081 public void stop() throws JMSException { 082 stopped = true; 083 } 084 085 public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, 086 ServerSessionPool serverSessionPool, int maxMessages) 087 throws JMSException { 088 return getConnection() 089 .createConnectionConsumer(destination, selector, serverSessionPool, maxMessages); 090 } 091 092 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, 093 ServerSessionPool serverSessionPool, int maxMessages) 094 throws JMSException { 095 return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages); 096 } 097 098 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, 099 ServerSessionPool serverSessionPool, int i) 100 throws JMSException { 101 return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i); 102 } 103 104 public String getClientID() throws JMSException { 105 return getConnection().getClientID(); 106 } 107 108 public ExceptionListener getExceptionListener() throws JMSException { 109 return getConnection().getExceptionListener(); 110 } 111 112 public ConnectionMetaData getMetaData() throws JMSException { 113 return getConnection().getMetaData(); 114 } 115 116 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 117 getConnection().setExceptionListener(exceptionListener); 118 } 119 120 public void setClientID(String clientID) throws JMSException { 121 getConnection().setClientID(clientID); 122 } 123 124 public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, 125 ServerSessionPool serverSessionPool, int maxMessages) 126 throws JMSException { 127 return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages); 128 } 129 130 // Session factory methods 131 // ------------------------------------------------------------------------- 132 public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException { 133 return (QueueSession)createSession(transacted, ackMode); 134 } 135 136 public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException { 137 return (TopicSession)createSession(transacted, ackMode); 138 } 139 140 public Session createSession(boolean transacted, int ackMode) throws JMSException { 141 return pool.createSession(transacted, ackMode); 142 } 143 144 // EnhancedCollection API 145 // ------------------------------------------------------------------------- 146 147 public DestinationSource getDestinationSource() throws JMSException { 148 return getConnection().getDestinationSource(); 149 } 150 151 // Implementation methods 152 // ------------------------------------------------------------------------- 153 154 ActiveMQConnection getConnection() throws JMSException { 155 assertNotClosed(); 156 return pool.getConnection(); 157 } 158 159 protected void assertNotClosed() throws AlreadyClosedException { 160 if (stopped || pool == null) { 161 throw new AlreadyClosedException(); 162 } 163 } 164 165 protected ActiveMQSession createSession(SessionKey key) throws JMSException { 166 return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode()); 167 } 168 169 public String toString() { 170 return "PooledConnection { " + pool + " }"; 171 } 172 173 }