001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.Connection;
026    import javax.jms.ConnectionConsumer;
027    import javax.jms.ConnectionMetaData;
028    import javax.jms.DeliveryMode;
029    import javax.jms.Destination;
030    import javax.jms.ExceptionListener;
031    import javax.jms.IllegalStateException;
032    import javax.jms.JMSException;
033    import javax.jms.Queue;
034    import javax.jms.QueueConnection;
035    import javax.jms.QueueSession;
036    import javax.jms.ServerSessionPool;
037    import javax.jms.Session;
038    import javax.jms.Topic;
039    import javax.jms.TopicConnection;
040    import javax.jms.TopicSession;
041    import javax.jms.XAConnection;
042    
043    import org.activemq.advisories.TempDestinationAdvisor;
044    import org.activemq.advisories.TempDestinationAdvisoryEvent;
045    import org.activemq.capacity.CapacityMonitorEvent;
046    import org.activemq.capacity.CapacityMonitorEventListener;
047    import org.activemq.filter.AndFilter;
048    import org.activemq.filter.Filter;
049    import org.activemq.filter.FilterFactory;
050    import org.activemq.filter.FilterFactoryImpl;
051    import org.activemq.filter.NoLocalFilter;
052    import org.activemq.io.util.ByteArray;
053    import org.activemq.io.util.ByteArrayCompression;
054    import org.activemq.io.util.ByteArrayFragmentation;
055    import org.activemq.io.util.MemoryBoundedObjectManager;
056    import org.activemq.io.util.MemoryBoundedQueue;
057    import org.activemq.io.util.MemoryBoundedQueueManager;
058    import org.activemq.management.JMSConnectionStatsImpl;
059    import org.activemq.management.JMSStatsImpl;
060    import org.activemq.management.StatsCapable;
061    import org.activemq.management.StatsImpl;
062    import org.activemq.message.ActiveMQDestination;
063    import org.activemq.message.ActiveMQMessage;
064    import org.activemq.message.ActiveMQObjectMessage;
065    import org.activemq.message.BrokerAdminCommand;
066    import org.activemq.message.CapacityInfo;
067    import org.activemq.message.CleanupConnectionInfo;
068    import org.activemq.message.ConnectionInfo;
069    import org.activemq.message.ConsumerInfo;
070    import org.activemq.message.Packet;
071    import org.activemq.message.PacketListener;
072    import org.activemq.message.ProducerInfo;
073    import org.activemq.message.Receipt;
074    import org.activemq.message.ResponseReceipt;
075    import org.activemq.message.SessionInfo;
076    import org.activemq.message.TransactionInfo;
077    import org.activemq.message.WireFormatInfo;
078    import org.activemq.message.XATransactionInfo;
079    import org.activemq.transport.TransportChannel;
080    import org.activemq.transport.TransportStatusEvent;
081    import org.activemq.transport.TransportStatusEventListener;
082    import org.activemq.util.IdGenerator;
083    import org.activemq.util.JMSExceptionHelper;
084    import org.apache.commons.logging.Log;
085    import org.apache.commons.logging.LogFactory;
086    
087    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
088    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
089    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
090    import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
091    
092    /**
093     * A <CODE>Connection</CODE> object is a client's active connection to its JMS
094     * provider. It typically allocates provider resources outside the Java virtual
095     * machine (JVM).
096     * <P>
097     * Connections support concurrent use.
098     * <P>
099     * A connection serves several purposes:
100     * <UL>
101     * <LI>It encapsulates an open connection with a JMS provider. It typically
102     * represents an open TCP/IP socket between a client and the service provider
103     * software.
104     * <LI>Its creation is where client authentication takes place.
105     * <LI>It can specify a unique client identifier.
106     * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
107     * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
108     * </UL>
109     * <P>
110     * Because the creation of a connection involves setting up authentication and
111     * communication, a connection is a relatively heavyweight object. Most clients
112     * will do all their messaging with a single connection. Other more advanced
113     * applications may use several connections. The JMS API does not architect a
114     * reason for using multiple connections; however, there may be operational
115     * reasons for doing so.
116     * <P>
117     * A JMS client typically creates a connection, one or more sessions, and a
118     * number of message producers and consumers. When a connection is created, it
119     * is in stopped mode. That means that no messages are being delivered.
120     * <P>
121     * It is typical to leave the connection in stopped mode until setup is complete
122     * (that is, until all message consumers have been created). At that point, the
123     * client calls the connection's <CODE>start</CODE> method, and messages begin
124     * arriving at the connection's consumers. This setup convention minimizes any
125     * client confusion that may result from asynchronous message delivery while the
126     * client is still in the process of setting itself up.
127     * <P>
128     * A connection can be started immediately, and the setup can be done
129     * afterwards. Clients that do this must be prepared to handle asynchronous
130     * message delivery while they are still in the process of setting up.
131     * <P>
132     * A message producer can send messages while a connection is stopped. <p/>This
133     * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
134     * object is an active connection to a publish/subscribe JMS provider. A client
135     * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
136     * objects for producing and consuming messages.
137     * <P>
138     * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
139     * from which specialized topic-related objects can be created. A more general,
140     * and recommended approach is to use the <CODE>Connection </CODE> object.
141     * <P>
142     * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
143     * object is an active connection to a point-to-point JMS provider. A client
144     * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
145     * objects for producing and consuming messages.
146     * <P>
147     * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
148     * from which specialized queue-related objects can be created. A more general,
149     * and recommended, approach is to use the <CODE>Connection </CODE> object.
150     * <P>
151     * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
152     * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
153     * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
154     * if used from <CODE>QueueConnection</CODE>. // *
155     * 
156     * @version $Revision: 1.1.1.1 $
157     * @see javax.jms.Connection
158     * @see javax.jms.ConnectionFactory
159     * @see javax.jms.QueueConnection
160     * @see javax.jms.TopicConnection
161     * @see javax.jms.TopicConnectionFactory
162     * @see javax.jms.QueueConnection
163     * @see javax.jms.QueueConnectionFactory
164     */
165    public class ActiveMQConnection implements Connection, PacketListener,
166                    ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
167                    CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
168    
169            /**
170             * Default UserName for the Connection
171             */
172            public static final String DEFAULT_USER = "defaultUser";
173    
174            /**
175             * Default URL for the ActiveMQ Broker
176             */
177            public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
178    
179            /**
180             * Default client URL. If using a message broker in a hub(s)/spoke
181             * architecture - use the DEFAULT_BROKER_URL
182             * 
183             * @see ActiveMQConnection#DEFAULT_BROKER_URL
184             */
185            public static final String DEFAULT_URL = "peer://development";
186    
187            /**
188             * Default Password for the Connection
189             */
190            public static final String DEFAULT_PASSWORD = "defaultPassword";
191    
192            private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
193    
194            private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
195    
196            // properties
197            private ActiveMQConnectionFactory factory;
198    
199            private String userName;
200    
201            private String password;
202    
203            protected String clientID;
204    
205            private int sendCloseTimeout = 2000;
206    
207            private TransportChannel transportChannel;
208    
209            private ExceptionListener exceptionListener;
210    
211            private ActiveMQPrefetchPolicy prefetchPolicy;
212    
213            private JMSStatsImpl factoryStats;
214    
215            private MemoryBoundedObjectManager memoryManager;
216    
217            private MemoryBoundedQueueManager boundedQueueManager;
218    
219            protected IdGenerator handleIdGenerator;
220    
221            private IdGenerator clientIdGenerator;
222    
223            protected IdGenerator packetIdGenerator;
224    
225            private IdGenerator sessionIdGenerator;
226    
227            private JMSConnectionStatsImpl stats;
228    
229            // internal state
230            private CopyOnWriteArrayList sessions;
231    
232            private CopyOnWriteArrayList messageDispatchers;
233    
234            private CopyOnWriteArrayList connectionConsumers;
235    
236            private SynchronizedInt consumerNumberGenerator;
237    
238            private ActiveMQConnectionMetaData connectionMetaData;
239    
240            private boolean closed;
241    
242            private SynchronizedBoolean started;
243    
244            private boolean clientIDSet;
245    
246            private boolean isConnectionInfoSentToBroker;
247    
248            private boolean isTransportOK;
249    
250            private boolean startedTransport;
251    
252            private long startTime;
253    
254            private long flowControlSleepTime = 0;
255            private Object flowControlMutex = new Object();
256    
257            private boolean quickClose;
258    
259            private boolean internalConnection;// used for notifying that the
260                                                                                    // connection is used for networks etc.
261    
262            private boolean userSpecifiedClientID;
263    
264            /**
265             * Should we use an async send for persistent non transacted messages ?
266             */
267            protected boolean useAsyncSend = true;
268    
269            private int sendConnectionInfoTimeout = 30000;
270    
271            private boolean disableTimeStampsByDefault = false;
272    
273            private boolean J2EEcompliant = true;
274    
275            private boolean prepareMessageBodyOnSend = true;
276    
277            private boolean copyMessageOnSend = true;
278    
279            // compression and fragmentation variables
280    
281            private boolean doMessageCompression = true;
282    
283            private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
284                                                                                                                                                                                            // size
285                                                                                                                                                                                            // above
286                                                                                                                                                                                            // which
287                                                                                                                                                                                            // compression
288                                                                                                                                                                                            // will
289                                                                                                                                                                                            // be
290                                                                                                                                                                                            // used
291    
292            private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
293    
294            private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
295                                                                                                                                                                                                    // compression
296                                                                                                                                                                                                    // strategy
297    
298            private boolean doMessageFragmentation = false;
299    
300            private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
301    
302            private boolean cachingEnabled = true;
303    
304            private boolean optimizedMessageDispatch = false;
305    
306            private CopyOnWriteArrayList transientConsumedRedeliverCache;
307    
308            private FilterFactory filterFactory;
309    
310            private Map tempDestinationMap;
311    
312            private Map validDestinationsMap;
313    
314            private String resourceManagerId;
315        //used for assembling message fragments
316        private final ConcurrentHashMap assemblies= new ConcurrentHashMap();
317        private final ByteArrayFragmentation fragmentation = new ByteArrayFragmentation();
318    
319            /**
320             * A static helper method to create a new connection
321             * 
322             * @return an ActiveMQConnection
323             * @throws JMSException
324             */
325            public static ActiveMQConnection makeConnection() throws JMSException {
326                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
327                    return (ActiveMQConnection) factory.createConnection();
328            }
329    
330            /**
331             * A static helper method to create a new connection
332             * 
333             * @param uri
334             * @return and ActiveMQConnection
335             * @throws JMSException
336             */
337            public static ActiveMQConnection makeConnection(String uri)
338                            throws JMSException {
339                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
340                    return (ActiveMQConnection) factory.createConnection();
341            }
342    
343            /**
344             * A static helper method to create a new connection
345             * 
346             * @param user
347             * @param password
348             * @param uri
349             * @return an ActiveMQConnection
350             * @throws JMSException
351             */
352            public static ActiveMQConnection makeConnection(String user,
353                            String password, String uri) throws JMSException {
354                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
355                                    password, uri);
356                    return (ActiveMQConnection) factory.createConnection();
357            }
358    
359            /**
360             * Constructs a connection from an existing TransportChannel and
361             * user/password.
362             * 
363             * @param factory
364             * @param theUserName
365             *            the users name
366             * @param thePassword
367             *            the password
368             * @param transportChannel
369             *            the transport channel to communicate with the server
370             * @throws JMSException
371             */
372            public ActiveMQConnection(ActiveMQConnectionFactory factory,
373                            String theUserName, String thePassword,
374                            TransportChannel transportChannel) throws JMSException {
375                    this(factory, theUserName, thePassword);
376                    this.transportChannel = transportChannel;
377                    this.transportChannel.setPacketListener(this);
378                    this.transportChannel.setExceptionListener(this);
379                    this.transportChannel.addTransportStatusEventListener(this);
380                    this.isTransportOK = true;
381            }
382    
383            protected ActiveMQConnection(ActiveMQConnectionFactory factory,
384                            String theUserName, String thePassword) {
385                    this.factory = factory;
386                    this.userName = theUserName;
387                    this.password = thePassword;
388                    this.clientIdGenerator = new IdGenerator();
389                    this.packetIdGenerator = new IdGenerator();
390                    this.handleIdGenerator = new IdGenerator();
391                    this.sessionIdGenerator = new IdGenerator();
392                    this.consumerNumberGenerator = new SynchronizedInt(0);
393                    this.sessions = new CopyOnWriteArrayList();
394                    this.messageDispatchers = new CopyOnWriteArrayList();
395                    this.connectionConsumers = new CopyOnWriteArrayList();
396                    this.connectionMetaData = new ActiveMQConnectionMetaData();
397                    this.started = new SynchronizedBoolean(false);
398                    this.startTime = System.currentTimeMillis();
399                    this.prefetchPolicy = new ActiveMQPrefetchPolicy();
400                    this.memoryManager = new MemoryBoundedObjectManager(clientID,
401                                    DEFAULT_CONNECTION_MEMORY_LIMIT);
402                    this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
403                    this.memoryManager.addCapacityEventListener(this);
404                    boolean transactional = this instanceof XAConnection;
405                    factoryStats = factory.getFactoryStats();
406                    factoryStats.addConnection(this);
407                    stats = new JMSConnectionStatsImpl(sessions, transactional);
408                    this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
409                    this.tempDestinationMap = new ConcurrentHashMap();
410                    this.validDestinationsMap = new ConcurrentHashMap();
411                    factory.onConnectionCreate(this);
412            }
413    
414            /**
415             * @return statistics for this Connection
416             */
417            public StatsImpl getStats() {
418                    return stats;
419            }
420    
421            /**
422             * @return a number unique for this connection
423             */
424            public JMSConnectionStatsImpl getConnectionStats() {
425                    return stats;
426            }
427    
428            /**
429             * Creates a <CODE>Session</CODE> object.
430             * 
431             * @param transacted
432             *            indicates whether the session is transacted
433             * @param acknowledgeMode
434             *            indicates whether the consumer or the client will acknowledge
435             *            any messages it receives; ignored if the session is
436             *            transacted. Legal values are
437             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
438             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
439             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
440             * @return a newly created session
441             * @throws JMSException
442             *             if the <CODE>Connection</CODE> object fails to create a
443             *             session due to some internal error or lack of support for the
444             *             specific transaction and acknowledgement mode.
445             * @see Session#AUTO_ACKNOWLEDGE
446             * @see Session#CLIENT_ACKNOWLEDGE
447             * @see Session#DUPS_OK_ACKNOWLEDGE
448             * @since 1.1
449             */
450            public Session createSession(boolean transacted, int acknowledgeMode)
451                            throws JMSException {
452                    checkClosed();
453                    sendConnectionInfoToBroker();
454                    return new ActiveMQSession(
455                                    this,
456                                    (transacted ? Session.SESSION_TRANSACTED
457                                                    : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
458                                                                    : acknowledgeMode)));
459            }
460    
461            /**
462             * Creates a <CODE>Session</CODE> object.
463             * 
464             * @param transacted
465             *            indicates whether the session is transacted
466             * @param acknowledgeMode
467             *            indicates whether the consumer or the client will acknowledge
468             *            any messages it receives; ignored if the session is
469             *            transacted. Legal values are
470             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
471             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
472             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
473             * @param optimizedDispatch
474             * @return a newly created session
475             * @throws JMSException
476             *             if the <CODE>Connection</CODE> object fails to create a
477             *             session due to some internal error or lack of support for the
478             *             specific transaction and acknowledgement mode.
479             * @see Session#AUTO_ACKNOWLEDGE
480             * @see Session#CLIENT_ACKNOWLEDGE
481             * @see Session#DUPS_OK_ACKNOWLEDGE
482             * @since 1.1
483             */
484            public Session createSession(boolean transacted, int acknowledgeMode,
485                            boolean optimizedDispatch) throws JMSException {
486                    checkClosed();
487                    sendConnectionInfoToBroker();
488                    return new ActiveMQSession(this,
489                                    (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
490                                    optimizedDispatch);
491            }
492    
493            /**
494             * Gets the client identifier for this connection.
495             * <P>
496             * This value is specific to the JMS provider. It is either preconfigured by
497             * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
498             * dynamically by the application by calling the <code>setClientID</code>
499             * method.
500             * 
501             * @return the unique client identifier
502             * @throws JMSException
503             *             if the JMS provider fails to return the client ID for this
504             *             connection due to some internal error.
505             */
506            public String getClientID() throws JMSException {
507                    checkClosed();
508                    return this.clientID;
509            }
510    
511            /**
512             * Sets the client identifier for this connection.
513             * <P>
514             * The preferred way to assign a JMS client's client identifier is for it to
515             * be configured in a client-specific <CODE>ConnectionFactory</CODE>
516             * object and transparently assigned to the <CODE>Connection</CODE> object
517             * it creates.
518             * <P>
519             * Alternatively, a client can set a connection's client identifier using a
520             * provider-specific value. The facility to set a connection's client
521             * identifier explicitly is not a mechanism for overriding the identifier
522             * that has been administratively configured. It is provided for the case
523             * where no administratively specified identifier exists. If one does exist,
524             * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
525             * If a client sets the client identifier explicitly, it must do so
526             * immediately after it creates the connection and before any other action
527             * on the connection is taken. After this point, setting the client
528             * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
529             * <P>
530             * The purpose of the client identifier is to associate a connection and its
531             * objects with a state maintained on behalf of the client by a provider.
532             * The only such state identified by the JMS API is that required to support
533             * durable subscriptions.
534             * <P>
535             * If another connection with the same <code>clientID</code> is already
536             * running when this method is called, the JMS provider should detect the
537             * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
538             * 
539             * @param newClientID
540             *            the unique client identifier
541             * @throws JMSException
542             *             if the JMS provider fails to set the client ID for this
543             *             connection due to some internal error.
544             * @throws javax.jms.InvalidClientIDException
545             *             if the JMS client specifies an invalid or duplicate client
546             *             ID.
547             * @throws javax.jms.IllegalStateException
548             *             if the JMS client attempts to set a connection's client ID at
549             *             the wrong time or when it has been administratively
550             *             configured.
551             */
552            public void setClientID(String newClientID) throws JMSException {
553                    if (this.clientIDSet) {
554                            throw new IllegalStateException("The clientID has already been set");
555                    }
556                    if (this.isConnectionInfoSentToBroker) {
557                            throw new IllegalStateException(
558                                            "Setting clientID on a used Connection is not allowed");
559                    }
560                    checkClosed();
561                    this.clientID = newClientID;
562                    this.userSpecifiedClientID = true;
563                    ensureClientIDInitialised();
564            }
565    
566            /**
567             * Gets the metadata for this connection.
568             * 
569             * @return the connection metadata
570             * @throws JMSException
571             *             if the JMS provider fails to get the connection metadata for
572             *             this connection.
573             * @see javax.jms.ConnectionMetaData
574             */
575            public ConnectionMetaData getMetaData() throws JMSException {
576                    checkClosed();
577                    return this.connectionMetaData;
578            }
579    
580            /**
581             * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
582             * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
583             * associated with it.
584             * 
585             * @return the <CODE>ExceptionListener</CODE> for this connection, or
586             *         null. if no <CODE>ExceptionListener</CODE> is associated with
587             *         this connection.
588             * @throws JMSException
589             *             if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
590             *             for this connection.
591             * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
592             */
593            public ExceptionListener getExceptionListener() throws JMSException {
594                    checkClosed();
595                    return this.exceptionListener;
596            }
597    
598            /**
599             * Sets an exception listener for this connection.
600             * <P>
601             * If a JMS provider detects a serious problem with a connection, it informs
602             * the connection's <CODE> ExceptionListener</CODE>, if one has been
603             * registered. It does this by calling the listener's <CODE>onException
604             * </CODE> method, passing it a <CODE>JMSException</CODE> object
605             * describing the problem.
606             * <P>
607             * An exception listener allows a client to be notified of a problem
608             * asynchronously. Some connections only consume messages, so they would
609             * have no other way to learn their connection has failed.
610             * <P>
611             * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
612             * <P>
613             * A JMS provider should attempt to resolve connection problems itself
614             * before it notifies the client of them.
615             * 
616             * @param listener
617             *            the exception listener
618             * @throws JMSException
619             *             if the JMS provider fails to set the exception listener for
620             *             this connection.
621             */
622            public void setExceptionListener(ExceptionListener listener)
623                            throws JMSException {
624                    checkClosed();
625                    this.exceptionListener = listener;
626                    this.transportChannel.setExceptionListener(listener);
627            }
628    
629            /**
630             * Starts (or restarts) a connection's delivery of incoming messages. A call
631             * to <CODE>start</CODE> on a connection that has already been started is
632             * ignored.
633             * 
634             * @throws JMSException
635             *             if the JMS provider fails to start message delivery due to
636             *             some internal error.
637             * @see javax.jms.Connection#stop()
638             */
639            public void start() throws JMSException {
640                    checkClosed();
641                    if (started.commit(false, true)) {
642                            // We have a change in connection info to send.
643                            // send the Connection info again
644                            sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
645                            for (Iterator i = sessions.iterator(); i.hasNext();) {
646                                    ActiveMQSession s = (ActiveMQSession) i.next();
647                                    s.start();
648                            }
649                    }
650            }
651    
652            /**
653             * @return true if this Connection is started
654             */
655            protected boolean isStarted() {
656                    return started.get();
657            }
658    
659            /**
660             * Temporarily stops a connection's delivery of incoming messages. Delivery
661             * can be restarted using the connection's <CODE>start</CODE> method. When
662             * the connection is stopped, delivery to all the connection's message
663             * consumers is inhibited: synchronous receives block, and messages are not
664             * delivered to message listeners.
665             * <P>
666             * This call blocks until receives and/or message listeners in progress have
667             * completed.
668             * <P>
669             * Stopping a connection has no effect on its ability to send messages. A
670             * call to <CODE>stop</CODE> on a connection that has already been stopped
671             * is ignored.
672             * <P>
673             * A call to <CODE>stop</CODE> must not return until delivery of messages
674             * has paused. This means that a client can rely on the fact that none of
675             * its message listeners will be called and that all threads of control
676             * waiting for <CODE>receive</CODE> calls to return will not return with a
677             * message until the connection is restarted. The receive timers for a
678             * stopped connection continue to advance, so receives may time out while
679             * the connection is stopped.
680             * <P>
681             * If message listeners are running when <CODE>stop</CODE> is invoked, the
682             * <CODE>stop</CODE> call must wait until all of them have returned before
683             * it may return. While these message listeners are completing, they must
684             * have the full services of the connection available to them.
685             * 
686             * @throws JMSException
687             *             if the JMS provider fails to stop message delivery due to
688             *             some internal error.
689             * @see javax.jms.Connection#start()
690             */
691            public void stop() throws JMSException {
692                    checkClosed();
693                    if (started.commit(true, false)) {
694                            for (Iterator i = sessions.iterator(); i.hasNext();) {
695                                    ActiveMQSession s = (ActiveMQSession) i.next();
696                                    s.stop();
697                            }
698                            sendConnectionInfoToBroker(2000, true, false);
699                    }
700            }
701    
702            /**
703             * Closes the connection.
704             * <P>
705             * Since a provider typically allocates significant resources outside the
706             * JVM on behalf of a connection, clients should close these resources when
707             * they are not needed. Relying on garbage collection to eventually reclaim
708             * these resources may not be timely enough.
709             * <P>
710             * There is no need to close the sessions, producers, and consumers of a
711             * closed connection.
712             * <P>
713             * Closing a connection causes all temporary destinations to be deleted.
714             * <P>
715             * When this method is invoked, it should not return until message
716             * processing has been shut down in an orderly fashion. This means that all
717             * message listeners that may have been running have returned, and that all
718             * pending receives have returned. A close terminates all pending message
719             * receives on the connection's sessions' consumers. The receives may return
720             * with a message or with null, depending on whether there was a message
721             * available at the time of the close. If one or more of the connection's
722             * sessions' message listeners is processing a message at the time when
723             * connection <CODE>close</CODE> is invoked, all the facilities of the
724             * connection and its sessions must remain available to those listeners
725             * until they return control to the JMS provider.
726             * <P>
727             * Closing a connection causes any of its sessions' transactions in progress
728             * to be rolled back. In the case where a session's work is coordinated by
729             * an external transaction manager, a session's <CODE>commit</CODE> and
730             * <CODE> rollback</CODE> methods are not used and the result of a closed
731             * session's work is determined later by the transaction manager. Closing a
732             * connection does NOT force an acknowledgment of client-acknowledged
733             * sessions.
734             * <P>
735             * Invoking the <CODE>acknowledge</CODE> method of a received message from
736             * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
737             * Closing a closed connection must NOT throw an exception.
738             * 
739             * @throws JMSException
740             *             if the JMS provider fails to close the connection due to some
741             *             internal error. For example, a failure to release resources
742             *             or to close a socket connection can cause this exception to
743             *             be thrown.
744             */
745            public void close() throws JMSException {
746                    this.transportChannel.setPendingStop(true);
747                    synchronized (this) {
748                            if (!closed) {
749                                    memoryManager.removeCapacityEventListener(this);
750                                    try {
751                                            closeTemporaryDestinations();
752                                            for (Iterator i = this.sessions.iterator(); i.hasNext();) {
753                                                    ActiveMQSession s = (ActiveMQSession) i.next();
754                                                    s.close();
755                                            }
756                                            for (Iterator i = this.connectionConsumers.iterator(); i
757                                                            .hasNext();) {
758                                                    ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
759                                                                    .next();
760                                                    c.close();
761                                            }
762                                            try {
763                                                    sendConnectionInfoToBroker(sendCloseTimeout, true, true);
764                                            } catch (TimeoutExpiredException e) {
765                                                    log
766                                                                    .warn("Failed to send close to broker, timeout expired of: "
767                                                                                    + sendCloseTimeout + " millis");
768                                            }
769                                            this.connectionConsumers.clear();
770                                            this.messageDispatchers.clear();
771                                            this.transportChannel.stop();
772                                    } finally {
773                                            this.sessions.clear();
774                                            started.set(false);
775                                            factory.onConnectionClose(this);
776                                    }
777                                    closed = true;
778                                    transientConsumedRedeliverCache.clear();
779                                    validDestinationsMap.clear();
780                     factoryStats.removeConnection(this);
781                            }
782                    }
783    
784            }
785    
786        /**
787         * Tells the broker to terminate its VM.  This can be used to cleanly terminate a broker running in
788         * a standalone java process.  Server must have property enable.vm.shutdown=true defined
789         * to allow this to work.
790         */
791        public void terminateBrokerVM() throws JMSException {
792            BrokerAdminCommand command = new BrokerAdminCommand();
793            command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
794            asyncSendPacket(command);
795        }
796    
797            /**
798             * simply throws an exception if the Connection is already closed
799             * 
800             * @throws JMSException
801             */
802            protected synchronized void checkClosed() throws JMSException {
803                    if (!startedTransport) {
804                            startedTransport = true;
805                            this.transportChannel.setCachingEnabled(isCachingEnabled());
806                            if (useAsyncSend == false) {
807                                    this.transportChannel.setNoDelay(true);
808                            }
809    
810                            this.transportChannel.setUsedInternally(internalConnection);
811                            this.transportChannel.start();
812                            if (transportChannel.doesSupportWireFormatVersioning()) {
813                                    WireFormatInfo info = new WireFormatInfo();
814                                    info.setVersion(transportChannel.getCurrentWireFormatVersion());
815                                    this.asyncSendPacket(info);
816                            }
817                    }
818                    if (this.closed) {
819                            throw new ConnectionClosedException();
820                    }
821            }
822    
823            /**
824             * Creates a connection consumer for this connection (optional operation).
825             * This is an expert facility not used by regular JMS clients.
826             * 
827             * @param destination
828             *            the destination to access
829             * @param messageSelector
830             *            only messages with properties matching the message selector
831             *            expression are delivered. A value of null or an empty string
832             *            indicates that there is no message selector for the message
833             *            consumer.
834             * @param sessionPool
835             *            the server session pool to associate with this connection
836             *            consumer
837             * @param maxMessages
838             *            the maximum number of messages that can be assigned to a
839             *            server session at one time
840             * @return the connection consumer
841             * @throws JMSException
842             *             if the <CODE>Connection</CODE> object fails to create a
843             *             connection consumer due to some internal error or invalid
844             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
845             * @throws javax.jms.InvalidDestinationException
846             *             if an invalid destination is specified.
847             * @throws javax.jms.InvalidSelectorException
848             *             if the message selector is invalid.
849             * @see javax.jms.ConnectionConsumer
850             * @since 1.1
851             */
852            public ConnectionConsumer createConnectionConsumer(Destination destination,
853                            String messageSelector, ServerSessionPool sessionPool,
854                            int maxMessages) throws JMSException {
855                    checkClosed();
856                    ensureClientIDInitialised();
857                    ConsumerInfo info = new ConsumerInfo();
858                    info.setConsumerId(handleIdGenerator.generateId());
859                    info.setDestination(ActiveMQMessageTransformation
860                                    .transformDestination(destination));
861                    info.setSelector(messageSelector);
862                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
863             info.setClientId(clientID);
864                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
865                                    maxMessages);
866            }
867        
868        /**
869         * Creates a connection consumer for this connection (optional operation).
870         * This is an expert facility not used by regular JMS clients.
871         * 
872         * @param destination
873         *            the destination to access
874         * @param messageSelector
875         *            only messages with properties matching the message selector
876         *            expression are delivered. A value of null or an empty string
877         *            indicates that there is no message selector for the message
878         *            consumer.
879         * @param sessionPool
880         *            the server session pool to associate with this connection
881         *            consumer
882         * @param maxMessages
883         *            the maximum number of messages that can be assigned to a
884         *            server session at one time
885         * @param noLocal
886         *            set true if you want to filter out messages published locally
887         *           
888         * @return the connection consumer
889         * @throws JMSException
890         *             if the <CODE>Connection</CODE> object fails to create a
891         *             connection consumer due to some internal error or invalid
892         *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
893         * @throws javax.jms.InvalidDestinationException
894         *             if an invalid destination is specified.
895         * @throws javax.jms.InvalidSelectorException
896         *             if the message selector is invalid.
897         * @see javax.jms.ConnectionConsumer
898         * @since 1.1
899         */
900        public ConnectionConsumer createConnectionConsumer(Destination destination,
901                String messageSelector, ServerSessionPool sessionPool,
902                int maxMessages, boolean noLocal) throws JMSException {
903            
904            checkClosed();
905            ensureClientIDInitialised();
906            ConsumerInfo info = new ConsumerInfo();
907            info.setConsumerId(handleIdGenerator.generateId());
908            info.setDestination(ActiveMQMessageTransformation
909                    .transformDestination(destination));
910            info.setSelector(messageSelector);
911            info.setConsumerNo(handleIdGenerator.getNextShortSequence());
912            info.setNoLocal(noLocal);
913            info.setClientId(clientID);
914            return new ActiveMQConnectionConsumer(this, sessionPool, info,
915                    maxMessages);
916        }
917    
918    
919    
920            /**
921             * Create a durable connection consumer for this connection (optional
922             * operation). This is an expert facility not used by regular JMS clients.
923             * 
924             * @param topic
925             *            topic to access
926             * @param subscriptionName
927             *            durable subscription name
928             * @param messageSelector
929             *            only messages with properties matching the message selector
930             *            expression are delivered. A value of null or an empty string
931             *            indicates that there is no message selector for the message
932             *            consumer.
933             * @param sessionPool
934             *            the server session pool to associate with this durable
935             *            connection consumer
936             * @param maxMessages
937             *            the maximum number of messages that can be assigned to a
938             *            server session at one time
939             * @return the durable connection consumer
940             * @throws JMSException
941             *             if the <CODE>Connection</CODE> object fails to create a
942             *             connection consumer due to some internal error or invalid
943             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
944             * @throws javax.jms.InvalidDestinationException
945             *             if an invalid destination is specified.
946             * @throws javax.jms.InvalidSelectorException
947             *             if the message selector is invalid.
948             * @see javax.jms.ConnectionConsumer
949             * @since 1.1
950             */
951            public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
952                            String subscriptionName, String messageSelector,
953                            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
954                    checkClosed();
955                    ensureClientIDInitialised();
956                    ConsumerInfo info = new ConsumerInfo();
957                    info.setConsumerId(this.handleIdGenerator.generateId());
958                    info.setDestination(ActiveMQMessageTransformation
959                                    .transformDestination(topic));
960                    info.setSelector(messageSelector);
961                    info.setConsumerName(subscriptionName);
962                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
963             info.setClientId(clientID);
964                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
965                                    maxMessages);
966            }
967        
968        /**
969         * Create a durable connection consumer for this connection (optional
970         * operation). This is an expert facility not used by regular JMS clients.
971         * 
972         * @param topic
973         *            topic to access
974         * @param subscriptionName
975         *            durable subscription name
976         * @param messageSelector
977         *            only messages with properties matching the message selector
978         *            expression are delivered. A value of null or an empty string
979         *            indicates that there is no message selector for the message
980         *            consumer.
981         * @param sessionPool
982         *            the server session pool to associate with this durable
983         *            connection consumer
984         * @param maxMessages
985         *            the maximum number of messages that can be assigned to a
986         *            server session at one time
987         * @param noLocal
988         *            set true if you want to filter out messages published locally
989         *            
990         * @return the durable connection consumer
991         * @throws JMSException
992         *             if the <CODE>Connection</CODE> object fails to create a
993         *             connection consumer due to some internal error or invalid
994         *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
995         * @throws javax.jms.InvalidDestinationException
996         *             if an invalid destination is specified.
997         * @throws javax.jms.InvalidSelectorException
998         *             if the message selector is invalid.
999         * @see javax.jms.ConnectionConsumer
1000         * @since 1.1
1001         */
1002        public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
1003                String subscriptionName, String messageSelector,
1004                ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
1005            checkClosed();
1006            ensureClientIDInitialised();
1007            ConsumerInfo info = new ConsumerInfo();
1008            info.setConsumerId(this.handleIdGenerator.generateId());
1009            info.setDestination(ActiveMQMessageTransformation
1010                    .transformDestination(topic));
1011            info.setSelector(messageSelector);
1012            info.setConsumerName(subscriptionName);
1013            info.setNoLocal(noLocal);
1014            info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1015            info.setClientId(clientID);
1016            return new ActiveMQConnectionConsumer(this, sessionPool, info,
1017                    maxMessages);
1018        }
1019    
1020            /**
1021         * Implementation of the PacketListener interface - consume a packet
1022         * 
1023         * @param packet -
1024         *            the Packet to consume
1025         * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1026         */
1027        public void consume(Packet packet) {
1028            if (!closed && packet != null) {
1029                if (packet.isJMSMessage()) {
1030                    ActiveMQMessage message = (ActiveMQMessage) packet;
1031                    message.setReadOnly(true);
1032                    message.setConsumerIdentifer(clientID);
1033    
1034                    // lets check for expired messages which is only relevant for
1035                    // multicast based stuff
1036                    // as a pointcast based network should filter out this stuff
1037                    if (transportChannel.isMulticast()) {
1038                        long expiration = message.getJMSExpiration();
1039                        if (expiration > 0) {
1040                            long timeStamp = System.currentTimeMillis();
1041                            if (timeStamp > expiration) {
1042                                if (log.isDebugEnabled()) {
1043                                    log.debug("Discarding expired message: " + message);
1044                                }
1045                                return;
1046                            }
1047                        }
1048                    }
1049    
1050                    try {                    
1051                        message = assembleMessage(message);
1052                        if( message !=null ) {
1053                            int count = 0;
1054                            for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
1055                                ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
1056                                if (dispatcher.isTarget(message)) {
1057                                    if (count > 0) {
1058                                        // separate message for each Session etc.
1059                                        message = message.deepCopy();
1060                                    }
1061                                    dispatcher.dispatch(message);
1062                                    count++;
1063                                }
1064                            }
1065                        }
1066                    } catch (JMSException jmsEx) {
1067                        handleAsyncException(jmsEx);
1068                    }
1069                } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1070                    CapacityInfo info = (CapacityInfo) packet;
1071                    synchronized(flowControlMutex) {
1072                        flowControlSleepTime = info.getFlowControlTimeout();
1073                    }
1074                    // System.out.println("SET FLOW TIMEOUT = " +
1075                    // flowControlSleepTime + " FOR " + info);
1076                } else if (packet.getPacketType() == Packet.KEEP_ALIVE && packet.isReceiptRequired()) {
1077                    Receipt receipt = new Receipt();
1078                    receipt.setCorrelationId(packet.getId());
1079                    receipt.setReceiptRequired(false);
1080                    try {
1081                        asyncSendPacket(receipt);
1082                    } catch (JMSException jmsEx) {
1083                        handleAsyncException(jmsEx);
1084                    }
1085                }
1086            }
1087        }
1088    
1089        private final ActiveMQMessage assembleMessage(ActiveMQMessage message) {
1090            ActiveMQMessage result = message;
1091            if (message != null && !isInternalConnection() && message.isMessagePart()) {
1092                if (message.getNumberOfParts() == 1) {
1093                    //passed though from another session - i.e.
1094                    //a network or remote connection and now assembled
1095                    message.resetMessagePart();
1096                    result = message;
1097                }
1098                else {
1099                    result = null;
1100                    String parentId = message.getParentMessageID();
1101                    ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId);
1102                    if (array == null) {
1103                        array = new ActiveMQMessage[message.getNumberOfParts()];
1104                        assemblies.put(parentId, array);
1105                    }
1106                    array[message.getPartNumber()] = message;
1107                    boolean complete = true;
1108                    for (int i = 0;i < array.length;i++) {
1109                        complete &= array[i] != null;
1110                    }
1111                    if (complete) {
1112                        result = array[0];
1113                        ByteArray[] bas = new ByteArray[array.length];
1114                        try {
1115                            for (int i = 0;i < bas.length;i++) {
1116                                bas[i] = array[i].getBodyAsBytes();
1117                                if (i >= 1){
1118                                    array[i].clearBody();
1119                                }
1120                            }
1121                            ByteArray ba = fragmentation.assemble(bas);
1122                            result.setBodyAsBytes(ba);
1123                        }
1124                        catch (IOException ioe) {
1125                            JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId);
1126                            jmsEx.setLinkedException(ioe);
1127                            onException(jmsEx);
1128                        }catch(JMSException jmsEx){
1129                            onException(jmsEx);  
1130                        }
1131                    }
1132                }
1133            }
1134            return result;
1135        }
1136    
1137            /**
1138             * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1139             */
1140            public void onException(JMSException jmsEx) {
1141                    // Got an exception propagated up from the transport channel
1142                    handleAsyncException(jmsEx);
1143                    isTransportOK = false;
1144                    try {
1145                            close();
1146                    } catch (JMSException ex) {
1147                            log.debug("Exception closing the connection", ex);
1148                    }
1149            }
1150    
1151            /**
1152             * Creates a <CODE>TopicSession</CODE> object.
1153             * 
1154             * @param transacted
1155             *            indicates whether the session is transacted
1156             * @param acknowledgeMode
1157             *            indicates whether the consumer or the client will acknowledge
1158             *            any messages it receives; ignored if the session is
1159             *            transacted. Legal values are
1160             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1161             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1162             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1163             * @return a newly created topic session
1164             * @throws JMSException
1165             *             if the <CODE>TopicConnection</CODE> object fails to create
1166             *             a session due to some internal error or lack of support for
1167             *             the specific transaction and acknowledgement mode.
1168             * @see Session#AUTO_ACKNOWLEDGE
1169             * @see Session#CLIENT_ACKNOWLEDGE
1170             * @see Session#DUPS_OK_ACKNOWLEDGE
1171             */
1172            public TopicSession createTopicSession(boolean transacted,
1173                            int acknowledgeMode) throws JMSException {
1174          checkClosed();
1175          sendConnectionInfoToBroker();
1176                    return new ActiveMQTopicSession((ActiveMQSession) createSession(
1177                                    transacted, acknowledgeMode));
1178            }
1179    
1180            /**
1181             * Creates a connection consumer for this connection (optional operation).
1182             * This is an expert facility not used by regular JMS clients.
1183             * 
1184             * @param topic
1185             *            the topic to access
1186             * @param messageSelector
1187             *            only messages with properties matching the message selector
1188             *            expression are delivered. A value of null or an empty string
1189             *            indicates that there is no message selector for the message
1190             *            consumer.
1191             * @param sessionPool
1192             *            the server session pool to associate with this connection
1193             *            consumer
1194             * @param maxMessages
1195             *            the maximum number of messages that can be assigned to a
1196             *            server session at one time
1197             * @return the connection consumer
1198             * @throws JMSException
1199             *             if the <CODE>TopicConnection</CODE> object fails to create
1200             *             a connection consumer due to some internal error or invalid
1201             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1202             * @throws InvalidDestinationException
1203             *             if an invalid topic is specified.
1204             * @throws InvalidSelectorException
1205             *             if the message selector is invalid.
1206             * @see javax.jms.ConnectionConsumer
1207             */
1208            public ConnectionConsumer createConnectionConsumer(Topic topic,
1209                            String messageSelector, ServerSessionPool sessionPool,
1210                            int maxMessages) throws JMSException {
1211                    checkClosed();
1212                    ensureClientIDInitialised();
1213                    ConsumerInfo info = new ConsumerInfo();
1214                    info.setConsumerId(this.handleIdGenerator.generateId());
1215                    info.setDestination(ActiveMQMessageTransformation
1216                                    .transformDestination(topic));
1217                    info.setSelector(messageSelector);
1218                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1219             info.setClientId(clientID);
1220                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1221                                    maxMessages);
1222            }
1223    
1224            /**
1225             * Creates a <CODE>QueueSession</CODE> object.
1226             * 
1227             * @param transacted
1228             *            indicates whether the session is transacted
1229             * @param acknowledgeMode
1230             *            indicates whether the consumer or the client will acknowledge
1231             *            any messages it receives; ignored if the session is
1232             *            transacted. Legal values are
1233             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1234             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1235             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1236             * @return a newly created queue session
1237             * @throws JMSException
1238             *             if the <CODE>QueueConnection</CODE> object fails to create
1239             *             a session due to some internal error or lack of support for
1240             *             the specific transaction and acknowledgement mode.
1241             * @see Session#AUTO_ACKNOWLEDGE
1242             * @see Session#CLIENT_ACKNOWLEDGE
1243             * @see Session#DUPS_OK_ACKNOWLEDGE
1244             */
1245            public QueueSession createQueueSession(boolean transacted,
1246                            int acknowledgeMode) throws JMSException {
1247          checkClosed();
1248          sendConnectionInfoToBroker();
1249                    return new ActiveMQQueueSession((ActiveMQSession) createSession(
1250                                    transacted, acknowledgeMode));
1251            }
1252    
1253            /**
1254             * Creates a connection consumer for this connection (optional operation).
1255             * This is an expert facility not used by regular JMS clients.
1256             * 
1257             * @param queue
1258             *            the queue to access
1259             * @param messageSelector
1260             *            only messages with properties matching the message selector
1261             *            expression are delivered. A value of null or an empty string
1262             *            indicates that there is no message selector for the message
1263             *            consumer.
1264             * @param sessionPool
1265             *            the server session pool to associate with this connection
1266             *            consumer
1267             * @param maxMessages
1268             *            the maximum number of messages that can be assigned to a
1269             *            server session at one time
1270             * @return the connection consumer
1271             * @throws JMSException
1272             *             if the <CODE>QueueConnection</CODE> object fails to create
1273             *             a connection consumer due to some internal error or invalid
1274             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1275             * @throws InvalidDestinationException
1276             *             if an invalid queue is specified.
1277             * @throws InvalidSelectorException
1278             *             if the message selector is invalid.
1279             * @see javax.jms.ConnectionConsumer
1280             */
1281            public ConnectionConsumer createConnectionConsumer(Queue queue,
1282                            String messageSelector, ServerSessionPool sessionPool,
1283                            int maxMessages) throws JMSException {
1284                    checkClosed();
1285                    ensureClientIDInitialised();
1286                    ConsumerInfo info = new ConsumerInfo();
1287                    info.setConsumerId(this.handleIdGenerator.generateId());
1288                    info.setDestination(ActiveMQMessageTransformation
1289                                    .transformDestination(queue));
1290                    info.setSelector(messageSelector);
1291                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1292             info.setClientId(clientID);
1293                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1294                                    maxMessages);
1295            }
1296    
1297            /**
1298             * Ensures that the clientID was manually specified and not auto-generated.
1299             * If the clientID was not specified this method will throw an exception.
1300             * This method is used to ensure that the clientID + durableSubscriber name
1301             * are used correctly.
1302             * 
1303             * @throws JMSException
1304             */
1305            public void checkClientIDWasManuallySpecified() throws JMSException {
1306                    if (!userSpecifiedClientID) {
1307                            throw new JMSException(
1308                                            "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1309                    }
1310            }
1311    
1312            /**
1313             * handle disconnect/reconnect events
1314             * 
1315             * @param event
1316             */
1317            public void statusChanged(TransportStatusEvent event) {
1318                    log.info("channel status changed: " + event);
1319                    if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1320                            isTransportOK = true;
1321                            doReconnect();
1322    
1323                    } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1324                            isTransportOK = false;
1325                            clearMessagesInProgress();
1326                    }
1327            }
1328    
1329            /**
1330             * send a Packet through the Connection - for internal use only
1331             * 
1332             * @param packet
1333             * @throws JMSException
1334             */
1335            public void asyncSendPacket(Packet packet) throws JMSException {
1336                    asyncSendPacket(packet, true);
1337            }
1338    
1339            /**
1340             * send a Packet through the Connection - for internal use only
1341             * 
1342             * @param packet
1343             * @param doSendWhileReconnecting
1344             * @throws JMSException
1345             */
1346            public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting)
1347                            throws JMSException {
1348                    if (isTransportOK
1349                                    && !closed
1350                                    && (doSendWhileReconnecting || transportChannel
1351                                                    .isTransportConnected())) {
1352                            packet.setId(packetIdGenerator.getNextShortSequence());
1353                            packet.setReceiptRequired(false);
1354                            synchronized(flowControlMutex) {
1355                                if (packet.isJMSMessage() && flowControlSleepTime > 0) {
1356                                        try {
1357                                                Thread.sleep(flowControlSleepTime);
1358                                        } catch (InterruptedException e) {
1359                                        }
1360                                }
1361                            }
1362                            this.transportChannel.asyncSend(packet);
1363                    }
1364            }
1365    
1366            /**
1367             * send a Packet through a Connection - for internal use only
1368             * 
1369             * @param packet
1370             * @throws JMSException
1371             */
1372            public void syncSendPacket(Packet packet) throws JMSException {
1373                    syncSendPacket(packet, 0);
1374            }
1375    
1376            /**
1377             * Send a packet through a Connection - for internal use only
1378             * 
1379             * @param packet
1380             * @param timeout
1381             * @throws JMSException
1382             */
1383            public void syncSendPacket(Packet packet, int timeout) throws JMSException {
1384                    if (isTransportOK && !closed) {
1385                            Receipt receipt;
1386                            packet.setId(packetIdGenerator.getNextShortSequence());
1387                            packet.setReceiptRequired(true);
1388                            receipt = this.transportChannel.send(packet, timeout);
1389                            if (receipt != null) {
1390                                    if (receipt.isFailed()) {
1391                                            Throwable e = receipt.getException();
1392                                            if (e != null) {
1393                                                    throw JMSExceptionHelper.newJMSException(e);
1394                                            }
1395                                            throw new JMSException(
1396                                                            "syncSendPacket failed with unknown exception");
1397                                    }
1398                            }
1399                    } else {
1400                            if (closed) {
1401                                    throw new ConnectionClosedException();
1402                            } else {
1403                                    throw new JMSException(
1404                                                    "syncSendTimedOut: connection no longer OK");
1405                            }
1406                    }
1407            }
1408    
1409            public Receipt syncSendRequest(Packet packet) throws JMSException {
1410                    checkClosed();
1411                    if (isTransportOK && !closed) {
1412                            Receipt receipt;
1413                            packet.setReceiptRequired(true);
1414                            packet.setId(this.packetIdGenerator.getNextShortSequence());
1415    
1416                            receipt = this.transportChannel.send(packet);
1417                            if (receipt != null && receipt.isFailed()) {
1418                                    Throwable e = receipt.getException();
1419                                    if (e != null) {
1420                                            throw (JMSException) new JMSException(e.getMessage())
1421                                                            .initCause(e);
1422                                    }
1423                                    throw new JMSException(
1424                                                    "syncSendPacket failed with unknown exception");
1425                            }
1426                            return receipt;
1427                    } else {
1428                            if (closed) {
1429                                    throw new ConnectionClosedException();
1430                            } else {
1431                                    throw new JMSException(
1432                                                    "syncSendTimedOut: connection no longer OK");
1433                            }
1434                    }
1435            }
1436    
1437            // Properties
1438            // -------------------------------------------------------------------------
1439    
1440            /**
1441             * @return Returns the prefetchPolicy.
1442             */
1443            public ActiveMQPrefetchPolicy getPrefetchPolicy() {
1444                    return prefetchPolicy;
1445            }
1446    
1447            /**
1448             * @param prefetchPolicy
1449             *            The prefetchPolicy to set.
1450             */
1451            public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
1452                    this.prefetchPolicy = prefetchPolicy;
1453            }
1454    
1455            public int getSendCloseTimeout() {
1456                    return sendCloseTimeout;
1457            }
1458    
1459            public void setSendCloseTimeout(int sendCloseTimeout) {
1460                    this.sendCloseTimeout = sendCloseTimeout;
1461            }
1462    
1463            public int getSendConnectionInfoTimeout() {
1464                    return sendConnectionInfoTimeout;
1465            }
1466    
1467            public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
1468                    this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
1469            }
1470    
1471            public TransportChannel getTransportChannel() {
1472                    return transportChannel;
1473            }
1474    
1475            /**
1476             * Returns the clientID of the connection, forcing one to be generated if
1477             * one has not yet been configured
1478             */
1479            public String getInitializedClientID() throws JMSException {
1480                    ensureClientIDInitialised();
1481                    return this.clientID;
1482            }
1483    
1484            // Implementation methods
1485            // -------------------------------------------------------------------------
1486    
1487            /**
1488             * Used internally for adding Sessions to the Connection
1489             * 
1490             * @param session
1491             * @throws JMSException
1492             */
1493            protected void addSession(ActiveMQSession session) throws JMSException {
1494                    this.sessions.add(session);
1495                    addMessageDispatcher(session);
1496                    if (started.get()) {
1497                            session.start();
1498                    }
1499                    SessionInfo info = createSessionInfo(session);
1500                    info.setStarted(true);
1501                    asyncSendPacket(info);
1502            }
1503    
1504            /**
1505             * Used interanlly for removing Sessions from a Connection
1506             * 
1507             * @param session
1508             * @throws JMSException
1509             */
1510            protected void removeSession(ActiveMQSession session) throws JMSException {
1511                    this.sessions.remove(session);
1512                    removeMessageDispatcher(session);
1513                    SessionInfo info = createSessionInfo(session);
1514                    info.setStarted(false);
1515                    asyncSendPacket(info, false);
1516            }
1517    
1518            private SessionInfo createSessionInfo(ActiveMQSession session) {
1519                    SessionInfo info = new SessionInfo();
1520                    info.setClientId(clientID);
1521                    info.setSessionId(session.getSessionId());
1522                    info.setStartTime(session.getStartTime());
1523                    return info;
1524            }
1525    
1526            /**
1527             * Add a ConnectionConsumer
1528             * 
1529             * @param connectionConsumer
1530             * @throws JMSException
1531             */
1532            protected void addConnectionConsumer(
1533                            ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1534                    this.connectionConsumers.add(connectionConsumer);
1535                    addMessageDispatcher(connectionConsumer);
1536            }
1537    
1538            /**
1539             * Remove a ConnectionConsumer
1540             * 
1541             * @param connectionConsumer
1542             */
1543            protected void removeConnectionConsumer(
1544                            ActiveMQConnectionConsumer connectionConsumer) {
1545                    this.connectionConsumers.add(connectionConsumer);
1546                    removeMessageDispatcher(connectionConsumer);
1547            }
1548    
1549            /**
1550             * Add a Message dispatcher to receive messages from the Broker
1551             * 
1552             * @param messageDispatch
1553             * @throws JMSException
1554             *             if an internal error
1555             */
1556            protected void addMessageDispatcher(
1557                            ActiveMQMessageDispatcher messageDispatch) throws JMSException {
1558                    this.messageDispatchers.add(messageDispatch);
1559            }
1560    
1561            /**
1562             * Remove a Message dispatcher
1563             * 
1564             * @param messageDispatcher
1565             */
1566            protected void removeMessageDispatcher(
1567                            ActiveMQMessageDispatcher messageDispatcher) {
1568                    this.messageDispatchers.remove(messageDispatcher);
1569            }
1570    
1571            /**
1572             * Used for handling async exceptions
1573             * 
1574             * @param jmsEx
1575             */
1576            protected void handleAsyncException(JMSException jmsEx) {
1577                    if (!closed) {
1578                            if (this.exceptionListener != null) {
1579                                    this.exceptionListener.onException(jmsEx);
1580                            } else {
1581                                    log.warn(
1582                                                    "Async exception with no exception listener: " + jmsEx,
1583                                                    jmsEx);
1584                            }
1585                    }
1586            }
1587    
1588            protected void sendConnectionInfoToBroker() throws JMSException {
1589                    sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false);
1590            }
1591    
1592            /**
1593             * Send the ConnectionInfo to the Broker
1594             * 
1595             * @param timeout
1596             * @param isClosed
1597             * @throws JMSException
1598             */
1599            protected void sendConnectionInfoToBroker(int timeout, boolean forceResend,
1600                            boolean closing) throws JMSException {
1601                    // Can we skip sending the ConnectionInfo packet??
1602                    if (isConnectionInfoSentToBroker && !forceResend) {
1603                            return;
1604                    }
1605    
1606            fragmentation.setFragmentationLimit(getMessageFragmentationLimit());
1607    
1608                    this.isConnectionInfoSentToBroker = true;
1609                    ensureClientIDInitialised();
1610                    ConnectionInfo info = new ConnectionInfo();
1611                    info.setClientId(this.clientID);
1612                    info.setHostName(IdGenerator.getHostName());
1613                    info.setUserName(userName);
1614                    info.setPassword(password);
1615                    info.setStartTime(startTime);
1616                    info.setStarted(started.get());
1617                    info.setClosed(closed || closing);
1618                    info.setClientVersion(connectionMetaData.getProviderVersion());
1619                    info.setWireFormatVersion(transportChannel
1620                                    .getCurrentWireFormatVersion());
1621                    if (info.getProperties() != null) {
1622                            info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY,
1623                                            new Boolean(!useAsyncSend).toString());
1624                    }
1625                    if (quickClose && info.isClosed()) {
1626                            asyncSendPacket(info);
1627                    } else {
1628                            syncSendPacket(info, timeout);
1629                    }
1630            }
1631    
1632            /**
1633             * Set the maximum amount of memory this Connection should use for buffered
1634             * inbound messages
1635             * 
1636             * @param newMemoryLimit
1637             *            the new memory limit in bytes
1638             */
1639            public void setConnectionMemoryLimit(int newMemoryLimit) {
1640                    memoryManager.setValueLimit(newMemoryLimit);
1641            }
1642    
1643            /**
1644             * Get the current value for the maximum amount of memory this Connection
1645             * should use for buffered inbound messages
1646             * 
1647             * @return the current limit in bytes
1648             */
1649            public int getConnectionMemoryLimit() {
1650                    return (int) memoryManager.getValueLimit();
1651            }
1652    
1653            /**
1654             * CapacityMonitorEventListener implementation called when the capacity of a
1655             * CapacityService changes
1656             * 
1657             * @param event
1658             */
1659            public void capacityChanged(CapacityMonitorEvent event) {
1660                    // send the event to broker ...
1661                    CapacityInfo info = new CapacityInfo();
1662                    info.setResourceName(event.getMonitorName());
1663                    info.setCapacity(event.getCapacity());
1664                    // System.out.println("Cap changed: " + event);
1665                    try {
1666                            asyncSendPacket(info, false);
1667                    } catch (JMSException e) {
1668                            JMSException jmsEx = new JMSException(
1669                                            "failed to send change in capacity");
1670                            jmsEx.setLinkedException(e);
1671                            handleAsyncException(jmsEx);
1672                    }
1673            }
1674    
1675            /**
1676             * @return a number unique for this connection
1677             */
1678            protected int getNextConsumerNumber() {
1679                    return this.consumerNumberGenerator.increment();
1680            }
1681    
1682            protected short generateSessionId() {
1683                    return this.sessionIdGenerator.getNextShortSequence();
1684            }
1685    
1686            private synchronized void ensureClientIDInitialised() {
1687                    if (this.clientID == null || this.clientID.trim().equals("")) {
1688                            this.clientID = this.clientIdGenerator.generateId();
1689                    }
1690                    transportChannel.setClientID(clientID);
1691                    this.clientIDSet = true;
1692            }
1693    
1694            protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1695                    return boundedQueueManager.getMemoryBoundedQueue(name);
1696            }
1697    
1698            protected void doReconnect() {
1699                    try {
1700                            // send the Connection info again
1701                            sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
1702                            for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1703                                    ActiveMQSession session = (ActiveMQSession) iter.next();
1704                                    SessionInfo sessionInfo = createSessionInfo(session);
1705                                    sessionInfo.setStarted(true);
1706                                    asyncSendPacket(sessionInfo, false);
1707                                    // send consumers
1708                                    for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator
1709                                                    .hasNext();) {
1710                                            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator
1711                                                            .next();
1712                                            ConsumerInfo consumerInfo = session
1713                                                            .createConsumerInfo(consumer);
1714                                            consumerInfo.setStarted(true);
1715                                            asyncSendPacket(consumerInfo, false);
1716                                    }
1717                                    // send producers
1718                                    for (Iterator producersIterator = session.producers.iterator(); producersIterator
1719                                                    .hasNext();) {
1720                                            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator
1721                                                            .next();
1722                                            ProducerInfo producerInfo = session
1723                                                            .createProducerInfo(producer);
1724                                            producerInfo.setStarted(true);
1725                                            asyncSendPacket(producerInfo, false);
1726                                    }
1727                                    // send the current capacity
1728                                    CapacityMonitorEvent event = memoryManager
1729                                                    .generateCapacityMonitorEvent();
1730                                    if (event != null) {
1731                                            capacityChanged(event);
1732                                    }
1733                            }
1734                    } catch (JMSException jmsEx) {
1735                            log.error("Failed to do reconnection");
1736                            handleAsyncException(jmsEx);
1737                            isTransportOK = false;
1738                    }
1739            }
1740    
1741            /**
1742             * @return Returns the useAsyncSend.
1743             */
1744            public boolean isUseAsyncSend() {
1745                    return useAsyncSend;
1746            }
1747    
1748            /**
1749             * @param useAsyncSend
1750             *            The useAsyncSend to set.
1751             */
1752            public void setUseAsyncSend(boolean useAsyncSend) {
1753                    this.useAsyncSend = useAsyncSend;
1754            }
1755    
1756            /**
1757             * @return Returns the cachingEnabled.
1758             */
1759            public boolean isCachingEnabled() {
1760                    return cachingEnabled;
1761            }
1762    
1763            /**
1764             * @param cachingEnabled
1765             *            The cachingEnabled to set.
1766             */
1767            public void setCachingEnabled(boolean cachingEnabled) {
1768                    this.cachingEnabled = cachingEnabled;
1769            }
1770    
1771            /**
1772             * @return Returns the j2EEcompliant.
1773             */
1774            public boolean isJ2EEcompliant() {
1775                    return J2EEcompliant;
1776            }
1777    
1778            /**
1779             * @param ecompliant
1780             *            The j2EEcompliant to set.
1781             */
1782            public void setJ2EEcompliant(boolean ecompliant) {
1783                    J2EEcompliant = ecompliant;
1784            }
1785    
1786            /**
1787             * @return Returns the internalConnection.
1788             */
1789            public boolean isInternalConnection() {
1790                    return internalConnection;
1791            }
1792    
1793            /**
1794             * @param internalConnection
1795             *            The internalConnection to set.
1796             */
1797            public void setInternalConnection(boolean internalConnection) {
1798                    this.internalConnection = internalConnection;
1799            }
1800    
1801            /**
1802             * @return Returns the doMessageCompression.
1803             */
1804            public boolean isDoMessageCompression() {
1805                    return doMessageCompression
1806                                    && transportChannel.doesSupportMessageCompression();
1807            }
1808    
1809            /**
1810             * @param doMessageCompression
1811             *            The doMessageCompression to set.
1812             */
1813            public void setDoMessageCompression(boolean doMessageCompression) {
1814                    this.doMessageCompression = doMessageCompression
1815                                    && transportChannel.doesSupportMessageCompression();
1816            }
1817    
1818            /**
1819             * @return Returns the doMessageFragmentation.
1820             */
1821            public boolean isDoMessageFragmentation() {
1822                    return doMessageFragmentation
1823                                    && transportChannel.doesSupportMessageFragmentation();
1824            }
1825    
1826            /**
1827             * @param doMessageFragmentation
1828             *            The doMessageFragmentation to set.
1829             */
1830            public void setDoMessageFragmentation(boolean doMessageFragmentation) {
1831                    this.doMessageFragmentation = doMessageFragmentation
1832                                    && transportChannel.doesSupportMessageFragmentation();
1833            }
1834    
1835            /**
1836             * @return Returns the messageCompressionLevel.
1837             */
1838            public int getMessageCompressionLevel() {
1839                    return messageCompressionLevel;
1840            }
1841    
1842            /**
1843             * @param messageCompressionLevel
1844             *            The messageCompressionLevel to set.
1845             */
1846            public void setMessageCompressionLevel(int messageCompressionLevel) {
1847                    this.messageCompressionLevel = messageCompressionLevel;
1848            }
1849    
1850            /**
1851             * @return Returns the messageCompressionLimit.
1852             */
1853            public int getMessageCompressionLimit() {
1854                    return messageCompressionLimit;
1855            }
1856    
1857            /**
1858             * @param messageCompressionLimit
1859             *            The messageCompressionLimit to set.
1860             */
1861            public void setMessageCompressionLimit(int messageCompressionLimit) {
1862                    this.messageCompressionLimit = messageCompressionLimit;
1863            }
1864    
1865            /**
1866             * @return Returns the messageCompressionStrategy.
1867             */
1868            public int getMessageCompressionStrategy() {
1869                    return messageCompressionStrategy;
1870            }
1871    
1872            /**
1873             * @param messageCompressionStrategy
1874             *            The messageCompressionStrategy to set.
1875             */
1876            public void setMessageCompressionStrategy(int messageCompressionStrategy) {
1877                    this.messageCompressionStrategy = messageCompressionStrategy;
1878            }
1879    
1880            /**
1881             * @return Returns the messageFragmentationLimit.
1882             */
1883            public int getMessageFragmentationLimit() {
1884                    return messageFragmentationLimit;
1885            }
1886    
1887            /**
1888             * @param messageFragmentationLimit
1889             *            The messageFragmentationLimit to set.
1890             */
1891            public void setMessageFragmentationLimit(int messageFragmentationLimit) {
1892                    this.messageFragmentationLimit = messageFragmentationLimit;
1893            }
1894    
1895            /**
1896             * @return Returns the disableTimeStampsByDefault.
1897             */
1898            public boolean isDisableTimeStampsByDefault() {
1899                    return disableTimeStampsByDefault;
1900            }
1901    
1902            /**
1903             * @param disableTimeStampsByDefault
1904             *            The disableTimeStampsByDefault to set.
1905             */
1906            public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
1907                    this.disableTimeStampsByDefault = disableTimeStampsByDefault;
1908            }
1909    
1910            /**
1911             * Causes pre-serialization of messages before send By default this is on
1912             * 
1913             * @return Returns the prePrepareMessageOnSend.
1914             */
1915            public boolean isPrepareMessageBodyOnSend() {
1916                    return prepareMessageBodyOnSend;
1917            }
1918    
1919            /**
1920             * Causes pre-serialization of messages before send By default this is on
1921             * 
1922             * @param prePrepareMessageOnSend
1923             *            The prePrepareMessageOnSend to set.
1924             */
1925            public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
1926                    this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
1927            }
1928    
1929            /**
1930             * @return Returns the copyMessageOnSend.
1931             */
1932            public boolean isCopyMessageOnSend() {
1933                    return copyMessageOnSend;
1934            }
1935    
1936            /**
1937             * @param copyMessageOnSend
1938             *            The copyMessageOnSend to set.
1939             */
1940            public void setCopyMessageOnSend(boolean copyMessageOnSend) {
1941                    this.copyMessageOnSend = copyMessageOnSend;
1942            }
1943    
1944            /**
1945             * @return Returns the quickClose.
1946             */
1947            public boolean isQuickClose() {
1948                    return quickClose;
1949            }
1950    
1951            /**
1952             * @param quickClose
1953             *            The quickClose to set.
1954             */
1955            public void setQuickClose(boolean quickClose) {
1956                    this.quickClose = quickClose;
1957            }
1958    
1959            /**
1960             * @return Returns the optimizedMessageDispatch.
1961             */
1962            public boolean isOptimizedMessageDispatch() {
1963                    return optimizedMessageDispatch;
1964            }
1965    
1966            /**
1967             * @param optimizedMessageDispatch
1968             *            The optimizedMessageDispatch to set.
1969             */
1970            public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
1971                    this.optimizedMessageDispatch = optimizedMessageDispatch;
1972            }
1973    
1974            protected void clearMessagesInProgress() {
1975                    for (Iterator i = sessions.iterator(); i.hasNext();) {
1976                            ActiveMQSession session = (ActiveMQSession) i.next();
1977                            session.clearMessagesInProgress();
1978                    }
1979            }
1980    
1981            /**
1982             * Tells the broker to destroy a destination.
1983             * 
1984             * @param destination
1985             */
1986            public void destroyDestination(ActiveMQDestination destination)
1987                            throws JMSException {
1988                    BrokerAdminCommand command = new BrokerAdminCommand();
1989                    command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION);
1990                    command.setDestination(destination);
1991                    syncSendPacket(command);
1992            }
1993    
1994            /**
1995             * Cleans up this connection so that it's state is as if the connection was
1996             * just created. This allows the Resource Adapter to clean up a connection
1997             * so that it can be reused without having to close and recreate the
1998             * connection.
1999             * 
2000             * @param sessionId
2001             */
2002            public void cleanup() throws JMSException {
2003    
2004                    try {
2005                            for (Iterator i = this.sessions.iterator(); i.hasNext();) {
2006                                    ActiveMQSession s = (ActiveMQSession) i.next();
2007                                    s.close();
2008                            }
2009                            for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
2010                                    ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
2011                                                    .next();
2012                                    c.close();
2013                            }
2014                            this.connectionConsumers.clear();
2015                            this.messageDispatchers.clear();
2016                    } finally {
2017                            this.sessions.clear();
2018                            started.set(false);
2019                    }
2020    
2021                    setExceptionListener(null);
2022                    clientIDSet = false;
2023                    isConnectionInfoSentToBroker = false;
2024    
2025                    CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo();
2026                    cleanupInfo.setClientId(getClientID());
2027                    asyncSendPacket(cleanupInfo);
2028            }
2029    
2030            /**
2031             * Changes the associated username/password that is associated with this
2032             * connection. If the connection has been used, you must called cleanup()
2033             * before calling this method.
2034             * 
2035             * @throws IllegalStateException
2036             *             if the connection is in used.
2037             * @param sessionId
2038             */
2039            public void changeUserInfo(String theUserName, String thePassword)
2040                            throws JMSException {
2041                    if (isConnectionInfoSentToBroker)
2042                            throw new IllegalStateException(
2043                                            "changeUserInfo used Connection is not allowed");
2044    
2045                    this.userName = theUserName;
2046                    this.password = thePassword;
2047            }
2048    
2049            protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) {
2050                    transientConsumedRedeliverCache.add(message);
2051            }
2052    
2053            protected void replayTransientConsumedRedeliveredMessages(
2054                            ActiveMQSession session, ActiveMQMessageConsumer consumer)
2055                            throws JMSException {
2056                    if (consumer.getDestination().isTopic()
2057                                    && !transientConsumedRedeliverCache.isEmpty()) {
2058                            Filter filter = getFilterFactory().createFilter(
2059                                            consumer.getDestination(), consumer.getMessageSelector());
2060                            if (consumer.isNoLocal()) {
2061                                    filter = new AndFilter(filter, new NoLocalFilter(clientID));
2062                            }
2063                            for (Iterator i = transientConsumedRedeliverCache.iterator(); i
2064                                            .hasNext();) {
2065                                    ActiveMQMessage message = (ActiveMQMessage) i.next();
2066                                    if (filter.matches(message)) {
2067                                            transientConsumedRedeliverCache.remove(message);
2068                                            message.setMessageAcknowledge(session);
2069                                            message.setJMSRedelivered(true);
2070                                            message.setConsumerNos(new int[] { consumer
2071                                                            .getConsumerNumber() });
2072                                            consumer.processMessage(message);
2073                                    }
2074                            }
2075                    }
2076            }
2077    
2078            private FilterFactory getFilterFactory() {
2079                    if (filterFactory == null) {
2080                            filterFactory = new FilterFactoryImpl();
2081                    }
2082                    return filterFactory;
2083            }
2084    
2085            protected void startTemporaryDestination(ActiveMQDestination dest)
2086                            throws JMSException {
2087                    if (dest != null && dest.isTemporary()) {
2088                            TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2089                                            .get(dest);
2090                            if (event == null) {
2091                                    event = new TempDestinationAdvisoryEvent(dest, true);
2092                                    tempDestinationMap.put(dest, event);
2093                                    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2094                                    msg.setObject(event);
2095                                    msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2096                                    msg.setJMSDestination(dest.getTopicForTempAdvisory());
2097                                    msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2098                                                    + " .started");
2099                                    this.syncSendPacket(msg);
2100                            }
2101                    }
2102            }
2103    
2104            protected void stopTemporaryDestination(ActiveMQDestination dest)
2105                            throws JMSException {
2106                    if (dest != null && dest.isTemporary()) {
2107                            TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2108                                            .remove(dest);
2109                            if (event != null) {
2110                                    event.setStarted(false);
2111                                    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2112                                    msg.setObject(event);
2113                                    msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2114                                    msg.setJMSDestination(dest.getTopicForTempAdvisory());
2115                                    msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2116                                                    + " .stopped");
2117                                    this.syncSendPacket(msg);
2118                            }
2119                    }
2120            }
2121    
2122            protected void closeTemporaryDestinations() throws JMSException {
2123                    for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) {
2124                            ActiveMQDestination dest = (ActiveMQDestination) i.next();
2125                            stopTemporaryDestination(dest);
2126                    }
2127            }
2128    
2129            protected void startAdvisoryForTempDestination(Destination d)
2130                            throws JMSException {
2131                    if (d != null) {
2132                            ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2133                                            .transformDestination(d);
2134                            if (dest.isTemporary()) {
2135                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2136                                                    .get(dest);
2137                                    if (test == null) {
2138                                            test = new TempDestinationAdvisor(this, dest);
2139                                            test.start();
2140                                            validDestinationsMap.put(dest, test);
2141                                    }
2142                            }
2143                    }
2144            }
2145    
2146            protected void stopAdvisoryForTempDestination(ActiveMQDestination d)
2147                            throws JMSException {
2148                    if (d != null) {
2149                            ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2150                                            .transformDestination(d);
2151                            if (dest.isTemporary()) {
2152                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2153                                                    .remove(dest);
2154                                    if (test != null) {
2155                                            test.stop();
2156                                    }
2157                            }
2158                    }
2159            }
2160    
2161            protected final void validateDestination(ActiveMQDestination dest)
2162                            throws JMSException {
2163                    if (dest != null) {
2164                            if (dest.isTemporary()) {
2165                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2166                                                    .get(dest);
2167                                    if (dest.isDeleted() || test == null || !test.isActive(dest)) {
2168                                            throw new JMSException(
2169                                                            "Cannot publish to a deleted Destination: " + dest);
2170                                    }
2171                            }
2172                    }
2173            }
2174    
2175            /**
2176             * @return Returns the resourceManagerId.
2177             * @throws JMSException
2178             */
2179            synchronized public String getResourceManagerId() throws JMSException {
2180                    if (resourceManagerId == null) {
2181                            resourceManagerId = determineResourceManagerId();
2182                    }
2183                    return resourceManagerId;
2184            }
2185    
2186            /**
2187             * Get's the resource manager id.
2188             */
2189            private String determineResourceManagerId() throws JMSException {
2190    
2191                    XATransactionInfo info = new XATransactionInfo();
2192                    info.setType(TransactionInfo.GET_RM_ID);
2193    
2194                    ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info);
2195                    String rmId = (String) receipt.getResult();
2196                    assert rmId != null;
2197                    return rmId;
2198            }
2199    
2200        public ByteArrayFragmentation getFragmentation() {
2201            return fragmentation;
2202        }
2203    
2204        public ConcurrentHashMap getAssemblies() {
2205            return assemblies;
2206        }
2207    
2208    
2209    }