001    /**
002     * 
003     * Copyright 2005 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.jabber;
020    import java.io.BufferedInputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.IOException;
024    import java.io.PrintWriter;
025    import java.net.Socket;
026    import java.net.URI;
027    import java.util.ArrayList;
028    import java.util.List;
029    import javax.jms.JMSException;
030    import javax.xml.stream.XMLInputFactory;
031    import javax.xml.stream.XMLStreamException;
032    import javax.xml.stream.XMLStreamReader;
033    import org.activemq.message.Packet;
034    import org.activemq.transport.TransportStatusEvent;
035    import org.activemq.transport.tcp.TcpBufferedOutputStream;
036    import org.activemq.transport.tcp.TcpTransportChannel;
037    import org.activemq.transport.tcp.TcpTransportServerChannel;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    import EDU.oswego.cs.dl.util.concurrent.Executor;
041    
042    /**
043     * A transport for using Jabber (XMPP) to talk to ActiveMQ
044     * 
045     * @version $Revision: 1.1 $
046     */
047    public class JabberTransportChannel extends TcpTransportChannel {
048        private static final Log log = LogFactory.getLog(JabberTransportChannel.class);
049        private XMLInputFactory inputFactory;
050        private BufferedInputStream in;
051    
052        public JabberTransportChannel() {
053            super(new JabberWireFormat());
054        }
055    
056        public JabberTransportChannel(URI remoteLocation) throws JMSException {
057            super(new JabberWireFormat(), remoteLocation);
058        }
059    
060        public JabberTransportChannel(URI remoteLocation, URI localLocation) throws JMSException {
061            super(new JabberWireFormat(), remoteLocation, localLocation);
062        }
063    
064        public JabberTransportChannel(TcpTransportServerChannel serverChannel, Socket socket, Executor executor)
065                throws JMSException {
066            super(serverChannel, new JabberWireFormat(), socket, executor);
067        }
068    
069        public JabberTransportChannel(Socket socket, Executor executor) throws JMSException {
070            super(new JabberWireFormat(), socket, executor);
071        }
072    
073        public void run() {
074            System.out.println("Jabber consumer thread starting");
075            log.trace("Jabber consumer thread starting");
076            int count = 0;
077            try {
078                if (inputFactory == null) {
079                    inputFactory = XMLInputFactory.newInstance();
080                }
081                XMLStreamReader reader = inputFactory.createXMLStreamReader(in, "UTF-8");
082                //initialize dialog
083                getJabberWireFormat().initialize();
084                List list = new ArrayList();
085                while (!isClosed()) {
086                    list.clear();
087                    if (isServerSide() && ++count > 500) {
088                        count = 0;
089                        Thread.yield();
090                    }
091                    if (!reader.hasNext()) {
092                        stop();
093                        break;
094                    }
095                    getJabberWireFormat().readPacket(reader, list);
096                    for (int i = 0;i < list.size();i++) {
097                        Packet packet = (Packet) list.get(i);
098                        if (packet != null) {
099                            doConsumePacket(packet);
100                        }
101                    }
102                }
103                stop();
104            }
105            catch (XMLStreamException e) {
106                doClose(e);
107            }
108            catch (JMSException e) {
109                doClose(e);
110            }
111            catch (IOException e) {
112                doClose(e);
113            }
114        }
115    
116        public JabberWireFormat getJabberWireFormat() {
117            return (JabberWireFormat) getWireFormat();
118        }
119    
120        protected void initializeStreams() throws IOException {
121            System.out.println("Creating input stream");
122            this.in = new BufferedInputStream(socket.getInputStream(), 8192);
123            this.dataIn = new DataInputStream(in);
124            System.out.println("creating output stream");
125            TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192);
126            this.dataOut = new DataOutputStream(buffOut);
127            System.out.println("Creating print writer...");
128            PrintWriter writer = new PrintWriter(socket.getOutputStream());
129            getJabberWireFormat().setWriter(writer);
130            System.out.println("Firing event");
131            fireStatusEvent(new TransportStatusEvent(this, TransportStatusEvent.CONNECTED));
132        }
133    }