001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.LinkedList; 023 import java.util.List; 024 025 import javax.jms.JMSException; 026 import javax.jms.ServerSession; 027 import javax.jms.ServerSessionPool; 028 import javax.jms.Session; 029 import javax.resource.spi.UnavailableException; 030 import javax.resource.spi.endpoint.MessageEndpoint; 031 032 import org.activemq.ActiveMQQueueSession; 033 import org.activemq.ActiveMQSession; 034 import org.activemq.ActiveMQTopicSession; 035 import org.activemq.message.ActiveMQMessage; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:11 $ 041 */ 042 public class ServerSessionPoolImpl implements ServerSessionPool { 043 044 private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class); 045 046 private final ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker; 047 private final int maxSessions; 048 049 private ArrayList idleSessions = new ArrayList(); 050 private LinkedList activeSessions = new LinkedList(); 051 private boolean closing = false; 052 053 public ServerSessionPoolImpl(ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { 054 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; 055 this.maxSessions=maxSessions; 056 } 057 058 private ServerSessionImpl createServerSessionImpl() throws JMSException { 059 ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 060 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 061 final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge); 062 MessageEndpoint endpoint; 063 try { 064 int batchSize = 0; 065 if (activationSpec.getEnableBatchBooleanValue()) { 066 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 067 } 068 if( activationSpec.isUseRAManagedTransactionEnabled() ) { 069 // The RA will manage the transaction commit. 070 endpoint = createEndpoint(null); 071 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); 072 } else { 073 // Give the container an object to manage to transaction with. 074 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 075 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); 076 } 077 } catch (UnavailableException e) { 078 // The container could be limiting us on the number of endpoints 079 // that are being created. 080 log.debug("Could not create an endpoint.", e); 081 session.close(); 082 return null; 083 } 084 } 085 086 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 087 MessageEndpoint endpoint; 088 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 089 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 090 return endpointProxy; 091 } 092 093 /** 094 */ 095 synchronized public ServerSession getServerSession() throws JMSException { 096 log.debug("ServerSession requested."); 097 if (closing) { 098 throw new JMSException("Session Pool Shutting Down."); 099 } 100 101 if (idleSessions.size() > 0) { 102 ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1); 103 activeSessions.addLast(ss); 104 log.debug("Using idle session: " + ss); 105 return ss; 106 } else { 107 // Are we at the upper limit? 108 if (activeSessions.size() >= maxSessions) { 109 // then reuse the allready created sessions.. 110 // This is going to queue up messages into a session for 111 // processing. 112 return getExistingServerSession(); 113 } 114 ServerSessionImpl ss = createServerSessionImpl(); 115 // We may not be able to create a session due to the container 116 // restricting us. 117 if (ss == null) { 118 if( idleSessions.size() == 0 ) { 119 throw new JMSException("Endpoint factory did not allows to any endpoints."); 120 } 121 122 return getExistingServerSession(); 123 } 124 activeSessions.addLast(ss); 125 log.debug("Created a new session: " + ss); 126 return ss; 127 } 128 } 129 130 /** 131 * @param message 132 * @throws JMSException 133 */ 134 private void dispatchToSession(ActiveMQMessage message) throws JMSException { 135 136 ServerSession serverSession = getServerSession(); 137 Session nestedSession = serverSession.getSession(); 138 ActiveMQSession session = null; 139 if (nestedSession instanceof ActiveMQSession) { 140 session = (ActiveMQSession) nestedSession; 141 } else if (nestedSession instanceof ActiveMQTopicSession) { 142 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession; 143 session = (ActiveMQSession) topicSession.getNext(); 144 } else if (nestedSession instanceof ActiveMQQueueSession) { 145 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession; 146 session = (ActiveMQSession) queueSession.getNext(); 147 } else { 148 throw new JMSException("Invalid instance of session obtained from server session." + 149 "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " + 150 "Found instance of " + nestedSession.getClass().getName()); 151 } 152 session.dispatch(message); 153 serverSession.start(); 154 } 155 156 157 /** 158 * @return 159 */ 160 private ServerSession getExistingServerSession() { 161 ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst(); 162 activeSessions.addLast(ss); 163 log.debug("Reusing an active session: " + ss); 164 return ss; 165 } 166 167 synchronized public void returnToPool(ServerSessionImpl ss) { 168 log.debug("Session returned to pool: " + ss); 169 activeSessions.remove(ss); 170 idleSessions.add(ss); 171 notify(); 172 } 173 174 synchronized public void removeFromPool(ServerSessionImpl ss) { 175 activeSessions.remove(ss); 176 try { 177 ActiveMQSession session = (ActiveMQSession) ss.getSession(); 178 List l = session.getUnconsumedMessages(); 179 for (Iterator i = l.iterator(); i.hasNext();) { 180 dispatchToSession((ActiveMQMessage) i.next()); 181 } 182 } catch (Throwable t) { 183 log.error("Error redispatching unconsumed messages from stale session", t); 184 } 185 ss.close(); 186 notify(); 187 } 188 189 public void close() { 190 synchronized (this) { 191 closing = true; 192 closeIdleSessions(); 193 while( activeSessions.size() > 0 ) { 194 try { 195 wait(); 196 } catch (InterruptedException e) { 197 Thread.currentThread().interrupt(); 198 return; 199 } 200 closeIdleSessions(); 201 } 202 } 203 } 204 205 private void closeIdleSessions() { 206 for (Iterator iter = idleSessions.iterator(); iter.hasNext();) { 207 ServerSessionImpl ss = (ServerSessionImpl) iter.next(); 208 ss.close(); 209 } 210 } 211 212 }