001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.xmpp; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.io.OutputStream; 022 import java.net.Socket; 023 import java.net.URI; 024 025 import javax.net.SocketFactory; 026 import javax.xml.bind.JAXBContext; 027 import javax.xml.bind.JAXBException; 028 import javax.xml.bind.Marshaller; 029 import javax.xml.bind.Unmarshaller; 030 import javax.xml.namespace.QName; 031 import javax.xml.stream.Location; 032 import javax.xml.stream.XMLEventReader; 033 import javax.xml.stream.XMLInputFactory; 034 import javax.xml.stream.XMLOutputFactory; 035 import javax.xml.stream.XMLReporter; 036 import javax.xml.stream.XMLStreamException; 037 import javax.xml.stream.XMLStreamWriter; 038 import javax.xml.stream.events.Attribute; 039 import javax.xml.stream.events.StartElement; 040 import javax.xml.stream.events.XMLEvent; 041 042 import ietf.params.xml.ns.xmpp_sasl.Mechanisms; 043 044 import org.apache.activemq.command.BrokerInfo; 045 import org.apache.activemq.command.Command; 046 import org.apache.activemq.transport.tcp.TcpBufferedInputStream; 047 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; 048 import org.apache.activemq.transport.tcp.TcpTransport; 049 import org.apache.activemq.util.IOExceptionSupport; 050 import org.apache.activemq.util.ServiceStopper; 051 import org.apache.activemq.wireformat.WireFormat; 052 import org.apache.commons.logging.Log; 053 import org.apache.commons.logging.LogFactory; 054 import org.jabber.etherx.streams.Features; 055 056 /** 057 * @version $Revision: 565003 $ 058 */ 059 public class XmppTransport extends TcpTransport { 060 protected static final QName ATTRIBUTE_TO = new QName("to"); 061 062 private static final transient Log LOG = LogFactory.getLog(XmppTransport.class); 063 064 protected OutputStream outputStream; 065 protected InputStream inputStream; 066 067 private JAXBContext context; 068 private XMLEventReader xmlReader; 069 private Unmarshaller unmarshaller; 070 private Marshaller marshaller; 071 private XMLStreamWriter xmlWriter; 072 private String to = "client"; 073 private ProtocolConverter converter; 074 private String from = "localhost"; 075 private String brokerId = "broker-id-1"; 076 077 public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException { 078 super(wireFormat, socket); 079 init(); 080 } 081 082 public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri1) throws IOException { 083 super(wireFormat, socketFactory, uri, uri1); 084 init(); 085 } 086 087 private void init() { 088 converter = new ProtocolConverter(this); 089 } 090 091 @Override 092 public void oneway(Object object) throws IOException { 093 if (object instanceof Command) { 094 Command command = (Command)object; 095 096 if (command instanceof BrokerInfo) { 097 BrokerInfo brokerInfo = (BrokerInfo)command; 098 099 brokerId = brokerInfo.getBrokerId().toString(); 100 from = brokerInfo.getBrokerName(); 101 try { 102 writeOpenStream(brokerId, from); 103 } catch (XMLStreamException e) { 104 throw IOExceptionSupport.create(e); 105 } 106 } else { 107 try { 108 converter.onActiveMQCommad(command); 109 } catch (IOException e) { 110 throw e; 111 } catch (Exception e) { 112 throw IOExceptionSupport.create(e); 113 } 114 } 115 } else { 116 LOG.warn("Unkown command: " + object); 117 } 118 } 119 120 /** 121 * Marshalls the given POJO to the client 122 */ 123 public void marshall(Object command) throws IOException { 124 if (isStopped() || isStopping()) { 125 LOG.warn("Not marshalling command as shutting down: " + command); 126 return; 127 } 128 try { 129 marshaller.marshal(command, xmlWriter); 130 xmlWriter.flush(); 131 outputStream.flush(); 132 } catch (JAXBException e) { 133 throw IOExceptionSupport.create(e); 134 } catch (XMLStreamException e) { 135 throw IOExceptionSupport.create(e); 136 } 137 } 138 139 @Override 140 public void doRun() throws IOException { 141 LOG.debug("XMPP consumer thread starting"); 142 try { 143 XMLInputFactory xif = XMLInputFactory.newInstance(); 144 xif.setXMLReporter(new XMLReporter() { 145 public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException { 146 LOG.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation); 147 } 148 }); 149 150 xmlReader = xif.createXMLEventReader(inputStream); 151 152 XMLEvent docStart = xmlReader.nextEvent(); 153 154 XMLEvent rootElement = xmlReader.nextTag(); 155 156 if (rootElement instanceof StartElement) { 157 StartElement startElement = (StartElement)rootElement; 158 Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO); 159 if (toAttribute != null) { 160 to = toAttribute.getValue(); 161 } 162 } 163 while (true) { 164 if (isStopped()) { 165 break; 166 } 167 168 XMLEvent event = xmlReader.peek(); 169 if (event.isStartElement()) { 170 // unmarshal a new object 171 Object object = unmarshaller.unmarshal(xmlReader); 172 if (object != null) { 173 converter.onXmppCommand(object); 174 } 175 } else { 176 if (event.getEventType() == XMLEvent.END_ELEMENT) { 177 break; 178 } else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) { 179 break; 180 } else { 181 xmlReader.nextEvent(); 182 } 183 184 } 185 } 186 } catch (Exception e) { 187 throw IOExceptionSupport.create(e); 188 } 189 } 190 191 public String getFrom() { 192 return from; 193 } 194 195 @Override 196 protected void doStop(ServiceStopper stopper) throws Exception { 197 if (xmlWriter != null) { 198 try { 199 xmlWriter.writeEndElement(); 200 xmlWriter.writeEndDocument(); 201 xmlWriter.close(); 202 } catch (XMLStreamException e) { 203 // the client may have closed first so ignore this 204 LOG.info("Caught trying to close transport: " + e, e); 205 } 206 } 207 if (xmlReader != null) { 208 try { 209 xmlReader.close(); 210 } catch (XMLStreamException e) { 211 // the client may have closed first so ignore this 212 LOG.info("Caught trying to close transport: " + e, e); 213 } 214 } 215 super.doStop(stopper); 216 } 217 218 @Override 219 protected void initializeStreams() throws Exception { 220 // TODO it would be preferable to use class discovery here! 221 context = JAXBContext.newInstance("jabber.client" 222 /* 223 * + ":jabber.server" + ":jabber.iq.gateway" + ":jabber.iq.last" + 224 * ":jabber.iq.oob" + ":jabber.iq.pass" + ":jabber.iq.time" + 225 * ":jabber.iq.version" + ":org.jabber.protocol.activity" + 226 * ":org.jabber.protocol.address" + ":org.jabber.protocol.amp" + 227 * ":org.jabber.protocol.amp_errors" + ":org.jabber.protocol.muc_admin" + 228 * ":org.jabber.protocol.muc_unique" 229 */ 230 + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.roster" + ":org.jabber.etherx.streams" + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items" 231 + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_user" + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas" 232 + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"); 233 234 inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024); 235 outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024); 236 237 unmarshaller = context.createUnmarshaller(); 238 marshaller = context.createMarshaller(); 239 marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); 240 } 241 242 protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException { 243 LOG.debug("Sending initial stream element"); 244 XMLOutputFactory factory = XMLOutputFactory.newInstance(); 245 // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true); 246 xmlWriter = factory.createXMLStreamWriter(outputStream); 247 248 // write the dummy start tag 249 xmlWriter.writeStartDocument(); 250 xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); 251 xmlWriter.writeDefaultNamespace("jabber:client"); 252 xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams"); 253 xmlWriter.writeAttribute("version", "1.0"); 254 xmlWriter.writeAttribute("id", id); 255 if (to == null) { 256 to = "client"; 257 } 258 xmlWriter.writeAttribute("to", to); 259 xmlWriter.writeAttribute("from", from); 260 261 // now lets write the features 262 Features features = new Features(); 263 264 // TODO support TLS 265 // features.getAny().add(new Starttls()); 266 267 Mechanisms mechanisms = new Mechanisms(); 268 269 // TODO support SASL 270 // mechanisms.getMechanism().add("DIGEST-MD5"); 271 // mechanisms.getMechanism().add("PLAIN"); 272 features.getAny().add(mechanisms); 273 marshall(features); 274 275 LOG.debug("Initial stream element sent!"); 276 } 277 278 }