001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.advisories;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Set;
023    import javax.jms.Connection;
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    import javax.jms.Message;
027    import javax.jms.MessageConsumer;
028    import javax.jms.MessageListener;
029    import javax.jms.ObjectMessage;
030    import javax.jms.Session;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    import org.activemq.ActiveMQConnection;
034    import org.activemq.ActiveMQSession;
035    import org.activemq.message.ActiveMQDestination;
036    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
037    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
038    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
039    
040    /**
041     * A helper class for listening for TempDestination advisories
042     * 
043     * @version $Revision: 1.1.1.1 $
044     */
045    public class TempDestinationAdvisor implements MessageListener {
046        private static final Log log = LogFactory.getLog(TempDestinationAdvisor.class);
047        private Connection connection;
048        private ActiveMQDestination destination;
049        private Session session;
050        private List listeners = new CopyOnWriteArrayList();
051        private Set activeDestinations = new CopyOnWriteArraySet();
052        private SynchronizedBoolean started = new SynchronizedBoolean(false);
053        private long startedAt;
054    
055        /**
056         * Construct a TempDestinationAdvisor
057         * 
058         * @param connection
059         * @param destination the destination to listen for TempDestination events
060         * @throws JMSException
061         */
062        public TempDestinationAdvisor(Connection connection, Destination destination) throws JMSException {
063            this.connection = connection;
064            this.destination = ActiveMQDestination.transformDestination(destination);
065        }
066    
067        /**
068         * start listening for advisories
069         * 
070         * @throws JMSException
071         */
072        public void start() throws JMSException {
073            if (started.commit(false, true)) {
074                if (connection instanceof ActiveMQConnection) {
075                    session = ((ActiveMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE, true);
076                    ((ActiveMQSession) session).setInternalSession(true);
077                }
078                else {
079                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
080                }
081                MessageConsumer consumer = session.createConsumer(destination.getTopicForTempAdvisory());
082                consumer.setMessageListener(this);
083                startedAt = System.currentTimeMillis();
084            }
085        }
086    
087        /**
088         * stop listening for advisories
089         * 
090         * @throws JMSException
091         */
092        public void stop() throws JMSException {
093            if (started.commit(true, false)) {
094                if (session != null) {
095                    session.close();
096                }
097            }
098        }
099    
100        /**
101         * returns true if the temporary destination is active
102         * 
103         * @param destination
104         * @return true if a subscriber for the destination
105         */
106        public boolean isActive(Destination destination) {
107            boolean rtnval = false;
108            synchronized(this)
109            {
110                rtnval = activeDestinations.contains(destination);
111                if (rtnval == false && startedAt > 0)
112                {
113                    // wait a while to see if the advisory event arrives (no longer than 5 seconds)
114                    long waittime = 5000 - (System.currentTimeMillis() - startedAt);
115                    startedAt = 0;
116                    try {
117                        wait(waittime);
118                    } catch (Exception e) {}
119                    rtnval = activeDestinations.contains(destination);
120                }
121            }
122            return rtnval;
123        }
124    
125        /**
126         * Add a listener
127         * 
128         * @param l
129         */
130        public void addListener(TempDestinationAdvisoryEventListener l) {
131            listeners.add(l);
132        }
133    
134        /**
135         * Remove a listener
136         * 
137         * @param l
138         */
139        public void removeListener(TempDestinationAdvisoryEventListener l) {
140            listeners.remove(l);
141        }
142    
143        /**
144         * OnMessage() implementation
145         * 
146         * @param msg
147         */
148        public void onMessage(Message msg) {
149            if (msg instanceof ObjectMessage) {
150                try {
151                    TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) msg).getObject();
152                    if (event.isStarted()) {
153                        activeDestinations.add(event.getDestination());
154                        synchronized (this) {
155                            notifyAll();
156                        }
157                                    }
158                    else {
159                        activeDestinations.remove(event.getDestination());
160                    }
161                    fireEvent(event);
162                }
163                catch (JMSException e) {
164                    log.error("Failed to process message: " + msg);
165                }
166            }
167        }
168    
169        private void fireEvent(TempDestinationAdvisoryEvent event) {
170            for (Iterator i = listeners.iterator();i.hasNext();) {
171                TempDestinationAdvisoryEventListener l = (TempDestinationAdvisoryEventListener) i.next();
172                l.onEvent(event);
173            }
174        }
175    }