001    /**
002    The contents of this file are subject to the Mozilla Public License Version 1.1 
003    (the "License"); you may not use this file except in compliance with the License. 
004    You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005    Software distributed under the License is distributed on an "AS IS" basis, 
006    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007    specific language governing rights and limitations under the License. 
008    
009    The Original Code is "Initiator.java".  Description: 
010    "Performs the initiation role of a message exchange accorging to HL7's original 
011     mode rules." 
012    
013    The Initial Developer of the Original Code is University Health Network. Copyright (C) 
014    2002.  All Rights Reserved. 
015    
016    Contributor(s): ______________________________________. 
017    
018    Alternatively, the contents of this file may be used under the terms of the 
019    GNU General Public License (the  ???GPL???), in which case the provisions of the GPL are 
020    applicable instead of those above.  If you wish to allow use of your version of this 
021    file only under the terms of the GPL and not to allow others to use your version 
022    of this file under the MPL, indicate your decision by deleting  the provisions above 
023    and replace  them with the notice and other provisions required by the GPL License.  
024    If you do not delete the provisions above, a recipient may use your version of 
025    this file under either the MPL or the GPL. 
026    
027    */
028    
029    package ca.uhn.hl7v2.app;
030    
031    import java.io.IOException;
032    import java.net.Socket;
033    
034    import ca.uhn.hl7v2.HL7Exception;
035    import ca.uhn.hl7v2.llp.HL7Reader;
036    import ca.uhn.hl7v2.llp.HL7Writer;
037    import ca.uhn.hl7v2.llp.LLPException;
038    import ca.uhn.hl7v2.llp.LowerLayerProtocol;
039    import ca.uhn.hl7v2.model.Message;
040    import ca.uhn.hl7v2.parser.Parser;
041    import ca.uhn.hl7v2.parser.PipeParser;
042    import ca.uhn.hl7v2.util.MessageIDGenerator;
043    import ca.uhn.hl7v2.util.Terser;
044    import ca.uhn.log.HapiLog;
045    import ca.uhn.log.HapiLogFactory;
046    
047    /**
048     * <p>Performs the initiation role of a message exchange (i.e sender of the first message; 
049     * analagous to the client in a client-server interaction), according to HL7's original 
050     * mode processing rules.</p>
051     * <p>The <code>sendAndReceive(...)</code> method blocks until either a response is received 
052     * with the matching message ID, or until a timeout period has passed.  The timeout defaults to 
053     * 10000 ms (10 sec) but can be configured by setting the system property "ca.uhn.hl7v2.app.initiator.timeout"
054     * to an integer value representing the number of ms after which to time out.</p>
055     * <p>At the time of writing, enhanced mode, two-phase reply, continuation messages, and 
056     * batch processing are unsupported. </p>
057     * @author  Bryan Tripp
058     */
059    public class Initiator {
060        
061        private static final HapiLog log = HapiLogFactory.getHapiLog(Initiator.class); 
062    
063        //private Parser parser;
064        private HL7Reader in;
065        private HL7Writer out;
066        private Connection conn;
067        //private boolean keepListening;    
068        private int timeoutMillis = 10000;
069    
070        /** 
071         * Creates a new instance of Initiator.  
072         * @param conn the Connection associated with this Initiator.   
073         */
074        protected Initiator(Connection conn) throws LLPException {
075            this.conn = conn;
076    
077            //see if timeout has been set
078            String timeout = System.getProperty("ca.uhn.hl7v2.app.initiator.timeout");
079            if (timeout != null) {
080                try {
081                    timeoutMillis = Integer.parseInt(timeout);
082                    log.debug("Setting Initiator timeout to " + timeout + " ms");
083                }
084                catch (NumberFormatException e) {
085                    log.warn(timeout + " is not a valid integer - Initiator is using default timeout");
086                }
087            }
088        }
089    
090        /**
091         * Sends a message to a responder system, receives the reply, and 
092         * returns the reply as a Message object.  This method is thread-safe - multiple  
093         * threads can share an Initiator and call this method.  Responses are returned to 
094         * the calling thread on the basis of message ID.  
095         */
096        public Message sendAndReceive(Message out) throws HL7Exception, LLPException, IOException {
097            HapiLog rawOutbound = HapiLogFactory.getHapiLog("ca.uhn.hl7v2.raw.outbound");
098            HapiLog rawInbound = HapiLogFactory.getHapiLog("ca.uhn.hl7v2.raw.inbound");
099            
100            if (out == null) {
101                throw new HL7Exception("Can't encode null message", HL7Exception.REQUIRED_FIELD_MISSING);
102                    }
103    
104            //register message with response Receiver(s) (by message ID)
105            Terser t = new Terser(out);
106            String messID = t.get("/MSH-10");
107    
108                    if (messID == null || messID.length() == 0) {
109                            throw new HL7Exception("MSH segment missing required field Control ID (MSH-10)", HL7Exception.REQUIRED_FIELD_MISSING);
110                    }
111    
112            MessageReceipt mr = this.conn.reserveResponse(messID);
113    
114            //log and send message 
115            String outbound = conn.getParser().encode(out);
116            log.info("Initiator sending message: " + outbound);
117            rawOutbound.info(outbound);
118            
119            try {
120                this.conn.getSendWriter().writeMessage(outbound);
121            }
122            catch (IOException e) {
123                conn.close();
124                throw e;
125            }
126    
127            //wait for response
128            boolean done = false;
129            Message response = null;
130            long startTime = System.currentTimeMillis();
131            while (!done) {
132                synchronized (mr) {
133                    try {
134                        mr.wait(500); //if it comes, notifyAll() will be called
135                    }
136                    catch (InterruptedException e) {
137                    }
138    
139                    if (mr.getMessage() != null) {
140                        //get message from receipt 
141                        String inbound = mr.getMessage();
142                        
143                        //log that we got the message
144                        log.info( "Initiator received message: " + inbound );
145                        rawInbound.info(inbound);
146                        
147                        //parse message
148                        response = conn.getParser().parse(inbound);
149                        log.debug("response parsed");
150                        done = true;
151                    }
152    
153                    if (System.currentTimeMillis() > startTime + timeoutMillis)
154                        throw new HL7Exception("Timeout waiting for response to message with control ID '" + messID);
155                }
156            }
157    
158            return response;
159        }
160    
161        /**
162         * Sets the time (in milliseconds) that the initiator will wait for a response 
163         * for a given message before timing out and throwing an exception (default 
164         * is 10 seconds). 
165         */
166        public void setTimeoutMillis(int timeout) {
167            this.timeoutMillis = timeout;
168        }
169    
170        /**
171         * Test harness
172         */
173        public static void main(String args[]) {
174            if (args.length != 2) {
175                System.out.println("Usage: ca.uhn.hl7v2.app.Initiator host port");
176            }
177    
178            try {
179    
180                //set up connection to server
181                String host = args[0];
182                int port = Integer.parseInt(args[1]);
183    
184                final Parser parser = new PipeParser();
185                LowerLayerProtocol llp = LowerLayerProtocol.makeLLP();
186                Connection connection = new Connection(parser, llp, new Socket(host, port));
187                final Initiator initiator = connection.getInitiator();
188                final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA";
189    
190                //get a bunch of threads to send messages
191                for (int i = 0; i < 1000; i++) {
192                    Thread sender = new Thread(new Runnable() {
193                        public void run() {
194                            try {
195                                    //get message ID
196        String ID = MessageIDGenerator.getInstance().getNewID();
197                                Message out = parser.parse(outText);
198                                Terser tOut = new Terser(out);
199                                tOut.set("/MSH-10", ID);
200    
201                                //send, get response
202                                Message in = initiator.sendAndReceive(out);
203    
204                                //get ACK ID
205                                Terser tIn = new Terser(in);
206                                String ackID = tIn.get("/MSA-2");
207                                if (ID.equals(ackID)) {
208                                    System.out.println("OK - ack ID matches");
209                                }
210                                else {
211                                    throw new RuntimeException("Ack ID for message " + ID + " is " + ackID);
212                                }
213    
214                            }
215                            catch (Exception e) {
216                                e.printStackTrace();
217                            }
218                        }
219                    });
220                    sender.start();
221                }
222    
223            }
224            catch (Exception e) {
225                e.printStackTrace();
226            }
227        }
228    
229    }