001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq.io.impl;
020    import java.io.DataInput;
021    import java.io.DataInputStream;
022    import java.io.DataOutput;
023    import java.io.DataOutputStream;
024    import java.io.IOException;
025    import java.io.Serializable;
026    import java.util.ArrayList;
027    import java.util.List;
028    
029    import javax.jms.JMSException;
030    
031    import org.activemq.io.AbstractWireFormat;
032    import org.activemq.io.WireFormat;
033    import org.activemq.message.AbstractPacket;
034    import org.activemq.message.CachedValue;
035    import org.activemq.message.Packet;
036    import org.activemq.message.WireFormatInfo;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire
042     * format.
043     * 
044     * @version $Revision: 1.1.1.1 $
045     */
046    public abstract class AbstractDefaultWireFormat extends AbstractWireFormat implements Serializable {
047    
048        /**
049         * Current wire format version for this implementation
050         */
051        public static final int WIRE_FORMAT_VERSION = 3;
052        private static final Log log = LogFactory.getLog(AbstractDefaultWireFormat.class);
053        
054        protected transient  PacketReader messageReader;
055        protected transient  PacketReader textMessageReader;
056        protected transient  PacketReader objectMessageReader;
057        protected transient  PacketReader bytesMessageReader;
058        protected transient  PacketReader streamMessageReader;
059        protected transient  PacketReader mapMessageReader;
060        protected transient  PacketReader messageAckReader;
061        protected transient  PacketReader receiptReader;
062        protected transient  PacketReader consumerInfoReader;
063        protected transient  PacketReader producerInfoReader;
064        protected transient  PacketReader transactionInfoReader;
065        protected transient  PacketReader xaTransactionInfoReader;
066        protected transient  PacketReader brokerInfoReader;
067        protected transient  PacketReader connectionInfoReader;
068        protected transient  PacketReader sessionInfoReader;
069        protected transient  PacketReader durableUnsubscribeReader;
070        protected transient  PacketReader reponseReceiptReader;
071        protected transient  PacketReader intReponseReceiptReader;
072        protected transient  PacketReader capacityInfoReader;
073        protected transient  PacketReader capacityInfoRequestReader;
074        protected transient  PacketReader wireFormatInfoReader;
075        protected transient  PacketReader keepAliveReader;
076        protected transient  PacketReader brokerAdminCommandReader;
077        protected transient  PacketReader cachedValueReader;
078        protected transient  PacketReader cleanupConnectionAndSessionInfoReader;        
079        protected transient  PacketWriter messageWriter;
080        protected transient  PacketWriter textMessageWriter;
081        protected transient  PacketWriter objectMessageWriter;
082        protected transient  PacketWriter bytesMessageWriter;
083        protected transient  PacketWriter streamMessageWriter;
084        protected transient  PacketWriter mapMessageWriter;
085        protected transient  PacketWriter messageAckWriter;
086        protected transient  PacketWriter receiptWriter;
087        protected transient  PacketWriter consumerInfoWriter;
088        protected transient  PacketWriter producerInfoWriter;
089        protected transient  PacketWriter transactionInfoWriter;
090        protected transient  PacketWriter xaTransactionInfoWriter;
091        protected transient  PacketWriter brokerInfoWriter;
092        protected transient  PacketWriter connectionInfoWriter;
093        protected transient  PacketWriter sessionInfoWriter;
094        protected transient  PacketWriter durableUnsubscribeWriter;
095        protected transient  PacketWriter reponseReceiptWriter;
096        protected transient  PacketWriter intReponseReceiptWriter;
097        protected transient  PacketWriter capacityInfoWriter;
098        protected transient  PacketWriter capacityInfoRequestWriter;
099        protected transient  PacketWriter wireFormatInfoWriter;
100        protected transient  PacketWriter keepAliveWriter;
101        protected transient  PacketWriter brokerAdminCommandWriter;
102        protected transient  PacketWriter cachedValueWriter;
103        protected transient  PacketWriter cleanupConnectionAndSessionInfoWriter;
104    
105        private List readers = new ArrayList();
106        private List writers = new ArrayList();
107        
108        protected transient int currentWireFormatVersion;
109        
110        /**
111         * Default Constructor
112         */
113        public AbstractDefaultWireFormat() {
114            this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
115            initializeReaders();
116            initializeWriters();
117        }
118        
119        
120        abstract public byte[] toBytes(Packet packet) throws IOException;
121        abstract public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException;    
122        abstract protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException;
123        
124        abstract protected void handleCachedValue(CachedValue cv);
125        abstract public Object getValueFromReadCache(short key);
126        abstract short getWriteCachedKey(Object value) throws IOException;
127        
128    
129        /**
130         * Some wire formats require a handshake at start-up
131         * @param dataOut
132         * @param dataIn
133         * @throws JMSException
134         */
135        public void initiateClientSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{
136            WireFormatInfo info = new WireFormatInfo();
137            info.setVersion(getCurrentWireFormatVersion());
138            try {
139                writePacket(info, dataOut);
140                dataOut.flush();
141            }
142            catch (IOException e) {
143                throw new JMSException("Failed to intiate protocol");
144            }
145        }
146        
147        /**
148         * Some wire formats require a handshake at start-up
149         * @param dataOut
150         * @param dataIn
151         * @throws JMSException
152         */
153        public void initiateServerSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{
154        }
155            
156        /**
157         * @return new WireFormat
158         */
159        abstract public WireFormat copy();
160    
161        /**
162         * @param firstByte
163         * @param dataIn
164         * @return
165         * @throws IOException
166         * 
167         */
168        public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
169            switch (firstByte) {
170                case Packet.ACTIVEMQ_MESSAGE :
171                    return readPacket(dataIn, messageReader);
172                case Packet.ACTIVEMQ_TEXT_MESSAGE :
173                    return readPacket(dataIn, textMessageReader);
174                case Packet.ACTIVEMQ_OBJECT_MESSAGE :
175                    return readPacket(dataIn, objectMessageReader);
176                case Packet.ACTIVEMQ_BYTES_MESSAGE :
177                    return readPacket(dataIn, bytesMessageReader);
178                case Packet.ACTIVEMQ_STREAM_MESSAGE :
179                    return readPacket(dataIn, streamMessageReader);
180                case Packet.ACTIVEMQ_MAP_MESSAGE :
181                    return readPacket(dataIn, mapMessageReader);
182                case Packet.ACTIVEMQ_MSG_ACK :
183                    return readPacket(dataIn, messageAckReader);
184                case Packet.RECEIPT_INFO :
185                    return readPacket(dataIn, receiptReader);
186                case Packet.CONSUMER_INFO :
187                    return readPacket(dataIn, consumerInfoReader);
188                case Packet.PRODUCER_INFO :
189                    return readPacket(dataIn, producerInfoReader);
190                case Packet.TRANSACTION_INFO :
191                    return readPacket(dataIn, transactionInfoReader);
192                case Packet.XA_TRANSACTION_INFO :
193                    return readPacket(dataIn, xaTransactionInfoReader);
194                case Packet.ACTIVEMQ_BROKER_INFO :
195                    return readPacket(dataIn, brokerInfoReader);
196                case Packet.ACTIVEMQ_CONNECTION_INFO :
197                    return readPacket(dataIn, connectionInfoReader);
198                case Packet.SESSION_INFO :
199                    return readPacket(dataIn, sessionInfoReader);
200                case Packet.DURABLE_UNSUBSCRIBE :
201                    return readPacket(dataIn, durableUnsubscribeReader);
202                case Packet.RESPONSE_RECEIPT_INFO :
203                    return readPacket(dataIn, reponseReceiptReader);
204                case Packet.INT_RESPONSE_RECEIPT_INFO :
205                    return readPacket(dataIn, intReponseReceiptReader);
206                case Packet.CAPACITY_INFO :
207                    return readPacket(dataIn, capacityInfoReader);
208                case Packet.CAPACITY_INFO_REQUEST :
209                    return readPacket(dataIn, capacityInfoRequestReader);
210                case Packet.WIRE_FORMAT_INFO :
211                    WireFormatInfo info =  (WireFormatInfo)readPacket(dataIn, wireFormatInfoReader);
212                    if (info != null){
213                        if (info.getVersion() < 3){
214                            throw new IOException("Cannot support wire format version: " + info.getVersion());
215                        }
216                    }
217                    return info;
218                
219                case Packet.KEEP_ALIVE :
220                    return readPacket(dataIn, keepAliveReader);
221                case Packet.BROKER_ADMIN_COMMAND :
222                    return readPacket(dataIn, brokerAdminCommandReader);
223                case Packet.CACHED_VALUE_COMMAND :
224                    CachedValue cv =  (CachedValue)readPacket(dataIn,cachedValueReader);
225                    handleCachedValue(cv);
226                    return null;
227                case Packet.CLEANUP_CONNECTION_INFO :
228                    return readPacket(dataIn, cleanupConnectionAndSessionInfoReader);
229                default :
230                    log.error("Could not find PacketReader for packet type: "
231                            + AbstractPacket.getPacketTypeAsString(firstByte));
232                    return null;
233            }
234        }
235        
236        protected PacketWriter getWriter(Packet packet) throws IOException {
237            PacketWriter answer = null;
238            switch (packet.getPacketType()) {
239                case Packet.ACTIVEMQ_MESSAGE :
240                    answer = messageWriter;
241                    break;
242                case Packet.ACTIVEMQ_TEXT_MESSAGE :
243                    answer = textMessageWriter;
244                    break;
245                case Packet.ACTIVEMQ_OBJECT_MESSAGE :
246                    answer = objectMessageWriter;
247                    break;
248                case Packet.ACTIVEMQ_BYTES_MESSAGE :
249                    answer = bytesMessageWriter;
250                    break;
251                case Packet.ACTIVEMQ_STREAM_MESSAGE :
252                    answer = streamMessageWriter;
253                    break;
254                case Packet.ACTIVEMQ_MAP_MESSAGE :
255                    answer = mapMessageWriter;
256                    break;
257                case Packet.ACTIVEMQ_MSG_ACK :
258                    answer = messageAckWriter;
259                    break;
260                case Packet.RECEIPT_INFO :
261                    answer = receiptWriter;
262                    break;
263                case Packet.CONSUMER_INFO :
264                    answer = consumerInfoWriter;
265                    break;
266                case Packet.PRODUCER_INFO :
267                    answer = producerInfoWriter;
268                    break;
269                case Packet.TRANSACTION_INFO :
270                    answer = transactionInfoWriter;
271                    break;
272                case Packet.XA_TRANSACTION_INFO :
273                    answer = xaTransactionInfoWriter;
274                    break;
275                case Packet.ACTIVEMQ_BROKER_INFO :
276                    answer = brokerInfoWriter;
277                    break;
278                case Packet.ACTIVEMQ_CONNECTION_INFO :
279                    answer = connectionInfoWriter;
280                    break;
281                case Packet.SESSION_INFO :
282                    answer = sessionInfoWriter;
283                    break;
284                case Packet.DURABLE_UNSUBSCRIBE :
285                    answer = durableUnsubscribeWriter;
286                    break;
287                case Packet.RESPONSE_RECEIPT_INFO :
288                    answer = reponseReceiptWriter;
289                    break;
290                case Packet.INT_RESPONSE_RECEIPT_INFO :
291                    answer = intReponseReceiptWriter;
292                    break;
293                case Packet.CAPACITY_INFO :
294                    answer = capacityInfoWriter;
295                    break;
296                case Packet.CAPACITY_INFO_REQUEST :
297                    answer = capacityInfoRequestWriter;
298                    break;
299                case Packet.WIRE_FORMAT_INFO :
300                    answer = wireFormatInfoWriter;
301                    break;
302                case Packet.KEEP_ALIVE :
303                    answer = keepAliveWriter;
304                    break;
305                case Packet.BROKER_ADMIN_COMMAND :
306                    answer = brokerAdminCommandWriter;
307                    break;
308                case Packet.CACHED_VALUE_COMMAND:
309                    answer = cachedValueWriter;
310                    break;
311                case Packet.CLEANUP_CONNECTION_INFO:
312                    answer = cleanupConnectionAndSessionInfoWriter;
313                    break;
314                default :
315                    log.error("no PacketWriter for packet: " + packet);
316            }
317            return answer;
318        }
319        
320        /**
321         * Can this wireformat process packets of this version
322         * @param version the version number to test
323         * @return true if can accept the version
324         */
325        public boolean canProcessWireFormatVersion(int version){
326            return version <= WIRE_FORMAT_VERSION;
327        }
328        
329        /**
330         * @return the current version of this wire format
331         */
332        public int getCurrentWireFormatVersion(){
333            return currentWireFormatVersion;
334        }
335        
336        /**
337         * set the current version
338         * @param version
339         */
340        public void setCurrentWireFormatVersion(int version){
341            this.currentWireFormatVersion = version;
342            for (int i =0; i < readers.size(); i++){
343                PacketReader reader = (PacketReader)readers.get(i);
344                reader.setWireFormatVersion(version);
345            }
346            for (int i =0; i < writers.size(); i++){
347                PacketWriter writer = (PacketWriter)writers.get(i);
348                writer.setWireFormatVersion(version);
349            }
350        }
351    
352        private void initializeReaders() {
353            messageReader = new ActiveMQMessageReader(this);
354            readers.add(messageReader);
355            textMessageReader = new ActiveMQTextMessageReader(this);
356            readers.add(textMessageReader);
357            objectMessageReader = new ActiveMQObjectMessageReader(this);
358            readers.add(objectMessageReader);
359            bytesMessageReader = new ActiveMQBytesMessageReader(this);
360            readers.add(bytesMessageReader);
361            streamMessageReader = new ActiveMQStreamMessageReader(this);
362            readers.add(streamMessageReader);
363            mapMessageReader = new ActiveMQMapMessageReader(this);
364            readers.add(mapMessageReader);
365            messageAckReader = new MessageAckReader(this);
366            readers.add(messageAckReader);
367            receiptReader = new ReceiptReader();
368            readers.add(receiptReader);
369            consumerInfoReader = new ConsumerInfoReader();
370            readers.add(consumerInfoReader);
371            producerInfoReader = new ProducerInfoReader();
372            readers.add(producerInfoReader);
373            transactionInfoReader = new TransactionInfoReader();
374            readers.add(transactionInfoReader);
375            xaTransactionInfoReader = new XATransactionInfoReader();
376            readers.add(xaTransactionInfoReader);
377            brokerInfoReader = new BrokerInfoReader();
378            readers.add(brokerInfoReader);
379            connectionInfoReader = new ConnectionInfoReader();
380            readers.add(connectionInfoReader);
381            sessionInfoReader = new SessionInfoReader();
382            readers.add(sessionInfoReader);
383            durableUnsubscribeReader = new DurableUnsubscribeReader();
384            readers.add(durableUnsubscribeReader);
385            reponseReceiptReader = new ResponseReceiptReader();
386            readers.add(reponseReceiptReader);
387            intReponseReceiptReader = new IntResponseReceiptReader();
388            readers.add(intReponseReceiptReader);
389            capacityInfoReader = new CapacityInfoReader();
390            readers.add(capacityInfoReader);
391            capacityInfoRequestReader = new CapacityInfoRequestReader();
392            readers.add(capacityInfoReader);
393            wireFormatInfoReader = new WireFormatInfoReader(this);
394            readers.add(wireFormatInfoReader);
395            keepAliveReader = new KeepAliveReader();
396            readers.add(keepAliveReader);
397            brokerAdminCommandReader = new BrokerAdminCommandReader();
398            readers.add(brokerAdminCommandReader);
399            cachedValueReader = new CachedValueReader();
400            readers.add(cachedValueReader);
401            cleanupConnectionAndSessionInfoReader = new CleanupConnectionInfoReader();
402            readers.add(cleanupConnectionAndSessionInfoReader);
403        }
404        
405        private void initializeWriters(){
406            messageWriter = new ActiveMQMessageWriter(this);
407            writers.add(messageWriter);
408            textMessageWriter = new ActiveMQTextMessageWriter(this);
409            writers.add(textMessageWriter);
410            objectMessageWriter = new ActiveMQObjectMessageWriter(this);
411            writers.add(objectMessageWriter);
412            bytesMessageWriter = new ActiveMQBytesMessageWriter(this);
413            writers.add(bytesMessageWriter);
414            streamMessageWriter = new ActiveMQStreamMessageWriter(this);
415            writers.add(streamMessageWriter);
416            mapMessageWriter = new ActiveMQMapMessageWriter(this);
417            writers.add(mapMessageWriter);
418            messageAckWriter = new MessageAckWriter(this);
419            writers.add(messageAckWriter);
420            receiptWriter = new ReceiptWriter();
421            writers.add(receiptWriter);
422            consumerInfoWriter = new ConsumerInfoWriter();
423            writers.add(consumerInfoWriter);
424            producerInfoWriter = new ProducerInfoWriter();
425            writers.add(producerInfoWriter);
426            transactionInfoWriter = new TransactionInfoWriter();
427            writers.add(transactionInfoWriter);
428            xaTransactionInfoWriter = new XATransactionInfoWriter();
429            writers.add(xaTransactionInfoWriter);
430            brokerInfoWriter = new BrokerInfoWriter();
431            writers.add(brokerInfoWriter);
432            connectionInfoWriter = new ConnectionInfoWriter();
433            writers.add(connectionInfoWriter);
434            sessionInfoWriter = new SessionInfoWriter();
435            writers.add(sessionInfoWriter);
436            durableUnsubscribeWriter = new DurableUnsubscribeWriter();
437            writers.add(durableUnsubscribeWriter);
438            reponseReceiptWriter = new ResponseReceiptWriter();
439            writers.add(reponseReceiptWriter);
440            intReponseReceiptWriter = new IntResponseReceiptWriter();
441            writers.add(intReponseReceiptWriter);
442            capacityInfoWriter = new CapacityInfoWriter();
443            writers.add(capacityInfoWriter);
444            capacityInfoRequestWriter = new CapacityInfoRequestWriter();
445            writers.add(capacityInfoWriter);
446            wireFormatInfoWriter = new WireFormatInfoWriter();
447            writers.add(wireFormatInfoWriter);
448            keepAliveWriter = new KeepAliveWriter();
449            writers.add(keepAliveWriter);
450            brokerAdminCommandWriter = new BrokerAdminCommandWriter();
451            writers.add(brokerAdminCommandWriter);
452            cachedValueWriter = new CachedValueWriter();
453            writers.add(cachedValueWriter);
454            cleanupConnectionAndSessionInfoWriter = new CleanupConnectionInfoWriter();
455            writers.add(cleanupConnectionAndSessionInfoWriter);
456        }
457        
458    }