001 /** 002 * The contents of this file are subject to the Mozilla Public License Version 1.1 003 * (the "License"); you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005 * Software distributed under the License is distributed on an "AS IS" basis, 006 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007 * specific language governing rights and limitations under the License. 008 * 009 * The Original Code is "Receiver.java". Description: 010 * "Listens for incoming messages on a certain input stream, and 011 * sends them to the appropriate location." 012 * 013 * The Initial Developer of the Original Code is University Health Network. Copyright (C) 014 * 2002. All Rights Reserved. 015 * 016 * Contributor(s): _____________. 017 * 018 * Alternatively, the contents of this file may be used under the terms of the 019 * GNU General Public License (the ?GPL?), in which case the provisions of the GPL are 020 * applicable instead of those above. If you wish to allow use of your version of this 021 * file only under the terms of the GPL and not to allow others to use your version 022 * of this file under the MPL, indicate your decision by deleting the provisions above 023 * and replace them with the notice and other provisions required by the GPL License. 024 * If you do not delete the provisions above, a recipient may use your version of 025 * this file under either the MPL or the GPL. 026 */ 027 028 package ca.uhn.hl7v2.app; 029 030 import java.io.IOException; 031 032 import ca.uhn.hl7v2.llp.HL7Reader; 033 import ca.uhn.log.HapiLog; 034 import ca.uhn.log.HapiLogFactory; 035 036 /** 037 * Listens for incoming messages on a certain input stream, and 038 * sends them to the appropriate location. 039 * @author Bryan Tripp 040 */ 041 public class Receiver implements Runnable { 042 043 private static final HapiLog log = HapiLogFactory.getHapiLog(Receiver.class); 044 045 private Connection conn; 046 private HL7Reader in; 047 private boolean running; 048 049 /** Creates a new instance of Receiver, associated with the given Connection */ 050 public Receiver(Connection c, HL7Reader in) { 051 this.conn = c; 052 this.in = in; 053 } 054 055 /** 056 * Loops continuously, reading messages and calling processMessage() until 057 * stop() is called. 058 */ 059 public void run() { 060 while (running) { 061 Thread.yield(); 062 try { 063 String message = in.getMessage(); 064 if (message == null) { 065 log.info("Closing connection (no more messages available)."); 066 conn.close(); 067 } else { 068 processMessage(message); 069 } 070 } 071 catch (IOException e) { 072 conn.close(); 073 log.error("IOException: closing Connection, will no longer read messages with this Receiver. ", e); 074 } 075 catch (Exception e) { 076 log.error("Error while closing connection: ", e); 077 } 078 } 079 } 080 081 /** 082 * Processes a single incoming message by sending it to the appropriate 083 * internal location. If an incoming message contains 084 * an MSA-2 field, it is assumed that this message is meant as a reply to a message that has been sent 085 * earlier. In this case an attempt is give the message to the object 086 * that sent the corresponding outbound message. If the message contains an MSA-2 but there are no objects that 087 * appear to be waiting for it, it is discarded and an exception is logged. If the message does not 088 * contain an MSA-2 field, it is concluded that the message has arrived unsolicited. In this case 089 * it is sent to the Responder (in a new Thread). 090 */ 091 protected void processMessage(String message) { 092 String ackID = conn.getParser().getAckID(message); 093 if (ackID == null) { 094 Grunt g = new Grunt(conn, message); 095 g.start(); 096 } 097 else { 098 MessageReceipt mr = conn.findRecipient(ackID); 099 if (mr == null) { 100 log.info( "Unexpected Message Received: " + message ); 101 } 102 else { 103 mr.setMessage(message); 104 } 105 } 106 } 107 108 /** Independent thread for processing a single message */ 109 private class Grunt extends Thread { 110 111 private Connection conn; 112 private String m; 113 114 public Grunt(Connection conn, String message) { 115 this.conn = conn; 116 this.m = message; 117 } 118 119 public void run() { 120 try { 121 String response = conn.getResponder().processMessage(m); 122 conn.getAckWriter().writeMessage(response); 123 } 124 catch (Exception e) { 125 log.error("Error while processing message: ", e ); 126 } 127 } 128 } 129 130 /** Starts the Receiver in a new thread */ 131 public void start() { 132 running = true; 133 Thread thd = new Thread(this); 134 thd.start(); 135 } 136 137 /** Stops the Receiver after the next message is read. */ 138 public void stop() { 139 running = false; 140 } 141 142 }