001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import javax.jms.JMSException;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.activemq.broker.BrokerContainer;
026    import org.activemq.broker.impl.BrokerConnectorImpl;
027    import org.activemq.io.impl.DefaultWireFormat;
028    import org.activemq.message.BrokerInfo;
029    import org.activemq.transport.composite.CompositeTransportChannel;
030    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
031    
032    /**
033     * Represents a Boondocks broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
034     * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
035     * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
036     * broker.
037     * 
038     * @version $Revision: 1.1.1.1 $
039     */
040    public class RemoteNetworkChannel extends NetworkChannel implements TransportStatusEventListener  {
041        private static final Log log = LogFactory.getLog(RemoteNetworkChannel.class);
042        private TransportChannel boondocksChannel;
043       
044        /**
045         * Default Constructor
046         * 
047         * @param tp
048         */
049        public RemoteNetworkChannel(PooledExecutor tp) {
050            super(tp);
051        }
052    
053        /**
054         * Constructor
055         * 
056         * @param connector
057         * @param brokerContainer
058         * @param uri
059         */
060        public RemoteNetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
061            super(connector,brokerContainer,uri);
062        }
063    
064        
065        /**
066         * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
067         */
068        public void statusChanged(TransportStatusEvent event) {
069            if (event.getTransportChannel() == boondocksChannel) {
070                if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
071                    try {
072                        sendBrokerInfo();
073                    }
074                    catch (JMSException e) {
075                        log.error("Failed to send Broker Info", e);
076                    }
077                }
078            }
079            else {
080                super.statusChanged(event);
081            }
082        }
083    
084        
085        /**
086         * remote:// can only make outgoing connections - we assume we can't
087         * accept incomming (duck!). So we initialize the transport channel
088         * from this side and create the broker client as well
089         * @throws JMSException
090         */
091        
092        protected void initialize() throws JMSException {
093            super.initialize();
094            try {
095                boondocksChannel = TransportChannelProvider.create(new DefaultWireFormat(), new URI(uri));
096                boondocksChannel.addTransportStatusEventListener(this);
097                if (boondocksChannel instanceof CompositeTransportChannel) {
098                    CompositeTransportChannel composite = (CompositeTransportChannel)boondocksChannel;
099                    composite.setMaximumRetries(maximumRetries);
100                    composite.setFailureSleepTime(reconnectSleepTime);
101                    composite.setIncrementTimeout(false);
102                }
103                boondocksChannel.start();
104                //create our own broker connector ...
105                BrokerConnectorImpl connector = new BrokerConnectorImpl(getBrokerContainer(),"vm://uri",new DefaultWireFormat());
106                connector.start();
107                connector.addClient(boondocksChannel);
108                sendBrokerInfo();
109            }
110            catch (URISyntaxException e) {
111             log.error("Could not parse uri: " + uri + " to make remote connector",e);
112            }
113        }
114        
115        private void sendBrokerInfo() throws JMSException{
116            //inform the other side we are a remote channel
117            if (boondocksChannel != null) {
118                BrokerInfo info = new BrokerInfo();
119                info.setBrokerName(brokerContainer.getBroker().getBrokerName());
120                info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
121                info.setRemote(true);
122                boondocksChannel.asyncSend(info);
123            }
124        }
125    
126        
127    }