001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import EDU.oswego.cs.dl.util.concurrent.Latch; 021 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.activemq.ActiveMQConnectionConsumer; 025 import org.activemq.ActiveMQSession; 026 import org.activemq.message.ActiveMQMessage; 027 import org.activemq.message.ActiveMQQueue; 028 import org.activemq.message.ActiveMQTopic; 029 030 import javax.jms.Connection; 031 import javax.jms.Destination; 032 import javax.jms.JMSException; 033 import javax.jms.Session; 034 import javax.jms.Topic; 035 import javax.jms.XASession; 036 import javax.resource.ResourceException; 037 import javax.resource.spi.endpoint.MessageEndpoint; 038 import javax.resource.spi.work.Work; 039 import javax.resource.spi.work.WorkEvent; 040 import javax.resource.spi.work.WorkException; 041 import javax.resource.spi.work.WorkListener; 042 import javax.resource.spi.work.WorkManager; 043 import javax.transaction.xa.XAResource; 044 045 /** 046 * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:10 $ 047 */ 048 public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker implements Work { 049 050 private static final Log log = LogFactory.getLog(ActiveMQPollingEndpointWorker.class); 051 private static final int MAX_WORKERS = 10; 052 053 private SynchronizedBoolean started = new SynchronizedBoolean(false); 054 private SynchronizedBoolean stopping = new SynchronizedBoolean(false); 055 private Latch stopLatch = new Latch(); 056 057 private ActiveMQConnectionConsumer consumer; 058 059 private CircularQueue workers; 060 061 static WorkListener debugingWorkListener = new WorkListener() { 062 //The work listener is useful only for debugging... 063 public void workAccepted(WorkEvent event) { 064 } 065 066 public void workRejected(WorkEvent event) { 067 log.warn("Work rejected: " + event, event.getException()); 068 } 069 070 public void workStarted(WorkEvent event) { 071 } 072 073 public void workCompleted(WorkEvent event) { 074 } 075 }; 076 private Connection connection; 077 078 /** 079 * @param adapter 080 * @param key 081 * @throws ResourceException 082 */ 083 public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { 084 super(adapter, key); 085 } 086 087 public void start() throws WorkException, ResourceException { 088 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 089 boolean ok = false; 090 try { 091 092 connection = adapter.makeConnection(activationSpec); 093 connection.start(); 094 095 workers = new CircularQueue(MAX_WORKERS, stopping); 096 for (int i = 0; i < workers.size(); i++) { 097 098 099 int acknowledge = (transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 100 Session session = connection.createSession(transacted, acknowledge); 101 XAResource xaresource = null; 102 if (session instanceof XASession) { 103 if (!transacted) { 104 throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint."); 105 } 106 xaresource = ((XASession) session).getXAResource(); 107 } 108 109 MessageEndpoint endpoint = endpointFactory.createEndpoint(xaresource); 110 workers.returnObject(new InboundEndpointWork((ActiveMQSession) session, endpoint, workers)); 111 } 112 113 Destination dest = null; 114 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 115 dest = new ActiveMQQueue(activationSpec.getDestination()); 116 } 117 else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 118 dest = new ActiveMQTopic(activationSpec.getDestination()); 119 } 120 else { 121 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType()); 122 } 123 124 if (emptyToNull(activationSpec.getSubscriptionName()) != null) { 125 consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer((Topic) dest, activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), null, 0); 126 } 127 else { 128 consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), null, 0); 129 } 130 131 ok = true; 132 log.debug("Started"); 133 134 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, debugingWorkListener); 135 ok = true; 136 137 } 138 catch (JMSException e) { 139 throw new ResourceException("Could not start the endpoint.", e); 140 } 141 finally { 142 143 // We don't want to leak sessions on errors. Keep them around only if 144 // there were no errors. 145 if (!ok) { 146 safeClose(consumer); 147 safeClose(connection); 148 } 149 } 150 151 } 152 153 private String emptyToNull(String value) { 154 if ("".equals(value)) { 155 return null; 156 } 157 return value; 158 } 159 160 /** 161 * 162 */ 163 public void stop() throws InterruptedException { 164 stopping.set(true); 165 workers.notifyWaiting(); 166 if (started.compareTo(true) == 0) { 167 stopLatch.acquire(); 168 } 169 safeClose(consumer); 170 safeClose(connection); 171 } 172 173 /** 174 * @see javax.resource.spi.work.Work#release() 175 */ 176 public void release() { 177 } 178 179 /** 180 * The WorkManager has started up and we now need to pull message off 181 * the destination and push them to an endpoint. 182 * 183 * @see java.lang.Runnable#run() 184 */ 185 public void run() { 186 started.set(true); 187 try { 188 189 while (!stopping.get()) { 190 ActiveMQMessage message = consumer.receive(500); 191 if (message != null) { 192 InboundEndpointWork worker = (InboundEndpointWork) workers.get(); 193 // Did we get stopped? 194 if (worker == null) { 195 break; 196 } 197 worker.setMessage(message); 198 workManager.scheduleWork(worker, WorkManager.INDEFINITE, null, debugingWorkListener); 199 } 200 } 201 202 // Try to collect the workers so that none are running. 203 workers.drain(); 204 205 } 206 catch (Throwable e) { 207 log.info("dispatcher: ", e); 208 } 209 finally { 210 stopLatch.release(); 211 } 212 } 213 }