001    /*
002     * Copyright (c) 2005 Your Corporation. All Rights Reserved.
003     */
004    package org.activemq.transport.stomp;
005    
006    import EDU.oswego.cs.dl.util.concurrent.Channel;
007    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
008    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
009    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010    import org.activemq.io.WireFormat;
011    import org.activemq.message.ActiveMQDestination;
012    import org.activemq.message.ActiveMQTextMessage;
013    import org.activemq.message.ConnectionInfo;
014    import org.activemq.message.ConsumerInfo;
015    import org.activemq.message.Packet;
016    import org.activemq.message.Receipt;
017    import org.activemq.message.SessionInfo;
018    import org.activemq.message.ActiveMQBytesMessage;
019    import org.activemq.util.IdGenerator;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Session;
023    import java.io.BufferedReader;
024    import java.io.DataInput;
025    import java.io.DataInputStream;
026    import java.io.DataOutput;
027    import java.io.DataOutputStream;
028    import java.io.IOException;
029    import java.io.InputStreamReader;
030    import java.net.DatagramPacket;
031    import java.net.ProtocolException;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Properties;
035    
036    /**
037     * Implements the TTMP protocol.
038     */
039    public class StompWireFormat implements WireFormat
040    {
041    
042        static final IdGenerator PACKET_IDS = new IdGenerator();
043        static final IdGenerator clientIds = new IdGenerator();
044    
045        private CommandParser commandParser = new CommandParser(this);
046        private HeaderParser headerParser = new HeaderParser();
047    
048        private DataInputStream in;
049    
050        private String clientId;
051    
052        private Channel pendingReadPackets = new LinkedQueue();
053        private Channel pendingWriteFrames = new LinkedQueue();
054        private List receiptListeners = new CopyOnWriteArrayList();
055        private short sessionId;
056        private Map subscriptions = new ConcurrentHashMap();
057        private List ackListeners = new CopyOnWriteArrayList();
058        private final Map transactions = new ConcurrentHashMap();
059    
060    
061    
062        void addReceiptListener(ReceiptListener listener)
063        {
064            receiptListeners.add(listener);
065        }
066    
067    
068        public Packet readPacket(DataInput in) throws IOException
069        {
070            Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn()
071            {
072                public Object cycle() throws InterruptedException
073                {
074                    return pendingReadPackets.poll(0);
075                }
076            });
077            if (pending != null)
078            {
079                return pending;
080            }
081    
082            try
083            {
084                return commandParser.parse(in);
085            }
086            catch (ProtocolException e)
087            {
088                sendError(e.getMessage());
089                return FlushPacket.PACKET;
090            }
091        }
092    
093        public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException
094        {
095            flushPendingFrames(out);
096    
097            // It may have just been a flush request.
098            if( packet == null )
099                return null;
100    
101            if (packet.getPacketType() == Packet.RECEIPT_INFO)
102            {
103                assert(packet instanceof Receipt);
104                Receipt receipt = (Receipt) packet;
105                for (int i = 0; i < receiptListeners.size(); i++)
106                {
107                    ReceiptListener listener = (ReceiptListener) receiptListeners.get(i);
108                    if (listener.onReceipt(receipt, out))
109                    {
110                        receiptListeners.remove(listener);
111                        return null;
112                    }
113                }
114            }
115    
116            if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE)
117            {
118                assert(packet instanceof ActiveMQTextMessage);
119                ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
120                Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
121                sub.receive(msg, out);
122            }
123            else if (packet.getPacketType() == Packet.ACTIVEMQ_BYTES_MESSAGE)
124            {
125                assert(packet instanceof ActiveMQBytesMessage);
126                ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
127                Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
128                sub.receive(msg, out);
129            }
130            return null;
131        }
132    
133        private void flushPendingFrames(final DataOutput out) throws IOException
134        {
135            boolean interrupted = false;
136            do
137            {
138                try
139                {
140                    byte[] frame = (byte[]) pendingWriteFrames.poll(0);
141                    if (frame == null) return;
142                    out.write(frame);
143                }
144                catch (InterruptedException e)
145                {
146                    interrupted = true;
147                }
148            }
149            while (interrupted);
150        }
151    
152        private void sendError(final String message)
153        {
154    //        System.err.println("sending error [" + message + "]");
155            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
156            {
157                public void cycle() throws InterruptedException
158                {
159                    pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR)
160                            .addHeader(Stomp.Headers.Error.MESSAGE, message)
161                            .toFrame());
162                }
163            });
164        }
165    
166        /**
167         * some transports may register their streams (e.g. Tcp)
168         *
169         * @param dataOut
170         * @param dataIn
171         */
172        public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn)
173        {
174            this.in = dataIn;
175        }
176    
177        /**
178         * Some wire formats require a handshake at start-up
179         *
180         * @throws java.io.IOException
181         */
182        public void initiateServerSideProtocol() throws IOException
183        {
184            BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
185            String first_line = in.readLine();
186            if (!first_line.startsWith(Stomp.Commands.CONNECT))
187            {
188                throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
189            }
190    
191            Properties headers = headerParser.parse(in);
192            //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
193            //    System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing");
194            //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
195            //    System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing");
196    
197            // allow anyone to login for now
198    
199            String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
200            String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
201    
202            // skip to end of the packet
203            while (in.read() != 0) {}
204            final ConnectionInfo info = new ConnectionInfo();
205            final Short packet_id = new Short(PACKET_IDS.getNextShortSequence());
206            clientId = clientIds.generateId();
207            commandParser.setClientId(clientId);
208    
209            info.setClientId(clientId);
210            info.setReceiptRequired(true);
211            info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
212            info.setClosed(false);
213            info.setHostName("ttmp.fake.host.name");
214            info.setId(packet_id.shortValue());
215            info.setUserName(login);
216            info.setPassword(passcode);
217            info.setStartTime(System.currentTimeMillis());
218            info.setStarted(true);
219    
220            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
221            {
222                public void cycle() throws InterruptedException
223                {
224                    pendingReadPackets.put(info);
225                }
226            });
227    
228            addReceiptListener(new ReceiptListener()
229            {
230                public boolean onReceipt(Receipt receipt, DataOutput out)
231                {
232                    if (receipt.getCorrelationId() != packet_id.shortValue()) return false;
233                    final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence());
234                    sessionId = clientIds.getNextShortSequence();
235    
236                    final SessionInfo info = new SessionInfo();
237                    info.setStartTime(System.currentTimeMillis());
238                    info.setId(session_packet_id.shortValue());
239                    info.setClientId(clientId);
240                    info.setSessionId(sessionId);
241                    info.setStarted(true);
242                    info.setSessionMode(Session.AUTO_ACKNOWLEDGE);
243                    info.setReceiptRequired(true);
244    
245                    AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
246                    {
247                        public void cycle() throws InterruptedException
248                        {
249                            pendingReadPackets.put(info);
250                        }
251                    });
252    
253                    addReceiptListener(new ReceiptListener()
254                    {
255                        public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException
256                        {
257                            if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false;
258                            StringBuffer buffer = new StringBuffer();
259                            buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
260                            buffer.append(Stomp.Headers.Connected.SESSION)
261                                    .append(Stomp.Headers.SEPERATOR)
262                                    .append(clientId)
263                                    .append(Stomp.NEWLINE)
264                                    .append(Stomp.NEWLINE);
265                            buffer.append(Stomp.NULL);
266                            out.writeBytes(buffer.toString());
267                            return true;
268                        }
269                    });
270    
271                    return true;
272                }
273            });
274        }
275    
276        /**
277         * Creates a new copy of this wire format so it can be used in another thread/context
278         */
279        public WireFormat copy()
280        {
281            return new StompWireFormat();
282        }
283    
284        /* Stuff below here is leaky stuff we don't actually need */
285    
286        /**
287         * Some wire formats require a handshake at start-up
288         *
289         * @throws java.io.IOException
290         */
291        public void initiateClientSideProtocol() throws IOException
292        {
293            throw new UnsupportedOperationException("Not yet implemented!");
294        }
295    
296        /**
297         * Can this wireformat process packets of this version
298         *
299         * @param version the version number to test
300         * @return true if can accept the version
301         */
302        public boolean canProcessWireFormatVersion(int version)
303        {
304            return version == getCurrentWireFormatVersion();
305        }
306    
307        /**
308         * @return the current version of this wire format
309         */
310        public int getCurrentWireFormatVersion()
311        {
312            return 1;
313        }
314    
315        /**
316         * @return Returns the enableCaching.
317         */
318        public boolean isCachingEnabled()
319        {
320            return false;
321        }
322    
323        /**
324         * @param enableCaching The enableCaching to set.
325         */
326        public void setCachingEnabled(boolean enableCaching)
327        {
328            // never
329        }
330    
331        /**
332         * some wire formats will implement their own fragementation
333         *
334         * @return true unless a wire format supports it's own fragmentation
335         */
336        public boolean doesSupportMessageFragmentation()
337        {
338            return false;
339        }
340    
341        /**
342         * Some wire formats will not be able to understand compressed messages
343         *
344         * @return true unless a wire format cannot understand compression
345         */
346        public boolean doesSupportMessageCompression()
347        {
348            return false;
349        }
350    
351        /**
352         * Writes the given package to a new datagram
353         *
354         * @param channelID is the unique channel ID
355         * @param packet    is the packet to write
356         * @return
357         * @throws java.io.IOException
358         * @throws javax.jms.JMSException
359         */
360        public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException
361        {
362            throw new UnsupportedOperationException("Will not be implemented");
363        }
364    
365        /**
366         * Reads the packet from the given byte[]
367         *
368         * @param bytes
369         * @param offset
370         * @param length
371         * @return
372         * @throws java.io.IOException
373         */
374        public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException
375        {
376            throw new UnsupportedOperationException("Will not be implemented");
377        }
378    
379        /**
380         * Reads the packet from the given byte[]
381         *
382         * @param bytes
383         * @return
384         * @throws java.io.IOException
385         */
386        public Packet fromBytes(byte[] bytes) throws IOException
387        {
388            throw new UnsupportedOperationException("Will not be implemented");
389        }
390    
391        /**
392         * A helper method which converts a packet into a byte array
393         *
394         * @param packet
395         * @return a byte array representing the packet using some wire protocol
396         * @throws java.io.IOException
397         * @throws javax.jms.JMSException
398         */
399        public byte[] toBytes(Packet packet) throws IOException, JMSException
400        {
401            throw new UnsupportedOperationException("Will not be implemented");
402        }
403    
404        /**
405         * A helper method for working with sockets where the first byte is read
406         * first, then the rest of the message is read.
407         * <p/>
408         * Its common when dealing with sockets to have different timeout semantics
409         * until the first non-zero byte is read of a message, after which
410         * time a zero timeout is used.
411         *
412         * @param firstByte the first byte of the packet
413         * @param in        the rest of the packet
414         * @return
415         * @throws java.io.IOException
416         */
417        public Packet readPacket(int firstByte, DataInput in) throws IOException
418        {
419            throw new UnsupportedOperationException("Will not be implemented");
420        }
421    
422        /**
423         * Read a packet from a Datagram packet from the given channelID. If the
424         * packet is from the same channel ID as it was sent then we have a
425         * loop-back so discard the packet
426         *
427         * @param channelID is the unique channel ID
428         * @param dpacket
429         * @return the packet read from the datagram or null if it should be
430         *         discarded
431         * @throws java.io.IOException
432         */
433        public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException
434        {
435            throw new UnsupportedOperationException("Will not be implemented");
436        }
437    
438        void clearTransactionId(String user_tx_id)
439        {
440            this.transactions.remove(user_tx_id);
441        }
442    
443        String getClientId()
444        {
445            return this.clientId;
446        }
447    
448        public short getSessionId()
449        {
450            return sessionId;
451        }
452    
453        public void addSubscription(Subscription s)
454        {
455            if (subscriptions.containsKey(s.getDestination()))
456            {
457                Subscription old = (Subscription) subscriptions.get(s.getDestination());
458                ConsumerInfo p = old.close();
459                enqueuePacket(p);
460                subscriptions.put(s.getDestination(), s);
461            }
462            else
463            {
464                subscriptions.put(s.getDestination(), s);
465            }
466        }
467    
468        public void enqueuePacket(final Packet ack)
469        {
470            AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
471            {
472                public void cycle() throws InterruptedException
473                {
474                    pendingReadPackets.put(ack);
475                }
476            });
477        }
478    
479        public Subscription getSubscriptionFor(ActiveMQDestination destination)
480        {
481            return (Subscription) subscriptions.get(destination);
482        }
483    
484        public void addAckListener(AckListener listener)
485        {
486            this.ackListeners.add(listener);
487        }
488    
489        public List getAckListeners()
490        {
491            return ackListeners;
492        }
493    
494        public String getTransactionId(String key)
495        {
496            return (String) transactions.get(key);
497        }
498    
499        public void registerTransactionId(String user_tx_id, String tx_id)
500        {
501            transactions.put(user_tx_id, tx_id);
502        }
503    }