001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.streams;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.io.OutputStream;
023    import javax.jms.BytesMessage;
024    import javax.jms.JMSException;
025    import javax.jms.MessageProducer;
026    import org.activemq.message.ActiveMQBytesMessage;
027    
028    /**
029     * OutputStream that writes on to JMS via the supplied JMS MessageProducer
030     * 
031     * @version $Revision: 1.1.1.1 $
032     */
033    public class JMSOutputStream extends OutputStream {
034        private final static int BUFFER_SIZE = 16 * 1024;
035        private byte[] buf;
036        private int count;
037        private boolean closed;
038        private MessageProducer producer;
039    
040        
041        /**
042        * Creates a new output stream to write data using the supplied JMS MessageProducer
043        * @param producer
044        */
045       public JMSOutputStream(MessageProducer producer) {
046           this(producer,BUFFER_SIZE);
047       }
048    
049        /**
050         * Creates a new output stream to write data using the supplied JMS MessageProducer
051         * @param producer
052         * @param size the buffer size.
053         * @throws IllegalArgumentException if size <= 0.
054         */
055        public JMSOutputStream(MessageProducer producer, int size) {
056            if (size <= 0) {
057                throw new IllegalArgumentException("Buffer size <= 0");
058            }
059            buf = new byte[size];
060            this.producer = producer;
061        }
062    
063        /**
064         * write a byte on to the stream
065         *
066         * @param b - byte to write
067         * @throws IOException
068         */
069        public void write(int b) throws IOException {
070            checkClosed();
071            if (availableBufferToWrite() < 1) {
072                flush();
073            }
074            buf[count++] = (byte) b;
075        }
076    
077    
078        /**
079         * write a byte array to the stream
080         *
081         * @param b   the byte buffer
082         * @param off the offset into the buffer
083         * @param len the length of data to write
084         * @throws IOException
085         */
086        public void write(byte b[], int off, int len) throws IOException {
087            checkClosed();
088            if (availableBufferToWrite() < len) {
089                flush();
090            }
091            if (buf.length >= len) {
092                System.arraycopy(b, off, buf, count, len);
093                count += len;
094            }
095            else {
096                writeBuffer(b, off, len);
097            }
098        }
099    
100        /**
101         * flush the data to the output stream
102         * This doesn't call flush on the underlying outputstream, because
103         * Tcp is particularly efficent at doing this itself ....
104         *
105         * @throws IOException
106         */
107        public void flush() throws IOException {
108            checkClosed();
109            if (count > 0 ) {
110                writeBuffer(buf, 0, count);
111                count = 0;
112            }
113        }
114    
115        /**
116         * close this stream
117         *
118         * @throws IOException
119         */
120        public void close() throws IOException {
121            if (!closed) {
122                write(-1);
123                flush();
124                super.close();
125                closed = true;
126                try {
127                    producer.close();
128                }
129                catch (JMSException jmsEx) {
130                    IOException ioEx = new IOException(jmsEx.getMessage());
131                    throw ioEx;
132                }
133            }
134        }
135    
136    
137        /**
138         * Checks that the stream has not been closed
139         *
140         * @throws IOException
141         */
142        protected void checkClosed() throws IOException {
143            if (closed) {
144                throw new EOFException("Cannot write to the stream any more it has already been closed");
145            }
146        }
147    
148        /**
149         * @return the amount free space in the buffer
150         */
151        private int availableBufferToWrite() {
152            return buf.length - count;
153        }
154        
155        private void writeBuffer(byte[] buf,int offset, int length) throws IOException{
156            try {
157            BytesMessage message = new ActiveMQBytesMessage();
158            message.writeBytes(buf,offset,length);
159            producer.send(message);
160            }catch(JMSException jmsEx){
161                IOException ioEx = new IOException(jmsEx.getMessage());
162                throw ioEx;
163            }
164        }
165    }