001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.net.Socket;
024    import java.net.URI;
025    import java.net.UnknownHostException;
026    import java.nio.ByteBuffer;
027    import java.nio.channels.SelectionKey;
028    import java.nio.channels.SocketChannel;
029    
030    import javax.net.SocketFactory;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.transport.Transport;
034    import org.apache.activemq.transport.tcp.TcpTransport;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.ServiceStopper;
037    import org.apache.activemq.wireformat.WireFormat;
038    
039    /**
040     * An implementation of the {@link Transport} interface using raw tcp/ip
041     * 
042     * @version $Revision$
043     */
044    public class NIOTransport extends TcpTransport {
045    
046        // private static final Log log = LogFactory.getLog(NIOTransport.class);
047        private SocketChannel channel;
048        private SelectorSelection selection;
049        private ByteBuffer inputBuffer;
050        private ByteBuffer currentBuffer;
051        private int nextFrameSize;
052    
053        public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
054            super(wireFormat, socketFactory, remoteLocation, localLocation);
055        }
056    
057        public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
058            super(wireFormat, socket);
059        }
060    
061        protected void initializeStreams() throws IOException {
062            channel = socket.getChannel();
063            channel.configureBlocking(false);
064    
065            // listen for events telling us when the socket is readable.
066            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
067                public void onSelect(SelectorSelection selection) {
068                    serviceRead();
069                }
070    
071                public void onError(SelectorSelection selection, Throwable error) {
072                    if (error instanceof IOException) {
073                        onException((IOException)error);
074                    } else {
075                        onException(IOExceptionSupport.create(error));
076                    }
077                }
078            });
079    
080            // Send the data via the channel
081            // inputBuffer = ByteBuffer.allocateDirect(8*1024);
082            inputBuffer = ByteBuffer.allocate(8 * 1024);
083            currentBuffer = inputBuffer;
084            nextFrameSize = -1;
085            currentBuffer.limit(4);
086            this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
087    
088        }
089    
090        private void serviceRead() {
091            try {
092                while (true) {
093    
094                    int readSize = channel.read(currentBuffer);
095                    if (readSize == -1) {
096                        onException(new EOFException());
097                        selection.close();
098                        break;
099                    }
100                    if (readSize == 0) {
101                        break;
102                    }
103    
104                    if (currentBuffer.hasRemaining()) {
105                        continue;
106                    }
107    
108                    // Are we trying to figure out the size of the next frame?
109                    if (nextFrameSize == -1) {
110                        assert inputBuffer == currentBuffer;
111    
112                        // If the frame is too big to fit in our direct byte buffer,
113                        // Then allocate a non direct byte buffer of the right size
114                        // for it.
115                        inputBuffer.flip();
116                        nextFrameSize = inputBuffer.getInt() + 4;
117                        if (nextFrameSize > inputBuffer.capacity()) {
118                            currentBuffer = ByteBuffer.allocate(nextFrameSize);
119                            currentBuffer.putInt(nextFrameSize);
120                        } else {
121                            inputBuffer.limit(nextFrameSize);
122                        }
123    
124                    } else {
125                        currentBuffer.flip();
126    
127                        Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
128                        doConsume((Command)command);
129    
130                        nextFrameSize = -1;
131                        inputBuffer.clear();
132                        inputBuffer.limit(4);
133                        currentBuffer = inputBuffer;
134                    }
135    
136                }
137    
138            } catch (IOException e) {
139                onException(e);
140            } catch (Throwable e) {
141                onException(IOExceptionSupport.create(e));
142            }
143        }
144    
145        protected void doStart() throws Exception {
146            connect();
147            selection.setInterestOps(SelectionKey.OP_READ);
148            selection.enable();
149        }
150    
151        protected void doStop(ServiceStopper stopper) throws Exception {
152            selection.close();
153            super.doStop(stopper);
154        }
155    }