001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.advisories; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.Set; 024 import javax.jms.Connection; 025 import javax.jms.Destination; 026 import javax.jms.JMSException; 027 import javax.jms.Message; 028 import javax.jms.MessageConsumer; 029 import javax.jms.MessageListener; 030 import javax.jms.ObjectMessage; 031 import javax.jms.Session; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.activemq.message.ActiveMQDestination; 035 import org.activemq.message.ConsumerInfo; 036 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 037 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 038 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 039 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 040 041 /** 042 * A helper class for listening for MessageConsumer advisories 043 * 044 * @version $Revision: 1.1.1.1 $ 045 */ 046 public class ConsumerAdvisor implements MessageListener { 047 private static final Log log = LogFactory.getLog(ConsumerAdvisor.class); 048 private Connection connection; 049 private ActiveMQDestination destination; 050 private Session session; 051 private List listeners = new CopyOnWriteArrayList(); 052 private SynchronizedBoolean started = new SynchronizedBoolean(false); 053 private Map activeSubscribers = new ConcurrentHashMap(); 054 055 /** 056 * Construct a ConsumerAdvisor 057 * 058 * @param connection 059 * @param destination the destination to listen for Consumer events 060 * @throws JMSException 061 */ 062 public ConsumerAdvisor(Connection connection, Destination destination) throws JMSException { 063 this.connection = connection; 064 this.destination = ActiveMQDestination.transformDestination(destination); 065 } 066 067 /** 068 * start listening for advisories 069 * 070 * @throws JMSException 071 */ 072 public void start() throws JMSException { 073 if (started.commit(false, true)) { 074 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 075 MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory()); 076 consumer.setMessageListener(this); 077 } 078 } 079 080 /** 081 * stop listening for advisories 082 * 083 * @throws JMSException 084 */ 085 public void stop() throws JMSException { 086 if (started.commit(true, false)) { 087 if (session != null) { 088 session.close(); 089 } 090 } 091 } 092 093 /** 094 * Add a listener 095 * 096 * @param l 097 */ 098 public void addListener(ConsumerAdvisoryEventListener l) { 099 listeners.add(l); 100 } 101 102 /** 103 * Remove a listener 104 * 105 * @param l 106 */ 107 public void removeListener(ConsumerAdvisoryEventListener l) { 108 listeners.remove(l); 109 } 110 111 /** 112 * returns true if there is an active subscriber for the destination 113 * 114 * @param destination 115 * @return true if a subscriber for the destination 116 */ 117 public boolean isActive(Destination destination) { 118 return activeSubscribers.containsKey(destination); 119 } 120 121 /** 122 * return a set of active ConsumerInfo's for a particular destination 123 * @param destination 124 * @return the set of currently active ConsumerInfo objects 125 */ 126 public Set activeConsumers(Destination destination) { 127 Set set = (Set) activeSubscribers.get(destination); 128 return set != null ? set : new CopyOnWriteArraySet(); 129 } 130 131 /** 132 * OnMessage() implementation 133 * 134 * @param msg 135 */ 136 public void onMessage(Message msg) { 137 if (msg instanceof ObjectMessage) { 138 try { 139 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject(); 140 updateActiveConsumers(info); 141 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info); 142 fireEvent(event); 143 } 144 catch (JMSException e) { 145 log.error("Failed to process message: " + msg); 146 } 147 } 148 } 149 150 private void fireEvent(ConsumerAdvisoryEvent event) { 151 for (Iterator i = listeners.iterator();i.hasNext();) { 152 ConsumerAdvisoryEventListener l = (ConsumerAdvisoryEventListener) i.next(); 153 l.onEvent(event); 154 } 155 } 156 157 private void updateActiveConsumers(ConsumerInfo info) { 158 Set set = (Set) activeSubscribers.get(info.getDestination()); 159 if (info.isStarted()) { 160 if (set == null) { 161 set = new CopyOnWriteArraySet(); 162 activeSubscribers.put(info.getDestination(), set); 163 } 164 set.add(info); 165 } 166 else { 167 if (set != null) { 168 set.remove(info); 169 if (set.isEmpty()) { 170 activeSubscribers.remove(info.getDestination()); 171 } 172 } 173 } 174 } 175 }