001 /** 002 The contents of this file are subject to the Mozilla Public License Version 1.1 003 (the "License"); you may not use this file except in compliance with the License. 004 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005 Software distributed under the License is distributed on an "AS IS" basis, 006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007 specific language governing rights and limitations under the License. 008 009 The Original Code is "Initiator.java". Description: 010 "Performs the initiation role of a message exchange accorging to HL7's original 011 mode rules." 012 013 The Initial Developer of the Original Code is University Health Network. Copyright (C) 014 2002. All Rights Reserved. 015 016 Contributor(s): ______________________________________. 017 018 Alternatively, the contents of this file may be used under the terms of the 019 GNU General Public License (the ???GPL???), in which case the provisions of the GPL are 020 applicable instead of those above. If you wish to allow use of your version of this 021 file only under the terms of the GPL and not to allow others to use your version 022 of this file under the MPL, indicate your decision by deleting the provisions above 023 and replace them with the notice and other provisions required by the GPL License. 024 If you do not delete the provisions above, a recipient may use your version of 025 this file under either the MPL or the GPL. 026 027 */ 028 029 package ca.uhn.hl7v2.app; 030 031 import java.io.IOException; 032 import java.net.Socket; 033 034 import ca.uhn.hl7v2.HL7Exception; 035 import ca.uhn.hl7v2.llp.HL7Reader; 036 import ca.uhn.hl7v2.llp.HL7Writer; 037 import ca.uhn.hl7v2.llp.LLPException; 038 import ca.uhn.hl7v2.llp.LowerLayerProtocol; 039 import ca.uhn.hl7v2.model.Message; 040 import ca.uhn.hl7v2.parser.Parser; 041 import ca.uhn.hl7v2.parser.PipeParser; 042 import ca.uhn.hl7v2.util.MessageIDGenerator; 043 import ca.uhn.hl7v2.util.Terser; 044 import ca.uhn.log.HapiLog; 045 import ca.uhn.log.HapiLogFactory; 046 047 /** 048 * <p>Performs the initiation role of a message exchange (i.e sender of the first message; 049 * analagous to the client in a client-server interaction), according to HL7's original 050 * mode processing rules.</p> 051 * <p>The <code>sendAndReceive(...)</code> method blocks until either a response is received 052 * with the matching message ID, or until a timeout period has passed. The timeout defaults to 053 * 10000 ms (10 sec) but can be configured by setting the system property "ca.uhn.hl7v2.app.initiator.timeout" 054 * to an integer value representing the number of ms after which to time out.</p> 055 * <p>At the time of writing, enhanced mode, two-phase reply, continuation messages, and 056 * batch processing are unsupported. </p> 057 * @author Bryan Tripp 058 */ 059 public class Initiator { 060 061 private static final HapiLog log = HapiLogFactory.getHapiLog(Initiator.class); 062 063 //private Parser parser; 064 private HL7Reader in; 065 private HL7Writer out; 066 private Connection conn; 067 //private boolean keepListening; 068 private int timeoutMillis = 10000; 069 070 /** 071 * Creates a new instance of Initiator. 072 * @param conn the Connection associated with this Initiator. 073 */ 074 protected Initiator(Connection conn) throws LLPException { 075 this.conn = conn; 076 077 //see if timeout has been set 078 String timeout = System.getProperty("ca.uhn.hl7v2.app.initiator.timeout"); 079 if (timeout != null) { 080 try { 081 timeoutMillis = Integer.parseInt(timeout); 082 log.debug("Setting Initiator timeout to " + timeout + " ms"); 083 } 084 catch (NumberFormatException e) { 085 log.warn(timeout + " is not a valid integer - Initiator is using default timeout"); 086 } 087 } 088 } 089 090 /** 091 * Sends a message to a responder system, receives the reply, and 092 * returns the reply as a Message object. This method is thread-safe - multiple 093 * threads can share an Initiator and call this method. Responses are returned to 094 * the calling thread on the basis of message ID. 095 */ 096 public Message sendAndReceive(Message out) throws HL7Exception, LLPException, IOException { 097 HapiLog rawOutbound = HapiLogFactory.getHapiLog("ca.uhn.hl7v2.raw.outbound"); 098 HapiLog rawInbound = HapiLogFactory.getHapiLog("ca.uhn.hl7v2.raw.inbound"); 099 100 if (out == null) { 101 throw new HL7Exception("Can't encode null message", HL7Exception.REQUIRED_FIELD_MISSING); 102 } 103 104 //register message with response Receiver(s) (by message ID) 105 Terser t = new Terser(out); 106 String messID = t.get("/MSH-10"); 107 108 if (messID == null || messID.length() == 0) { 109 throw new HL7Exception("MSH segment missing required field Control ID (MSH-10)", HL7Exception.REQUIRED_FIELD_MISSING); 110 } 111 112 MessageReceipt mr = this.conn.reserveResponse(messID); 113 114 //log and send message 115 String outbound = conn.getParser().encode(out); 116 log.info("Initiator sending message: " + outbound); 117 rawOutbound.info(outbound); 118 119 try { 120 this.conn.getSendWriter().writeMessage(outbound); 121 } 122 catch (IOException e) { 123 conn.close(); 124 throw e; 125 } 126 127 //wait for response 128 boolean done = false; 129 Message response = null; 130 long startTime = System.currentTimeMillis(); 131 while (!done) { 132 synchronized (mr) { 133 try { 134 mr.wait(500); //if it comes, notifyAll() will be called 135 } 136 catch (InterruptedException e) { 137 } 138 139 if (mr.getMessage() != null) { 140 //get message from receipt 141 String inbound = mr.getMessage(); 142 143 //log that we got the message 144 log.info( "Initiator received message: " + inbound ); 145 rawInbound.info(inbound); 146 147 //parse message 148 response = conn.getParser().parse(inbound); 149 log.debug("response parsed"); 150 done = true; 151 } 152 153 if (System.currentTimeMillis() > startTime + timeoutMillis) 154 throw new HL7Exception("Timeout waiting for response to message with control ID '" + messID); 155 } 156 } 157 158 return response; 159 } 160 161 /** 162 * Sets the time (in milliseconds) that the initiator will wait for a response 163 * for a given message before timing out and throwing an exception (default 164 * is 10 seconds). 165 */ 166 public void setTimeoutMillis(int timeout) { 167 this.timeoutMillis = timeout; 168 } 169 170 /** 171 * Test harness 172 */ 173 public static void main(String args[]) { 174 if (args.length != 2) { 175 System.out.println("Usage: ca.uhn.hl7v2.app.Initiator host port"); 176 } 177 178 try { 179 180 //set up connection to server 181 String host = args[0]; 182 int port = Integer.parseInt(args[1]); 183 184 final Parser parser = new PipeParser(); 185 LowerLayerProtocol llp = LowerLayerProtocol.makeLLP(); 186 Connection connection = new Connection(parser, llp, new Socket(host, port)); 187 final Initiator initiator = connection.getInitiator(); 188 final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA"; 189 190 //get a bunch of threads to send messages 191 for (int i = 0; i < 1000; i++) { 192 Thread sender = new Thread(new Runnable() { 193 public void run() { 194 try { 195 //get message ID 196 String ID = MessageIDGenerator.getInstance().getNewID(); 197 Message out = parser.parse(outText); 198 Terser tOut = new Terser(out); 199 tOut.set("/MSH-10", ID); 200 201 //send, get response 202 Message in = initiator.sendAndReceive(out); 203 204 //get ACK ID 205 Terser tIn = new Terser(in); 206 String ackID = tIn.get("/MSA-2"); 207 if (ID.equals(ackID)) { 208 System.out.println("OK - ack ID matches"); 209 } 210 else { 211 throw new RuntimeException("Ack ID for message " + ID + " is " + ackID); 212 } 213 214 } 215 catch (Exception e) { 216 e.printStackTrace(); 217 } 218 } 219 }); 220 sender.start(); 221 } 222 223 } 224 catch (Exception e) { 225 e.printStackTrace(); 226 } 227 } 228 229 }