001    /*
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "ProcessorImpl.java".  Description: 
010    "A default implementation of Processor." 
011    
012    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
013    2004.  All Rights Reserved. 
014    
015    Contributor(s): ______________________________________. 
016    
017    Alternatively, the contents of this file may be used under the terms of the 
018    GNU General Public License (the  ???GPL???), in which case the provisions of the GPL are 
019    applicable instead of those above.  If you wish to allow use of your version of this 
020    file only under the terms of the GPL and not to allow others to use your version 
021    of this file under the MPL, indicate your decision by deleting  the provisions above 
022    and replace  them with the notice and other provisions required by the GPL License.  
023    If you do not delete the provisions above, a recipient may use your version of 
024    this file under either the MPL or the GPL. 
025    */
026    
027    package ca.uhn.hl7v2.protocol.impl;
028    
029    import java.util.HashMap;
030    import java.util.Iterator;
031    import java.util.Map;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    
035    import ca.uhn.hl7v2.HL7Exception;
036    import ca.uhn.hl7v2.preparser.PreParser;
037    import ca.uhn.hl7v2.protocol.Processor;
038    import ca.uhn.hl7v2.protocol.ProcessorContext;
039    import ca.uhn.hl7v2.protocol.TransportException;
040    import ca.uhn.hl7v2.protocol.TransportLayer;
041    import ca.uhn.hl7v2.protocol.Transportable;
042    import ca.uhn.log.HapiLog;
043    import ca.uhn.log.HapiLogFactory;
044    
045    /**
046     * A default implementation of <code>Processor</code>.  
047     *  
048     * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
049     * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $
050     */
051    public class ProcessorImpl implements Processor {
052    
053        private static final HapiLog log = HapiLogFactory.getHapiLog(ProcessorImpl.class);
054    
055        private ProcessorContext myContext;
056        private final Map myAcceptAcks;
057        private final Map myReservations;
058        private final Map myAvailableMessages;
059        private boolean myThreaded; //true if separate threads are calling cycle()  
060        private Cycler ackCycler;
061        private Cycler nonAckCycler;
062        private ExecutorService myResponseExecutorService;
063        
064        /**
065         * @param theContext source of supporting services 
066         * @param isThreaded true if this class should create threads in which to call cycle(), and 
067         *  in which to send responses from Applications.  This is the preferred mode.  Use false 
068         *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
069         *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
070         *  makes potentially blocking calls, so these methods may not return until the next message 
071         *  is received from the remote server, regardless of timeout.  Probably the worst example of this
072         *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
073         *  required on error).  No response will be returned if the message is processed without error, 
074         *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
075         *  study this class carefully.
076         *   
077         * TODO: write a MLLPTransport with non-blocking IO  
078         * TODO: reconnect transport layers on error and retry 
079         */
080        public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
081            myContext = theContext;
082            myThreaded = isThreaded;
083            myAcceptAcks = new HashMap();
084            myReservations = new HashMap();
085            myAvailableMessages = new HashMap();
086            
087            if (isThreaded) {
088                myResponseExecutorService = Executors.newSingleThreadExecutor(); 
089    
090                    ackCycler = new Cycler(this, true);
091                Thread ackThd = new Thread(ackCycler);
092                ackThd.start();
093                nonAckCycler = new Cycler(this, false);
094                Thread nonAckThd = new Thread(nonAckCycler);
095                nonAckThd.start();            
096            }
097        }
098        
099        /**
100         * If self-threaded, stops threads that have been created.  
101         */
102        public void stop() {
103            if (myThreaded) {
104                ackCycler.stop();
105                nonAckCycler.stop();
106    
107                myResponseExecutorService.shutdownNow();
108            }
109        }
110    
111        /**
112         * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
113         */
114        public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
115            String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
116            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
117            String controlId = fields[0];
118            String needAcceptAck = fields[1];
119            String needAppAck = fields[2];
120            
121            checkValidAckNeededCode(needAcceptAck);
122            
123            trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
124            
125            boolean originalMode = (needAcceptAck == null && needAppAck == null); 
126            if (originalMode || !needAcceptAck.equals(NE)) {
127            
128                Transportable response = null;
129                int retries = 0;
130                do {
131                    long until = System.currentTimeMillis() + retryIntervalMillis;
132                    while (response == null && System.currentTimeMillis() < until) {
133                        synchronized (this) {
134                            ExpiringTransportable et = (ExpiringTransportable) myAcceptAcks.remove(controlId);
135                            if (et == null) {
136                                cycleIfNeeded(true);
137                            } else {
138                                response = et.transportable;
139                            }
140                        }
141                        sleepIfNeeded();
142                    }
143                    
144                    if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
145                            || (response != null && isReject(response))) {
146                        log.info("Resending message " + controlId);
147                        trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
148                        response = null;                    
149                    }
150                    
151                    if (response != null && isError(response)) {
152                        String[] errMsgPath = {"MSA-3"};
153                        String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
154                        throw new HL7Exception("Error message received: " + errMsg[0]);
155                    }
156                    
157                } while (response == null && ++retries <= maxRetries);
158            }
159        }
160        
161        private void checkValidAckNeededCode(String theCode) throws HL7Exception {
162            //must be one of the below ... 
163            if ( !(theCode == null || theCode.equals("") 
164                    ||theCode.equals(AL) || theCode.equals(ER) 
165                    || theCode.equals(NE) || theCode.equals(SU)) ) {
166                throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
167            }            
168        }
169        
170        /**
171         * Calls cycle() if we do not expect another thread to be doing so
172         * @param expectingAck as in cycle
173         */
174        private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
175            if (!myThreaded) {
176                cycle(expectingAck);
177            }        
178        }
179        
180        /**
181         * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
182         */
183        private void sleepIfNeeded() {
184            if (myThreaded) {
185                try {
186                    Thread.sleep(1);
187                } catch (InterruptedException e) { /* no problem */ }
188            }                
189        }
190        
191        /** Returns true if a CR or AR ACK */ 
192        private static boolean isReject(Transportable theMessage) throws HL7Exception {
193            boolean reject = false;
194            String[] fieldPaths = {"MSA-1"};
195            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
196            if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
197                reject = true;
198            }        
199            return reject;
200        }
201    
202        /** Returns true if a CE or AE ACK */ 
203        private static boolean isError(Transportable theMessage) throws HL7Exception {
204            boolean error = false;
205            String[] fieldPaths = {"MSA-1"};
206            String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
207            if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
208                error = true;
209            }
210            return error;
211        }
212    
213        /**
214         * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
215         */
216        public synchronized void reserve(String theAckId, long thePeriodMillis) {
217            Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
218            myReservations.put(theAckId, expiry);
219        }
220        
221        /**
222         * Tries to send the message, and if there is an error reconnects and tries again. 
223         */
224        private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
225            try {
226                theTransport.send(theTransportable);
227            } catch (TransportException e) {
228                theTransport.disconnect();
229                theTransport.connect();
230                theTransport.send(theTransportable);
231            }
232        }
233        
234        
235        /**
236         * Tries to receive a message, and if there is an error reconnects and tries again. 
237         */
238        private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
239            Transportable message = null;
240            try {
241                message = theTransport.receive();            
242            } catch (TransportException e) {
243                theTransport.disconnect();
244                theTransport.connect();
245                message = theTransport.receive();
246            }
247            return message;
248        }
249    
250        /** 
251         * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
252         */
253        public void cycle(boolean expectingAck) throws HL7Exception {
254            log.debug("In cycle()");
255            
256            cleanReservations();
257            cleanAcceptAcks();
258            cleanReservedMessages();
259    
260            Transportable in = null;
261            try {
262                if (expectingAck) {
263                    in = tryReceive(myContext.getLocallyDrivenTransportLayer());
264                } else {
265                    in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
266                }
267            } catch (TransportException e) {
268                try {
269                    Thread.sleep(1000);
270                } catch (InterruptedException e1) {}
271                throw e;
272            }
273            
274            // log
275            if (log.isDebugEnabled()) {
276                    if (in != null) {
277                    log.debug("Received message: " + in.getMessage());
278                    } else {
279                            log.debug("Received no message");
280                    }
281            }
282            
283            // If we have a message, handle it
284            if (in != null) { 
285                String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
286                String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
287                String acceptAckNeeded = fields[0];
288                String appAckNeeded = fields[1];
289                String ackCode = fields[2];
290                String ackId = fields[3];
291            
292                if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
293                    long expiryTime = System.currentTimeMillis() + 1000 * 60;
294                    myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
295                } else {
296                    AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
297                
298                    if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
299                        || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
300                        || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
301                        trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
302                    }
303      
304                    if (ack.isAcceptable()) {
305                        if (isReserved(ackId)) {
306                            
307                            if (log.isDebugEnabled()) {
308                                    log.debug("Received expected ACK message with ACK ID: " + ackId);
309                            }
310                            
311                            removeReservation(ackId);
312                            long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
313                            myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
314                            
315                        } else {
316    
317                            if (log.isDebugEnabled()) {
318                                    log.debug("Sending message to router");
319                            }
320    
321                            Transportable out = myContext.getRouter().processMessage(in);
322                            sendAppResponse(out);
323                            
324                        }
325                    } else {
326                            // TODO: should we do something more here? Might be nice to 
327                            // allow a configurable handler for this situation
328                            log.warn("Incoming message was not acceptable");
329                    }
330                    
331                }
332            } else {
333                String transport = expectingAck ? " Locally driven " : "Remotely driven";
334                log.debug(transport + " TransportLayer.receive() returned null.");
335            }
336            
337            sleepIfNeeded();
338    
339            log.debug("Exiting cycle()");
340        }
341        
342        /** Sends in a new thread if isThreaded, otherwise in current thread */
343        private void sendAppResponse(final Transportable theResponse) {
344            final ProcessorImpl processor = this;
345            Runnable sender = new Runnable() {
346                public void run() {
347                    try {
348                        
349                            if (log.isDebugEnabled()) {
350                                    log.debug("Sending response: " + theResponse);
351                            }
352                            
353                        //TODO: make configurable 
354                            processor.send(theResponse, 2, 3000);
355                            
356                    } catch (HL7Exception e) {
357                        log.error("Error trying to send response from Application", e);
358                    }
359                }
360            };
361            
362            if (myThreaded) {
363                myResponseExecutorService.execute(sender);
364            } else {
365                sender.run();
366            }
367        }
368        
369        /**
370         * Removes expired message reservations from the reservation list.  
371         */
372        private synchronized void cleanReservations() {
373            Iterator it = myReservations.keySet().iterator();
374            while (it.hasNext()) {
375                String ackId = (String) it.next();
376                Long expiry = (Long) myReservations.get(ackId);
377                if (System.currentTimeMillis() > expiry.longValue()) {
378                    it.remove();
379                }
380            }
381        }
382        
383        /**
384         * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
385         */
386        private synchronized void cleanAcceptAcks() {
387            Iterator it = myAcceptAcks.keySet().iterator();
388            while (it.hasNext()) {
389                String ackId = (String) it.next();
390                ExpiringTransportable et = (ExpiringTransportable) myAcceptAcks.get(ackId);
391                if (System.currentTimeMillis() > et.expiryTime) {
392                    it.remove();
393                }
394            }        
395        }
396        
397        private synchronized void cleanReservedMessages() throws HL7Exception {
398            Iterator it = myAvailableMessages.keySet().iterator();
399            while (it.hasNext()) {
400                String ackId = (String) it.next();            
401                ExpiringTransportable et = (ExpiringTransportable) myAvailableMessages.get(ackId);
402                if (System.currentTimeMillis() > et.expiryTime) {
403                    it.remove();
404                    
405                    //send to an Application 
406                    Transportable out = myContext.getRouter().processMessage(et.transportable);
407                    sendAppResponse(out);                
408                }
409            }  
410        }
411        
412        private synchronized boolean isReserved(String ackId) {
413            boolean reserved = false;
414            if (myReservations.containsKey(ackId)) {
415                reserved = true;
416            }
417            return reserved;
418        }
419        
420        private synchronized void removeReservation(String ackId) {
421            myReservations.remove(ackId);
422        }
423        
424    
425        /**
426         * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
427         */
428        public boolean isAvailable(String theAckId) {
429            boolean available = false;
430            if (myAvailableMessages.containsKey(theAckId)) {
431                available = true;
432            }
433            return available;
434        }
435    
436        /** 
437         * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
438         */
439        public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
440            if (!isReserved(theAckId)) {
441                reserve(theAckId, theTimeoutMillis);
442            }
443            
444            Transportable in = null;
445            long until = System.currentTimeMillis() + theTimeoutMillis;
446            do {
447                synchronized (this) {
448                    ExpiringTransportable et = (ExpiringTransportable) myAvailableMessages.get(theAckId);                
449                    if (et == null) {
450                        cycleIfNeeded(false);
451                    } else {
452                        in = et.transportable;
453                    }
454                }
455                sleepIfNeeded();
456            } while (in == null && System.currentTimeMillis() < until);
457            return in;
458        }
459    
460        /** 
461         * @see ca.uhn.hl7v2.protocol.Processor#getContext()
462         */
463        public ProcessorContext getContext() {
464            return myContext;
465        }
466        
467        /**
468         * A struct for Transportable collection entries that time out.  
469         *  
470         * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
471         * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $
472         */
473        class ExpiringTransportable {
474            public Transportable transportable;
475            public long expiryTime;
476            
477            public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
478                transportable = theTransportable;
479                expiryTime = theExpiryTime;
480            }
481        }
482        
483        /**
484         * A Runnable that repeatedly calls the cycle() method of this class.  
485         * 
486         * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
487         * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $
488         */
489        private static class Cycler implements Runnable {
490    
491            private Processor myProcessor;
492            private boolean myExpectingAck;
493            private boolean isRunning;
494            
495            /**
496             * @param theProcessor the processor on which to call cycle()
497             * @param isExpectingAck passed to cycle()
498             */
499            public Cycler(Processor theProcessor, boolean isExpectingAck) {
500                myProcessor = theProcessor;
501                myExpectingAck = isExpectingAck;
502                isRunning = true;
503            }
504            
505            /**
506             * Execution will stop at the end of the next cycle.  
507             */
508            public void stop() {
509                isRunning = false;
510            }
511            
512            /** 
513             * Calls cycle() repeatedly on the Processor given in the 
514             * constructor, until stop() is called.  
515             * 
516             * @see java.lang.Runnable#run()
517             */
518            public void run() {
519                while (isRunning) {
520                    try {
521                        myProcessor.cycle(myExpectingAck);
522                    } catch (HL7Exception e) {
523                        log.error("Error processing message", e);
524                    }
525                }
526            }        
527        }
528    
529    }