001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.Enumeration;
020    import java.util.concurrent.atomic.AtomicBoolean;
021    
022    import javax.jms.IllegalStateException;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.Queue;
026    import javax.jms.QueueBrowser;
027    
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ConsumerId;
030    import org.apache.activemq.command.MessageDispatch;
031    
032    /**
033     * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
034     * queue without removing them. <p/>
035     * <P>
036     * The <CODE>getEnumeration</CODE> method returns a <CODE>
037     * java.util.Enumeration</CODE>
038     * that is used to scan the queue's messages. It may be an enumeration of the
039     * entire content of a queue, or it may contain only the messages matching a
040     * message selector. <p/>
041     * <P>
042     * Messages may be arriving and expiring while the scan is done. The JMS API
043     * does not require the content of an enumeration to be a static snapshot of
044     * queue content. Whether these changes are visible or not depends on the JMS
045     * provider. <p/>
046     * <P>
047     * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048     * </CODE>
049     * or a <CODE>QueueSession</CODE>.
050     * 
051     * @see javax.jms.Session#createBrowser
052     * @see javax.jms.QueueSession#createBrowser
053     * @see javax.jms.QueueBrowser
054     * @see javax.jms.QueueReceiver
055     */
056    
057    public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
058    
059        private final ActiveMQSession session;
060        private final ActiveMQDestination destination;
061        private final String selector;
062    
063        private ActiveMQMessageConsumer consumer;
064        private boolean closed;
065        private final ConsumerId consumerId;
066        private final AtomicBoolean browseDone = new AtomicBoolean(true);
067        private final boolean dispatchAsync;
068        private Object semaphore = new Object();
069    
070        /**
071         * Constructor for an ActiveMQQueueBrowser - used internally
072         * 
073         * @param theSession
074         * @param dest
075         * @param selector
076         * @throws JMSException
077         */
078        protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
079            this.session = session;
080            this.consumerId = consumerId;
081            this.destination = destination;
082            this.selector = selector;
083            this.dispatchAsync = dispatchAsync;
084            this.consumer = createConsumer();
085        }
086    
087        /**
088         * @param session
089         * @param originalDestination
090         * @param selectorExpression
091         * @param cnum
092         * @return
093         * @throws JMSException
094         */
095        private ActiveMQMessageConsumer createConsumer() throws JMSException {
096            browseDone.set(false);
097            ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
098            return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
099                .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
100                public void dispatch(MessageDispatch md) {
101                    if (md.getMessage() == null) {
102                        browseDone.set(true);
103                    } else {
104                        super.dispatch(md);
105                    }
106                    notifyMessageAvailable();
107                }
108            };
109        }
110    
111        private void destroyConsumer() {
112            if (consumer == null) {
113                return;
114            }
115            try {
116                if (session.getTransacted()) {
117                    session.commit();
118                }
119                consumer.close();
120                consumer = null;
121            } catch (JMSException e) {
122                e.printStackTrace();
123            }
124        }
125    
126        /**
127         * Gets an enumeration for browsing the current queue messages in the order
128         * they would be received.
129         * 
130         * @return an enumeration for browsing the messages
131         * @throws JMSException if the JMS provider fails to get the enumeration for
132         *                 this browser due to some internal error.
133         */
134    
135        public Enumeration getEnumeration() throws JMSException {
136            checkClosed();
137            if (consumer == null) {
138                consumer = createConsumer();
139            }
140            return this;
141        }
142    
143        private void checkClosed() throws IllegalStateException {
144            if (closed) {
145                throw new IllegalStateException("The Consumer is closed");
146            }
147        }
148    
149        /**
150         * @return true if more messages to process
151         */
152        public boolean hasMoreElements() {
153            while (true) {
154    
155                synchronized (this) {
156                    if (consumer == null) {
157                        return false;
158                    }
159                }
160    
161                if (consumer.getMessageSize() > 0) {
162                    return true;
163                }
164    
165                if (browseDone.get() || !session.isRunning()) {
166                    destroyConsumer();
167                    return false;
168                }
169    
170                waitForMessage();
171            }
172        }
173    
174        /**
175         * @return the next message
176         */
177        public Object nextElement() {
178            while (true) {
179    
180                synchronized (this) {
181                    if (consumer == null) {
182                        return null;
183                    }
184                }
185    
186                try {
187                    Message answer = consumer.receiveNoWait();
188                    if (answer != null) {
189                        return answer;
190                    }
191                } catch (JMSException e) {
192                    this.session.connection.onClientInternalException(e);
193                    return null;
194                }
195    
196                if (browseDone.get() || !session.isRunning()) {
197                    destroyConsumer();
198                    return null;
199                }
200    
201                waitForMessage();
202            }
203        }
204    
205        public synchronized void close() throws JMSException {
206            destroyConsumer();
207            closed = true;
208        }
209    
210        /**
211         * Gets the queue associated with this queue browser.
212         * 
213         * @return the queue
214         * @throws JMSException if the JMS provider fails to get the queue
215         *                 associated with this browser due to some internal error.
216         */
217    
218        public Queue getQueue() throws JMSException {
219            return (Queue)destination;
220        }
221    
222        public String getMessageSelector() throws JMSException {
223            return selector;
224        }
225    
226        // Implementation methods
227        // -------------------------------------------------------------------------
228    
229        /**
230         * Wait on a semaphore for a fixed amount of time for a message to come in.
231         */
232        protected void waitForMessage() {
233            try {
234                synchronized (semaphore) {
235                    semaphore.wait(2000);
236                }
237            } catch (InterruptedException e) {
238                Thread.currentThread().interrupt();
239            }
240        }
241    
242        protected void notifyMessageAvailable() {
243            synchronized (semaphore) {
244                semaphore.notifyAll();
245            }
246        }
247    
248        public String toString() {
249            return "ActiveMQQueueBrowser { value=" + consumerId + " }";
250        }
251    
252    }