001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.xmpp;
018    
019    import java.io.IOException;
020    import java.io.PrintWriter;
021    import java.io.StringWriter;
022    import java.util.HashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import javax.jms.JMSException;
029    import org.w3c.dom.Element;
030    
031    import ietf.params.xml.ns.xmpp_sasl.Auth;
032    import ietf.params.xml.ns.xmpp_sasl.Challenge;
033    import ietf.params.xml.ns.xmpp_sasl.Success;
034    import ietf.params.xml.ns.xmpp_tls.Proceed;
035    import ietf.params.xml.ns.xmpp_tls.Starttls;
036    import jabber.client.Body;
037    import jabber.client.Error;
038    import jabber.client.Iq;
039    import jabber.client.Message;
040    import jabber.client.Presence;
041    import jabber.iq.auth.Query;
042    
043    import org.apache.activemq.advisory.AdvisorySupport;
044    import org.apache.activemq.command.ActiveMQDestination;
045    import org.apache.activemq.command.ActiveMQMessage;
046    import org.apache.activemq.command.ActiveMQTempQueue;
047    import org.apache.activemq.command.ActiveMQTextMessage;
048    import org.apache.activemq.command.ActiveMQTopic;
049    import org.apache.activemq.command.Command;
050    import org.apache.activemq.command.ConnectionId;
051    import org.apache.activemq.command.ConnectionInfo;
052    import org.apache.activemq.command.ConsumerId;
053    import org.apache.activemq.command.ConsumerInfo;
054    import org.apache.activemq.command.DestinationInfo;
055    import org.apache.activemq.command.ExceptionResponse;
056    import org.apache.activemq.command.MessageAck;
057    import org.apache.activemq.command.MessageDispatch;
058    import org.apache.activemq.command.MessageId;
059    import org.apache.activemq.command.ProducerId;
060    import org.apache.activemq.command.ProducerInfo;
061    import org.apache.activemq.command.Response;
062    import org.apache.activemq.command.SessionId;
063    import org.apache.activemq.command.SessionInfo;
064    import org.apache.activemq.transport.xmpp.command.Handler;
065    import org.apache.activemq.transport.xmpp.command.HandlerRegistry;
066    import org.apache.activemq.util.IdGenerator;
067    import org.apache.activemq.util.IntSequenceGenerator;
068    import org.apache.activemq.util.LongSequenceGenerator;
069    import org.apache.commons.logging.Log;
070    import org.apache.commons.logging.LogFactory;
071    import org.jabber.protocol.disco_info.Feature;
072    import org.jabber.protocol.disco_info.Identity;
073    import org.jabber.protocol.disco_items.Item;
074    import org.jabber.protocol.muc_user.X;
075    
076    /**
077     * TODO lots of this code could be shared with Stomp
078     */
079    public class ProtocolConverter {
080        private static final transient Log LOG = LogFactory.getLog(ProtocolConverter.class);
081        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
082        private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator("xmpp");
083    
084        private HandlerRegistry registry = new HandlerRegistry();
085        private XmppTransport transport;
086    
087        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
088        private final SessionId sessionId = new SessionId(connectionId, -1);
089        private final ProducerId producerId = new ProducerId(sessionId, 1);
090    
091        private final ConnectionInfo connectionInfo = new ConnectionInfo(connectionId);
092        private final SessionInfo sessionInfo = new SessionInfo(sessionId);
093        private final ProducerInfo producerInfo = new ProducerInfo(producerId);
094    
095        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
096        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
097        private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
098    
099        private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
100        private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
101        private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
102        private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>();
103    
104        private final Object commnadIdMutex = new Object();
105        private int lastCommandId;
106        private final AtomicBoolean connected = new AtomicBoolean(false);
107        private ActiveMQTempQueue inboxDestination;
108    
109        public ProtocolConverter(XmppTransport transport) {
110            this.transport = transport;
111            initialiseRegistry();
112        }
113    
114        protected int generateCommandId() {
115            synchronized (commnadIdMutex) {
116                return lastCommandId++;
117            }
118        }
119    
120        protected void initialiseRegistry() {
121            // this kinda wiring muck is soooo much cleaner in C# :(
122            registry.registerHandler(Message.class, new Handler<Message>() {
123                public void handle(Message event) throws Exception {
124                    onMessage(event);
125                }
126            });
127            registry.registerHandler(Auth.class, new Handler<Auth>() {
128                public void handle(Auth event) throws Exception {
129                    onAuth(event);
130                }
131            });
132            registry.registerHandler(Starttls.class, new Handler<Starttls>() {
133                public void handle(Starttls event) throws Exception {
134                    onStarttls(event);
135                }
136            });
137            registry.registerHandler(Iq.class, new Handler<Iq>() {
138                public void handle(Iq event) throws Exception {
139                    onIq(event);
140                }
141            });
142            registry.registerHandler(Presence.class, new Handler<Presence>() {
143                public void handle(Presence event) throws Exception {
144                    onPresence(event);
145                }
146            });
147        }
148    
149        public void onXmppCommand(Object command) throws Exception {
150            // TODO we could do some nice code generation to boost performance
151            // by autogenerating the bytecode to statically lookup a handler from a
152            // registry maybe?
153    
154            Handler handler = registry.getHandler(command.getClass());
155            if (handler == null) {
156                unknownCommand(command);
157            } else {
158                handler.handle(command);
159            }
160        }
161    
162        public void onActiveMQCommad(Command command) throws Exception {
163            if (command.isResponse()) {
164                Response response = (Response)command;
165                Handler<Response> handler = resposeHandlers.remove(new Integer(response.getCorrelationId()));
166                if (handler != null) {
167                    handler.handle(response);
168                } else {
169                    LOG.warn("No handler for response: " + response);
170                }
171            } else if (command.isMessageDispatch()) {
172                MessageDispatch md = (MessageDispatch)command;
173                Handler<MessageDispatch> handler = subscriptionsByConsumerId.get(md.getConsumerId());
174                if (handler != null) {
175                    handler.handle(md);
176                } else {
177                    LOG.warn("No handler for message: " + md);
178                }
179            }
180        }
181    
182        protected void unknownCommand(Object command) throws Exception {
183            LOG.warn("Unkown command: " + command + " of type: " + command.getClass().getName());
184        }
185    
186        protected void onIq(final Iq iq) throws Exception {
187            Object any = iq.getAny();
188    
189            if (any instanceof Query) {
190                onAuthQuery(any, iq);
191    
192            } else if (any instanceof jabber.iq._private.Query) {
193                jabber.iq._private.Query query = (jabber.iq._private.Query)any;
194    
195                if (LOG.isDebugEnabled()) {
196                    LOG.debug("Iq Private " + debugString(iq) + " any: " + query.getAny());
197                }
198    
199                Iq result = createResult(iq);
200                jabber.iq._private.Query answer = new jabber.iq._private.Query();
201                result.setAny(answer);
202                transport.marshall(result);
203            } else if (any instanceof jabber.iq.roster.Query) {
204                jabber.iq.roster.Query query = (jabber.iq.roster.Query)any;
205    
206                if (LOG.isDebugEnabled()) {
207                    LOG.debug("Iq Roster " + debugString(iq) + " item: " + query.getItem());
208                }
209    
210                Iq result = createResult(iq);
211                jabber.iq.roster.Query roster = new jabber.iq.roster.Query();
212                result.setAny(roster);
213                transport.marshall(result);
214            } else if (any instanceof org.jabber.protocol.disco_items.Query) {
215                onDiscoItems(iq, (org.jabber.protocol.disco_items.Query)any);
216            } else if (any instanceof org.jabber.protocol.disco_info.Query) {
217                onDiscoInfo(iq, (org.jabber.protocol.disco_info.Query)any);
218            } else {
219                if (any instanceof Element) {
220                    Element element = (Element)any;
221                    LOG.warn("Iq Unknown " + debugString(iq) + " element namespace: " + element.getNamespaceURI() + " localName: " + element.getLocalName());
222                } else {
223                    LOG.warn("Iq Unknown " + debugString(iq) + " any: " + any + " of type: " + any.getClass().getName());
224                }
225                Iq result = createResult(iq);
226                jabber.client.Error error = new Error();
227                error.setUnexpectedRequest("Don't understand: " + any.toString());
228                result.setAny(error);
229                transport.marshall(result);
230            }
231        }
232    
233        protected void onAuthQuery(Object any, final Iq iq) throws IOException {
234            Query query = (Query)any;
235            if (LOG.isDebugEnabled()) {
236                LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
237            }
238            if (query.getPassword() == null) {
239                Iq result = createResult(iq);
240                Query required = new Query();
241                required.setPassword("");
242                required.setUsername("");
243                result.setAny(required);
244                transport.marshall(result);
245                return;
246            }
247    
248            // connectionInfo.setClientId(query.getResource());
249            connectionInfo.setUserName(query.getUsername());
250            connectionInfo.setPassword(query.getPassword());
251    
252            // TODO support digest?
253    
254            if (connectionInfo.getClientId() == null) {
255                connectionInfo.setClientId(CLIENT_ID_GENERATOR.generateId());
256            }
257    
258            sendToActiveMQ(connectionInfo, new Handler<Response>() {
259                public void handle(Response response) throws Exception {
260    
261                    Iq result = createResult(iq);
262    
263                    if (response instanceof ExceptionResponse) {
264                        ExceptionResponse exceptionResponse = (ExceptionResponse)response;
265                        Throwable exception = exceptionResponse.getException();
266    
267                        LOG.warn("Failed to create connection: " + exception, exception);
268    
269                        Error error = new Error();
270                        result.setError(error);
271    
272                        StringWriter buffer = new StringWriter();
273                        exception.printStackTrace(new PrintWriter(buffer));
274                        error.setInternalServerError(buffer.toString());
275                    } else {
276                        connected.set(true);
277                    }
278                    transport.marshall(result);
279    
280                    sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
281                    sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
282                }
283            });
284        }
285    
286        protected String debugString(Iq iq) {
287            return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
288        }
289    
290        protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException {
291            String to = iq.getTo();
292    
293            if (LOG.isDebugEnabled()) {
294                LOG.debug("Iq Disco Items query " + debugString(iq) + " node: " + query.getNode() + " item: " + query.getItem());
295            }
296    
297            Iq result = createResult(iq);
298            org.jabber.protocol.disco_items.Query answer = new org.jabber.protocol.disco_items.Query();
299            if (to == null || to.length() == 0) {
300                answer.getItem().add(createItem("queues", "Queues", "queues"));
301                answer.getItem().add(createItem("topics", "Topics", "topics"));
302            } else {
303                // lets not add anything?
304            }
305    
306            result.setAny(answer);
307            transport.marshall(result);
308        }
309    
310        protected void onDiscoInfo(Iq iq, org.jabber.protocol.disco_info.Query query) throws IOException {
311            String to = iq.getTo();
312    
313            // TODO lets create the topic 'to'
314    
315            if (LOG.isDebugEnabled()) {
316                LOG.debug("Iq Disco Info query " + debugString(iq) + " node: " + query.getNode() + " features: " + query.getFeature() + " identity: " + query.getIdentity());
317            }
318    
319            Iq result = createResult(iq);
320            org.jabber.protocol.disco_info.Query answer = new org.jabber.protocol.disco_info.Query();
321            answer.setNode(to);
322            answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#info"));
323            answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#items"));
324            if (to == null || to.length() == 0) {
325                answer.getIdentity().add(createIdentity("directory", "chatroom", "queues"));
326                answer.getIdentity().add(createIdentity("directory", "chatroom", "topics"));
327                /*
328                 * answer.getIdentity().add(createIdentity("hierarchy", "queues",
329                 * "branch")); answer.getIdentity().add(createIdentity("hierarchy",
330                 * "topics", "branch"));
331                 */
332            } else {
333                // for queues/topics
334                if (to.equals("queues")) {
335                    answer.getIdentity().add(createIdentity("conference", "queue.a", "text"));
336                    answer.getIdentity().add(createIdentity("conference", "queue.b", "text"));
337                } else if (to.equals("topics")) {
338                    answer.getIdentity().add(createIdentity("conference", "topic.x", "text"));
339                    answer.getIdentity().add(createIdentity("conference", "topic.y", "text"));
340                    answer.getIdentity().add(createIdentity("conference", "topic.z", "text"));
341                } else {
342                    // lets reply to an actual room
343                    answer.getIdentity().add(createIdentity("conference", to, "text"));
344                    answer.getFeature().add(createFeature("http://jabber.org/protocol/muc"));
345                    answer.getFeature().add(createFeature("muc-open"));
346                }
347            }
348    
349            result.setAny(answer);
350            transport.marshall(result);
351        }
352    
353        protected void onPresence(Presence presence) throws IOException, JMSException {
354            if (LOG.isDebugEnabled()) {
355                LOG.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType() + " showOrStatusOrPriority: "
356                          + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
357            }
358            org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
359            item.setAffiliation("owner");
360            item.setRole("moderator");
361            item.setNick("broker");
362            sendPresence(presence, item);
363    
364            /*
365             * item = new org.jabber.protocol.muc_user.Item();
366             * item.setAffiliation("admin"); item.setRole("moderator");
367             * sendPresence(presence, item);
368             */
369    
370            // lets create a subscription
371            final String to = presence.getTo();
372    
373            ActiveMQDestination destination = createActiveMQDestination(to);
374            if (destination == null) {
375                LOG.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
376                return;
377            }
378            subscribe(to, destination, jidToConsumerMap);
379    
380            // lets subscribe to a personal inbox for replies
381    
382            // Check if Destination info is of temporary type.
383            if (inboxDestination == null) {
384                inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
385    
386                DestinationInfo info = new DestinationInfo();
387                info.setConnectionId(connectionInfo.getConnectionId());
388                info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
389                info.setDestination(inboxDestination);
390                sendToActiveMQ(info, null);
391    
392                subscribe(to, inboxDestination, jidToInboxConsumerMap);
393            }
394        }
395    
396        protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) {
397            boolean createConsumer = false;
398            ConsumerInfo consumerInfo = null;
399            synchronized (consumerMap) {
400                consumerInfo = consumerMap.get(to);
401                if (consumerInfo == null) {
402                    consumerInfo = new ConsumerInfo();
403                    consumerMap.put(to, consumerInfo);
404    
405                    ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
406                    consumerInfo.setConsumerId(consumerId);
407                    consumerInfo.setPrefetchSize(10);
408                    consumerInfo.setNoLocal(true);
409                    createConsumer = true;
410                }
411            }
412            if (!createConsumer) {
413                return;
414            }
415    
416            consumerInfo.setDestination(destination);
417    
418            subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
419                public void handle(MessageDispatch messageDispatch) throws Exception {
420                    // processing the inbound message
421                    if (LOG.isDebugEnabled()) {
422                        LOG.debug("Receiving inbound: " + messageDispatch.getMessage());
423                    }
424    
425                    // lets send back an ACK
426                    MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
427                    sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
428    
429                    Message message = createXmppMessage(to, messageDispatch);
430                    if (message != null) {
431                        if (LOG.isDebugEnabled()) {
432                            LOG.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
433                        }
434                        transport.marshall(message);
435                    }
436                }
437            });
438            sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
439        }
440    
441        protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
442            Message answer = new Message();
443            answer.setType("groupchat");
444            String from = to;
445            int idx = from.indexOf('/');
446            if (idx > 0) {
447                from = from.substring(0, idx) + "/broker";
448            }
449            answer.setFrom(from);
450            answer.setTo(to);
451    
452            org.apache.activemq.command.Message message = messageDispatch.getMessage();
453            // answer.setType(message.getType());
454            if (message instanceof ActiveMQTextMessage) {
455                ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message;
456                Body body = new Body();
457                String text = activeMQTextMessage.getText();
458                LOG.info("Setting the body text to be: " + text);
459                body.setValue(text);
460                answer.getAny().add(body);
461            } else {
462                // TODO support other message types
463                LOG.warn("Could not convert the message to a complete Jabber message: " + message);
464            }
465            return answer;
466        }
467    
468        protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
469            Presence answer = new Presence();
470            answer.setFrom(presence.getTo());
471            answer.setType(presence.getType());
472            answer.setTo(presence.getFrom());
473            X x = new X();
474            x.getDeclineOrDestroyOrInvite().add(item);
475            answer.getShowOrStatusOrPriority().add(x);
476            transport.marshall(answer);
477        }
478    
479        protected Item createItem(String jid, String name, String node) {
480            Item answer = new Item();
481            answer.setJid(jid);
482            answer.setName(name);
483            answer.setNode(node);
484            return answer;
485        }
486    
487        protected Identity createIdentity(String category, String type, String name) {
488            Identity answer = new Identity();
489            answer.setCategory(category);
490            answer.setName(name);
491            answer.setType(type);
492            return answer;
493        }
494    
495        protected Feature createFeature(String var) {
496            Feature feature = new Feature();
497            feature.setVar(var);
498            return feature;
499        }
500    
501        /**
502         * Creates a result command from the input
503         */
504        protected Iq createResult(Iq iq) {
505            Iq result = new Iq();
506            result.setId(iq.getId());
507            result.setFrom(transport.getFrom());
508            result.setTo(iq.getFrom());
509            result.setLang(iq.getLang());
510            result.setType("result");
511            return result;
512        }
513    
514        protected void sendToActiveMQ(Command command, Handler<Response> handler) {
515            command.setCommandId(generateCommandId());
516            if (handler != null) {
517                command.setResponseRequired(true);
518                resposeHandlers.put(command.getCommandId(), handler);
519            }
520            transport.getTransportListener().onCommand(command);
521        }
522    
523        protected void onStarttls(Starttls starttls) throws Exception {
524            LOG.debug("Starttls");
525            transport.marshall(new Proceed());
526        }
527    
528        protected void onMessage(Message message) throws Exception {
529            if (LOG.isDebugEnabled()) {
530                LOG.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
531            }
532    
533            final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
534    
535            ActiveMQDestination destination = createActiveMQDestination(message.getTo());
536    
537            activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
538            activeMQMessage.setDestination(destination);
539            activeMQMessage.setProducerId(producerId);
540            activeMQMessage.setTimestamp(System.currentTimeMillis());
541            addActiveMQMessageHeaders(activeMQMessage, message);
542    
543            /*
544             * MessageDispatch dispatch = new MessageDispatch();
545             * dispatch.setDestination(destination);
546             * dispatch.setMessage(activeMQMessage);
547             */
548    
549            if (LOG.isDebugEnabled()) {
550                LOG.debug("Sending ActiveMQ message: " + activeMQMessage);
551            }
552            sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
553        }
554    
555        protected Handler<Response> createErrorHandler(final String text) {
556            return new Handler<Response>() {
557                public void handle(Response event) throws Exception {
558                    if (event instanceof ExceptionResponse) {
559                        ExceptionResponse exceptionResponse = (ExceptionResponse)event;
560                        Throwable exception = exceptionResponse.getException();
561                        LOG.error("Failed to " + text + ". Reason: " + exception, exception);
562                    } else if (LOG.isDebugEnabled()) {
563                        LOG.debug("Completed " + text);
564                    }
565                }
566            };
567        }
568    
569        /**
570         * Converts the Jabber destination name into a destination in ActiveMQ
571         */
572        protected ActiveMQDestination createActiveMQDestination(String jabberDestination) throws JMSException {
573            if (jabberDestination == null) {
574                return null;
575            }
576            String name = jabberDestination;
577            int idx = jabberDestination.indexOf('@');
578            if (idx > 0) {
579                name = name.substring(0, idx);
580            }
581    
582            System.out.println("#### Creating ActiveMQ destination for: " + name);
583    
584            // lets support lower-case versions of the agent topic
585            if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
586                name = AdvisorySupport.AGENT_TOPIC;
587            }
588            return new ActiveMQTopic(name);
589        }
590    
591        protected ActiveMQMessage createActiveMQMessage(Message message) throws JMSException {
592            ActiveMQTextMessage answer = new ActiveMQTextMessage();
593            String text = "";
594            List<Object> list = message.getSubjectOrBodyOrThread();
595            for (Object object : list) {
596                if (object instanceof Body) {
597                    Body body = (Body)object;
598                    text = body.getValue();
599                    break;
600                }
601            }
602            answer.setText(text);
603            return answer;
604        }
605    
606        protected void addActiveMQMessageHeaders(ActiveMQMessage answer, Message message) throws JMSException {
607            answer.setStringProperty("XMPPFrom", message.getFrom());
608            answer.setStringProperty("XMPPID", message.getId());
609            answer.setStringProperty("XMPPLang", message.getLang());
610            answer.setStringProperty("XMPPTo", message.getTo());
611            answer.setJMSType(message.getType());
612            ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom());
613            if (replyTo == null) {
614                replyTo = inboxDestination;
615            }
616            System.out.println("Setting reply to destination to: " + replyTo);
617            answer.setJMSReplyTo(replyTo);
618        }
619    
620        protected void onAuth(Auth auth) throws Exception {
621            if (LOG.isDebugEnabled()) {
622                LOG.debug("Auth mechanism: " + auth.getMechanism() + " value: " + auth.getValue());
623            }
624            String value = createChallengeValue(auth);
625            if (value != null) {
626                Challenge challenge = new Challenge();
627                challenge.setValue(value);
628                transport.marshall(challenge);
629            } else {
630                transport.marshall(new Success());
631            }
632        }
633    
634        protected String createChallengeValue(Auth auth) {
635            // TODO implement the challenge
636            return null;
637        }
638    
639    }