001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Queue;
023    import javax.jms.QueueConnection;
024    import javax.jms.QueueConnectionFactory;
025    import javax.jms.QueueSession;
026    import javax.jms.Session;
027    import javax.naming.NamingException;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A Bridge to other JMS Queue providers
034     * 
035     * @org.apache.xbean.XBean
036     * 
037     * @version $Revision: 1.1.1.1 $
038     */
039    public class JmsQueueConnector extends JmsConnector {
040        private static final Log LOG = LogFactory.getLog(JmsQueueConnector.class);
041        private String outboundQueueConnectionFactoryName;
042        private String localConnectionFactoryName;
043        private QueueConnectionFactory outboundQueueConnectionFactory;
044        private QueueConnectionFactory localQueueConnectionFactory;
045        private QueueConnection outboundQueueConnection;
046        private QueueConnection localQueueConnection;
047        private InboundQueueBridge[] inboundQueueBridges;
048        private OutboundQueueBridge[] outboundQueueBridges;
049    
050        public boolean init() {
051            boolean result = super.init();
052            if (result) {
053                try {
054                    initializeForeignQueueConnection();
055                    initializeLocalQueueConnection();
056                    initializeInboundJmsMessageConvertor();
057                    initializeOutboundJmsMessageConvertor();
058                    initializeInboundQueueBridges();
059                    initializeOutboundQueueBridges();
060                } catch (Exception e) {
061                    LOG.error("Failed to initialize the JMSConnector", e);
062                }
063            }
064            return result;
065        }
066    
067        /**
068         * @return Returns the inboundQueueBridges.
069         */
070        public InboundQueueBridge[] getInboundQueueBridges() {
071            return inboundQueueBridges;
072        }
073    
074        /**
075         * @param inboundQueueBridges The inboundQueueBridges to set.
076         */
077        public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
078            this.inboundQueueBridges = inboundQueueBridges;
079        }
080    
081        /**
082         * @return Returns the outboundQueueBridges.
083         */
084        public OutboundQueueBridge[] getOutboundQueueBridges() {
085            return outboundQueueBridges;
086        }
087    
088        /**
089         * @param outboundQueueBridges The outboundQueueBridges to set.
090         */
091        public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
092            this.outboundQueueBridges = outboundQueueBridges;
093        }
094    
095        /**
096         * @return Returns the localQueueConnectionFactory.
097         */
098        public QueueConnectionFactory getLocalQueueConnectionFactory() {
099            return localQueueConnectionFactory;
100        }
101    
102        /**
103         * @param localQueueConnectionFactory The localQueueConnectionFactory to
104         *                set.
105         */
106        public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
107            this.localQueueConnectionFactory = localConnectionFactory;
108        }
109    
110        /**
111         * @return Returns the outboundQueueConnectionFactory.
112         */
113        public QueueConnectionFactory getOutboundQueueConnectionFactory() {
114            return outboundQueueConnectionFactory;
115        }
116    
117        /**
118         * @return Returns the outboundQueueConnectionFactoryName.
119         */
120        public String getOutboundQueueConnectionFactoryName() {
121            return outboundQueueConnectionFactoryName;
122        }
123    
124        /**
125         * @param outboundQueueConnectionFactoryName The
126         *                outboundQueueConnectionFactoryName to set.
127         */
128        public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
129            this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
130        }
131    
132        /**
133         * @return Returns the localConnectionFactoryName.
134         */
135        public String getLocalConnectionFactoryName() {
136            return localConnectionFactoryName;
137        }
138    
139        /**
140         * @param localConnectionFactoryName The localConnectionFactoryName to set.
141         */
142        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143            this.localConnectionFactoryName = localConnectionFactoryName;
144        }
145    
146        /**
147         * @return Returns the localQueueConnection.
148         */
149        public QueueConnection getLocalQueueConnection() {
150            return localQueueConnection;
151        }
152    
153        /**
154         * @param localQueueConnection The localQueueConnection to set.
155         */
156        public void setLocalQueueConnection(QueueConnection localQueueConnection) {
157            this.localQueueConnection = localQueueConnection;
158        }
159    
160        /**
161         * @return Returns the outboundQueueConnection.
162         */
163        public QueueConnection getOutboundQueueConnection() {
164            return outboundQueueConnection;
165        }
166    
167        /**
168         * @param outboundQueueConnection The outboundQueueConnection to set.
169         */
170        public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
171            this.outboundQueueConnection = foreignQueueConnection;
172        }
173    
174        /**
175         * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
176         *                to set.
177         */
178        public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
179            this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
180        }
181    
182        public void restartProducerConnection() throws NamingException, JMSException {
183            outboundQueueConnection = null;
184            initializeForeignQueueConnection();
185        }
186    
187        protected void initializeForeignQueueConnection() throws NamingException, JMSException {
188            if (outboundQueueConnection == null) {
189                // get the connection factories
190                if (outboundQueueConnectionFactory == null) {
191                    // look it up from JNDI
192                    if (outboundQueueConnectionFactoryName != null) {
193                        outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
194                            .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
195                        if (outboundUsername != null) {
196                            outboundQueueConnection = outboundQueueConnectionFactory
197                                .createQueueConnection(outboundUsername, outboundPassword);
198                        } else {
199                            outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
200                        }
201                    } else {
202                        throw new JMSException("Cannot create localConnection - no information");
203                    }
204                } else {
205                    if (outboundUsername != null) {
206                        outboundQueueConnection = outboundQueueConnectionFactory
207                            .createQueueConnection(outboundUsername, outboundPassword);
208                    } else {
209                        outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
210                    }
211                }
212            }
213            if (localClientId != null && localClientId.length() > 0) {
214                outboundQueueConnection.setClientID(getOutboundClientId());
215            }
216            outboundQueueConnection.start();
217        }
218    
219        protected void initializeLocalQueueConnection() throws NamingException, JMSException {
220            if (localQueueConnection == null) {
221                // get the connection factories
222                if (localQueueConnectionFactory == null) {
223                    if (embeddedConnectionFactory == null) {
224                        // look it up from JNDI
225                        if (localConnectionFactoryName != null) {
226                            localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
227                                .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
228                            if (localUsername != null) {
229                                localQueueConnection = localQueueConnectionFactory
230                                    .createQueueConnection(localUsername, localPassword);
231                            } else {
232                                localQueueConnection = localQueueConnectionFactory.createQueueConnection();
233                            }
234                        } else {
235                            throw new JMSException("Cannot create localConnection - no information");
236                        }
237                    } else {
238                        localQueueConnection = embeddedConnectionFactory.createQueueConnection();
239                    }
240                } else {
241                    if (localUsername != null) {
242                        localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
243                                                                                                 localPassword);
244                    } else {
245                        localQueueConnection = localQueueConnectionFactory.createQueueConnection();
246                    }
247                }
248            }
249            if (localClientId != null && localClientId.length() > 0) {
250                localQueueConnection.setClientID(getLocalClientId());
251            }
252            localQueueConnection.start();
253        }
254    
255        protected void initializeInboundJmsMessageConvertor() {
256            inboundMessageConvertor.setConnection(localQueueConnection);
257        }
258    
259        protected void initializeOutboundJmsMessageConvertor() {
260            outboundMessageConvertor.setConnection(outboundQueueConnection);
261        }
262    
263        protected void initializeInboundQueueBridges() throws JMSException {
264            if (inboundQueueBridges != null) {
265                QueueSession outboundSession = outboundQueueConnection
266                    .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
267                QueueSession localSession = localQueueConnection.createQueueSession(false,
268                                                                                    Session.AUTO_ACKNOWLEDGE);
269                for (int i = 0; i < inboundQueueBridges.length; i++) {
270                    InboundQueueBridge bridge = inboundQueueBridges[i];
271                    String localQueueName = bridge.getLocalQueueName();
272                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
273                    String queueName = bridge.getInboundQueueName();
274                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
275                    bridge.setConsumerQueue(foreignQueue);
276                    bridge.setProducerQueue(activemqQueue);
277                    bridge.setProducerConnection(localQueueConnection);
278                    bridge.setConsumerConnection(outboundQueueConnection);
279                    if (bridge.getJmsMessageConvertor() == null) {
280                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
281                    }
282                    bridge.setJmsConnector(this);
283                    addInboundBridge(bridge);
284                }
285                outboundSession.close();
286                localSession.close();
287            }
288        }
289    
290        protected void initializeOutboundQueueBridges() throws JMSException {
291            if (outboundQueueBridges != null) {
292                QueueSession outboundSession = outboundQueueConnection
293                    .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
294                QueueSession localSession = localQueueConnection.createQueueSession(false,
295                                                                                    Session.AUTO_ACKNOWLEDGE);
296                for (int i = 0; i < outboundQueueBridges.length; i++) {
297                    OutboundQueueBridge bridge = outboundQueueBridges[i];
298                    String localQueueName = bridge.getLocalQueueName();
299                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
300                    String queueName = bridge.getOutboundQueueName();
301                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
302                    bridge.setConsumerQueue(activemqQueue);
303                    bridge.setProducerQueue(foreignQueue);
304                    bridge.setProducerConnection(outboundQueueConnection);
305                    bridge.setConsumerConnection(localQueueConnection);
306                    if (bridge.getJmsMessageConvertor() == null) {
307                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
308                    }
309                    bridge.setJmsConnector(this);
310                    addOutboundBridge(bridge);
311                }
312                outboundSession.close();
313                localSession.close();
314            }
315        }
316    
317        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
318                                                  Connection replyToConsumerConnection) {
319            Queue replyToProducerQueue = (Queue)destination;
320            boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
321    
322            if (isInbound) {
323                InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
324                if (bridge == null) {
325                    bridge = new InboundQueueBridge() {
326                        protected Destination processReplyToDestination(Destination destination) {
327                            return null;
328                        }
329                    };
330                    try {
331                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
332                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
333                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
334                        replyToConsumerSession.close();
335                        bridge.setConsumerQueue(replyToConsumerQueue);
336                        bridge.setProducerQueue(replyToProducerQueue);
337                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
338                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
339                        bridge.setDoHandleReplyTo(false);
340                        if (bridge.getJmsMessageConvertor() == null) {
341                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
342                        }
343                        bridge.setJmsConnector(this);
344                        bridge.start();
345                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
346                    } catch (Exception e) {
347                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
348                        return null;
349                    }
350                    replyToBridges.put(replyToProducerQueue, bridge);
351                }
352                return bridge.getConsumerQueue();
353            } else {
354                OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
355                if (bridge == null) {
356                    bridge = new OutboundQueueBridge() {
357                        protected Destination processReplyToDestination(Destination destination) {
358                            return null;
359                        }
360                    };
361                    try {
362                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
363                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
364                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
365                        replyToConsumerSession.close();
366                        bridge.setConsumerQueue(replyToConsumerQueue);
367                        bridge.setProducerQueue(replyToProducerQueue);
368                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
369                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
370                        bridge.setDoHandleReplyTo(false);
371                        if (bridge.getJmsMessageConvertor() == null) {
372                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
373                        }
374                        bridge.setJmsConnector(this);
375                        bridge.start();
376                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
377                    } catch (Exception e) {
378                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
379                        return null;
380                    }
381                    replyToBridges.put(replyToProducerQueue, bridge);
382                }
383                return bridge.getConsumerQueue();
384            }
385        }
386    
387        protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
388            return session.createQueue(queueName);
389        }
390    
391        protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
392            Queue result = null;
393            try {
394                result = session.createQueue(queueName);
395            } catch (JMSException e) {
396                // look-up the Queue
397                try {
398                    result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
399                } catch (NamingException e1) {
400                    String errStr = "Failed to look-up Queue for name: " + queueName;
401                    LOG.error(errStr, e);
402                    JMSException jmsEx = new JMSException(errStr);
403                    jmsEx.setLinkedException(e1);
404                    throw jmsEx;
405                }
406            }
407            return result;
408        }
409    
410    }