001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.xmpp;
018    
019    import java.io.DataInput;
020    import java.io.DataInputStream;
021    import java.io.DataOutput;
022    import java.io.DataOutputStream;
023    import java.io.IOException;
024    
025    import org.apache.activemq.util.ByteArrayInputStream;
026    import org.apache.activemq.util.ByteArrayOutputStream;
027    import org.apache.activemq.util.ByteSequence;
028    import org.apache.activemq.wireformat.WireFormat;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A wire format which uses XMPP format of messages
034     *
035     * @version $Revision: 742888 $
036     */
037    public class XmppWireFormat implements WireFormat {
038        
039        private int version = 1;
040    
041        public WireFormat copy() {
042            return new XmppWireFormat();
043        }
044    
045        /*
046        public Packet readPacket(DataInput in) throws IOException {
047            return null;
048        }
049    
050        public Packet readPacket(int firstByte, DataInput in) throws IOException {
051            return null;
052        }
053    
054        public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
055            switch (packet.getPacketType()) {
056                case Packet.ACTIVEMQ_MESSAGE:
057                    writeMessage((ActiveMQMessage) packet, "", out);
058                    break;
059    
060                case Packet.ACTIVEMQ_TEXT_MESSAGE:
061                    writeTextMessage((ActiveMQTextMessage) packet, out);
062                    break;
063    
064                case Packet.ACTIVEMQ_BYTES_MESSAGE:
065                    writeBytesMessage((ActiveMQBytesMessage) packet, out);
066                    break;
067    
068                case Packet.ACTIVEMQ_OBJECT_MESSAGE:
069                    writeObjectMessage((ActiveMQObjectMessage) packet, out);
070                    break;
071    
072                case Packet.ACTIVEMQ_MAP_MESSAGE:
073                case Packet.ACTIVEMQ_STREAM_MESSAGE:
074    
075    
076                case Packet.ACTIVEMQ_BROKER_INFO:
077                case Packet.ACTIVEMQ_CONNECTION_INFO:
078                case Packet.ACTIVEMQ_MSG_ACK:
079                case Packet.CONSUMER_INFO:
080                case Packet.DURABLE_UNSUBSCRIBE:
081                case Packet.INT_RESPONSE_RECEIPT_INFO:
082                case Packet.PRODUCER_INFO:
083                case Packet.RECEIPT_INFO:
084                case Packet.RESPONSE_RECEIPT_INFO:
085                case Packet.SESSION_INFO:
086                case Packet.TRANSACTION_INFO:
087                case Packet.XA_TRANSACTION_INFO:
088                default:
089                    log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
090            }
091            return null;
092        }
093    */
094    
095    //    /**
096    //     * Can this wireformat process packets of this version
097    //     * @param version the version number to test
098    //     * @return true if can accept the version
099    //     */
100    //    public boolean canProcessWireFormatVersion(int version){
101    //        return true;
102    //    }
103    //
104    //    /**
105    //     * @return the current version of this wire format
106    //     */
107    //    public int getCurrentWireFormatVersion(){
108    //        return 1;
109    //    }
110    //
111    //    // Implementation methods
112    //    //-------------------------------------------------------------------------
113    //    protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
114    //        Serializable object = message.getObject();
115    //        String text = (object != null) ? object.toString() : "";
116    //        writeMessage(message, text, out);
117    //    }
118    //
119    //    protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
120    //        writeMessage(message, message.getText(), out);
121    //    }
122    //
123    //    protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
124    //        ByteArray data = message.getBodyAsBytes();
125    //        String text = encodeBinary(data.getBuf(),data.getOffset(),data.getLength());
126    //        writeMessage(message, text, out);
127    //    }
128    //
129    //    protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
130    //        String type = getXmppType(message);
131    //
132    //        StringBuffer buffer = new StringBuffer("<");
133    //        buffer.append(type);
134    //        buffer.append(" to='");
135    //        buffer.append(message.getJMSDestination().toString());
136    //        buffer.append("' from='");
137    //        buffer.append(message.getJMSReplyTo().toString());
138    //        String messageID = message.getJMSMessageID();
139    //        if (messageID != null) {
140    //            buffer.append("' id='");
141    //            buffer.append(messageID);
142    //        }
143    //
144    //        HashMap properties = message.getProperties();
145    //        if (properties != null) {
146    //            for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
147    //                Map.Entry entry = (Map.Entry) iter.next();
148    //                Object key = entry.getKey();
149    //                Object value = entry.getValue();
150    //                if (value != null) {
151    //                    buffer.append("' ");
152    //                    buffer.append(key.toString());
153    //                    buffer.append("='");
154    //                    buffer.append(value.toString());
155    //                }
156    //            }
157    //        }
158    //
159    //        buffer.append("'>");
160    //
161    //        String id = message.getJMSCorrelationID();
162    //        if (id != null) {
163    //            buffer.append("<thread>");
164    //            buffer.append(id);
165    //            buffer.append("</thread>");
166    //        }
167    //        buffer.append(body);
168    //        buffer.append("</");
169    //        buffer.append(type);
170    //        buffer.append(">");
171    //
172    //        out.write(buffer.toString().getBytes());
173    //    }
174    //
175    //    protected String encodeBinary(byte[] data,int offset,int length) {
176    //        // TODO
177    //        throw new RuntimeException("Not implemented yet!");
178    //    }
179    //
180    //    protected String getXmppType(ActiveMQMessage message) {
181    //        String type = message.getJMSType();
182    //        if (type == null) {
183    //            type = "message";
184    //        }
185    //        return type;
186    //    }
187    
188    
189        public ByteSequence marshal(Object command) throws IOException {
190            ByteArrayOutputStream baos = new ByteArrayOutputStream();
191            DataOutputStream dos = new DataOutputStream(baos);
192            marshal(command, dos);
193            dos.close();
194            return baos.toByteSequence();
195        }
196    
197        public Object unmarshal(ByteSequence packet) throws IOException {
198            ByteArrayInputStream stream = new ByteArrayInputStream(packet);
199            DataInputStream dis = new DataInputStream(stream);
200            return unmarshal(dis);
201        }
202    
203        public void marshal(Object object, DataOutput dataOutput) throws IOException {
204            /** TODO */
205        }
206    
207        public Object unmarshal(DataInput dataInput) throws IOException {
208            return null;  /** TODO */
209        }
210    
211    
212        public int getVersion() {
213            return version;
214        }
215    
216        public void setVersion(int version) {
217            this.version = version;
218        }
219        
220            public boolean inReceive() {
221                    // TODO Implement for inactivity monitor
222                    return false;
223            }
224    }