001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import javax.jms.ConnectionConsumer;
022    import javax.jms.IllegalStateException;
023    import javax.jms.JMSException;
024    import javax.jms.ServerSession;
025    import javax.jms.ServerSessionPool;
026    import javax.jms.Session;
027    
028    import org.activemq.io.util.MemoryBoundedQueue;
029    import org.activemq.message.ActiveMQMessage;
030    import org.activemq.message.ConsumerInfo;
031    
032    /**
033     * For application servers, <CODE>Connection</CODE> objects provide a special
034     * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
035     * messages it is to consume are specified by a <CODE>Destination</CODE> and a
036     * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
037     * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
038     * <p/>
039     * <P>
040     * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
041     * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
042     * and starts it. As traffic picks up, messages can back up. If this happens, a
043     * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
044     * with more than one message. This reduces the thread context switches and
045     * minimizes resource use at the expense of some serialization of message
046     * processing.
047     * 
048     * @see javax.jms.Connection#createConnectionConsumer
049     * @see javax.jms.Connection#createDurableConnectionConsumer
050     * @see javax.jms.QueueConnection#createConnectionConsumer
051     * @see javax.jms.TopicConnection#createConnectionConsumer
052     * @see javax.jms.TopicConnection#createDurableConnectionConsumer
053     */
054    
055    public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher {
056    
057        private ActiveMQConnection connection;
058    
059        private ServerSessionPool sessionPool;
060    
061        private ConsumerInfo consumerInfo;
062    
063        private boolean closed;
064    
065        protected MemoryBoundedQueue messageQueue;
066    
067        /**
068         * Create a ConnectionConsumer
069         * 
070         * @param theConnection
071         * @param theSessionPool
072         * @param theConsumerInfo
073         * @param theMaximumMessages
074         * @throws JMSException
075         */
076        protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool,
077                ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
078            this.connection = theConnection;
079            this.sessionPool = theSessionPool;
080            this.consumerInfo = theConsumerInfo;
081            this.connection.addConnectionConsumer(this);
082            this.consumerInfo.setStarted(true);
083            this.consumerInfo.setPrefetchNumber(theMaximumMessages);
084            this.connection.syncSendPacket(this.consumerInfo);
085    
086            String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":"
087                    + theConsumerInfo.getConsumerNo();
088            this.messageQueue = connection.getMemoryBoundedQueue(queueName);
089        }
090    
091        /**
092         * Tests to see if the Message Dispatcher is a target for this message
093         * 
094         * @param message
095         *            the message to test
096         * @return true if the Message Dispatcher can dispatch the message
097         */
098        public boolean isTarget(ActiveMQMessage message) {
099            return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
100        }
101    
102        /**
103         * Dispatch an ActiveMQMessage
104         * 
105         * @param message
106         */
107        public void dispatch(ActiveMQMessage message) {
108            if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
109                message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
110                message.setTransientConsumed(!this.consumerInfo.isDurableTopic()
111                        && !this.consumerInfo.getDestination().isQueue());
112                try {
113                    if (sessionPool != null)
114                        dispatchToSession(message);
115                    else
116                        dispatchToQueue(message);
117                } catch (JMSException jmsEx) {
118                    this.connection.handleAsyncException(jmsEx);
119                }
120            }
121        }
122    
123        /**
124         * @param message
125         * @throws JMSException
126         */
127        private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
128            messageQueue.enqueue(message);
129        }
130    
131        /**
132         * Receives the next message that arrives within the specified timeout
133         * interval.
134         * 
135         * @throws JMSException
136         */
137        public ActiveMQMessage receive(long timeout) throws JMSException {
138            try {
139                ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
140                return message;
141            } catch (InterruptedException ioe) {
142                return null;
143            }
144        }
145    
146        /**
147         * @param message
148         * @throws JMSException
149         */
150        private void dispatchToSession(ActiveMQMessage message) throws JMSException {
151    
152            ServerSession serverSession = sessionPool.getServerSession();
153            Session nestedSession = serverSession.getSession();
154            ActiveMQSession session = null;
155            if (nestedSession instanceof ActiveMQSession) {
156                session = (ActiveMQSession) nestedSession;
157            } else if (nestedSession instanceof ActiveMQTopicSession) {
158                ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
159                session = (ActiveMQSession) topicSession.getNext();
160            } else if (nestedSession instanceof ActiveMQQueueSession) {
161                ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
162                session = (ActiveMQSession) queueSession.getNext();
163            } else {
164                throw new JMSException("Invalid instance of session obtained from server session." +
165                "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
166                "Found instance of " + nestedSession.getClass().getName());
167            }
168            session.dispatch(message);
169            serverSession.start();
170        }
171    
172        /**
173         * Gets the server session pool associated with this connection consumer.
174         * 
175         * @return the server session pool used by this connection consumer
176         * @throws JMSException
177         *             if the JMS provider fails to get the server session pool
178         *             associated with this consumer due to some internal error.
179         */
180    
181        public ServerSessionPool getServerSessionPool() throws JMSException {
182            if (closed) {
183                throw new IllegalStateException("The Connection Consumer is closed");
184            }
185            return this.sessionPool;
186        }
187    
188        /**
189         * Closes the connection consumer. <p/>
190         * <P>
191         * Since a provider may allocate some resources on behalf of a connection
192         * consumer outside the Java virtual machine, clients should close these
193         * resources when they are not needed. Relying on garbage collection to
194         * eventually reclaim these resources may not be timely enough.
195         * 
196         * @throws JMSException
197         */
198    
199        public void close() throws JMSException {
200            if (!closed) {
201                closed = true;
202                this.consumerInfo.setStarted(false);
203                this.connection.asyncSendPacket(this.consumerInfo);
204                this.connection.removeConnectionConsumer(this);
205            }
206    
207        }
208    }