001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.broker.impl;
019    
020    import org.activemq.broker.BrokerClient;
021    import org.activemq.broker.BrokerConnector;
022    import org.activemq.broker.BrokerContainer;
023    import org.activemq.io.WireFormat;
024    import org.activemq.message.ActiveMQMessage;
025    import org.activemq.message.ActiveMQXid;
026    import org.activemq.message.BrokerInfo;
027    import org.activemq.message.ConnectionInfo;
028    import org.activemq.message.ConsumerInfo;
029    import org.activemq.message.DurableUnsubscribe;
030    import org.activemq.message.MessageAck;
031    import org.activemq.message.ProducerInfo;
032    import org.activemq.message.SessionInfo;
033    import org.activemq.transport.TransportChannel;
034    import org.activemq.transport.TransportChannelListener;
035    import org.activemq.transport.TransportServerChannel;
036    import org.activemq.transport.TransportServerChannelProvider;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    import javax.jms.JMSException;
041    import javax.jms.JMSSecurityException;
042    import javax.transaction.xa.XAException;
043    import java.net.URI;
044    import java.net.URISyntaxException;
045    import java.util.Collections;
046    import java.util.HashMap;
047    import java.util.Map;
048    
049    /**
050     * An implementation of the broker (the JMS server)
051     *
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
055        
056        private TransportServerChannel serverChannel;
057        private Log log;
058        private BrokerContainer container;
059        private Map clients = Collections.synchronizedMap(new HashMap());
060    
061        /**
062         * Helper constructor for TCP protocol with the given bind address
063         *
064         * @param container
065         * @param bindAddress
066         * @throws JMSException
067         */
068        public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
069            this(container, createTransportServerChannel(wireFormat, bindAddress));
070        }
071    
072        /**
073         * @param container
074         * @param serverChannel
075         */
076        public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
077            this(container);
078            this.serverChannel = serverChannel;
079            serverChannel.setTransportChannelListener(this);
080        }
081        
082        /**
083        * @param container
084        * @param serverChannel
085        */
086       public BrokerConnectorImpl(BrokerContainer container) {
087           assert container != null;
088           this.log = LogFactory.getLog(getClass().getName());
089           this.container = container;
090           this.container.addConnector(this);
091           
092       }
093    
094        /**
095         * @return infomation about the Broker
096         */
097        public BrokerInfo getBrokerInfo() {
098            return container.getBroker().getBrokerInfo();
099        }
100    
101        /**
102         * Get a hint about the broker capacity for more messages
103         *
104         * @return percentage value (0-100) about how much capacity the
105         *         broker has
106         */
107        public int getBrokerCapacity() {
108            return container.getBroker().getRoundedCapacity();
109        }
110    
111        /**
112         * @return Get the server channel
113         */
114        public TransportServerChannel getServerChannel() {
115            return serverChannel;
116        }
117    
118        /**
119         * start the Broker
120         *
121         * @throws JMSException
122         */
123        public void start() throws JMSException {
124            if (this.serverChannel != null){
125                this.serverChannel.start();
126            }
127            log.info("ActiveMQ connector started: " + serverChannel);
128        }
129    
130        /**
131         * Stop the Broker
132         *
133         * @throws JMSException
134         */
135        public void stop() throws JMSException {
136            this.container.removeConnector(this);
137            if (this.serverChannel != null){
138                this.serverChannel.stop();
139            }
140            log.info("ActiveMQ connector stopped: " + serverChannel);
141        }
142    
143        /**
144         * Register a Broker Client
145         *
146         * @param client
147         * @param info   contains infomation about the Connection this Client represents
148         * @throws JMSException
149         * @throws javax.jms.InvalidClientIDException
150         *                              if the JMS client specifies an invalid or duplicate client ID.
151         * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
152         */
153        public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
154            this.container.registerConnection(client, info);
155        }
156    
157        /**
158         * Deregister a Broker Client
159         *
160         * @param client
161         * @param info
162         * @throws JMSException if some internal error occurs
163         */
164        public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
165            this.container.deregisterConnection(client, info);
166        }
167    
168        /**
169         * Registers a MessageConsumer
170         *
171         * @param client
172         * @param info
173         * @throws JMSException
174         * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
175         */
176        public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
177            if (info.getDestination() == null) {
178                throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
179            }
180            this.container.registerMessageConsumer(client, info);
181    
182        }
183    
184        /**
185         * De-register a MessageConsumer from the Broker
186         *
187         * @param client
188         * @param info
189         * @throws JMSException
190         */
191        public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
192            this.container.deregisterMessageConsumer(client, info);
193        }
194    
195        /**
196         * Registers a MessageProducer
197         *
198         * @param client
199         * @param info
200         * @throws JMSException
201         * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
202         */
203        public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
204            this.container.registerMessageProducer(client, info);
205        }
206    
207        /**
208         * De-register a MessageProducer from the Broker
209         *
210         * @param client
211         * @param info
212         * @throws JMSException
213         */
214        public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
215            this.container.deregisterMessageProducer(client, info);
216        }
217    
218        /**
219         * Register a client-side Session (used for Monitoring)
220         *
221         * @param client
222         * @param info
223         * @throws JMSException
224         */
225        public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
226            this.container.registerSession(client, info);
227        }
228    
229        /**
230         * De-register a client-side Session from the Broker (used for monitoring)
231         *
232         * @param client
233         * @param info
234         * @throws JMSException
235         */
236        public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
237            this.container.deregisterSession(client, info);
238        }
239    
240        /**
241         * Start a transaction from the Client session
242         *
243         * @param client
244         * @param transactionId
245         * @throws JMSException
246         */
247        public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
248            this.container.startTransaction(client, transactionId);
249        }
250    
251        /**
252         * Rollback a transacton
253         *
254         * @param client
255         * @param transactionId
256         * @throws JMSException
257         */
258        public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
259            this.container.rollbackTransaction(client, transactionId);
260        }
261    
262        /**
263         * Commit a transaction
264         *
265         * @param client
266         * @param transactionId
267         * @throws JMSException
268         */
269        public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
270            this.container.commitTransaction(client, transactionId);
271        }
272    
273        /**
274         * Send a non-transacted message to the Broker
275         *
276         * @param client
277         * @param message
278         * @throws JMSException
279         */
280        public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
281            this.container.sendMessage(client, message);
282        }
283    
284        /**
285         * Acknowledge reciept of a message
286         *
287         * @param client
288         * @param ack
289         * @throws JMSException
290         */
291        public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
292            this.container.acknowledgeMessage(client, ack);
293        }
294    
295        /**
296         * Command to delete a durable topic subscription
297         *
298         * @param client
299         * @param ds
300         * @throws JMSException
301         */
302        public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
303            this.container.durableUnsubscribe(client, ds);
304        }
305    
306    
307        /**
308         * @param channel - client to add
309         */
310        public void addClient(TransportChannel channel) {
311            try {
312                BrokerClient client = new BrokerClientImpl();
313                client.initialize(this, channel);
314                if (log.isDebugEnabled()) {
315                    log.debug("Starting new client: " + client);
316                }
317                channel.setServerSide(true);
318                channel.start();
319                clients.put(channel, client);
320            }
321            catch (JMSException e) {
322                log.error("Failed to add client due to: " + e, e);
323            }
324        }
325    
326        /**
327         * @param channel - client to remove
328         */
329        public void removeClient(TransportChannel channel) {
330            BrokerClient client = (BrokerClient) clients.remove(channel);
331            if (client != null) {
332                if (log.isDebugEnabled()) {
333                    log.debug("Client leaving client: " + client);
334                }
335    
336                // we may have already been closed, if not then lets simulate a normal shutdown
337                client.cleanUp();
338            }
339            else {
340                // might have got a duplicate callback
341                log.warn("No such client for channel: " + channel);
342            }
343        }
344    
345        /**
346         * @return the BrokerContainer for this Connector
347         */
348        public BrokerContainer getBrokerContainer() {
349            return this.container;
350        }
351    
352        /**
353         * Start an XA transaction.
354         *
355         * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
356         */
357        public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
358            this.container.startTransaction(client, xid);
359        }
360    
361        /**
362         * Gets the prepared XA transactions.
363         *
364         * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient)
365         */
366        public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
367            return this.container.getPreparedTransactions(client);
368        }
369    
370        /**
371         * Prepare an XA transaction.
372         *
373         * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
374         */
375        public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
376            return this.container.prepareTransaction(client, xid);
377        }
378    
379        /**
380         * Rollback an XA transaction.
381         *
382         * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
383         */
384        public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
385            this.container.rollbackTransaction(client, xid);
386        }
387    
388        /**
389         * Commit an XA transaction.
390         *
391         * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
392         */
393        public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
394            this.container.commitTransaction(client, xid, onePhase);
395        }
396    
397        /**
398         * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient)
399         */
400        public String getResourceManagerId(BrokerClient client) {
401            // TODO: I think we need to return a better (more unique) RM id.
402            return getBrokerInfo().getBrokerName();
403        }
404    
405    
406        // Implementation methods
407        //-------------------------------------------------------------------------
408        /**
409         * Factory method ot create a transport channel
410         *
411         * @param bindAddress
412         * @return @throws JMSException
413         * @throws JMSException
414         */
415        protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
416            URI url;
417            try {
418                url = new URI(bindAddress);
419            }
420            catch (URISyntaxException e) {
421                JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
422                jmsEx.setLinkedException(e);
423                throw jmsEx;
424            }
425            return TransportServerChannelProvider.create(wireFormat, url);
426        }
427    
428    }