001 /* 002 The contents of this file are subject to the Mozilla Public License Version 1.1 003 (the "License"); you may not use this file except in compliance with the License. 004 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005 Software distributed under the License is distributed on an "AS IS" basis, 006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007 specific language governing rights and limitations under the License. 008 009 The Original Code is "ProcessorImpl.java". Description: 010 "A default implementation of Processor." 011 012 The Initial Developer of the Original Code is University Health Network. Copyright (C) 013 2004. All Rights Reserved. 014 015 Contributor(s): ______________________________________. 016 017 Alternatively, the contents of this file may be used under the terms of the 018 GNU General Public License (the ???GPL???), in which case the provisions of the GPL are 019 applicable instead of those above. If you wish to allow use of your version of this 020 file only under the terms of the GPL and not to allow others to use your version 021 of this file under the MPL, indicate your decision by deleting the provisions above 022 and replace them with the notice and other provisions required by the GPL License. 023 If you do not delete the provisions above, a recipient may use your version of 024 this file under either the MPL or the GPL. 025 */ 026 027 package ca.uhn.hl7v2.protocol.impl; 028 029 import java.util.HashMap; 030 import java.util.Iterator; 031 import java.util.Map; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Executors; 034 035 import ca.uhn.hl7v2.HL7Exception; 036 import ca.uhn.hl7v2.preparser.PreParser; 037 import ca.uhn.hl7v2.protocol.Processor; 038 import ca.uhn.hl7v2.protocol.ProcessorContext; 039 import ca.uhn.hl7v2.protocol.TransportException; 040 import ca.uhn.hl7v2.protocol.TransportLayer; 041 import ca.uhn.hl7v2.protocol.Transportable; 042 import ca.uhn.log.HapiLog; 043 import ca.uhn.log.HapiLogFactory; 044 045 /** 046 * A default implementation of <code>Processor</code>. 047 * 048 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 049 * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $ 050 */ 051 public class ProcessorImpl implements Processor { 052 053 private static final HapiLog log = HapiLogFactory.getHapiLog(ProcessorImpl.class); 054 055 private ProcessorContext myContext; 056 private final Map myAcceptAcks; 057 private final Map myReservations; 058 private final Map myAvailableMessages; 059 private boolean myThreaded; //true if separate threads are calling cycle() 060 private Cycler ackCycler; 061 private Cycler nonAckCycler; 062 private ExecutorService myResponseExecutorService; 063 064 /** 065 * @param theContext source of supporting services 066 * @param isThreaded true if this class should create threads in which to call cycle(), and 067 * in which to send responses from Applications. This is the preferred mode. Use false 068 * if threading is not allowed, eg you are running the code in an EJB container. In this case, 069 * the send() and receive() methods will call cycle() themselves as needed. However, cycle() 070 * makes potentially blocking calls, so these methods may not return until the next message 071 * is received from the remote server, regardless of timeout. Probably the worst example of this 072 * would be if receive() was called to wait for an application ACK that was specified as "RE" (ie 073 * required on error). No response will be returned if the message is processed without error, 074 * and in a non-threaded environment, receive() will block forever. Use true if you can, otherwise 075 * study this class carefully. 076 * 077 * TODO: write a MLLPTransport with non-blocking IO 078 * TODO: reconnect transport layers on error and retry 079 */ 080 public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) { 081 myContext = theContext; 082 myThreaded = isThreaded; 083 myAcceptAcks = new HashMap(); 084 myReservations = new HashMap(); 085 myAvailableMessages = new HashMap(); 086 087 if (isThreaded) { 088 myResponseExecutorService = Executors.newSingleThreadExecutor(); 089 090 ackCycler = new Cycler(this, true); 091 Thread ackThd = new Thread(ackCycler); 092 ackThd.start(); 093 nonAckCycler = new Cycler(this, false); 094 Thread nonAckThd = new Thread(nonAckCycler); 095 nonAckThd.start(); 096 } 097 } 098 099 /** 100 * If self-threaded, stops threads that have been created. 101 */ 102 public void stop() { 103 if (myThreaded) { 104 ackCycler.stop(); 105 nonAckCycler.stop(); 106 107 myResponseExecutorService.shutdownNow(); 108 } 109 } 110 111 /** 112 * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long) 113 */ 114 public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception { 115 String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"}; 116 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 117 String controlId = fields[0]; 118 String needAcceptAck = fields[1]; 119 String needAppAck = fields[2]; 120 121 checkValidAckNeededCode(needAcceptAck); 122 123 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage); 124 125 boolean originalMode = (needAcceptAck == null && needAppAck == null); 126 if (originalMode || !needAcceptAck.equals(NE)) { 127 128 Transportable response = null; 129 int retries = 0; 130 do { 131 long until = System.currentTimeMillis() + retryIntervalMillis; 132 while (response == null && System.currentTimeMillis() < until) { 133 synchronized (this) { 134 ExpiringTransportable et = (ExpiringTransportable) myAcceptAcks.remove(controlId); 135 if (et == null) { 136 cycleIfNeeded(true); 137 } else { 138 response = et.transportable; 139 } 140 } 141 sleepIfNeeded(); 142 } 143 144 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL)) 145 || (response != null && isReject(response))) { 146 log.info("Resending message " + controlId); 147 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage); 148 response = null; 149 } 150 151 if (response != null && isError(response)) { 152 String[] errMsgPath = {"MSA-3"}; 153 String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath); 154 throw new HL7Exception("Error message received: " + errMsg[0]); 155 } 156 157 } while (response == null && ++retries <= maxRetries); 158 } 159 } 160 161 private void checkValidAckNeededCode(String theCode) throws HL7Exception { 162 //must be one of the below ... 163 if ( !(theCode == null || theCode.equals("") 164 ||theCode.equals(AL) || theCode.equals(ER) 165 || theCode.equals(NE) || theCode.equals(SU)) ) { 166 throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message"); 167 } 168 } 169 170 /** 171 * Calls cycle() if we do not expect another thread to be doing so 172 * @param expectingAck as in cycle 173 */ 174 private void cycleIfNeeded(boolean expectingAck) throws HL7Exception { 175 if (!myThreaded) { 176 cycle(expectingAck); 177 } 178 } 179 180 /** 181 * Sleeps for 1 ms if externally threaded (this is to let the CPU idle). 182 */ 183 private void sleepIfNeeded() { 184 if (myThreaded) { 185 try { 186 Thread.sleep(1); 187 } catch (InterruptedException e) { /* no problem */ } 188 } 189 } 190 191 /** Returns true if a CR or AR ACK */ 192 private static boolean isReject(Transportable theMessage) throws HL7Exception { 193 boolean reject = false; 194 String[] fieldPaths = {"MSA-1"}; 195 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 196 if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) { 197 reject = true; 198 } 199 return reject; 200 } 201 202 /** Returns true if a CE or AE ACK */ 203 private static boolean isError(Transportable theMessage) throws HL7Exception { 204 boolean error = false; 205 String[] fieldPaths = {"MSA-1"}; 206 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 207 if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) { 208 error = true; 209 } 210 return error; 211 } 212 213 /** 214 * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long) 215 */ 216 public synchronized void reserve(String theAckId, long thePeriodMillis) { 217 Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis); 218 myReservations.put(theAckId, expiry); 219 } 220 221 /** 222 * Tries to send the message, and if there is an error reconnects and tries again. 223 */ 224 private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException { 225 try { 226 theTransport.send(theTransportable); 227 } catch (TransportException e) { 228 theTransport.disconnect(); 229 theTransport.connect(); 230 theTransport.send(theTransportable); 231 } 232 } 233 234 235 /** 236 * Tries to receive a message, and if there is an error reconnects and tries again. 237 */ 238 private Transportable tryReceive(TransportLayer theTransport) throws TransportException { 239 Transportable message = null; 240 try { 241 message = theTransport.receive(); 242 } catch (TransportException e) { 243 theTransport.disconnect(); 244 theTransport.connect(); 245 message = theTransport.receive(); 246 } 247 return message; 248 } 249 250 /** 251 * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean) 252 */ 253 public void cycle(boolean expectingAck) throws HL7Exception { 254 log.debug("In cycle()"); 255 256 cleanReservations(); 257 cleanAcceptAcks(); 258 cleanReservedMessages(); 259 260 Transportable in = null; 261 try { 262 if (expectingAck) { 263 in = tryReceive(myContext.getLocallyDrivenTransportLayer()); 264 } else { 265 in = tryReceive(myContext.getRemotelyDrivenTransportLayer()); 266 } 267 } catch (TransportException e) { 268 try { 269 Thread.sleep(1000); 270 } catch (InterruptedException e1) {} 271 throw e; 272 } 273 274 // log 275 if (log.isDebugEnabled()) { 276 if (in != null) { 277 log.debug("Received message: " + in.getMessage()); 278 } else { 279 log.debug("Received no message"); 280 } 281 } 282 283 // If we have a message, handle it 284 if (in != null) { 285 String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"}; 286 String[] fields = PreParser.getFields(in.getMessage(), fieldPaths); 287 String acceptAckNeeded = fields[0]; 288 String appAckNeeded = fields[1]; 289 String ackCode = fields[2]; 290 String ackId = fields[3]; 291 292 if (ackId != null && ackCode != null && ackCode.startsWith("C")) { 293 long expiryTime = System.currentTimeMillis() + 1000 * 60; 294 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime)); 295 } else { 296 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in); 297 298 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 299 || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 300 || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) { 301 trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage()); 302 } 303 304 if (ack.isAcceptable()) { 305 if (isReserved(ackId)) { 306 307 if (log.isDebugEnabled()) { 308 log.debug("Received expected ACK message with ACK ID: " + ackId); 309 } 310 311 removeReservation(ackId); 312 long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5; 313 myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime)); 314 315 } else { 316 317 if (log.isDebugEnabled()) { 318 log.debug("Sending message to router"); 319 } 320 321 Transportable out = myContext.getRouter().processMessage(in); 322 sendAppResponse(out); 323 324 } 325 } else { 326 // TODO: should we do something more here? Might be nice to 327 // allow a configurable handler for this situation 328 log.warn("Incoming message was not acceptable"); 329 } 330 331 } 332 } else { 333 String transport = expectingAck ? " Locally driven " : "Remotely driven"; 334 log.debug(transport + " TransportLayer.receive() returned null."); 335 } 336 337 sleepIfNeeded(); 338 339 log.debug("Exiting cycle()"); 340 } 341 342 /** Sends in a new thread if isThreaded, otherwise in current thread */ 343 private void sendAppResponse(final Transportable theResponse) { 344 final ProcessorImpl processor = this; 345 Runnable sender = new Runnable() { 346 public void run() { 347 try { 348 349 if (log.isDebugEnabled()) { 350 log.debug("Sending response: " + theResponse); 351 } 352 353 //TODO: make configurable 354 processor.send(theResponse, 2, 3000); 355 356 } catch (HL7Exception e) { 357 log.error("Error trying to send response from Application", e); 358 } 359 } 360 }; 361 362 if (myThreaded) { 363 myResponseExecutorService.execute(sender); 364 } else { 365 sender.run(); 366 } 367 } 368 369 /** 370 * Removes expired message reservations from the reservation list. 371 */ 372 private synchronized void cleanReservations() { 373 Iterator it = myReservations.keySet().iterator(); 374 while (it.hasNext()) { 375 String ackId = (String) it.next(); 376 Long expiry = (Long) myReservations.get(ackId); 377 if (System.currentTimeMillis() > expiry.longValue()) { 378 it.remove(); 379 } 380 } 381 } 382 383 /** 384 * Discards expired accept acknowledgements (these are used in retry protocol; see send()). 385 */ 386 private synchronized void cleanAcceptAcks() { 387 Iterator it = myAcceptAcks.keySet().iterator(); 388 while (it.hasNext()) { 389 String ackId = (String) it.next(); 390 ExpiringTransportable et = (ExpiringTransportable) myAcceptAcks.get(ackId); 391 if (System.currentTimeMillis() > et.expiryTime) { 392 it.remove(); 393 } 394 } 395 } 396 397 private synchronized void cleanReservedMessages() throws HL7Exception { 398 Iterator it = myAvailableMessages.keySet().iterator(); 399 while (it.hasNext()) { 400 String ackId = (String) it.next(); 401 ExpiringTransportable et = (ExpiringTransportable) myAvailableMessages.get(ackId); 402 if (System.currentTimeMillis() > et.expiryTime) { 403 it.remove(); 404 405 //send to an Application 406 Transportable out = myContext.getRouter().processMessage(et.transportable); 407 sendAppResponse(out); 408 } 409 } 410 } 411 412 private synchronized boolean isReserved(String ackId) { 413 boolean reserved = false; 414 if (myReservations.containsKey(ackId)) { 415 reserved = true; 416 } 417 return reserved; 418 } 419 420 private synchronized void removeReservation(String ackId) { 421 myReservations.remove(ackId); 422 } 423 424 425 /** 426 * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String) 427 */ 428 public boolean isAvailable(String theAckId) { 429 boolean available = false; 430 if (myAvailableMessages.containsKey(theAckId)) { 431 available = true; 432 } 433 return available; 434 } 435 436 /** 437 * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long) 438 */ 439 public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception { 440 if (!isReserved(theAckId)) { 441 reserve(theAckId, theTimeoutMillis); 442 } 443 444 Transportable in = null; 445 long until = System.currentTimeMillis() + theTimeoutMillis; 446 do { 447 synchronized (this) { 448 ExpiringTransportable et = (ExpiringTransportable) myAvailableMessages.get(theAckId); 449 if (et == null) { 450 cycleIfNeeded(false); 451 } else { 452 in = et.transportable; 453 } 454 } 455 sleepIfNeeded(); 456 } while (in == null && System.currentTimeMillis() < until); 457 return in; 458 } 459 460 /** 461 * @see ca.uhn.hl7v2.protocol.Processor#getContext() 462 */ 463 public ProcessorContext getContext() { 464 return myContext; 465 } 466 467 /** 468 * A struct for Transportable collection entries that time out. 469 * 470 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 471 * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $ 472 */ 473 class ExpiringTransportable { 474 public Transportable transportable; 475 public long expiryTime; 476 477 public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) { 478 transportable = theTransportable; 479 expiryTime = theExpiryTime; 480 } 481 } 482 483 /** 484 * A Runnable that repeatedly calls the cycle() method of this class. 485 * 486 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 487 * @version $Revision: 1.4 $ updated on $Date: 2009/12/16 19:36:34 $ by $Author: jamesagnew $ 488 */ 489 private static class Cycler implements Runnable { 490 491 private Processor myProcessor; 492 private boolean myExpectingAck; 493 private boolean isRunning; 494 495 /** 496 * @param theProcessor the processor on which to call cycle() 497 * @param isExpectingAck passed to cycle() 498 */ 499 public Cycler(Processor theProcessor, boolean isExpectingAck) { 500 myProcessor = theProcessor; 501 myExpectingAck = isExpectingAck; 502 isRunning = true; 503 } 504 505 /** 506 * Execution will stop at the end of the next cycle. 507 */ 508 public void stop() { 509 isRunning = false; 510 } 511 512 /** 513 * Calls cycle() repeatedly on the Processor given in the 514 * constructor, until stop() is called. 515 * 516 * @see java.lang.Runnable#run() 517 */ 518 public void run() { 519 while (isRunning) { 520 try { 521 myProcessor.cycle(myExpectingAck); 522 } catch (HL7Exception e) { 523 log.error("Error processing message", e); 524 } 525 } 526 } 527 } 528 529 }