001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.proxy; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.Iterator; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 025 import org.apache.activemq.Service; 026 import org.apache.activemq.transport.CompositeTransport; 027 import org.apache.activemq.transport.Transport; 028 import org.apache.activemq.transport.TransportAcceptListener; 029 import org.apache.activemq.transport.TransportFactory; 030 import org.apache.activemq.transport.TransportFilter; 031 import org.apache.activemq.transport.TransportServer; 032 import org.apache.activemq.util.ServiceStopper; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 036 /** 037 * @org.apache.xbean.XBean 038 * 039 * @version $Revision$ 040 */ 041 public class ProxyConnector implements Service { 042 043 private static final Log LOG = LogFactory.getLog(ProxyConnector.class); 044 private TransportServer server; 045 private URI bind; 046 private URI remote; 047 private URI localUri; 048 private String name; 049 private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>(); 050 051 public void start() throws Exception { 052 053 this.getServer().setAcceptListener(new TransportAcceptListener() { 054 public void onAccept(Transport localTransport) { 055 try { 056 Transport remoteTransport = createRemoteTransport(); 057 ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport); 058 connections.add(connection); 059 connection.start(); 060 } catch (Exception e) { 061 onAcceptError(e); 062 } 063 } 064 065 public void onAcceptError(Exception error) { 066 LOG.error("Could not accept connection: " + error, error); 067 } 068 }); 069 getServer().start(); 070 LOG.info("Proxy Connector " + getName() + " Started"); 071 072 } 073 074 public void stop() throws Exception { 075 ServiceStopper ss = new ServiceStopper(); 076 if (this.server != null) { 077 ss.stop(this.server); 078 } 079 for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) { 080 LOG.info("Connector stopped: Stopping proxy."); 081 ss.stop(iter.next()); 082 } 083 ss.throwFirstException(); 084 LOG.info("Proxy Connector " + getName() + " Stopped"); 085 } 086 087 // Properties 088 // ------------------------------------------------------------------------- 089 090 public URI getLocalUri() { 091 return localUri; 092 } 093 094 public void setLocalUri(URI localURI) { 095 this.localUri = localURI; 096 } 097 098 public URI getBind() { 099 return bind; 100 } 101 102 public void setBind(URI bind) { 103 this.bind = bind; 104 } 105 106 public URI getRemote() { 107 return remote; 108 } 109 110 public void setRemote(URI remote) { 111 this.remote = remote; 112 } 113 114 public TransportServer getServer() throws IOException, URISyntaxException { 115 if (server == null) { 116 server = createServer(); 117 } 118 return server; 119 } 120 121 public void setServer(TransportServer server) { 122 this.server = server; 123 } 124 125 protected TransportServer createServer() throws IOException, URISyntaxException { 126 if (bind == null) { 127 throw new IllegalArgumentException("You must specify either a server or the bind property"); 128 } 129 return TransportFactory.bind(bind); 130 } 131 132 private Transport createRemoteTransport() throws Exception { 133 Transport transport = TransportFactory.compositeConnect(remote); 134 CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class); 135 if (ct != null && localUri != null) { 136 ct.add(new URI[] {localUri}); 137 } 138 139 // Add a transport filter so that can track the transport life cycle 140 transport = new TransportFilter(transport) { 141 public void stop() throws Exception { 142 LOG.info("Stopping proxy."); 143 super.stop(); 144 connections.remove(this); 145 } 146 }; 147 return transport; 148 } 149 150 public String getName() { 151 if (name == null) { 152 if (server != null) { 153 name = server.getConnectURI().toString(); 154 } else { 155 name = "proxy"; 156 } 157 } 158 return name; 159 } 160 161 public void setName(String name) { 162 this.name = name; 163 } 164 165 }