001    /**
002     *
003     * Copyright 2004 Hiram Chirino
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.ra;
019    
020    import EDU.oswego.cs.dl.util.concurrent.Latch;
021    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.activemq.ActiveMQConnectionConsumer;
025    import org.activemq.ActiveMQSession;
026    import org.activemq.message.ActiveMQMessage;
027    import org.activemq.message.ActiveMQQueue;
028    import org.activemq.message.ActiveMQTopic;
029    
030    import javax.jms.Connection;
031    import javax.jms.Destination;
032    import javax.jms.JMSException;
033    import javax.jms.Session;
034    import javax.jms.Topic;
035    import javax.jms.XASession;
036    import javax.resource.ResourceException;
037    import javax.resource.spi.endpoint.MessageEndpoint;
038    import javax.resource.spi.work.Work;
039    import javax.resource.spi.work.WorkEvent;
040    import javax.resource.spi.work.WorkException;
041    import javax.resource.spi.work.WorkListener;
042    import javax.resource.spi.work.WorkManager;
043    import javax.transaction.xa.XAResource;
044    
045    /**
046     * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:10 $
047     */
048    public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker implements Work {
049    
050        private static final Log log = LogFactory.getLog(ActiveMQPollingEndpointWorker.class);
051        private static final int MAX_WORKERS = 10;
052    
053        private SynchronizedBoolean started = new SynchronizedBoolean(false);
054        private SynchronizedBoolean stopping = new SynchronizedBoolean(false);
055        private Latch stopLatch = new Latch();
056    
057        private ActiveMQConnectionConsumer consumer;
058    
059        private CircularQueue workers;
060    
061        static WorkListener debugingWorkListener = new WorkListener() {
062            //The work listener is useful only for debugging...
063            public void workAccepted(WorkEvent event) {
064            }
065    
066            public void workRejected(WorkEvent event) {
067                log.warn("Work rejected: " + event, event.getException());
068            }
069    
070            public void workStarted(WorkEvent event) {
071            }
072    
073            public void workCompleted(WorkEvent event) {
074            }
075        };
076        private Connection connection;
077    
078        /**
079         * @param adapter
080         * @param key
081         * @throws ResourceException
082         */
083        public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
084            super(adapter, key);
085        }
086    
087        public void start() throws WorkException, ResourceException {
088            ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();        
089            boolean ok = false;
090            try {
091                
092                connection = adapter.makeConnection(activationSpec);
093                connection.start();
094                
095                workers = new CircularQueue(MAX_WORKERS, stopping);
096                for (int i = 0; i < workers.size(); i++) {
097                    
098                    
099                    int acknowledge = (transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
100                    Session session = connection.createSession(transacted, acknowledge);
101                    XAResource xaresource = null;
102                    if (session instanceof XASession) {
103                        if (!transacted) {
104                            throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
105                        }
106                        xaresource = ((XASession) session).getXAResource();
107                    }
108    
109                    MessageEndpoint endpoint = endpointFactory.createEndpoint(xaresource);
110                    workers.returnObject(new InboundEndpointWork((ActiveMQSession) session, endpoint, workers));
111                }
112    
113                Destination dest = null;
114                if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
115                    dest = new ActiveMQQueue(activationSpec.getDestination());
116                }
117                else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
118                    dest = new ActiveMQTopic(activationSpec.getDestination());
119                }
120                else {
121                    throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
122                }
123    
124                if (emptyToNull(activationSpec.getSubscriptionName()) != null) {
125                    consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer((Topic) dest, activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), null, 0);
126                }
127                else {
128                    consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), null, 0);
129                }
130    
131                ok = true;
132                log.debug("Started");
133    
134                workManager.scheduleWork(this, WorkManager.INDEFINITE, null, debugingWorkListener);
135                ok = true;
136    
137            }
138            catch (JMSException e) {
139                throw new ResourceException("Could not start the endpoint.", e);
140            }
141            finally {
142    
143                // We don't want to leak sessions on errors.  Keep them around only if
144                // there were no errors.
145                if (!ok) {
146                    safeClose(consumer);
147                    safeClose(connection);
148                }
149            }
150    
151        }
152    
153        private String emptyToNull(String value) {
154            if ("".equals(value)) {
155                return null;
156            }
157            return value;
158        }
159    
160        /**
161         *
162         */
163        public void stop() throws InterruptedException {
164            stopping.set(true);
165            workers.notifyWaiting();
166            if (started.compareTo(true) == 0) {
167                stopLatch.acquire();
168            }
169            safeClose(consumer);
170            safeClose(connection);
171        }
172    
173        /**
174         * @see javax.resource.spi.work.Work#release()
175         */
176        public void release() {
177        }
178    
179        /**
180         * The WorkManager has started up and we now need to pull message off
181         * the destination and push them to an endpoint.
182         *
183         * @see java.lang.Runnable#run()
184         */
185        public void run() {
186            started.set(true);
187            try {
188    
189                while (!stopping.get()) {
190                    ActiveMQMessage message = consumer.receive(500);
191                    if (message != null) {
192                        InboundEndpointWork worker = (InboundEndpointWork) workers.get();
193                        // Did we get stopped?
194                        if (worker == null) {
195                            break;
196                        }
197                        worker.setMessage(message);
198                        workManager.scheduleWork(worker, WorkManager.INDEFINITE, null, debugingWorkListener);
199                    }
200                }   
201                
202                // Try to collect the workers so that none are running.
203                workers.drain();
204    
205            }
206            catch (Throwable e) {
207                log.info("dispatcher: ", e);
208            }
209            finally {
210                stopLatch.release();
211            }
212        }
213    }