001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.List;
021    
022    import javax.jms.JMSException;
023    
024    import org.apache.activemq.command.ConsumerId;
025    import org.apache.activemq.command.MessageDispatch;
026    import org.apache.activemq.thread.Task;
027    import org.apache.activemq.thread.TaskRunner;
028    import org.apache.activemq.util.JMSExceptionSupport;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A utility class used by the Session for dispatching messages asynchronously
034     * to consumers
035     * 
036     * @version $Revision$
037     * @see javax.jms.Session
038     */
039    public class ActiveMQSessionExecutor implements Task {
040        private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
041    
042        private ActiveMQSession session;
043        private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
044        private boolean dispatchedBySessionPool;
045        private volatile TaskRunner taskRunner;
046        private boolean startedOrWarnedThatNotStarted;
047    
048        ActiveMQSessionExecutor(ActiveMQSession session) {
049            this.session = session;
050        }
051    
052        void setDispatchedBySessionPool(boolean value) {
053            dispatchedBySessionPool = value;
054            wakeup();
055        }
056    
057        void execute(MessageDispatch message) throws InterruptedException {
058    
059            if (!startedOrWarnedThatNotStarted) {
060    
061                ActiveMQConnection connection = session.connection;
062                long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
063                if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
064                    startedOrWarnedThatNotStarted = true;
065                } else {
066                    long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
067    
068                    // lets only warn when a significant amount of time has passed
069                    // just in case its normal operation
070                    if (elapsedTime > aboutUnstartedConnectionTimeout) {
071                        LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
072                                 + " Received: " + message);
073                        startedOrWarnedThatNotStarted = true;
074                    }
075                }
076            }
077    
078            if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
079                dispatch(message);
080            } else {
081                messageQueue.enqueue(message);
082                wakeup();
083            }
084        }
085    
086        public void wakeup() {
087            if (!dispatchedBySessionPool) {
088                if (session.isSessionAsyncDispatch()) {
089                    try {
090                        TaskRunner taskRunner = this.taskRunner;
091                        if (taskRunner == null) {
092                            synchronized (this) {
093                                if (this.taskRunner == null) {
094                                    if (!isRunning()) {
095                                        // stop has been called
096                                        return;
097                                    }
098                                    this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
099                                            "ActiveMQ Session: " + session.getSessionId());
100                                }
101                                taskRunner = this.taskRunner;
102                            }
103                        }
104                        taskRunner.wakeup();
105                    } catch (InterruptedException e) {
106                        Thread.currentThread().interrupt();
107                    }
108                } else {
109                    while (iterate()) {
110                    }
111                }
112            }
113        }
114    
115        void executeFirst(MessageDispatch message) {
116            messageQueue.enqueueFirst(message);
117            wakeup();
118        }
119    
120        public boolean hasUncomsumedMessages() {
121            return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
122        }
123    
124        void dispatch(MessageDispatch message) {
125    
126            // TODO - we should use a Map for this indexed by consumerId
127    
128            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
129                ConsumerId consumerId = message.getConsumerId();
130                if (consumerId.equals(consumer.getConsumerId())) {
131                    consumer.dispatch(message);
132                    break;
133                }
134            }
135        }
136    
137        synchronized void start() {
138            if (!messageQueue.isRunning()) {
139                messageQueue.start();
140                if (hasUncomsumedMessages()) {
141                    wakeup();
142                }
143            }
144        }
145    
146        void stop() throws JMSException {
147            try {
148                if (messageQueue.isRunning()) {
149                    synchronized(this) {
150                        messageQueue.stop();
151                        if (this.taskRunner != null) {
152                            this.taskRunner.shutdown();
153                            this.taskRunner = null;
154                        }
155                    }
156                }
157            } catch (InterruptedException e) {
158                Thread.currentThread().interrupt();
159                throw JMSExceptionSupport.create(e);
160            }
161        }
162    
163        boolean isRunning() {
164            return messageQueue.isRunning();
165        }
166    
167        void close() {
168            messageQueue.close();
169        }
170    
171        void clear() {
172            messageQueue.clear();
173        }
174    
175        MessageDispatch dequeueNoWait() {
176            return messageQueue.dequeueNoWait();
177        }
178    
179        protected void clearMessagesInProgress() {
180            messageQueue.clear();
181        }
182    
183        public boolean isEmpty() {
184            return messageQueue.isEmpty();
185        }
186    
187        public boolean iterate() {
188    
189            // Deliver any messages queued on the consumer to their listeners.
190            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
191                if (consumer.iterate()) {
192                    return true;
193                }
194            }
195    
196            // No messages left queued on the listeners.. so now dispatch messages
197            // queued on the session
198            MessageDispatch message = messageQueue.dequeueNoWait();
199            if (message == null) {
200                return false;
201            } else {
202                dispatch(message);
203                return !messageQueue.isEmpty();
204            }
205        }
206    
207        List getUnconsumedMessages() {
208            return messageQueue.removeAll();
209        }
210    
211    }