001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.jabber; 019 020 import org.activemq.io.AbstractWireFormat; 021 import org.activemq.io.WireFormat; 022 import org.activemq.io.util.ByteArray; 023 import org.activemq.message.ActiveMQBytesMessage; 024 import org.activemq.message.ActiveMQDestination; 025 import org.activemq.message.ActiveMQMessage; 026 import org.activemq.message.ActiveMQObjectMessage; 027 import org.activemq.message.ActiveMQTextMessage; 028 import org.activemq.message.ConnectionInfo; 029 import org.activemq.message.ConsumerInfo; 030 import org.activemq.message.Packet; 031 import org.activemq.util.IdGenerator; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 import javax.jms.Destination; 036 import javax.jms.JMSException; 037 import javax.xml.namespace.QName; 038 import javax.xml.stream.XMLStreamConstants; 039 import javax.xml.stream.XMLStreamException; 040 import javax.xml.stream.XMLStreamReader; 041 import java.io.DataInput; 042 import java.io.DataOutput; 043 import java.io.IOException; 044 import java.io.PrintWriter; 045 import java.io.Serializable; 046 import java.net.InetAddress; 047 import java.util.Iterator; 048 import java.util.List; 049 import java.util.Map; 050 051 /** 052 * A wire format which uses XMPP format of messages 053 * 054 * @version $Revision: 1.1 $ 055 */ 056 public class JabberWireFormat extends AbstractWireFormat { 057 private static final Log log = LogFactory.getLog(JabberWireFormat.class); 058 059 private static final String NAMESPACE = "http://etherx.jabber.org/streams"; 060 private static final String QUEUE_PREFIX = "queue:"; 061 private static final String TOPIC_PREFIX = "topic:"; 062 private static final String TEMP_QUEUE_PREFIX = "tempQueue:"; 063 private static final String TEMP_TOPIC_PREFIX = "tempTopic:"; 064 065 private static final QName STREAM_QNAME = new QName(NAMESPACE, "stream", "stream"); 066 private static final QName MESSAGE_QNAME = new QName("jabber:client","message","message"); 067 private static final QName AUTH_QNAME = new QName("jabber:iq:auth", "query", "query"); 068 069 private IdGenerator idGenerator = new IdGenerator(); 070 private String clientID = idGenerator.generateId(); 071 private ConnectionInfo connectionInfo; 072 private PrintWriter writer; 073 private String userName; 074 private boolean validStream = false; 075 076 077 public WireFormat copy() { 078 return new JabberWireFormat(); 079 } 080 081 public Packet readPacket(int firstByte, DataInput in) throws IOException { 082 return null; /** TODO */ 083 } 084 085 086 /** 087 * Reads a packet from the XML stream 088 * @param reader 089 * @param returnPackets 090 * @throws XMLStreamException 091 * @throws JMSException 092 */ 093 public void readPacket(XMLStreamReader reader, List returnPackets) throws XMLStreamException, JMSException { 094 String sessionId = getAttributeValue("id", reader); 095 096 if (reader.next() == XMLStreamConstants.START_ELEMENT) { 097 098 QName name = reader.getName(); 099 100 if (!validStream) { 101 if (name.equals(STREAM_QNAME)) { 102 validStream = true; 103 } 104 else { 105 String errStr = "Bad initial QName for stream. Received: " + name + " while expecting: " 106 + STREAM_QNAME; 107 log.warn(errStr); 108 throw new JMSException(errStr); 109 } 110 } 111 else { 112 QName test = new QName("jabber:iq:auth", "query", "query"); 113 114 if (name.equals(AUTH_QNAME)) { 115 if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){ 116 name = reader.getName(); 117 this.userName = reader.getElementText(); 118 119 //skip past the end 120 if (reader.hasNext()) { 121 reader.next(); 122 } 123 if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){ 124 if (sessionId != null){ 125 writer.println(" <iq id='" + sessionId +"' type='result'/>"); 126 writer.flush(); 127 } 128 }else { 129 //write back a request for the password 130 writer.println("<iq id='" + sessionId + "'"); 131 writer.println(" type = 'result'>"); 132 writer.println("<query xmlns='jabber:iq:auth'><username>" + this.userName + "</username><password/><digest/><resource/></query></iq>"); 133 writer.flush(); 134 returnPackets.add(createConnectionInfo()); 135 returnPackets.add(createConsumerPacket()); 136 } 137 } 138 }else if (name.equals(MESSAGE_QNAME)){ 139 Packet pack = readMessage(reader); 140 if (pack != null){ 141 returnPackets.add(pack); 142 } 143 }else { 144 //general catch all - just say ok .. 145 if (sessionId != null){ 146 writer.println(" <iq id='" + sessionId +"' type='result'/>"); 147 writer.flush(); 148 } 149 } 150 } 151 } 152 } 153 154 private String getAttributeValue(String attributeName, XMLStreamReader reader) { 155 String result = null; 156 for (int i = 0;i < reader.getAttributeCount();i++) { 157 if (reader.getAttributeName(i).toString().equals(attributeName)) { 158 result = reader.getAttributeValue(i); 159 break; 160 } 161 } 162 return result; 163 } 164 165 166 167 168 public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException { 169 switch (packet.getPacketType()) { 170 case Packet.ACTIVEMQ_MESSAGE: 171 writeMessage((ActiveMQMessage) packet, "", out); 172 break; 173 174 case Packet.ACTIVEMQ_TEXT_MESSAGE: 175 writeTextMessage((ActiveMQTextMessage) packet, out); 176 break; 177 178 case Packet.ACTIVEMQ_BYTES_MESSAGE: 179 writeBytesMessage((ActiveMQBytesMessage) packet, out); 180 break; 181 182 case Packet.ACTIVEMQ_OBJECT_MESSAGE: 183 writeObjectMessage((ActiveMQObjectMessage) packet, out); 184 break; 185 186 case Packet.ACTIVEMQ_CONNECTION_INFO: 187 188 case Packet.ACTIVEMQ_MAP_MESSAGE: 189 case Packet.ACTIVEMQ_STREAM_MESSAGE: 190 191 192 case Packet.ACTIVEMQ_BROKER_INFO: 193 case Packet.ACTIVEMQ_MSG_ACK: 194 case Packet.CONSUMER_INFO: 195 case Packet.DURABLE_UNSUBSCRIBE: 196 case Packet.INT_RESPONSE_RECEIPT_INFO: 197 case Packet.PRODUCER_INFO: 198 case Packet.RECEIPT_INFO: 199 case Packet.RESPONSE_RECEIPT_INFO: 200 case Packet.SESSION_INFO: 201 case Packet.TRANSACTION_INFO: 202 case Packet.XA_TRANSACTION_INFO: 203 default: 204 log.debug("Ignoring message type: " + packet.getPacketType() + " packet: " + packet); 205 } 206 writer.flush(); 207 return null; 208 } 209 210 /** 211 * Can this wireformat process packets of this version 212 * 213 * @param version the version number to test 214 * @return true if can accept the version 215 */ 216 public boolean canProcessWireFormatVersion(int version) { 217 return true; 218 } 219 220 /** 221 * @return the current version of this wire format 222 */ 223 public int getCurrentWireFormatVersion() { 224 return 1; 225 } 226 227 public PrintWriter getWriter() { 228 return writer; 229 } 230 231 public void setWriter(PrintWriter writer) { 232 this.writer = writer; 233 } 234 235 // Implementation methods 236 //------------------------------------------------------------------------- 237 protected Packet createConnectionInfo() { 238 connectionInfo = new ConnectionInfo(); 239 connectionInfo.setStarted(true); 240 connectionInfo.setClientId(this.clientID); 241 connectionInfo.setClientVersion("" + getCurrentWireFormatVersion()); 242 connectionInfo.setUserName(userName); 243 return connectionInfo; 244 } 245 246 protected Packet createConsumerPacket(){ 247 ConsumerInfo info = new ConsumerInfo(); 248 info.setClientId(this.clientID); 249 info.setConsumerNo(0); 250 info.setStarted(true); 251 info.setStartTime(System.currentTimeMillis()); 252 info.setDestination(createDestination("chat",this.userName)); 253 return info; 254 } 255 256 257 258 259 protected void initialize() throws IOException { 260 //start the stream - 261 String hostName = InetAddress.getLocalHost().toString(); 262 writer.println("<?xml version='1.0'?>"); 263 writer.println("<stream:stream"); 264 writer.println(" xmlns='jabber:client'"); 265 writer.println(" xml:lang='en'"); 266 writer.println(" xmlns:stream='http://etherx.jabber.org/streams'"); 267 writer.println(" from='" + hostName + "'"); 268 writer.println(" id='" + clientID + "'>"); 269 writer.flush(); 270 } 271 272 protected Packet readMessage(XMLStreamReader reader) throws XMLStreamException, JMSException { 273 ActiveMQTextMessage message = new ActiveMQTextMessage(); 274 message.setJMSMessageID(idGenerator.generateId()); 275 QName name = reader.getName(); 276 String to = getAttributeValue("to", reader); 277 String type = getAttributeValue("type",reader); 278 279 if (type != null){ 280 message.setJMSType(type); 281 } 282 283 if (to != null && to.length() > 0) { 284 message.setJMSDestination(createDestination(type,to)); 285 } 286 287 if (this.userName != null && this.userName .length() > 0) { 288 message.setJMSReplyTo(createDestination("chat",this.userName)); 289 } 290 291 while (reader.hasNext()) { 292 switch (reader.nextTag()) { 293 case XMLStreamConstants.START_ELEMENT: 294 if (!readElement(reader, message)) { 295 log.debug("Unknown element: " + reader.getName()); 296 } 297 break; 298 case XMLStreamConstants.END_ELEMENT: 299 case XMLStreamConstants.END_DOCUMENT: 300 return message; 301 } 302 } 303 return message; 304 } 305 306 protected boolean readElement(XMLStreamReader reader, ActiveMQTextMessage message) throws JMSException, XMLStreamException { 307 QName name = reader.getName(); 308 String localPart = name.getLocalPart(); 309 if (localPart.equals("body")) { 310 message.setText(reader.getElementText()); 311 return true; 312 } 313 else if (localPart.equals("thread")) { 314 message.setJMSCorrelationID(reader.getElementText()); 315 return true; 316 } 317 else { 318 return false; 319 } 320 } 321 322 protected String readXMLAsText(XMLStreamReader reader) throws XMLStreamException { 323 StringBuffer buffer = new StringBuffer(); 324 int elementCount = 0; 325 while (reader.hasNext()) { 326 switch (reader.nextTag()) { 327 case XMLStreamConstants.START_ELEMENT: 328 if (elementCount++ > 0) { 329 writeStartElement(reader); 330 } 331 break; 332 333 case XMLStreamConstants.CHARACTERS: 334 buffer.append(reader.getText()); 335 break; 336 337 case XMLStreamConstants.END_ELEMENT: 338 if (--elementCount <= 0) { 339 return buffer.toString(); 340 } 341 writeEndElement(reader); 342 break; 343 344 case XMLStreamConstants.END_DOCUMENT: 345 return buffer.toString(); 346 } 347 } 348 return buffer.toString(); 349 } 350 351 protected void writeStartElement(XMLStreamReader reader) { 352 writer.print("<"); 353 writeQName(reader.getName()); 354 for (int i = 0, size = reader.getNamespaceCount(); i < size; i++) { 355 writer.print("xmlns"); 356 String prefix = reader.getNamespacePrefix(i); 357 if (prefix != null && prefix.length() > 0) { 358 writer.print(":"); 359 writer.print(prefix); 360 } 361 writer.print("='"); 362 writer.print(reader.getNamespaceURI(i)); 363 writer.print("'"); 364 } 365 for (int i = 0, size = reader.getAttributeCount(); i < size; i++) { 366 writer.print("xmlns"); 367 writeQName(reader.getAttributeName(i)); 368 writer.print("='"); 369 writer.print(reader.getAttributeValue(i)); 370 writer.print("'"); 371 } 372 writer.println(">"); 373 } 374 375 protected void writeEndElement(XMLStreamReader reader) { 376 writer.print("</"); 377 writeQName(reader.getName()); 378 writer.println(">"); 379 } 380 381 protected void writeQName(QName name) { 382 String prefix = name.getPrefix(); 383 if (prefix != null && prefix.length() > 0) { 384 writer.print(prefix); 385 writer.print(":"); 386 } 387 writer.print(name.getLocalPart()); 388 } 389 390 protected ActiveMQDestination createDestination(String typeName,String text) { 391 int type = ActiveMQDestination.ACTIVEMQ_QUEUE; 392 if (text.startsWith(TOPIC_PREFIX)) { 393 type = ActiveMQDestination.ACTIVEMQ_TOPIC; 394 text = text.substring(TOPIC_PREFIX.length()); 395 } 396 else if (text.startsWith(QUEUE_PREFIX)) { 397 type = ActiveMQDestination.ACTIVEMQ_QUEUE; 398 text = text.substring(QUEUE_PREFIX.length()); 399 } 400 else if (text.startsWith(TEMP_QUEUE_PREFIX)) { 401 type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE; 402 text = text.substring(TEMP_QUEUE_PREFIX.length()); 403 } 404 else if (text.startsWith(TEMP_TOPIC_PREFIX)) { 405 type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC; 406 text = text.substring(TEMP_TOPIC_PREFIX.length()); 407 }else { 408 if (typeName != null){ 409 if (typeName.equals("groupchat")){ 410 type = ActiveMQDestination.ACTIVEMQ_TOPIC; 411 } 412 //else default is a queue - (assume default typeName is 'chat') 413 } 414 } 415 text = text.trim(); 416 if (text.length() == 0) { 417 return null; 418 } 419 return ActiveMQDestination.createDestination(type, text); 420 } 421 422 protected String toString(Destination destination) { 423 if (destination instanceof ActiveMQDestination) { 424 ActiveMQDestination activeDestination = (ActiveMQDestination) destination; 425 String physicalName = activeDestination.getPhysicalName(); 426 switch (activeDestination.getDestinationType()) { 427 case ActiveMQDestination.ACTIVEMQ_QUEUE: 428 return QUEUE_PREFIX + physicalName; 429 430 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE: 431 return TEMP_QUEUE_PREFIX + physicalName; 432 433 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC: 434 return TEMP_TOPIC_PREFIX + physicalName; 435 } 436 return physicalName; 437 } 438 return destination != null ? destination.toString() : ""; 439 } 440 441 protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException { 442 Serializable object = message.getObject(); 443 String text = (object != null) ? object.toString() : ""; 444 writeMessage(message, text, out); 445 } 446 447 protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException { 448 writeMessage(message, message.getText(), out); 449 } 450 451 protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException { 452 ByteArray data = message.getBodyAsBytes(); 453 String text = encodeBinary(data.getBuf(), data.getOffset(), data.getLength()); 454 writeMessage(message, text, out); 455 } 456 457 protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException { 458 String type = getXmppType(message); 459 460 writer.print("<"); 461 writer.print(type); 462 writer.print(" to='"); 463 writer.print(toString(message.getJMSDestination())); 464 writer.print("' from='"); 465 writer.print(toString(message.getJMSReplyTo())); 466 String messageID = message.getJMSMessageID(); 467 if (messageID != null) { 468 writer.print("' id='"); 469 writer.print(messageID); 470 } 471 472 Map properties = message.getProperties(); 473 if (properties != null) { 474 for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) { 475 Map.Entry entry = (Map.Entry) iter.next(); 476 Object key = entry.getKey(); 477 Object value = entry.getValue(); 478 if (value != null) { 479 writer.print("' "); 480 writer.print(key.toString()); 481 writer.print("='"); 482 writer.print(value.toString()); 483 } 484 } 485 } 486 487 writer.println("'>"); 488 489 String id = message.getJMSCorrelationID(); 490 if (id != null) { 491 writer.print("<thread>"); 492 writer.print(id); 493 writer.print("</thread>"); 494 } 495 writer.print("<body>"); 496 writer.print(body); 497 writer.println("</body>"); 498 writer.print("</"); 499 writer.print(type); 500 writer.println(">"); 501 } 502 503 protected String encodeBinary(byte[] data, int offset, int length) { 504 // TODO 505 throw new RuntimeException("Not implemented yet!"); 506 } 507 508 protected String getXmppType(ActiveMQMessage message) { 509 String type = message.getJMSType(); 510 if (type == null) { 511 type = "message"; 512 } 513 return type; 514 } 515 }