001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.activeio; 020 import java.io.DataInputStream; 021 import java.io.DataOutputStream; 022 import java.io.EOFException; 023 import java.io.IOException; 024 import java.net.SocketException; 025 026 import javax.jms.JMSException; 027 028 import org.activeio.AsyncChannel; 029 import org.activeio.AsyncChannelListener; 030 import org.activeio.adapter.PacketByteArrayOutputStream; 031 import org.activeio.adapter.PacketInputStream; 032 import org.activeio.net.SocketMetadata; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 import org.activemq.io.WireFormat; 036 import org.activemq.message.Packet; 037 import org.activemq.transport.TransportChannelSupport; 038 import org.activemq.transport.TransportStatusEvent; 039 import org.activemq.util.JMSExceptionHelper; 040 041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 042 043 /** 044 * A tcp implementation of a TransportChannel 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public class ActiveIOTransportChannel extends TransportChannelSupport implements AsyncChannelListener { 049 050 private static final Log log = LogFactory.getLog(ActiveIOTransportChannel.class); 051 private final Object writeLock = new Object(); 052 private final AsyncChannel asynchChannel; 053 private final SynchronizedBoolean closed = new SynchronizedBoolean(false); 054 private final PacketByteArrayOutputStream outputBuffer = new PacketByteArrayOutputStream(); 055 private final DataOutputStream dataOut = new DataOutputStream(outputBuffer); 056 057 private final PacketAggregator aggregator = new PacketAggregator() { 058 protected void packetAssembled(org.activeio.Packet packet) { 059 try { 060 Packet p = getWireFormat().readPacket(new DataInputStream(new PacketInputStream(packet))); 061 if( p!=null ) { 062 doConsumePacket(p); 063 } 064 } catch (IOException e) { 065 onPacketError(e); 066 } 067 } 068 }; 069 070 public ActiveIOTransportChannel(WireFormat wireFormat, AsyncChannel asynchChannel) { 071 super(wireFormat); 072 this.asynchChannel = asynchChannel; 073 asynchChannel.setAsyncChannelListener(this); 074 075 // Enable TcpNoDelay if possible 076 SocketMetadata socket = (SocketMetadata) asynchChannel.narrow(SocketMetadata.class); 077 if(socket!=null) { 078 try { 079 socket.setTcpNoDelay(true); 080 } catch (SocketException e) { 081 } 082 } 083 } 084 085 public void start() throws JMSException { 086 try { 087 asynchChannel.start(); 088 } catch (IOException e) { 089 throw JMSExceptionHelper.newJMSException(e.getMessage(),e); 090 } 091 } 092 093 public void stop() { 094 if (closed.commit(false, true)) { 095 super.stop(); 096 asynchChannel.dispose(); 097 } 098 } 099 100 public void forceDisconnect() { 101 log.debug("Forcing disconnect"); 102 asynchChannel.dispose(); 103 } 104 105 106 public void asyncSend(Packet packet) throws JMSException { 107 doAsyncSend(packet); 108 } 109 110 protected Packet doAsyncSend(Packet packet) throws JMSException { 111 Packet response = null; 112 try { 113 synchronized (writeLock) { 114 response = getWireFormat().writePacket(packet, dataOut); 115 dataOut.flush(); 116 asynchChannel.write( outputBuffer.getPacket() ); 117 asynchChannel.flush(); 118 outputBuffer.reset(); 119 } 120 } 121 catch (IOException e) { 122 if (closed.get()) { 123 log.trace("Caught exception while closed: " + e, e); 124 } 125 else { 126 throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e); 127 } 128 } 129 catch (JMSException e) { 130 if (closed.get()) { 131 log.trace("Caught exception while closed: " + e, e); 132 } 133 else { 134 throw e; 135 } 136 } 137 return response; 138 } 139 140 public void onPacket(org.activeio.Packet packet) { 141 try { 142 aggregator.addRawPacket(packet); 143 } catch (IOException e) { 144 onPacketError(e); 145 } 146 } 147 148 public void onPacketError(IOException ex) { 149 if (!closed.get()) { 150 if (!pendingStop){ 151 setPendingStop(true); 152 setTransportConnected(false); 153 if (ex instanceof EOFException && isServerSide() == false) { 154 log.warn("Peer closed connection", ex); 155 } 156 else { 157 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex)); 158 } 159 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 160 } 161 stop(); 162 } 163 } 164 165 public AsyncChannel getAsyncChannel() { 166 return asynchChannel; 167 } 168 169 /** 170 * @return the current version of this wire format 171 */ 172 public int getCurrentWireFormatVersion() { 173 return getWireFormat().getCurrentWireFormatVersion(); 174 } 175 176 }