001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import javax.annotation.PostConstruct;
020    import javax.annotation.PreDestroy;
021    import javax.jms.Connection;
022    import javax.jms.ConnectionFactory;
023    import javax.jms.Destination;
024    import javax.jms.ExceptionListener;
025    import javax.jms.JMSException;
026    import javax.jms.MessageConsumer;
027    import javax.jms.Session;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.advisory.AdvisorySupport;
031    import org.apache.activemq.util.ServiceStopper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * An agent which listens to commands on a JMS destination
037     * 
038     * @version $Revision: 916678 $
039     * @org.apache.xbean.XBean
040     */
041    public class CommandAgent implements Service, ExceptionListener {
042        private static final Log LOG = LogFactory.getLog(CommandAgent.class);
043    
044        private String brokerUrl = "vm://localhost";
045        private String username;
046        private String password;
047        private ConnectionFactory connectionFactory;
048        private Connection connection;
049        private Destination commandDestination;
050        private CommandMessageListener listener;
051        private Session session;
052        private MessageConsumer consumer;
053    
054        /**
055         *
056         * @throws Exception
057         * @org.apache.xbean.InitMethod
058         */
059        @PostConstruct
060        public void start() throws Exception {
061            session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
062            listener = new CommandMessageListener(session);
063            Destination destination = getCommandDestination();
064            if (LOG.isDebugEnabled()) {
065                LOG.debug("Agent subscribing to control destination: " + destination);
066            }
067            consumer = session.createConsumer(destination);
068            consumer.setMessageListener(listener);
069        }
070    
071        /**
072         *
073         * @throws Exception
074         * @org.apache.xbean.DestroyMethod
075         */
076        @PreDestroy
077        public void stop() throws Exception {
078            ServiceStopper stopper = new ServiceStopper();
079            if (consumer != null) {
080                try {
081                    consumer.close();
082                    consumer = null;
083                } catch (JMSException e) {
084                    stopper.onException(this, e);
085                }
086            }
087            if (session != null) {
088                try {
089                    session.close();
090                    session = null;
091                } catch (JMSException e) {
092                    stopper.onException(this, e);
093                }
094            }
095            if (connection != null) {
096                try {
097                    connection.close();
098                    connection = null;
099                } catch (JMSException e) {
100                    stopper.onException(this, e);
101                }
102            }
103            stopper.throwFirstException();
104        }
105    
106        // Properties
107        // -------------------------------------------------------------------------
108        public String getBrokerUrl() {
109            return brokerUrl;
110        }
111    
112        public void setBrokerUrl(String brokerUrl) {
113            this.brokerUrl = brokerUrl;
114        }    
115    
116        public String getUsername() {
117                    return username;
118            }
119    
120            public void setUsername(String username) {
121                    this.username = username;
122            }
123    
124            public String getPassword() {
125                    return password;
126            }
127    
128            public void setPassword(String password) {
129                    this.password = password;
130            }
131    
132            public ConnectionFactory getConnectionFactory() {
133            if (connectionFactory == null) {
134                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
135            }
136            return connectionFactory;
137        }
138    
139        public void setConnectionFactory(ConnectionFactory connectionFactory) {
140            this.connectionFactory = connectionFactory;
141        }
142    
143        public Connection getConnection() throws JMSException {
144            if (connection == null) {
145                connection = createConnection();
146                connection.setExceptionListener(this);
147                connection.start();
148            }
149            return connection;
150        }
151    
152        public void setConnection(Connection connection) {
153            this.connection = connection;
154        }
155    
156        public Destination getCommandDestination() {
157            if (commandDestination == null) {
158                commandDestination = createCommandDestination();
159            }
160            return commandDestination;
161        }
162    
163        public void setCommandDestination(Destination commandDestination) {
164            this.commandDestination = commandDestination;
165        }
166    
167        protected Connection createConnection() throws JMSException {
168            return getConnectionFactory().createConnection(username, password);
169        }
170    
171        protected Destination createCommandDestination() {
172            return AdvisorySupport.getAgentDestination();
173        }
174    
175        public void onException(JMSException exception) {
176            try {
177                stop();
178            } catch (Exception e) {
179            }
180        }
181    }