001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.advisories;
020    
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import javax.jms.Connection;
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.jms.Message;
029    import javax.jms.MessageConsumer;
030    import javax.jms.MessageListener;
031    import javax.jms.ObjectMessage;
032    import javax.jms.Session;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.activemq.message.ActiveMQDestination;
036    import org.activemq.message.ProducerInfo;
037    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
038    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
039    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
040    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
041    /**
042     * A helper class for listening for MessageProducer advisories
043     * 
044     * * @version $Revision: 1.1.1.1 $
045     */
046    
047    public class ProducerAdvisor implements MessageListener {
048        private static final Log log = LogFactory.getLog(ProducerAdvisor.class);
049        private Connection connection;
050        private ActiveMQDestination destination;
051        private Session session;
052        private List listeners = new CopyOnWriteArrayList();
053        private SynchronizedBoolean started = new SynchronizedBoolean(false);
054        private Map activeProducers = new ConcurrentHashMap();
055        
056        /**
057         * Construct a ProducerAdvisor
058         * @param connection
059         * @param destination the destination to listen for Producer events
060         * @throws JMSException
061         */
062        public ProducerAdvisor(Connection connection, Destination destination) throws JMSException{
063            this.connection = connection;
064            this.destination = ActiveMQDestination.transformDestination(destination);
065        }
066        
067        /**
068         * start listening for advisories
069         * @throws JMSException
070         *
071         */
072        public void start() throws JMSException {
073            if (started.commit(false, true)) {
074                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
075                MessageConsumer consumer = session.createConsumer(destination.getTopicForProducerAdvisory());
076                consumer.setMessageListener(this);
077            }
078        }
079        
080        /**
081         * stop listening for advisories
082         * @throws JMSException
083         */
084        public void stop() throws JMSException{
085            if (started.commit(true,false)){
086                if (session != null){
087                    session.close();
088                }
089            }
090        }
091        
092        /**
093         * Add a listener
094         * @param l
095         */
096        public void addListener(ProducerAdvisoryEventListener l){
097            listeners.add(l);
098        }
099        
100        /**
101         * Remove a listener
102         * @param l
103         */
104        public void removeListener(ProducerAdvisoryEventListener l){
105            listeners.remove(l);
106        }
107        
108        
109        /**
110         * returns true if there is an active producer for the destination
111         * 
112         * @param destination
113         * @return true if a producer for the destination
114         */
115        public boolean isActive(Destination destination) {
116            return activeProducers.containsKey(destination);
117        }
118    
119        /**
120         * return a set of active ProducerInfo's for a particular destination
121         * @param destination
122         * @return the set of ProducerInfo objects currently active
123         */
124        public Set activeProducers(Destination destination) {
125            Set set = (Set) activeProducers.get(destination);
126            return set != null ? set : new CopyOnWriteArraySet();
127        }
128    
129        
130        
131        
132        /**
133         * OnMessage() implementation
134         * @param msg
135         */
136        public void onMessage(Message msg){
137            if (msg instanceof ObjectMessage){
138                try {
139                    ProducerInfo info = (ProducerInfo)((ObjectMessage)msg).getObject();
140                    updateActiveProducers(info);
141                    ProducerAdvisoryEvent event = new ProducerAdvisoryEvent(info);
142                    fireEvent(event);
143                }
144                catch (JMSException e) {
145                    log.error("Failed to process message: " + msg);
146                }
147            }
148        }
149        
150        private void fireEvent(ProducerAdvisoryEvent event){
151            for (Iterator i = listeners.iterator(); i.hasNext(); ){
152                ProducerAdvisoryEventListener l = (ProducerAdvisoryEventListener)i.next();
153                l.onEvent(event);
154            }
155        }
156        
157        private void updateActiveProducers(ProducerInfo info) {
158            Set set = (Set) activeProducers.get(info.getDestination());
159            if (info.isStarted()) {
160                if (set == null) {
161                    set = new CopyOnWriteArraySet();
162                    activeProducers.put(info.getDestination(), set);
163                }
164                set.add(info);
165            }
166            else {
167                if (set != null) {
168                    set.remove(info);
169                    if (set.isEmpty()) {
170                        activeProducers.remove(set);
171                    }
172                }
173            }
174        }
175    
176    
177    }