001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.jrms;
019    
020    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021    import com.sun.multicast.reliable.RMException;
022    import com.sun.multicast.reliable.transport.RMPacketSocket;
023    import com.sun.multicast.reliable.transport.SessionDoneException;
024    import com.sun.multicast.reliable.transport.TransportProfile;
025    import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.activemq.io.WireFormat;
029    import org.activemq.message.Packet;
030    import org.activemq.transport.TransportChannelSupport;
031    import org.activemq.util.IdGenerator;
032    
033    import javax.jms.JMSException;
034    import java.io.IOException;
035    import java.net.DatagramPacket;
036    import java.net.InetAddress;
037    import java.net.URI;
038    
039    /**
040     * A JRMS implementation of a TransportChannel
041     *
042     * @version $Revision$
043     */
044    public class JRMSTransportChannel extends TransportChannelSupport implements Runnable {
045    
046        private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
047        private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
048    
049        private WireFormat wireFormat;
050        private SynchronizedBoolean closed;
051        private SynchronizedBoolean started;
052        private Thread thread; //need to change this - and use a thread pool
053        // need to see our own messages
054        private RMPacketSocket socket;
055        private IdGenerator idGenerator;
056        private String channelId;
057        private int port;
058        private InetAddress inetAddress;
059        private Object lock;
060    
061        /**
062         * Construct basic helpers
063         */
064        protected JRMSTransportChannel(WireFormat wireFormat) {
065            this.wireFormat = wireFormat;
066            idGenerator = new IdGenerator();
067            channelId = idGenerator.generateId();
068            closed = new SynchronizedBoolean(false);
069            started = new SynchronizedBoolean(false);
070            lock = new Object();
071        }
072    
073        /**
074         * Connect to a remote Node - e.g. a Broker
075         *
076         * @param remoteLocation
077         * @throws JMSException
078         */
079        public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
080            this(wireFormat);
081            try {
082                this.port = remoteLocation.getPort();
083                this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
084                LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port);
085                profile.setTTL((byte) 1);
086                profile.setOrdered(true);
087                this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE);
088            }
089            catch (Exception ioe) {
090                ioe.printStackTrace();
091                JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
092                jmsEx.setLinkedException(ioe);
093                throw jmsEx;
094            }
095        }
096    
097        /**
098         * close the channel
099         */
100        public void stop() {
101            if (closed.commit(false, true)) {
102                super.stop();
103                try {
104                    socket.close();
105                }
106                catch (Exception e) {
107                    log.trace(toString() + " now closed");
108                }
109            }
110        }
111    
112        /**
113         * start listeneing for events
114         *
115         * @throws JMSException if an error occurs
116         */
117        public void start() throws JMSException {
118            if (started.commit(false, true)) {
119                thread = new Thread(this, toString());
120                if (isServerSide()) {
121                    thread.setDaemon(true);
122                }
123                thread.start();
124            }
125        }
126    
127        /**
128         * Asynchronously send a Packet
129         *
130         * @param packet
131         * @throws JMSException
132         */
133        public void asyncSend(Packet packet) throws JMSException {
134            try {
135                DatagramPacket dpacket = createDatagramPacket(packet);
136    
137                // lets sync to avoid concurrent writes
138                //synchronized (lock) {
139                socket.send(dpacket);
140                //}
141            }
142            catch (RMException rme) {
143                JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
144                jmsEx.setLinkedException(rme);
145                throw jmsEx;
146            }
147            catch (IOException e) {
148                JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
149                jmsEx.setLinkedException(e);
150                throw jmsEx;
151            }
152        }
153    
154    
155        public boolean isMulticast() {
156            return true;
157        }
158    
159        /**
160         * reads packets from a Socket
161         */
162        public void run() {
163            try {
164                while (!closed.get()) {
165                    DatagramPacket dpacket = socket.receive();
166                    Packet packet = wireFormat.readPacket(channelId, dpacket);
167                    if (packet != null) {
168                        doConsumePacket(packet);
169                    }
170                }
171                log.trace("The socket peer is now closed");
172                //doClose(new IOException("Socket peer is now closed"));
173                stop();
174            }
175            catch (SessionDoneException e) {
176                // this isn't really an exception, it just indicates
177                // that the socket has closed normally
178                log.trace("Session completed", e);
179                stop();
180            }
181            catch (RMException ste) {
182                doClose(ste);
183            }
184            catch (IOException e) {
185                doClose(e);
186            }
187        }
188        
189        /**
190         * Can this wireformat process packets of this version
191         * @param version the version number to test
192         * @return true if can accept the version
193         */
194        public boolean canProcessWireFormatVersion(int version){
195            return wireFormat.canProcessWireFormatVersion(version);
196        }
197        
198        /**
199         * @return the current version of this wire format
200         */
201        public int getCurrentWireFormatVersion(){
202            return wireFormat.getCurrentWireFormatVersion();
203        }
204    
205        protected DatagramPacket createDatagramPacket() {
206            DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
207            answer.setPort(port);
208            answer.setAddress(inetAddress);
209            return answer;
210        }
211    
212        protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
213            DatagramPacket answer = wireFormat.writePacket(channelId, packet);
214            answer.setPort(port);
215            answer.setAddress(inetAddress);
216            return answer;
217        }
218    
219        private void doClose(Exception ex) {
220            if (!closed.get()) {
221                JMSException jmsEx = new JMSException("Error reading socket: " + ex);
222                jmsEx.setLinkedException(ex);
223                onAsyncException(jmsEx);
224                stop();
225            }
226        }
227    
228        /**
229         * pretty print for object
230         *
231         * @return String representation of this object
232         */
233        public String toString() {
234            return "JRMSTransportChannel: " + socket;
235        }
236    
237        public void forceDisconnect() {
238                // TODO: implement me.
239                    throw new RuntimeException("Not yet Implemented.");
240            }
241    }