001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.concurrent.LinkedBlockingQueue;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import java.util.concurrent.atomic.AtomicLong;
025    
026    import org.apache.activemq.thread.Task;
027    import org.apache.activemq.thread.TaskRunner;
028    import org.apache.activemq.thread.TaskRunnerFactory;
029    import org.apache.activemq.thread.Valve;
030    import org.apache.activemq.transport.FutureResponse;
031    import org.apache.activemq.transport.ResponseCallback;
032    import org.apache.activemq.transport.Transport;
033    import org.apache.activemq.transport.TransportDisposedIOException;
034    import org.apache.activemq.transport.TransportListener;
035    import org.apache.activemq.util.IOExceptionSupport;
036    
037    
038    /**
039     * A Transport implementation that uses direct method invocations.
040     * 
041     * @version $Revision$
042     */
043    public class VMTransport implements Transport, Task {
044    
045        private static final Object DISCONNECT = new Object();
046        private static final AtomicLong NEXT_ID = new AtomicLong(0);
047        // still possible to configure dedicated task runner through system property but not programmatically
048        private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
049        protected VMTransport peer;
050        protected TransportListener transportListener;
051        protected boolean disposed;
052        protected boolean marshal;
053        protected boolean network;
054        protected boolean async = true;
055        protected int asyncQueueDepth = 2000;
056        protected LinkedBlockingQueue<Object> messageQueue;
057        protected boolean started;
058        protected final URI location;
059        protected final long id;
060        private TaskRunner taskRunner;
061        private final Object lazyInitMutext = new Object();
062        private final Valve enqueueValve = new Valve(true);
063        private final AtomicBoolean stopping = new AtomicBoolean();
064        private volatile int receiveCounter;
065        
066        public VMTransport(URI location) {
067            this.location = location;
068            this.id = NEXT_ID.getAndIncrement();
069        }
070    
071        public void setPeer(VMTransport peer) {
072            this.peer = peer;
073        }
074    
075        public void oneway(Object command) throws IOException {
076            if (disposed) {
077                throw new TransportDisposedIOException("Transport disposed.");
078            }
079            if (peer == null) {
080                throw new IOException("Peer not connected.");
081            }
082    
083            
084            TransportListener transportListener=null;
085            try {
086                // Disable the peer from changing his state while we try to enqueue onto him.
087                peer.enqueueValve.increment();
088            
089                if (peer.disposed || peer.stopping.get()) {
090                    throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
091                }
092                
093                if (peer.started) {
094                    if (peer.async) {
095                        peer.getMessageQueue().put(command);
096                        peer.wakeup();
097                    } else {
098                        transportListener = peer.transportListener;
099                    }
100                } else {
101                    peer.getMessageQueue().put(command);
102                }
103                
104            } catch (InterruptedException e) {
105                InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
106                iioe.initCause(e);
107                throw iioe;
108            } finally {
109                // Allow the peer to change state again...
110                peer.enqueueValve.decrement();
111            }
112    
113            if( transportListener!=null ) {
114                if( command == DISCONNECT ) {
115                    transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
116                } else {
117                    peer.receiveCounter++;
118                    transportListener.onCommand(command);
119                }
120            }
121        }
122    
123        public void start() throws Exception {
124            if (transportListener == null) {
125                throw new IOException("TransportListener not set.");
126            }
127            try {
128                enqueueValve.turnOff();
129                if (messageQueue != null && !async) {
130                    Object command;
131                    while ((command = messageQueue.poll()) != null && !stopping.get() ) {
132                        receiveCounter++;
133                        transportListener.onCommand(command);
134                    }
135                }
136                started = true;
137                wakeup();
138            } finally {
139                enqueueValve.turnOn();
140            }
141            // If we get stopped while starting up, then do the actual stop now 
142            // that the enqueueValve is back on.
143            if( stopping.get() ) {
144                stop();
145            }
146        }
147    
148        public void stop() throws Exception {
149            stopping.set(true);
150            
151            // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
152            if( enqueueValve.isOn() ) {
153    
154                TaskRunner tr = null;
155                try {
156                    enqueueValve.turnOff();
157                    if (!disposed) {
158                        started = false;
159                        disposed = true;
160                        if (taskRunner != null) {
161                            tr = taskRunner;
162                            taskRunner = null;
163                        }
164                    }
165                } finally {
166                    stopping.set(false);
167                    enqueueValve.turnOn();
168                }
169                if (tr != null) {
170                    tr.shutdown(1000);
171                }
172                // let the peer know that we are disconnecting..
173                try {
174                    oneway(DISCONNECT);
175                } catch (Exception ignore) {
176                }
177            }
178        }
179        
180        /**
181         * @see org.apache.activemq.thread.Task#iterate()
182         */
183        public boolean iterate() {
184            
185            final TransportListener tl;
186            try {
187                // Disable changing the state variables while we are running... 
188                enqueueValve.increment();
189                tl = transportListener;
190                if (!started || disposed || tl == null || stopping.get()) {
191                    if( stopping.get() ) {
192                        // drain the queue it since folks could be blocked putting on to
193                        // it and that would not allow the stop() method for finishing up.
194                        getMessageQueue().clear();  
195                    }
196                    return false;
197                }
198            } catch (InterruptedException e) {
199                return false;
200            } finally {
201                enqueueValve.decrement();
202            }
203    
204            LinkedBlockingQueue<Object> mq = getMessageQueue();
205            Object command = mq.poll();
206            if (command != null) {
207                if( command == DISCONNECT ) {
208                    tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
209                } else {
210                    tl.onCommand(command);
211                }
212                return !mq.isEmpty();
213            } else {
214                return false;
215            }
216            
217        }
218    
219        public void setTransportListener(TransportListener commandListener) {
220            try {
221                try {
222                    enqueueValve.turnOff();
223                    this.transportListener = commandListener;
224                    wakeup();
225                } finally {
226                    enqueueValve.turnOn();
227                }
228            } catch (InterruptedException e) {
229                throw new RuntimeException(e);
230            }
231        }
232    
233        private LinkedBlockingQueue<Object> getMessageQueue() {
234            synchronized (lazyInitMutext) {
235                if (messageQueue == null) {
236                    messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
237                }
238                return messageQueue;
239            }
240        }
241    
242        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
243            throw new AssertionError("Unsupported Method");
244        }
245    
246        public Object request(Object command) throws IOException {
247            throw new AssertionError("Unsupported Method");
248        }
249    
250        public Object request(Object command, int timeout) throws IOException {
251            throw new AssertionError("Unsupported Method");
252        }
253    
254        public TransportListener getTransportListener() {
255            return transportListener;
256        }
257    
258        public <T> T narrow(Class<T> target) {
259            if (target.isAssignableFrom(getClass())) {
260                return target.cast(this);
261            }
262            return null;
263        }
264    
265        public boolean isMarshal() {
266            return marshal;
267        }
268    
269        public void setMarshal(boolean marshal) {
270            this.marshal = marshal;
271        }
272    
273        public boolean isNetwork() {
274            return network;
275        }
276    
277        public void setNetwork(boolean network) {
278            this.network = network;
279        }
280    
281        public String toString() {
282            return location + "#" + id;
283        }
284    
285        public String getRemoteAddress() {
286            if (peer != null) {
287                return peer.toString();
288            }
289            return null;
290        }
291    
292        /**
293         * @return the async
294         */
295        public boolean isAsync() {
296            return async;
297        }
298    
299        /**
300         * @param async the async to set
301         */
302        public void setAsync(boolean async) {
303            this.async = async;
304        }
305    
306        /**
307         * @return the asyncQueueDepth
308         */
309        public int getAsyncQueueDepth() {
310            return asyncQueueDepth;
311        }
312    
313        /**
314         * @param asyncQueueDepth the asyncQueueDepth to set
315         */
316        public void setAsyncQueueDepth(int asyncQueueDepth) {
317            this.asyncQueueDepth = asyncQueueDepth;
318        }
319    
320        protected void wakeup() {
321            if (async) {
322                synchronized (lazyInitMutext) {
323                    if (taskRunner == null) {
324                        taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString());
325                    }
326                }
327                try {
328                    taskRunner.wakeup();
329                } catch (InterruptedException e) {
330                    Thread.currentThread().interrupt();
331                }
332            }
333        }
334    
335        public boolean isFaultTolerant() {
336            return false;
337        }
338    
339            public boolean isDisposed() {
340                    return disposed;
341            }
342            
343            public boolean isConnected() {
344                return started;
345            }
346    
347            public void reconnect(URI uri) throws IOException {
348                    throw new IOException("Not supported");
349            }
350    
351        public int getReceiveCounter() {
352            return receiveCounter;
353        }
354    }