001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.net.Socket;
024    import java.net.URI;
025    import java.net.UnknownHostException;
026    import java.nio.ByteBuffer;
027    import java.nio.channels.SelectionKey;
028    import java.nio.channels.SocketChannel;
029    
030    import javax.net.SocketFactory;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.transport.Transport;
034    import org.apache.activemq.transport.nio.NIOOutputStream;
035    import org.apache.activemq.transport.nio.SelectorManager;
036    import org.apache.activemq.transport.nio.SelectorSelection;
037    import org.apache.activemq.transport.tcp.TcpTransport;
038    import org.apache.activemq.util.ByteArrayOutputStream;
039    import org.apache.activemq.util.ByteSequence;
040    import org.apache.activemq.util.IOExceptionSupport;
041    import org.apache.activemq.util.ServiceStopper;
042    import org.apache.activemq.wireformat.WireFormat;
043    
044    /**
045     * An implementation of the {@link Transport} interface for using Stomp over NIO
046     * 
047     * @version $Revision$
048     */
049    public class StompNIOTransport extends TcpTransport {
050    
051        private SocketChannel channel;
052        private SelectorSelection selection;
053        
054        private ByteBuffer inputBuffer;
055        ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
056        int previousByte = -1;
057    
058        public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
059            super(wireFormat, socketFactory, remoteLocation, localLocation);
060        }
061    
062        public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
063            super(wireFormat, socket);
064        }
065    
066        protected void initializeStreams() throws IOException {
067            channel = socket.getChannel();
068            channel.configureBlocking(false);
069    
070            // listen for events telling us when the socket is readable.
071            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
072                public void onSelect(SelectorSelection selection) {
073                    serviceRead();
074                }
075    
076                public void onError(SelectorSelection selection, Throwable error) {
077                    if (error instanceof IOException) {
078                        onException((IOException)error);
079                    } else {
080                        onException(IOExceptionSupport.create(error));
081                    }
082                }
083            });
084    
085            inputBuffer = ByteBuffer.allocate(8 * 1024);
086            this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
087        }
088        
089        private void serviceRead() {
090            try {
091                
092               while (true) {
093                   // read channel
094                   int readSize = channel.read(inputBuffer);
095                   // channel is closed, cleanup
096                   if (readSize == -1) {
097                       onException(new EOFException());
098                       selection.close();
099                       break;
100                   }
101                   // nothing more to read, break
102                   if (readSize == 0) {
103                       break;
104                   }
105                   
106                   inputBuffer.flip();
107                   
108                   int b;
109                   ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
110                   
111                   int i = 0;
112                   while(i++ < readSize) {
113                       b = input.read();
114                       // skip repeating nulls
115                       if (previousByte == 0 && b == 0) {
116                           continue;
117                       }
118                       currentCommand.write(b);
119                       // end of command reached, unmarshal
120                       if (b == 0) {
121                           Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
122                           doConsume((Command)command);
123                           currentCommand.reset();
124                       }
125                       previousByte = b;
126                   }
127                   // clear the buffer
128                   inputBuffer.clear();
129                   
130               }
131            } catch (IOException e) {
132                onException(e);  
133            } catch (Throwable e) {
134                onException(IOExceptionSupport.create(e));
135            }
136        }
137    
138        protected void doStart() throws Exception {
139            connect();
140            selection.setInterestOps(SelectionKey.OP_READ);
141            selection.enable();
142        }
143    
144        protected void doStop(ServiceStopper stopper) throws Exception {
145            try {
146                selection.close();
147            } catch (Exception e) {
148                    e.printStackTrace();
149            }
150            super.doStop(stopper);
151        }
152    }