001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.advisory; 018 019 import java.util.Set; 020 import java.util.concurrent.CopyOnWriteArraySet; 021 import java.util.concurrent.atomic.AtomicBoolean; 022 023 import javax.jms.Connection; 024 import javax.jms.JMSException; 025 import javax.jms.Message; 026 import javax.jms.MessageConsumer; 027 import javax.jms.MessageListener; 028 import javax.jms.Session; 029 030 import org.apache.activemq.Service; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ActiveMQMessage; 033 import org.apache.activemq.command.ActiveMQQueue; 034 import org.apache.activemq.command.ActiveMQTempQueue; 035 import org.apache.activemq.command.ActiveMQTempTopic; 036 import org.apache.activemq.command.ActiveMQTopic; 037 import org.apache.activemq.command.DestinationInfo; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 /** 042 * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them 043 * being created or deleted. 044 * 045 * @version $Revision: 681153 $ 046 */ 047 public class DestinationSource implements MessageListener { 048 private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class); 049 private AtomicBoolean started = new AtomicBoolean(false); 050 private final Connection connection; 051 private Session session; 052 private MessageConsumer queueConsumer; 053 private MessageConsumer topicConsumer; 054 private MessageConsumer tempTopicConsumer; 055 private MessageConsumer tempQueueConsumer; 056 private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>(); 057 private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>(); 058 private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>(); 059 private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>(); 060 private DestinationListener listener; 061 062 public DestinationSource(Connection connection) throws JMSException { 063 this.connection = connection; 064 } 065 066 public DestinationListener getListener() { 067 return listener; 068 } 069 070 public void setDestinationListener(DestinationListener listener) { 071 this.listener = listener; 072 } 073 074 /** 075 * Returns the current queues available on the broker 076 */ 077 public Set<ActiveMQQueue> getQueues() { 078 return queues; 079 } 080 081 /** 082 * Returns the current topics on the broker 083 */ 084 public Set<ActiveMQTopic> getTopics() { 085 return topics; 086 } 087 088 /** 089 * Returns the current temporary topics available on the broker 090 */ 091 public Set<ActiveMQTempQueue> getTemporaryQueues() { 092 return temporaryQueues; 093 } 094 095 /** 096 * Returns the current temporary queues available on the broker 097 */ 098 public Set<ActiveMQTempTopic> getTemporaryTopics() { 099 return temporaryTopics; 100 } 101 102 public void start() throws JMSException { 103 if (started.compareAndSet(false, true)) { 104 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 105 queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC); 106 queueConsumer.setMessageListener(this); 107 108 topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC); 109 topicConsumer.setMessageListener(this); 110 111 tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC); 112 tempQueueConsumer.setMessageListener(this); 113 114 tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC); 115 tempTopicConsumer.setMessageListener(this); 116 } 117 } 118 119 public void stop() throws JMSException { 120 if (started.compareAndSet(true, false)) { 121 if (session != null) { 122 session.close(); 123 } 124 } 125 } 126 127 public void onMessage(Message message) { 128 if (message instanceof ActiveMQMessage) { 129 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 130 Object command = activeMessage.getDataStructure(); 131 if (command instanceof DestinationInfo) { 132 DestinationInfo destinationInfo = (DestinationInfo) command; 133 DestinationEvent event = new DestinationEvent(this, destinationInfo); 134 fireDestinationEvent(event); 135 } 136 else { 137 LOG.warn("Unknown dataStructure: " + command); 138 } 139 } 140 else { 141 LOG.warn("Unknown message type: " + message + ". Message ignored"); 142 } 143 } 144 145 protected void fireDestinationEvent(DestinationEvent event) { 146 // now lets update the data structures 147 ActiveMQDestination destination = event.getDestination(); 148 boolean add = event.isAddOperation(); 149 if (destination instanceof ActiveMQQueue) { 150 ActiveMQQueue queue = (ActiveMQQueue) destination; 151 if (add) { 152 queues.add(queue); 153 } 154 else { 155 queues.remove(queue); 156 } 157 } 158 else if (destination instanceof ActiveMQTopic) { 159 ActiveMQTopic topic = (ActiveMQTopic) destination; 160 if (add) { 161 topics.add(topic); 162 } 163 else { 164 topics.remove(topic); 165 } 166 } 167 else if (destination instanceof ActiveMQTempQueue) { 168 ActiveMQTempQueue queue = (ActiveMQTempQueue) destination; 169 if (add) { 170 temporaryQueues.add(queue); 171 } 172 else { 173 temporaryQueues.remove(queue); 174 } 175 } 176 else if (destination instanceof ActiveMQTempTopic) { 177 ActiveMQTempTopic topic = (ActiveMQTempTopic) destination; 178 if (add) { 179 temporaryTopics.add(topic); 180 } 181 else { 182 temporaryTopics.remove(topic); 183 } 184 } 185 else { 186 LOG.warn("Unknown destination type: " + destination); 187 } 188 if (listener != null) { 189 listener.onDestinationEvent(event); 190 } 191 } 192 }