001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.util.Enumeration; 020 import java.util.concurrent.atomic.AtomicBoolean; 021 022 import javax.jms.IllegalStateException; 023 import javax.jms.JMSException; 024 import javax.jms.Message; 025 import javax.jms.Queue; 026 import javax.jms.QueueBrowser; 027 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ConsumerId; 030 import org.apache.activemq.command.MessageDispatch; 031 032 /** 033 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a 034 * queue without removing them. <p/> 035 * <P> 036 * The <CODE>getEnumeration</CODE> method returns a <CODE> 037 * java.util.Enumeration</CODE> 038 * that is used to scan the queue's messages. It may be an enumeration of the 039 * entire content of a queue, or it may contain only the messages matching a 040 * message selector. <p/> 041 * <P> 042 * Messages may be arriving and expiring while the scan is done. The JMS API 043 * does not require the content of an enumeration to be a static snapshot of 044 * queue content. Whether these changes are visible or not depends on the JMS 045 * provider. <p/> 046 * <P> 047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session 048 * </CODE> 049 * or a <CODE>QueueSession</CODE>. 050 * 051 * @see javax.jms.Session#createBrowser 052 * @see javax.jms.QueueSession#createBrowser 053 * @see javax.jms.QueueBrowser 054 * @see javax.jms.QueueReceiver 055 */ 056 057 public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration { 058 059 private final ActiveMQSession session; 060 private final ActiveMQDestination destination; 061 private final String selector; 062 063 private ActiveMQMessageConsumer consumer; 064 private boolean closed; 065 private final ConsumerId consumerId; 066 private final AtomicBoolean browseDone = new AtomicBoolean(true); 067 private final boolean dispatchAsync; 068 private Object semaphore = new Object(); 069 070 /** 071 * Constructor for an ActiveMQQueueBrowser - used internally 072 * 073 * @param theSession 074 * @param dest 075 * @param selector 076 * @throws JMSException 077 */ 078 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException { 079 this.session = session; 080 this.consumerId = consumerId; 081 this.destination = destination; 082 this.selector = selector; 083 this.dispatchAsync = dispatchAsync; 084 this.consumer = createConsumer(); 085 } 086 087 /** 088 * @param session 089 * @param originalDestination 090 * @param selectorExpression 091 * @param cnum 092 * @return 093 * @throws JMSException 094 */ 095 private ActiveMQMessageConsumer createConsumer() throws JMSException { 096 browseDone.set(false); 097 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy(); 098 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy 099 .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) { 100 public void dispatch(MessageDispatch md) { 101 if (md.getMessage() == null) { 102 browseDone.set(true); 103 } else { 104 super.dispatch(md); 105 } 106 notifyMessageAvailable(); 107 } 108 }; 109 } 110 111 private void destroyConsumer() { 112 if (consumer == null) { 113 return; 114 } 115 try { 116 if (session.getTransacted()) { 117 session.commit(); 118 } 119 consumer.close(); 120 consumer = null; 121 } catch (JMSException e) { 122 e.printStackTrace(); 123 } 124 } 125 126 /** 127 * Gets an enumeration for browsing the current queue messages in the order 128 * they would be received. 129 * 130 * @return an enumeration for browsing the messages 131 * @throws JMSException if the JMS provider fails to get the enumeration for 132 * this browser due to some internal error. 133 */ 134 135 public Enumeration getEnumeration() throws JMSException { 136 checkClosed(); 137 if (consumer == null) { 138 consumer = createConsumer(); 139 } 140 return this; 141 } 142 143 private void checkClosed() throws IllegalStateException { 144 if (closed) { 145 throw new IllegalStateException("The Consumer is closed"); 146 } 147 } 148 149 /** 150 * @return true if more messages to process 151 */ 152 public boolean hasMoreElements() { 153 while (true) { 154 155 synchronized (this) { 156 if (consumer == null) { 157 return false; 158 } 159 } 160 161 if (consumer.getMessageSize() > 0) { 162 return true; 163 } 164 165 if (browseDone.get() || !session.isRunning()) { 166 destroyConsumer(); 167 return false; 168 } 169 170 waitForMessage(); 171 } 172 } 173 174 /** 175 * @return the next message 176 */ 177 public Object nextElement() { 178 while (true) { 179 180 synchronized (this) { 181 if (consumer == null) { 182 return null; 183 } 184 } 185 186 try { 187 Message answer = consumer.receiveNoWait(); 188 if (answer != null) { 189 return answer; 190 } 191 } catch (JMSException e) { 192 this.session.connection.onClientInternalException(e); 193 return null; 194 } 195 196 if (browseDone.get() || !session.isRunning()) { 197 destroyConsumer(); 198 return null; 199 } 200 201 waitForMessage(); 202 } 203 } 204 205 public synchronized void close() throws JMSException { 206 destroyConsumer(); 207 closed = true; 208 } 209 210 /** 211 * Gets the queue associated with this queue browser. 212 * 213 * @return the queue 214 * @throws JMSException if the JMS provider fails to get the queue 215 * associated with this browser due to some internal error. 216 */ 217 218 public Queue getQueue() throws JMSException { 219 return (Queue)destination; 220 } 221 222 public String getMessageSelector() throws JMSException { 223 return selector; 224 } 225 226 // Implementation methods 227 // ------------------------------------------------------------------------- 228 229 /** 230 * Wait on a semaphore for a fixed amount of time for a message to come in. 231 */ 232 protected void waitForMessage() { 233 try { 234 synchronized (semaphore) { 235 semaphore.wait(2000); 236 } 237 } catch (InterruptedException e) { 238 Thread.currentThread().interrupt(); 239 } 240 } 241 242 protected void notifyMessageAvailable() { 243 synchronized (semaphore) { 244 semaphore.notifyAll(); 245 } 246 } 247 248 public String toString() { 249 return "ActiveMQQueueBrowser { value=" + consumerId + " }"; 250 } 251 252 }