001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.peer; 020 import javax.jms.IllegalStateException; 021 import javax.jms.JMSException; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.activemq.broker.BrokerConnector; 025 import org.activemq.broker.BrokerContainer; 026 import org.activemq.broker.impl.BrokerConnectorImpl; 027 import org.activemq.broker.impl.BrokerContainerImpl; 028 import org.activemq.io.WireFormat; 029 import org.activemq.store.vm.VMPersistenceAdapter; 030 import org.activemq.transport.DiscoveryNetworkConnector; 031 import org.activemq.transport.NetworkConnector; 032 import org.activemq.transport.TransportChannel; 033 import org.activemq.transport.multicast.MulticastDiscoveryAgent; 034 import org.activemq.transport.vm.VmTransportChannel; 035 import org.activemq.util.IdGenerator; 036 import org.activemq.util.URIHelper; 037 038 /** 039 * A <CODE>PeerTransportChannel</CODE> creates an embedded broker and networks peers together to form a P-2-P network. 040 * <P> 041 * By default, <CODE>PeerTransportChannel</CODE> uses discovery to locate other peers, and uses a well known service 042 * name on the discovery 043 * <P> 044 * An example of the expected format is: <CODE>peer://development.net</CODE> where development.net is the service name 045 * used in discovery 046 * <P> 047 * 048 * @version $Revision: 1.1.1.1 $ 049 */ 050 public class PeerTransportChannel extends VmTransportChannel { 051 private static final Log log = LogFactory.getLog(PeerTransportChannel.class); 052 protected static final String DEFAULT_BROKER_CONNECTOR_URI = "tcp://localhost:0"; 053 protected WireFormat wireFormat; 054 protected TransportChannel channel; 055 protected String discoveryURI; 056 protected String remoteUserName; 057 protected String remotePassword; 058 protected String brokerName; 059 protected boolean doDiscovery; 060 protected String peerURIs; 061 protected String brokerConnectorURI; 062 protected String serviceName; 063 protected BrokerConnector brokerConnector; 064 protected boolean remote; 065 protected boolean persistent=false; 066 067 068 /** 069 * Construct a PeerTransportChannel 070 * 071 * @param wireFormat 072 * @param serviceName 073 * @throws JMSException 074 */ 075 protected PeerTransportChannel(WireFormat wireFormat, String serviceName) throws JMSException { 076 this.wireFormat = wireFormat; 077 this.serviceName = serviceName; 078 this.discoveryURI = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI; 079 IdGenerator idGen = new IdGenerator(); 080 this.brokerName = idGen.generateId(); 081 this.brokerConnectorURI = DEFAULT_BROKER_CONNECTOR_URI; 082 this.doDiscovery = true; 083 if (serviceName == null || serviceName.length() == 0) { 084 throw new IllegalStateException("No service name specified for peer:// protocol"); 085 } 086 } 087 088 /** 089 * @return true if the transport channel is active, this value will be false through reconnecting 090 */ 091 public boolean isTransportConnected() { 092 return true; 093 } 094 095 /** 096 * Some transports rely on an embedded broker (beer based protocols) 097 * 098 * @return true if an embedded broker required 099 */ 100 public boolean requiresEmbeddedBroker() { 101 return true; 102 } 103 104 /** 105 * Some transports that rely on an embedded broker need to create the connector used by the broker 106 * 107 * @return the BrokerConnector or null if not applicable 108 * @throws JMSException 109 */ 110 public BrokerConnector getEmbeddedBrokerConnector() throws JMSException { 111 try { 112 if (brokerConnector == null) { 113 BrokerContainer container = new BrokerContainerImpl(brokerName, serviceName); 114 if( !persistent ) { 115 container.setPersistenceAdapter(new VMPersistenceAdapter()); 116 } 117 NetworkConnector networkConnector = null; 118 if (doDiscovery) { 119 networkConnector = new DiscoveryNetworkConnector(container); 120 MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(serviceName); 121 container.setDiscoveryAgent(agent); 122 } 123 if (peerURIs != null && peerURIs.length() > 0) { 124 URIHelper peers = new URIHelper(peerURIs); 125 networkConnector = createNetworkConnector(container); 126 while (peers.hasNext()) { 127 String peerURL = peers.getNext(); 128 networkConnector.addNetworkChannel(peerURL); 129 } 130 } 131 container.addNetworkConnector(networkConnector); 132 URIHelper helper = new URIHelper(brokerConnectorURI); 133 brokerConnector = new BrokerConnectorImpl(container, helper.getNext(), wireFormat); 134 while (helper.hasNext()) { 135 new BrokerConnectorImpl(container, helper.getNext(), wireFormat); 136 } 137 container.start(); 138 } 139 return brokerConnector; 140 } 141 catch (Exception e) { 142 e.printStackTrace(); 143 String errorStr = "Failed to get embedded connector"; 144 log.error(errorStr, e); 145 JMSException jmsEx = new JMSException(errorStr); 146 jmsEx.setLinkedException(e); 147 throw jmsEx; 148 } 149 } 150 151 /** 152 * Create a NetworkConnector 153 * @param container 154 * @return the NetworkConnector 155 */ 156 protected NetworkConnector createNetworkConnector(BrokerContainer container){ 157 return new NetworkConnector(container); 158 } 159 160 /** 161 * @return Returns the brokerDiscoveryURI. 162 */ 163 public String getDiscoveryURI() { 164 return discoveryURI; 165 } 166 167 /** 168 * @param discoveryURI The brokerDiscoveryURI to set. 169 */ 170 public void setDiscoveryURI(String discoveryURI) { 171 this.discoveryURI = discoveryURI; 172 } 173 174 /** 175 * @return Returns the brokerName. 176 */ 177 public String getBrokerName() { 178 return brokerName; 179 } 180 181 /** 182 * @param brokerName The brokerName to set. 183 */ 184 public void setBrokerName(String brokerName) { 185 this.brokerName = brokerName; 186 } 187 188 /** 189 * @return Returns the doDiscovery. 190 */ 191 public boolean isDoDiscovery() { 192 return doDiscovery; 193 } 194 195 /** 196 * @param doDiscovery The doDiscovery to set. 197 */ 198 public void setDoDiscovery(boolean doDiscovery) { 199 this.doDiscovery = doDiscovery; 200 } 201 202 /** 203 * @return Returns the wireFormat. 204 */ 205 public WireFormat getWireFormat() { 206 return wireFormat; 207 } 208 209 /** 210 * @param wireFormat The wireFormat to set. 211 */ 212 public void setWireFormat(WireFormat wireFormat) { 213 this.wireFormat = wireFormat; 214 } 215 216 /** 217 * @return Returns the remotePassword. 218 */ 219 public String getRemotePassword() { 220 return remotePassword; 221 } 222 223 /** 224 * @param remotePassword The remotePassword to set. 225 */ 226 public void setRemotePassword(String remotePassword) { 227 this.remotePassword = remotePassword; 228 } 229 230 /** 231 * @return Returns the remoteUserName. 232 */ 233 public String getRemoteUserName() { 234 return remoteUserName; 235 } 236 237 /** 238 * @param remoteUserName The remoteUserName to set. 239 */ 240 public void setRemoteUserName(String remoteUserName) { 241 this.remoteUserName = remoteUserName; 242 } 243 244 /** 245 * @return Returns the brokerConnectorURI. 246 */ 247 public String getBrokerConnectorURI() { 248 return brokerConnectorURI; 249 } 250 251 /** 252 * @param brokerConnectorURI The brokerConnectorURI to set. 253 */ 254 public void setBrokerConnectorURI(String brokerConnectorURI) { 255 this.brokerConnectorURI = brokerConnectorURI; 256 } 257 258 /** 259 * @return Returns the peerURIs. 260 */ 261 public String getPeerURIs() { 262 return peerURIs; 263 } 264 265 /** 266 * @param peerURIs The peerURIs to set. 267 */ 268 public void setPeerURIs(String peerURIs) { 269 this.peerURIs = peerURIs; 270 } 271 272 /** 273 * @return Returns the serviceName. 274 */ 275 public String getServiceName() { 276 return serviceName; 277 } 278 279 /** 280 * @param serviceName The serviceName to set. 281 */ 282 public void setServiceName(String serviceName) { 283 this.serviceName = serviceName; 284 } 285 286 /** 287 * @return Returns the remote. 288 */ 289 public boolean isRemote() { 290 return remote; 291 } 292 /** 293 * @param remote The remote to set. 294 */ 295 public void setRemote(boolean remote) { 296 this.remote = remote; 297 } 298 299 /** 300 * @return Returns the persistent. 301 */ 302 public boolean isPersistent() { 303 return persistent; 304 } 305 /** 306 * @param persistent The persistent to set. 307 */ 308 public void setPersistent(boolean persistent) { 309 this.persistent = persistent; 310 } 311 }