001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.xmpp; 018 019 import java.io.IOException; 020 import java.io.PrintWriter; 021 import java.io.StringWriter; 022 import java.util.HashMap; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import javax.jms.JMSException; 029 import org.w3c.dom.Element; 030 031 import ietf.params.xml.ns.xmpp_sasl.Auth; 032 import ietf.params.xml.ns.xmpp_sasl.Challenge; 033 import ietf.params.xml.ns.xmpp_sasl.Success; 034 import ietf.params.xml.ns.xmpp_tls.Proceed; 035 import ietf.params.xml.ns.xmpp_tls.Starttls; 036 import jabber.client.Body; 037 import jabber.client.Error; 038 import jabber.client.Iq; 039 import jabber.client.Message; 040 import jabber.client.Presence; 041 import jabber.iq.auth.Query; 042 043 import org.apache.activemq.advisory.AdvisorySupport; 044 import org.apache.activemq.command.ActiveMQDestination; 045 import org.apache.activemq.command.ActiveMQMessage; 046 import org.apache.activemq.command.ActiveMQTempQueue; 047 import org.apache.activemq.command.ActiveMQTextMessage; 048 import org.apache.activemq.command.ActiveMQTopic; 049 import org.apache.activemq.command.Command; 050 import org.apache.activemq.command.ConnectionId; 051 import org.apache.activemq.command.ConnectionInfo; 052 import org.apache.activemq.command.ConsumerId; 053 import org.apache.activemq.command.ConsumerInfo; 054 import org.apache.activemq.command.DestinationInfo; 055 import org.apache.activemq.command.ExceptionResponse; 056 import org.apache.activemq.command.MessageAck; 057 import org.apache.activemq.command.MessageDispatch; 058 import org.apache.activemq.command.MessageId; 059 import org.apache.activemq.command.ProducerId; 060 import org.apache.activemq.command.ProducerInfo; 061 import org.apache.activemq.command.Response; 062 import org.apache.activemq.command.SessionId; 063 import org.apache.activemq.command.SessionInfo; 064 import org.apache.activemq.transport.xmpp.command.Handler; 065 import org.apache.activemq.transport.xmpp.command.HandlerRegistry; 066 import org.apache.activemq.util.IdGenerator; 067 import org.apache.activemq.util.IntSequenceGenerator; 068 import org.apache.activemq.util.LongSequenceGenerator; 069 import org.apache.commons.logging.Log; 070 import org.apache.commons.logging.LogFactory; 071 import org.jabber.protocol.disco_info.Feature; 072 import org.jabber.protocol.disco_info.Identity; 073 import org.jabber.protocol.disco_items.Item; 074 import org.jabber.protocol.muc_user.X; 075 076 /** 077 * TODO lots of this code could be shared with Stomp 078 */ 079 public class ProtocolConverter { 080 private static final transient Log LOG = LogFactory.getLog(ProtocolConverter.class); 081 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 082 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator("xmpp"); 083 084 private HandlerRegistry registry = new HandlerRegistry(); 085 private XmppTransport transport; 086 087 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 088 private final SessionId sessionId = new SessionId(connectionId, -1); 089 private final ProducerId producerId = new ProducerId(sessionId, 1); 090 091 private final ConnectionInfo connectionInfo = new ConnectionInfo(connectionId); 092 private final SessionInfo sessionInfo = new SessionInfo(sessionId); 093 private final ProducerInfo producerInfo = new ProducerInfo(producerId); 094 095 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 096 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 097 private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator(); 098 099 private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>(); 100 private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>(); 101 private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>(); 102 private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>(); 103 104 private final Object commnadIdMutex = new Object(); 105 private int lastCommandId; 106 private final AtomicBoolean connected = new AtomicBoolean(false); 107 private ActiveMQTempQueue inboxDestination; 108 109 public ProtocolConverter(XmppTransport transport) { 110 this.transport = transport; 111 initialiseRegistry(); 112 } 113 114 protected int generateCommandId() { 115 synchronized (commnadIdMutex) { 116 return lastCommandId++; 117 } 118 } 119 120 protected void initialiseRegistry() { 121 // this kinda wiring muck is soooo much cleaner in C# :( 122 registry.registerHandler(Message.class, new Handler<Message>() { 123 public void handle(Message event) throws Exception { 124 onMessage(event); 125 } 126 }); 127 registry.registerHandler(Auth.class, new Handler<Auth>() { 128 public void handle(Auth event) throws Exception { 129 onAuth(event); 130 } 131 }); 132 registry.registerHandler(Starttls.class, new Handler<Starttls>() { 133 public void handle(Starttls event) throws Exception { 134 onStarttls(event); 135 } 136 }); 137 registry.registerHandler(Iq.class, new Handler<Iq>() { 138 public void handle(Iq event) throws Exception { 139 onIq(event); 140 } 141 }); 142 registry.registerHandler(Presence.class, new Handler<Presence>() { 143 public void handle(Presence event) throws Exception { 144 onPresence(event); 145 } 146 }); 147 } 148 149 public void onXmppCommand(Object command) throws Exception { 150 // TODO we could do some nice code generation to boost performance 151 // by autogenerating the bytecode to statically lookup a handler from a 152 // registry maybe? 153 154 Handler handler = registry.getHandler(command.getClass()); 155 if (handler == null) { 156 unknownCommand(command); 157 } else { 158 handler.handle(command); 159 } 160 } 161 162 public void onActiveMQCommad(Command command) throws Exception { 163 if (command.isResponse()) { 164 Response response = (Response)command; 165 Handler<Response> handler = resposeHandlers.remove(new Integer(response.getCorrelationId())); 166 if (handler != null) { 167 handler.handle(response); 168 } else { 169 LOG.warn("No handler for response: " + response); 170 } 171 } else if (command.isMessageDispatch()) { 172 MessageDispatch md = (MessageDispatch)command; 173 Handler<MessageDispatch> handler = subscriptionsByConsumerId.get(md.getConsumerId()); 174 if (handler != null) { 175 handler.handle(md); 176 } else { 177 LOG.warn("No handler for message: " + md); 178 } 179 } 180 } 181 182 protected void unknownCommand(Object command) throws Exception { 183 LOG.warn("Unkown command: " + command + " of type: " + command.getClass().getName()); 184 } 185 186 protected void onIq(final Iq iq) throws Exception { 187 Object any = iq.getAny(); 188 189 if (any instanceof Query) { 190 onAuthQuery(any, iq); 191 192 } else if (any instanceof jabber.iq._private.Query) { 193 jabber.iq._private.Query query = (jabber.iq._private.Query)any; 194 195 if (LOG.isDebugEnabled()) { 196 LOG.debug("Iq Private " + debugString(iq) + " any: " + query.getAny()); 197 } 198 199 Iq result = createResult(iq); 200 jabber.iq._private.Query answer = new jabber.iq._private.Query(); 201 result.setAny(answer); 202 transport.marshall(result); 203 } else if (any instanceof jabber.iq.roster.Query) { 204 jabber.iq.roster.Query query = (jabber.iq.roster.Query)any; 205 206 if (LOG.isDebugEnabled()) { 207 LOG.debug("Iq Roster " + debugString(iq) + " item: " + query.getItem()); 208 } 209 210 Iq result = createResult(iq); 211 jabber.iq.roster.Query roster = new jabber.iq.roster.Query(); 212 result.setAny(roster); 213 transport.marshall(result); 214 } else if (any instanceof org.jabber.protocol.disco_items.Query) { 215 onDiscoItems(iq, (org.jabber.protocol.disco_items.Query)any); 216 } else if (any instanceof org.jabber.protocol.disco_info.Query) { 217 onDiscoInfo(iq, (org.jabber.protocol.disco_info.Query)any); 218 } else { 219 if (any instanceof Element) { 220 Element element = (Element)any; 221 LOG.warn("Iq Unknown " + debugString(iq) + " element namespace: " + element.getNamespaceURI() + " localName: " + element.getLocalName()); 222 } else { 223 LOG.warn("Iq Unknown " + debugString(iq) + " any: " + any + " of type: " + any.getClass().getName()); 224 } 225 Iq result = createResult(iq); 226 jabber.client.Error error = new Error(); 227 error.setUnexpectedRequest("Don't understand: " + any.toString()); 228 result.setAny(error); 229 transport.marshall(result); 230 } 231 } 232 233 protected void onAuthQuery(Object any, final Iq iq) throws IOException { 234 Query query = (Query)any; 235 if (LOG.isDebugEnabled()) { 236 LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername()); 237 } 238 if (query.getPassword() == null) { 239 Iq result = createResult(iq); 240 Query required = new Query(); 241 required.setPassword(""); 242 required.setUsername(""); 243 result.setAny(required); 244 transport.marshall(result); 245 return; 246 } 247 248 // connectionInfo.setClientId(query.getResource()); 249 connectionInfo.setUserName(query.getUsername()); 250 connectionInfo.setPassword(query.getPassword()); 251 252 // TODO support digest? 253 254 if (connectionInfo.getClientId() == null) { 255 connectionInfo.setClientId(CLIENT_ID_GENERATOR.generateId()); 256 } 257 258 sendToActiveMQ(connectionInfo, new Handler<Response>() { 259 public void handle(Response response) throws Exception { 260 261 Iq result = createResult(iq); 262 263 if (response instanceof ExceptionResponse) { 264 ExceptionResponse exceptionResponse = (ExceptionResponse)response; 265 Throwable exception = exceptionResponse.getException(); 266 267 LOG.warn("Failed to create connection: " + exception, exception); 268 269 Error error = new Error(); 270 result.setError(error); 271 272 StringWriter buffer = new StringWriter(); 273 exception.printStackTrace(new PrintWriter(buffer)); 274 error.setInternalServerError(buffer.toString()); 275 } else { 276 connected.set(true); 277 } 278 transport.marshall(result); 279 280 sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion")); 281 sendToActiveMQ(producerInfo, createErrorHandler("create producer")); 282 } 283 }); 284 } 285 286 protected String debugString(Iq iq) { 287 return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId(); 288 } 289 290 protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException { 291 String to = iq.getTo(); 292 293 if (LOG.isDebugEnabled()) { 294 LOG.debug("Iq Disco Items query " + debugString(iq) + " node: " + query.getNode() + " item: " + query.getItem()); 295 } 296 297 Iq result = createResult(iq); 298 org.jabber.protocol.disco_items.Query answer = new org.jabber.protocol.disco_items.Query(); 299 if (to == null || to.length() == 0) { 300 answer.getItem().add(createItem("queues", "Queues", "queues")); 301 answer.getItem().add(createItem("topics", "Topics", "topics")); 302 } else { 303 // lets not add anything? 304 } 305 306 result.setAny(answer); 307 transport.marshall(result); 308 } 309 310 protected void onDiscoInfo(Iq iq, org.jabber.protocol.disco_info.Query query) throws IOException { 311 String to = iq.getTo(); 312 313 // TODO lets create the topic 'to' 314 315 if (LOG.isDebugEnabled()) { 316 LOG.debug("Iq Disco Info query " + debugString(iq) + " node: " + query.getNode() + " features: " + query.getFeature() + " identity: " + query.getIdentity()); 317 } 318 319 Iq result = createResult(iq); 320 org.jabber.protocol.disco_info.Query answer = new org.jabber.protocol.disco_info.Query(); 321 answer.setNode(to); 322 answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#info")); 323 answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#items")); 324 if (to == null || to.length() == 0) { 325 answer.getIdentity().add(createIdentity("directory", "chatroom", "queues")); 326 answer.getIdentity().add(createIdentity("directory", "chatroom", "topics")); 327 /* 328 * answer.getIdentity().add(createIdentity("hierarchy", "queues", 329 * "branch")); answer.getIdentity().add(createIdentity("hierarchy", 330 * "topics", "branch")); 331 */ 332 } else { 333 // for queues/topics 334 if (to.equals("queues")) { 335 answer.getIdentity().add(createIdentity("conference", "queue.a", "text")); 336 answer.getIdentity().add(createIdentity("conference", "queue.b", "text")); 337 } else if (to.equals("topics")) { 338 answer.getIdentity().add(createIdentity("conference", "topic.x", "text")); 339 answer.getIdentity().add(createIdentity("conference", "topic.y", "text")); 340 answer.getIdentity().add(createIdentity("conference", "topic.z", "text")); 341 } else { 342 // lets reply to an actual room 343 answer.getIdentity().add(createIdentity("conference", to, "text")); 344 answer.getFeature().add(createFeature("http://jabber.org/protocol/muc")); 345 answer.getFeature().add(createFeature("muc-open")); 346 } 347 } 348 349 result.setAny(answer); 350 transport.marshall(result); 351 } 352 353 protected void onPresence(Presence presence) throws IOException, JMSException { 354 if (LOG.isDebugEnabled()) { 355 LOG.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType() + " showOrStatusOrPriority: " 356 + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny()); 357 } 358 org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item(); 359 item.setAffiliation("owner"); 360 item.setRole("moderator"); 361 item.setNick("broker"); 362 sendPresence(presence, item); 363 364 /* 365 * item = new org.jabber.protocol.muc_user.Item(); 366 * item.setAffiliation("admin"); item.setRole("moderator"); 367 * sendPresence(presence, item); 368 */ 369 370 // lets create a subscription 371 final String to = presence.getTo(); 372 373 ActiveMQDestination destination = createActiveMQDestination(to); 374 if (destination == null) { 375 LOG.debug("No 'to' attribute specified for presence so not creating a JMS subscription"); 376 return; 377 } 378 subscribe(to, destination, jidToConsumerMap); 379 380 // lets subscribe to a personal inbox for replies 381 382 // Check if Destination info is of temporary type. 383 if (inboxDestination == null) { 384 inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 385 386 DestinationInfo info = new DestinationInfo(); 387 info.setConnectionId(connectionInfo.getConnectionId()); 388 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 389 info.setDestination(inboxDestination); 390 sendToActiveMQ(info, null); 391 392 subscribe(to, inboxDestination, jidToInboxConsumerMap); 393 } 394 } 395 396 protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) { 397 boolean createConsumer = false; 398 ConsumerInfo consumerInfo = null; 399 synchronized (consumerMap) { 400 consumerInfo = consumerMap.get(to); 401 if (consumerInfo == null) { 402 consumerInfo = new ConsumerInfo(); 403 consumerMap.put(to, consumerInfo); 404 405 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 406 consumerInfo.setConsumerId(consumerId); 407 consumerInfo.setPrefetchSize(10); 408 consumerInfo.setNoLocal(true); 409 createConsumer = true; 410 } 411 } 412 if (!createConsumer) { 413 return; 414 } 415 416 consumerInfo.setDestination(destination); 417 418 subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() { 419 public void handle(MessageDispatch messageDispatch) throws Exception { 420 // processing the inbound message 421 if (LOG.isDebugEnabled()) { 422 LOG.debug("Receiving inbound: " + messageDispatch.getMessage()); 423 } 424 425 // lets send back an ACK 426 MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1); 427 sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId())); 428 429 Message message = createXmppMessage(to, messageDispatch); 430 if (message != null) { 431 if (LOG.isDebugEnabled()) { 432 LOG.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny()); 433 } 434 transport.marshall(message); 435 } 436 } 437 }); 438 sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination)); 439 } 440 441 protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException { 442 Message answer = new Message(); 443 answer.setType("groupchat"); 444 String from = to; 445 int idx = from.indexOf('/'); 446 if (idx > 0) { 447 from = from.substring(0, idx) + "/broker"; 448 } 449 answer.setFrom(from); 450 answer.setTo(to); 451 452 org.apache.activemq.command.Message message = messageDispatch.getMessage(); 453 // answer.setType(message.getType()); 454 if (message instanceof ActiveMQTextMessage) { 455 ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message; 456 Body body = new Body(); 457 String text = activeMQTextMessage.getText(); 458 LOG.info("Setting the body text to be: " + text); 459 body.setValue(text); 460 answer.getAny().add(body); 461 } else { 462 // TODO support other message types 463 LOG.warn("Could not convert the message to a complete Jabber message: " + message); 464 } 465 return answer; 466 } 467 468 protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException { 469 Presence answer = new Presence(); 470 answer.setFrom(presence.getTo()); 471 answer.setType(presence.getType()); 472 answer.setTo(presence.getFrom()); 473 X x = new X(); 474 x.getDeclineOrDestroyOrInvite().add(item); 475 answer.getShowOrStatusOrPriority().add(x); 476 transport.marshall(answer); 477 } 478 479 protected Item createItem(String jid, String name, String node) { 480 Item answer = new Item(); 481 answer.setJid(jid); 482 answer.setName(name); 483 answer.setNode(node); 484 return answer; 485 } 486 487 protected Identity createIdentity(String category, String type, String name) { 488 Identity answer = new Identity(); 489 answer.setCategory(category); 490 answer.setName(name); 491 answer.setType(type); 492 return answer; 493 } 494 495 protected Feature createFeature(String var) { 496 Feature feature = new Feature(); 497 feature.setVar(var); 498 return feature; 499 } 500 501 /** 502 * Creates a result command from the input 503 */ 504 protected Iq createResult(Iq iq) { 505 Iq result = new Iq(); 506 result.setId(iq.getId()); 507 result.setFrom(transport.getFrom()); 508 result.setTo(iq.getFrom()); 509 result.setLang(iq.getLang()); 510 result.setType("result"); 511 return result; 512 } 513 514 protected void sendToActiveMQ(Command command, Handler<Response> handler) { 515 command.setCommandId(generateCommandId()); 516 if (handler != null) { 517 command.setResponseRequired(true); 518 resposeHandlers.put(command.getCommandId(), handler); 519 } 520 transport.getTransportListener().onCommand(command); 521 } 522 523 protected void onStarttls(Starttls starttls) throws Exception { 524 LOG.debug("Starttls"); 525 transport.marshall(new Proceed()); 526 } 527 528 protected void onMessage(Message message) throws Exception { 529 if (LOG.isDebugEnabled()) { 530 LOG.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread()); 531 } 532 533 final ActiveMQMessage activeMQMessage = createActiveMQMessage(message); 534 535 ActiveMQDestination destination = createActiveMQDestination(message.getTo()); 536 537 activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId())); 538 activeMQMessage.setDestination(destination); 539 activeMQMessage.setProducerId(producerId); 540 activeMQMessage.setTimestamp(System.currentTimeMillis()); 541 addActiveMQMessageHeaders(activeMQMessage, message); 542 543 /* 544 * MessageDispatch dispatch = new MessageDispatch(); 545 * dispatch.setDestination(destination); 546 * dispatch.setMessage(activeMQMessage); 547 */ 548 549 if (LOG.isDebugEnabled()) { 550 LOG.debug("Sending ActiveMQ message: " + activeMQMessage); 551 } 552 sendToActiveMQ(activeMQMessage, createErrorHandler("send message")); 553 } 554 555 protected Handler<Response> createErrorHandler(final String text) { 556 return new Handler<Response>() { 557 public void handle(Response event) throws Exception { 558 if (event instanceof ExceptionResponse) { 559 ExceptionResponse exceptionResponse = (ExceptionResponse)event; 560 Throwable exception = exceptionResponse.getException(); 561 LOG.error("Failed to " + text + ". Reason: " + exception, exception); 562 } else if (LOG.isDebugEnabled()) { 563 LOG.debug("Completed " + text); 564 } 565 } 566 }; 567 } 568 569 /** 570 * Converts the Jabber destination name into a destination in ActiveMQ 571 */ 572 protected ActiveMQDestination createActiveMQDestination(String jabberDestination) throws JMSException { 573 if (jabberDestination == null) { 574 return null; 575 } 576 String name = jabberDestination; 577 int idx = jabberDestination.indexOf('@'); 578 if (idx > 0) { 579 name = name.substring(0, idx); 580 } 581 582 System.out.println("#### Creating ActiveMQ destination for: " + name); 583 584 // lets support lower-case versions of the agent topic 585 if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) { 586 name = AdvisorySupport.AGENT_TOPIC; 587 } 588 return new ActiveMQTopic(name); 589 } 590 591 protected ActiveMQMessage createActiveMQMessage(Message message) throws JMSException { 592 ActiveMQTextMessage answer = new ActiveMQTextMessage(); 593 String text = ""; 594 List<Object> list = message.getSubjectOrBodyOrThread(); 595 for (Object object : list) { 596 if (object instanceof Body) { 597 Body body = (Body)object; 598 text = body.getValue(); 599 break; 600 } 601 } 602 answer.setText(text); 603 return answer; 604 } 605 606 protected void addActiveMQMessageHeaders(ActiveMQMessage answer, Message message) throws JMSException { 607 answer.setStringProperty("XMPPFrom", message.getFrom()); 608 answer.setStringProperty("XMPPID", message.getId()); 609 answer.setStringProperty("XMPPLang", message.getLang()); 610 answer.setStringProperty("XMPPTo", message.getTo()); 611 answer.setJMSType(message.getType()); 612 ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom()); 613 if (replyTo == null) { 614 replyTo = inboxDestination; 615 } 616 System.out.println("Setting reply to destination to: " + replyTo); 617 answer.setJMSReplyTo(replyTo); 618 } 619 620 protected void onAuth(Auth auth) throws Exception { 621 if (LOG.isDebugEnabled()) { 622 LOG.debug("Auth mechanism: " + auth.getMechanism() + " value: " + auth.getValue()); 623 } 624 String value = createChallengeValue(auth); 625 if (value != null) { 626 Challenge challenge = new Challenge(); 627 challenge.setValue(value); 628 transport.marshall(challenge); 629 } else { 630 transport.marshall(new Success()); 631 } 632 } 633 634 protected String createChallengeValue(Auth auth) { 635 // TODO implement the challenge 636 return null; 637 } 638 639 }