001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network.jms; 018 019 import java.util.concurrent.atomic.AtomicBoolean; 020 021 import javax.jms.Connection; 022 import javax.jms.Destination; 023 import javax.jms.JMSException; 024 import javax.jms.Message; 025 import javax.jms.MessageConsumer; 026 import javax.jms.MessageListener; 027 import javax.jms.MessageProducer; 028 import javax.naming.NamingException; 029 030 import org.apache.activemq.Service; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 /** 035 * A Destination bridge is used to bridge between to different JMS systems 036 * 037 * @version $Revision: 1.1.1.1 $ 038 */ 039 public abstract class DestinationBridge implements Service, MessageListener { 040 private static final Log LOG = LogFactory.getLog(DestinationBridge.class); 041 protected MessageConsumer consumer; 042 protected AtomicBoolean started = new AtomicBoolean(false); 043 protected JmsMesageConvertor jmsMessageConvertor; 044 protected boolean doHandleReplyTo = true; 045 protected JmsConnector jmsConnector; 046 private int maximumRetries = 10; 047 048 /** 049 * @return Returns the consumer. 050 */ 051 public MessageConsumer getConsumer() { 052 return consumer; 053 } 054 055 /** 056 * @param consumer The consumer to set. 057 */ 058 public void setConsumer(MessageConsumer consumer) { 059 this.consumer = consumer; 060 } 061 062 /** 063 * @param connector 064 */ 065 public void setJmsConnector(JmsConnector connector) { 066 this.jmsConnector = connector; 067 } 068 069 /** 070 * @return Returns the inboundMessageConvertor. 071 */ 072 public JmsMesageConvertor getJmsMessageConvertor() { 073 return jmsMessageConvertor; 074 } 075 076 /** 077 * @param jmsMessageConvertor 078 */ 079 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 080 this.jmsMessageConvertor = jmsMessageConvertor; 081 } 082 083 public int getMaximumRetries() { 084 return maximumRetries; 085 } 086 087 /** 088 * Sets the maximum number of retries if a send fails before closing the 089 * bridge 090 */ 091 public void setMaximumRetries(int maximumRetries) { 092 this.maximumRetries = maximumRetries; 093 } 094 095 protected Destination processReplyToDestination(Destination destination) { 096 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 097 } 098 099 public void start() throws Exception { 100 if (started.compareAndSet(false, true)) { 101 MessageConsumer consumer = createConsumer(); 102 consumer.setMessageListener(this); 103 createProducer(); 104 } 105 } 106 107 public void stop() throws Exception { 108 started.set(false); 109 } 110 111 public void onMessage(Message message) { 112 if (started.get() && message != null) { 113 int attempt = 0; 114 try { 115 if (attempt > 0) { 116 restartProducer(); 117 } 118 Message converted; 119 if (doHandleReplyTo) { 120 Destination replyTo = message.getJMSReplyTo(); 121 if (replyTo != null) { 122 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 123 } else { 124 converted = jmsMessageConvertor.convert(message); 125 } 126 } else { 127 message.setJMSReplyTo(null); 128 converted = jmsMessageConvertor.convert(message); 129 } 130 sendMessage(converted); 131 message.acknowledge(); 132 } catch (Exception e) { 133 LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); 134 if (maximumRetries > 0 && attempt >= maximumRetries) { 135 try { 136 stop(); 137 } catch (Exception e1) { 138 LOG.warn("Failed to stop cleanly", e1); 139 } 140 } 141 } 142 } 143 } 144 145 /** 146 * @return Returns the doHandleReplyTo. 147 */ 148 protected boolean isDoHandleReplyTo() { 149 return doHandleReplyTo; 150 } 151 152 /** 153 * @param doHandleReplyTo The doHandleReplyTo to set. 154 */ 155 protected void setDoHandleReplyTo(boolean doHandleReplyTo) { 156 this.doHandleReplyTo = doHandleReplyTo; 157 } 158 159 protected abstract MessageConsumer createConsumer() throws JMSException; 160 161 protected abstract MessageProducer createProducer() throws JMSException; 162 163 protected abstract void sendMessage(Message message) throws JMSException; 164 165 protected abstract Connection getConnnectionForConsumer(); 166 167 protected abstract Connection getConnectionForProducer(); 168 169 protected void restartProducer() throws JMSException, NamingException { 170 try { 171 getConnectionForProducer().close(); 172 } catch (Exception e) { 173 LOG.debug("Ignoring failure to close producer connection: " + e, e); 174 } 175 jmsConnector.restartProducerConnection(); 176 createProducer(); 177 } 178 }