001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq;
019    
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Properties;
027    
028    import javax.jms.Connection;
029    import javax.jms.ConnectionFactory;
030    import javax.jms.JMSException;
031    import javax.jms.QueueConnection;
032    import javax.jms.QueueConnectionFactory;
033    import javax.jms.TopicConnection;
034    import javax.jms.TopicConnectionFactory;
035    import javax.naming.Context;
036    
037    import org.activemq.broker.Broker;
038    import org.activemq.broker.BrokerConnector;
039    import org.activemq.broker.BrokerContainer;
040    import org.activemq.broker.BrokerContainerFactory;
041    import org.activemq.broker.BrokerContext;
042    import org.activemq.broker.impl.BrokerClientImpl;
043    import org.activemq.broker.impl.BrokerConnectorImpl;
044    import org.activemq.broker.impl.BrokerContainerFactoryImpl;
045    import org.activemq.io.WireFormat;
046    import org.activemq.io.WireFormatLoader;
047    import org.activemq.io.impl.DefaultWireFormat;
048    import org.activemq.io.util.ByteArrayCompression;
049    import org.activemq.io.util.ByteArrayFragmentation;
050    import org.activemq.jndi.JNDIBaseStorable;
051    import org.activemq.management.JMSStatsImpl;
052    import org.activemq.management.StatsCapable;
053    import org.activemq.management.StatsImpl;
054    import org.activemq.message.ActiveMQQueue;
055    import org.activemq.message.ActiveMQTopic;
056    import org.activemq.message.ConnectionInfo;
057    import org.activemq.message.ConsumerInfo;
058    import org.activemq.service.Service;
059    import org.activemq.transport.TransportChannel;
060    import org.activemq.transport.TransportChannelFactory;
061    import org.activemq.transport.TransportChannelListener;
062    import org.activemq.transport.TransportChannelProvider;
063    import org.activemq.transport.vm.VmTransportChannel;
064    import org.activemq.util.BeanUtils;
065    import org.activemq.util.IdGenerator;
066    import org.activemq.util.URIHelper;
067    import org.apache.commons.logging.Log;
068    import org.apache.commons.logging.LogFactory;
069    
070    /**
071     * A ConnectionFactory is an an Administed object, and is used for creating
072     * Connections.
073     * <p/>
074     * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object.
075     * You can use this connection to create both QueueConnections and TopicConnections.
076     *
077     * @version $Revision: 1.2 $
078     * @see javax.jms.ConnectionFactory
079     */
080    public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable {
081    
082        private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
083    
084        private BrokerContext brokerContext = BrokerContext.getInstance();
085        private BrokerContainerFactory brokerContainerFactory;
086        protected BrokerContainer brokerContainer;
087    
088        protected String userName;
089        protected String password;
090        protected String brokerURL;
091        protected String clientID;
092        protected String brokerName;
093        private boolean useEmbeddedBroker;
094        /**
095         * Should we use an async send for persistent non transacted messages ?
096         */
097        protected boolean useAsyncSend = true;
098        protected boolean disableTimeStampsByDefault = false;
099        protected boolean J2EEcompliant = true;
100        
101        /* The list of emebeded brokers that this object started */
102        private List startedEmbeddedBrokers = new ArrayList();
103    
104        private JMSStatsImpl stats = new JMSStatsImpl();
105        private WireFormat wireFormat = new DefaultWireFormat();
106        private IdGenerator idGenerator = new IdGenerator();
107        private int connectionCount;
108        private String brokerXmlConfig;
109        
110        
111        //compression and fragmentation variables
112        
113        protected boolean doMessageCompression = true;
114        protected int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;//data size above which compression will be used
115        protected int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
116        protected int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;//default compression strategy
117        
118        protected boolean doMessageFragmentation = false;
119        protected int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
120        
121        protected boolean cachingEnabled = true;
122        
123        protected boolean prepareMessageBodyOnSend = true; //causes pre-serialization of messages
124        
125        protected boolean quickClose = false;
126        
127        protected boolean internalConnection = false;//connections are used internally - for networks etc.
128    
129        protected boolean optimizedMessageDispatch = false;//set to true for better consumption for transient topics
130        
131        protected boolean copyMessageOnSend = true;//set false for better throughput
132    
133        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
134    
135        /**
136         * Default Constructor for ActiveMQConnectionFactory
137         */
138        public ActiveMQConnectionFactory() {
139            this( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_URL);
140        }
141    
142    
143        public ActiveMQConnectionFactory(String brokerURL) {
144            this(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
145        }
146    
147        public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
148            this.userName = userName;
149            this.password = password;
150            this.brokerURL = brokerURL;
151            
152            if( brokerURL.indexOf("?")>= 0 ) {
153                String options = brokerURL.substring(brokerURL.indexOf("?")+1);
154                Map properties = URIHelper.parseQuery(options);
155                if (!properties.isEmpty()) {
156                    BeanUtils.populate(this, properties);
157                }   
158            }
159        }
160    
161        /**
162         * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
163         * ready for use in embedded mode.
164         *
165         * @param container
166         */
167        public ActiveMQConnectionFactory(BrokerContainer container) {
168            this(container, "vm://" + container.getBroker().getName());
169        }
170    
171        /**
172         * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
173         * ready for use in embedded mode and the brokerURL connection.
174         *
175         * @param container
176         */
177        public ActiveMQConnectionFactory(BrokerContainer container, String brokerURL) {
178            this.brokerContainer = container;
179            this.useEmbeddedBroker = false;
180            this.brokerURL = brokerURL;
181            
182            if( brokerURL.indexOf("?")>= 0 ) {
183                String options = brokerURL.substring(brokerURL.indexOf("?")+1);
184                Map properties = URIHelper.parseQuery(options);
185                if (!properties.isEmpty()) {
186                    BeanUtils.populate(this, properties);
187                }   
188            }
189        }
190    
191    
192        public StatsImpl getStats() {
193            return stats;
194        }
195    
196        public JMSStatsImpl getFactoryStats() {
197            return stats;
198        }
199    
200        /**
201         * @return Returns the brokerURL.
202         */
203        public String getBrokerURL() {
204            return brokerURL;
205        }
206    
207        /**
208         * @param brokerURL The brokerURL to set.
209         */
210        public void setBrokerURL(String brokerURL) {
211            this.brokerURL = brokerURL;
212        }
213    
214        /**
215         * @return Returns the clientID.
216         */
217        public String getClientID() {
218            return clientID;
219        }
220    
221        /**
222         * @param clientID The clientID to set.
223         */
224        public void setClientID(String clientID) {
225            this.clientID = clientID;
226        }
227    
228        /**
229         * @return Returns the password.
230         */
231        public String getPassword() {
232            return password;
233        }
234    
235        /**
236         * @param password The password to set.
237         */
238        public void setPassword(String password) {
239            this.password = password;
240        }
241    
242        /**
243         * @return Returns the userName.
244         */
245        public String getUserName() {
246            return userName;
247        }
248    
249        /**
250         * @param userName The userName to set.
251         */
252        public void setUserName(String userName) {
253            this.userName = userName;
254        }
255    
256        /**
257         * Is an embedded broker used by this connection factory
258         *
259         * @return true if an embedded broker will be used by this connection factory
260         */
261        public boolean isUseEmbeddedBroker() {
262            return useEmbeddedBroker;
263        }
264    
265        /**
266         * Allows embedded brokers to be associated with a connection factory
267         *
268         * @param useEmbeddedBroker
269         */
270        public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
271            this.useEmbeddedBroker = useEmbeddedBroker;
272        }
273    
274        /**
275         * The name of the broker to use if creating an embedded broker
276         *
277         * @return
278         */
279        public String getBrokerName() {
280            if (brokerName == null) {
281                // lets auto-create a broker name
282                brokerName = idGenerator.generateId();
283            }
284            return brokerName;
285        }
286        
287        /**
288         * The name of the broker to use if creating an embedded broker
289         *
290         * @return
291         */
292        public String getBrokerName(String url) {
293            if (brokerName == null) {
294                brokerName = url;
295            }
296            return brokerName;
297        }
298    
299        public void setBrokerName(String brokerName) {
300            this.brokerName = brokerName;
301        }
302    
303        /**
304         * @return Returns the useAsyncSend.
305         */
306        public boolean isUseAsyncSend() {
307            return useAsyncSend;
308        }
309    
310        /**
311         * @param useAsyncSend The useAsyncSend to set.
312         */
313        public void setUseAsyncSend(boolean useAsyncSend) {
314            this.useAsyncSend = useAsyncSend;
315        }
316    
317        public WireFormat getWireFormat() {
318            return wireFormat.copy();//need a separate instance - especially if wire format caching enabled
319        }
320    
321        /**
322         * Allows the prefetch policy to be configured
323         *
324         * @return
325         */
326        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
327            return prefetchPolicy;
328        }
329    
330        /**
331         * Sets the prefetch policy
332         *
333         * @param prefetchPolicy
334         */
335        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
336            this.prefetchPolicy = prefetchPolicy;
337        }
338    
339        /**
340         * Set this flag for fast throughput!
341         * <P>
342         * Enables asynchronous sending of messages and disables timestamps by default
343         * </P>
344         * @param value - the flag to set
345         */
346        public void setTurboBoost(boolean value){
347            
348            disableTimeStampsByDefault = value;
349            useAsyncSend = value;
350            cachingEnabled = value;
351            optimizedMessageDispatch = value;
352            prepareMessageBodyOnSend = !value;
353            copyMessageOnSend = !value;
354        }
355        
356        /**
357         * @return true if turboBoost enabled
358         */
359        public boolean isTurboBoost(){
360            return disableTimeStampsByDefault && useAsyncSend && cachingEnabled;
361        }
362        
363        /**
364         * @return Returns the optimizedMessageDispatch.
365         */
366        public boolean isOptimizedMessageDispatch() {
367            return optimizedMessageDispatch;
368        }
369        /**
370         * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
371         */
372        public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
373            this.optimizedMessageDispatch = optimizedMessageDispatch;
374        }
375        /**
376         * @return Returns the disableTimeStampsByDefault.
377         */
378        public boolean isDisableTimeStampsByDefault() {
379            return disableTimeStampsByDefault;
380        }
381        /**
382         * @param disableTimeStampsByDefault The disableTimeStampsByDefault to set.
383         */
384        public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
385            this.disableTimeStampsByDefault = disableTimeStampsByDefault;
386        }
387        /**
388         * @return Returns the j2EEcompliant.
389         */
390        public boolean isJ2EEcompliant() {
391            return J2EEcompliant;
392        }
393        /**
394         * @param ecompliant The j2EEcompliant to set.
395         */
396        public void setJ2EEcompliant(boolean ecompliant) {
397            J2EEcompliant = ecompliant;
398        }
399    
400        /**
401         * @return Returns the internalConnection.
402         */
403        public boolean isInternalConnection() {
404            return internalConnection;
405        }
406        /**
407         * @param internalConnection The internalConnection to set.
408         */
409        public void setInternalConnection(boolean internalConnection) {
410            this.internalConnection = internalConnection;
411        }
412        /**
413         * @return Returns the quickClose.
414         */
415        public boolean isQuickClose() {
416            return quickClose;
417        }
418        /**
419         * @param quickClose The quickClose to set.
420         */
421        public void setQuickClose(boolean quickClose) {
422            this.quickClose = quickClose;
423        }
424        /**
425         * @return Returns the doMessageCompression.
426         */
427        public boolean isDoMessageCompression() {
428            return doMessageCompression;
429        }
430        /**
431         * @param doMessageCompression The doMessageCompression to set.
432         */
433        public void setDoMessageCompression(boolean doMessageCompression) {
434            this.doMessageCompression = doMessageCompression;
435        }
436        /**
437         * @return Returns the doMessageFragmentation.
438         */
439        public boolean isDoMessageFragmentation() {
440            return doMessageFragmentation;
441        }
442        /**
443         * @param doMessageFragmentation The doMessageFragmentation to set.
444         */
445        public void setDoMessageFragmentation(boolean doMessageFragmentation) {
446            this.doMessageFragmentation = doMessageFragmentation;
447        }
448        /**
449         * @return Returns the messageCompressionLimit.
450         */
451        public int getMessageCompressionLimit() {
452            return messageCompressionLimit;
453        }
454        /**
455         * @param messageCompressionLimit The messageCompressionLimit to set.
456         */
457        public void setMessageCompressionLimit(int messageCompressionLimit) {
458            this.messageCompressionLimit = messageCompressionLimit;
459        }
460        /**
461         * @return Returns the messageCompressionStrategy.
462         */
463        public int getMessageCompressionStrategy() {
464            return messageCompressionStrategy;
465        }
466        /**
467         * @param messageCompressionStrategy The messageCompressionStrategy to set.
468         */
469        public void setMessageCompressionStrategy(int messageCompressionStrategy) {
470            this.messageCompressionStrategy = messageCompressionStrategy;
471        }
472        /**
473         * @return Returns the messageFragmentationLimit.
474         */
475        public int getMessageFragmentationLimit() {
476            return messageFragmentationLimit;
477        }
478        /**
479         * @param messageFragmentationLimit The messageFragmentationLimit to set.
480         */
481        public void setMessageFragmentationLimit(int messageFragmentationLimit) {
482            this.messageFragmentationLimit = messageFragmentationLimit;
483        }
484        
485        /**
486         * @return Returns the cachingEnabled.
487         */
488        public boolean isCachingEnabled() {
489            return cachingEnabled;
490        }
491        /**
492         * @param cachingEnabled The cachingEnabled to set.
493         */
494        public void setCachingEnabled(boolean cachingEnabled) {
495            this.cachingEnabled = cachingEnabled;
496        }
497        /**
498         * Causes pre-serialization of messages before send
499         * By default this is on
500         * @return Returns the prePrepareMessageOnSend.
501         */
502        public boolean isPrepareMessageBodyOnSend() {
503            return prepareMessageBodyOnSend;
504        }
505        /**
506         * Causes pre-serialization of messages before send
507         * By default this is on
508         * @param prePrepareMessageOnSend The prePrepareMessageOnSend to set.
509         */
510        public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
511            this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
512        }
513        /**
514         * @return Returns the copyMessageOnSend.
515         */
516        public boolean isCopyMessageOnSend() {
517            return copyMessageOnSend;
518        }
519        /**
520         * @param copyMessageOnSend The copyMessageOnSend to set.
521         */
522        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
523            this.copyMessageOnSend = copyMessageOnSend;
524        }
525        /**
526         * Allows a custom wire format to be used; otherwise the default Java wire format is used
527         * which is designed for minimum size and maximum speed on the Java platform
528         *
529         * @param wireFormat
530         */
531        public void setWireFormat(WireFormat wireFormat) {
532            this.wireFormat = wireFormat;
533        }
534        
535        /**
536         * set the WireFormat by name - e.g. 'default','amqpfast' etc.
537         * 
538         * @param format
539         * @throws JMSException
540         */
541     
542        public void setWireFormat(String format) throws JMSException{
543            this.wireFormat = WireFormatLoader.getWireFormat(format);
544        }
545    
546        public String getBrokerXmlConfig() {
547            return brokerXmlConfig;
548        }
549    
550        public BrokerContainer getBrokerContainer() {
551            return brokerContainer;
552        }
553    
554        /**
555         * Sets the <a href="http://activemq.org/Xml+Configuration">XML configuration file</a>
556         * used to configure the ActiveMQ broker via Spring if using embedded mode.
557         *
558         * @param brokerXmlConfig is the filename which is assumed to be on the classpath unless a URL
559         *                        is specified. So a value of <code>foo/bar.xml</code> would be assumed to be on the classpath
560         *                        whereas <code>file:dir/file.xml</code> would use the file system.
561         *                        Any valid URL string is supported.
562         * @see #setUseEmbeddedBroker(boolean)
563         */
564        public void setBrokerXmlConfig(String brokerXmlConfig) {
565            this.brokerXmlConfig = brokerXmlConfig;
566        }
567    
568        public BrokerContainerFactory getBrokerContainerFactory() throws JMSException {
569            if (brokerContainerFactory == null) {
570                brokerContainerFactory = createBrokerContainerFactory();
571            }
572            return brokerContainerFactory;
573        }
574    
575        public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
576            this.brokerContainerFactory = brokerContainerFactory;
577        }
578    
579        /**
580         * Returns the context used to store broker containers and connectors which defaults
581         * to using the singleton
582         */
583        public BrokerContext getBrokerContext() {
584            return brokerContext;
585        }
586    
587        public void setBrokerContext(BrokerContext brokerContext) {
588            this.brokerContext = brokerContext;
589        }
590    
591        /**
592         * Create a JMS Connection
593         *
594         * @return the JMS Connection
595         * @throws JMSException if an error occurs creating the Connection
596         */
597        public Connection createConnection() throws JMSException {
598            return this.createConnection(this.userName, this.password);
599        }
600    
601        /**
602         * @param userName
603         * @param password
604         * @return the Connection
605         * @throws JMSException if an error occurs creating the Connection
606         */
607        public Connection createConnection(String userName, String password) throws JMSException {
608            ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
609            connection.setCachingEnabled(isCachingEnabled());
610            connection.setUseAsyncSend(isUseAsyncSend());
611            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
612            connection.setJ2EEcompliant(isJ2EEcompliant());
613            connection.setDoMessageCompression(isDoMessageCompression());
614            connection.setMessageCompressionLevel(messageCompressionLevel);
615            connection.setMessageCompressionLimit(getMessageCompressionLimit());
616            connection.setMessageCompressionStrategy(getMessageCompressionStrategy());
617            connection.setDoMessageFragmentation(isDoMessageFragmentation());
618            connection.setMessageFragmentationLimit(getMessageFragmentationLimit());
619            connection.setPrepareMessageBodyOnSend(isPrepareMessageBodyOnSend());
620            connection.setInternalConnection(isInternalConnection());
621            connection.setQuickClose(isQuickClose());
622            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
623            connection.setCopyMessageOnSend(isCopyMessageOnSend());
624            connection.setPrefetchPolicy(getPrefetchPolicy());
625            if (this.clientID != null && this.clientID.length() > 0) {
626                connection.setClientID(this.clientID);
627            }
628            return connection;
629        }
630    
631        /**
632         * Create a JMS QueueConnection
633         *
634         * @return the JMS QueueConnection
635         * @throws JMSException if an error occurs creating the Connection
636         */
637        public QueueConnection createQueueConnection() throws JMSException {
638            return this.createQueueConnection(this.userName, this.password);
639        }
640    
641        /**
642         * @param userName
643         * @param password
644         * @return the QueueConnection
645         * @throws JMSException if an error occurs creating the Connection
646         */
647        public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
648            return (QueueConnection) createConnection(userName, password);
649        }
650    
651        /**
652         * Create a JMS TopicConnection
653         *
654         * @return the JMS TopicConnection
655         * @throws JMSException if an error occurs creating the Connection
656         */
657        public TopicConnection createTopicConnection() throws JMSException {
658            return this.createTopicConnection(this.userName, this.password);
659        }
660    
661        /**
662         * @param userName
663         * @param password
664         * @return the TopicConnection
665         * @throws JMSException if an error occurs creating the Connection
666         */
667        public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
668            return (TopicConnection) createConnection(userName, password);
669        }
670    
671    
672        public void start() throws JMSException {
673        }
674    
675        /**
676         * A hook to allow any embedded JMS Broker's to be closed down
677         *
678         * @throws JMSException
679         */
680        public synchronized void stop() throws JMSException {
681            // Stop all embded brokers that we started.
682            for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) {
683                String uri = (String) iter.next();
684                brokerContext.deregisterConnector(uri);
685            }
686            if (brokerContainer != null) {
687                brokerContainer.stop();
688                brokerContainer = null;
689            }
690        }
691    
692    
693        public Broker getEmbeddedBroker() throws JMSException {
694            if (isUseEmbeddedBroker()) {
695                return getContainer(getBrokerName(), getBrokerName()).getBroker();
696            }
697            return null;
698        }
699    
700        public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
701            BrokerContext.getInstance().registerConnector(theURLString, brokerConnector);
702        }
703    
704        public static synchronized void unregisterBroker(String theURLString) {
705            BrokerContext.getInstance().deregisterConnector(theURLString);
706        }
707    
708    
709        // Implementation methods
710        //-------------------------------------------------------------------------
711    
712    
713        /**
714         * Set the properties that will represent the instance in JNDI
715         *
716         * @param props
717         */
718        protected void buildFromProperties(Properties props) {
719            this.userName = props.getProperty("userName", this.userName);
720            this.password = props.getProperty("password", this.password);
721            String temp = props.getProperty(Context.PROVIDER_URL);
722            if (temp == null || temp.length() == 0) {
723                 temp = props.getProperty("brokerURL");
724            }
725            if (temp != null && temp.length() > 0) {
726                this.brokerURL = temp;
727            }
728            this.brokerName = props.getProperty("brokerName", this.brokerName);
729            this.clientID = props.getProperty("clientID");
730            this.useAsyncSend = getBoolean(props, "useAsyncSend", true);
731            this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
732            this.brokerXmlConfig = props.getProperty("brokerXmlConfig", this.brokerXmlConfig);
733            this.J2EEcompliant = getBoolean(props,"J2EEcompliant",true);
734            if (props.containsKey("turboBoost")){
735                this.setTurboBoost(getBoolean(props, "turboBoost"));
736            }
737        }
738    
739        /**
740         * Initialize the instance from properties stored in JNDI
741         *
742         * @param props
743         */
744        protected void populateProperties(Properties props) {
745            props.put("userName", this.userName);
746            props.put("password", this.password);
747            props.put("brokerURL", this.brokerURL);
748            props.put(Context.PROVIDER_URL, this.brokerURL);
749            props.put("brokerName", this.brokerName);
750            if (this.clientID != null) {
751                props.put("clientID", this.clientID);
752            }
753            props.put("useAsyncSend", (useAsyncSend) ? "true" : "false");
754            props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false");
755            props.put("J2EEcompliant", (this.J2EEcompliant) ? "true" : "false");
756            props.put("turboBoost", (isTurboBoost()) ? "true" : "false");
757            if (this.brokerXmlConfig != null) {
758                props.put("brokerXmlConfig", this.brokerXmlConfig);
759            }
760        }
761    
762        /**
763         * Helper method to return the property value as a boolean flag
764         *
765         * @param props
766         * @param key
767         * @return
768         */
769        protected boolean getBoolean(Properties props, String key) {
770            return getBoolean(props, key, false);
771        }
772    
773        /**
774         * Helper method to return the property value as a boolean flag
775         *
776         * @param props
777         * @param key
778         * @param defaultValue
779         * @return
780         */
781        protected boolean getBoolean(Properties props, String key, boolean defaultValue) {
782            String value = props.getProperty(key);
783            return value != null ? value.equalsIgnoreCase("true") : defaultValue;
784        }
785    
786        protected BrokerContainerFactory createBrokerContainerFactory() throws JMSException {
787            if (brokerXmlConfig != null) {
788                return XmlConfigHelper.createBrokerContainerFactory(brokerXmlConfig);
789            }
790            return new BrokerContainerFactoryImpl();
791        }
792    
793        /**
794         * Factory method to create a TransportChannel from a URL
795         * @param theURLString
796         * @return the TransportChannel to use with the embedded broker
797         * @throws JMSException
798         */
799        protected TransportChannel createTransportChannel(String theURLString) throws JMSException {
800            URI uri = createURI(theURLString);
801            TransportChannelFactory factory = TransportChannelProvider.getFactory(uri);
802            BrokerConnector brokerConnector = null;
803            boolean created = false;
804            TransportChannel transportChannel = null;
805            boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker();
806            if (embedServer) {
807                synchronized (this) {
808                    if (factory.requiresEmbeddedBroker()) {
809                        transportChannel = factory.create(getWireFormat(), uri);
810                        brokerConnector = transportChannel.getEmbeddedBrokerConnector();
811                    }
812                    if (brokerConnector == null) {
813                        brokerConnector = brokerContext.getConnectorByURL(theURLString);
814                        if (brokerConnector == null) {
815                            brokerConnector = createBrokerConnector(theURLString);
816                            brokerContext.registerConnector(theURLString, brokerConnector);
817                            startedEmbeddedBrokers.add(theURLString);
818                            created = true;
819                        }
820                    }
821                    else {
822                        created = true;
823                    }
824                }
825            }
826            if (transportChannel == null){
827                transportChannel = factory.create(getWireFormat(), uri);
828            }
829           
830            if (embedServer) {
831                return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
832            }
833            return transportChannel;
834        }
835    
836        protected synchronized BrokerContainer getContainer(String url, String name) throws JMSException {
837            if (brokerContainer == null) {
838                brokerContainer = brokerContext.getBrokerContainerByName(url, name, getBrokerContainerFactory());
839            }
840            return brokerContainer;
841        }
842    
843        protected BrokerConnector createBrokerConnector(String url) throws JMSException {
844            BrokerConnector brokerConnector;
845            brokerConnector = new BrokerConnectorImpl(getContainer(url, getBrokerName()), url, getWireFormat());
846            brokerConnector.start();
847    
848            // lets wait a little for the server to startup
849            log.info("Embedded JMS Broker has started");
850            try {
851                Thread.sleep(1000);
852            }
853            catch (InterruptedException e) {
854                log.warn("caught exception sleeping",e);
855            }
856            return brokerConnector;
857        }
858    
859    
860        protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
861            ensureVmServerIsAvailable(channel, brokerConnector);
862            if (channel.isMulticast()) {
863                return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
864            }
865            return channel;
866        }
867    
868        private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
869            if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) {
870                VmTransportChannel answer = (VmTransportChannel) channel;
871                answer.connect(brokerConnector);
872            }
873        }
874    
875        protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
876            if (created) {
877                BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector;
878    
879                BrokerClientImpl client = new BrokerClientImpl();
880                client.initialize(brokerImpl, channel);
881                channel.start();
882                String brokerClientID = createMulticastClientID();
883                channel.setClientID(brokerClientID);
884    
885                // lets spoof a consumer for topics which will replicate messages
886                // over the multicast transport
887                ConnectionInfo info = new ConnectionInfo();
888                info.setHostName(IdGenerator.getHostName());
889                info.setClientId(brokerClientID);
890                info.setStarted(true);
891                client.consumeConnectionInfo(info);
892    
893                ConsumerInfo consumerInfo = new ConsumerInfo();
894                consumerInfo.setDestination(new ActiveMQTopic(">"));
895                consumerInfo.setNoLocal(true);
896                consumerInfo.setClientId(brokerClientID);
897                consumerInfo.setConsumerId(idGenerator.generateId());
898                consumerInfo.setStarted(true);
899                client.consumeConsumerInfo(consumerInfo);
900    
901                consumerInfo = new ConsumerInfo();
902                consumerInfo.setDestination(new ActiveMQQueue(">"));
903                consumerInfo.setNoLocal(true);
904                consumerInfo.setClientId(brokerClientID);
905                consumerInfo.setConsumerId(idGenerator.generateId());
906                consumerInfo.setStarted(true);
907                client.consumeConsumerInfo(consumerInfo);
908            }
909    
910            // now lets create a VM channel that the JMS client will use
911            // to connect to the embedded brokerConnector
912            URI localURI = createURI("vm", remoteLocation);
913            TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
914            ensureVmServerIsAvailable(localChannel, brokerConnector);
915            return localChannel;
916        }
917    
918        /**
919         * Creates the clientID for the multicast client (used to dispatch local
920         * messages over a multicast bus)
921         */
922        protected String createMulticastClientID() {
923            return idGenerator.generateId();
924        }
925    
926        protected URI createURI(String protocol, URI uri) throws JMSException {
927            try {
928                return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
929            }
930            catch (URISyntaxException e) {
931                JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
932                jmsEx.setLinkedException(e);
933                throw jmsEx;
934    
935            }
936        }
937    
938        protected URI createURI(String uri) throws JMSException {
939            try {
940                if (uri == null) {
941                    throw new JMSException("The connection URI must be specified!");
942                }
943                return new URI(uri);
944            }
945            catch (URISyntaxException e) {
946                e.printStackTrace();
947                JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
948                jmsEx.setLinkedException(e);
949                throw jmsEx;
950    
951            }
952        }
953    
954        /**
955         * Called when a connection is closed so that we can shut down any embedded brokers cleanly
956         *
957         * @param connection
958         */
959        synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException {
960            if (--connectionCount <= 0) {
961                // close any broker if we've got one
962                stop();
963            }
964    
965        }
966    
967        synchronized void onConnectionCreate(ActiveMQConnection connection) {
968            ++connectionCount;
969        }
970        
971    }