001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.util; 018 019 import javax.annotation.PostConstruct; 020 import javax.annotation.PreDestroy; 021 import javax.jms.Connection; 022 import javax.jms.ConnectionFactory; 023 import javax.jms.Destination; 024 import javax.jms.ExceptionListener; 025 import javax.jms.JMSException; 026 import javax.jms.MessageConsumer; 027 import javax.jms.Session; 028 import org.apache.activemq.ActiveMQConnectionFactory; 029 import org.apache.activemq.Service; 030 import org.apache.activemq.advisory.AdvisorySupport; 031 import org.apache.activemq.util.ServiceStopper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * An agent which listens to commands on a JMS destination 037 * 038 * @version $Revision: 916678 $ 039 * @org.apache.xbean.XBean 040 */ 041 public class CommandAgent implements Service, ExceptionListener { 042 private static final Log LOG = LogFactory.getLog(CommandAgent.class); 043 044 private String brokerUrl = "vm://localhost"; 045 private String username; 046 private String password; 047 private ConnectionFactory connectionFactory; 048 private Connection connection; 049 private Destination commandDestination; 050 private CommandMessageListener listener; 051 private Session session; 052 private MessageConsumer consumer; 053 054 /** 055 * 056 * @throws Exception 057 * @org.apache.xbean.InitMethod 058 */ 059 @PostConstruct 060 public void start() throws Exception { 061 session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 062 listener = new CommandMessageListener(session); 063 Destination destination = getCommandDestination(); 064 if (LOG.isDebugEnabled()) { 065 LOG.debug("Agent subscribing to control destination: " + destination); 066 } 067 consumer = session.createConsumer(destination); 068 consumer.setMessageListener(listener); 069 } 070 071 /** 072 * 073 * @throws Exception 074 * @org.apache.xbean.DestroyMethod 075 */ 076 @PreDestroy 077 public void stop() throws Exception { 078 ServiceStopper stopper = new ServiceStopper(); 079 if (consumer != null) { 080 try { 081 consumer.close(); 082 consumer = null; 083 } catch (JMSException e) { 084 stopper.onException(this, e); 085 } 086 } 087 if (session != null) { 088 try { 089 session.close(); 090 session = null; 091 } catch (JMSException e) { 092 stopper.onException(this, e); 093 } 094 } 095 if (connection != null) { 096 try { 097 connection.close(); 098 connection = null; 099 } catch (JMSException e) { 100 stopper.onException(this, e); 101 } 102 } 103 stopper.throwFirstException(); 104 } 105 106 // Properties 107 // ------------------------------------------------------------------------- 108 public String getBrokerUrl() { 109 return brokerUrl; 110 } 111 112 public void setBrokerUrl(String brokerUrl) { 113 this.brokerUrl = brokerUrl; 114 } 115 116 public String getUsername() { 117 return username; 118 } 119 120 public void setUsername(String username) { 121 this.username = username; 122 } 123 124 public String getPassword() { 125 return password; 126 } 127 128 public void setPassword(String password) { 129 this.password = password; 130 } 131 132 public ConnectionFactory getConnectionFactory() { 133 if (connectionFactory == null) { 134 connectionFactory = new ActiveMQConnectionFactory(brokerUrl); 135 } 136 return connectionFactory; 137 } 138 139 public void setConnectionFactory(ConnectionFactory connectionFactory) { 140 this.connectionFactory = connectionFactory; 141 } 142 143 public Connection getConnection() throws JMSException { 144 if (connection == null) { 145 connection = createConnection(); 146 connection.setExceptionListener(this); 147 connection.start(); 148 } 149 return connection; 150 } 151 152 public void setConnection(Connection connection) { 153 this.connection = connection; 154 } 155 156 public Destination getCommandDestination() { 157 if (commandDestination == null) { 158 commandDestination = createCommandDestination(); 159 } 160 return commandDestination; 161 } 162 163 public void setCommandDestination(Destination commandDestination) { 164 this.commandDestination = commandDestination; 165 } 166 167 protected Connection createConnection() throws JMSException { 168 return getConnectionFactory().createConnection(username, password); 169 } 170 171 protected Destination createCommandDestination() { 172 return AdvisorySupport.getAgentDestination(); 173 } 174 175 public void onException(JMSException exception) { 176 try { 177 stop(); 178 } catch (Exception e) { 179 } 180 } 181 }