001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.impl;
020    
021    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
022    import org.activemq.broker.BrokerClient;
023    import org.activemq.service.Dispatcher;
024    import org.activemq.service.MessageContainerManager;
025    import org.activemq.service.Subscription;
026    
027    /**
028     * A dispatcher of messages to some JMS connection.
029     * <p/>
030     * Typically this uses either IO or NIO to shovel the messages down
031     * a socket as fast as possible - in either a push or pull way.
032     *
033     * @version $Revision: 1.1.1.1 $
034     */
035    public class DispatcherImpl implements Dispatcher {
036    
037        private SynchronizedBoolean started = new SynchronizedBoolean(false);
038        private DispatchWorker worker = new DispatchWorker(); //this should be pooled
039        private Thread runner;
040    
041    
042        /**
043         * Register the MessageContainerManager for the Dispatcher
044         *
045         * @param mcm
046         */
047        public void register(MessageContainerManager mcm) {
048            worker.register(mcm);
049        }
050    
051        /**
052         * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is
053         * waiting for messages to dispatch
054         *
055         * @param sub the Subscription that now has messages to dispatch
056         */
057        public void wakeup(Subscription sub) {
058            worker.wakeup();
059        }
060    
061        /**
062         * Called to indicate that there is work to do this will wake up a Dispatch Worker if it is
063         * waiting for messages to dispatch
064         */
065        public void wakeup() {
066            worker.wakeup();
067        }
068    
069        /**
070         * Add an active subscription
071         *
072         * @param client
073         * @param sub
074         */
075        public void addActiveSubscription(BrokerClient client, Subscription sub) {
076            worker.addActiveSubscription(client, sub);
077        }
078    
079        /**
080         * remove an active subscription
081         *
082         * @param client
083         * @param sub
084         */
085        public void removeActiveSubscription(BrokerClient client, Subscription sub) {
086            worker.removeActiveSubscription(client, sub);
087        }
088    
089        /**
090         * start the DispatchWorker
091         *
092         * @see org.activemq.service.Service#start()
093         */
094        public void start() {
095            if (started.commit(false, true)) {
096                worker.start();
097                runner = new Thread(worker, "Dispatch Worker");
098                runner.setDaemon(true);
099                runner.setPriority(Thread.NORM_PRIORITY + 1);
100                runner.start();
101            }
102        }
103    
104        /**
105         * stop the DispatchWorker
106         *
107         * @see org.activemq.service.Service#stop()
108         */
109        public void stop() {
110            worker.stop();
111            started.set(false);
112        }
113    }