001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.util.HashMap;
022    import java.util.Map;
023    
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.CommandTypes;
032    import org.apache.activemq.command.ConsumerId;
033    import org.apache.activemq.command.ConsumerInfo;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.ProducerId;
037    import org.apache.activemq.selector.SelectorParser;
038    import org.apache.activemq.util.IOExceptionSupport;
039    import org.apache.activemq.util.IntrospectionSupport;
040    import org.apache.activemq.util.JMSExceptionSupport;
041    
042    /**
043     * @version $Revision$
044     */
045    public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
046    
047        private final ActiveMQConnection connection;
048        private final ConsumerInfo info;
049        // These are the messages waiting to be delivered to the client
050        private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
051    
052        private int deliveredCounter;
053        private MessageDispatch lastDelivered;
054        private boolean eosReached;
055        private byte buffer[];
056        private int pos;
057    
058        private ProducerId producerId;
059        private long nextSequenceId;
060    
061        public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch)
062            throws JMSException {
063            this.connection = connection;
064    
065            if (dest == null) {
066                throw new InvalidDestinationException("Don't understand null destinations");
067            } else if (dest.isTemporary()) {
068                String physicalName = dest.getPhysicalName();
069    
070                if (physicalName == null) {
071                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
072                }
073    
074                String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
075    
076                if (physicalName.indexOf(connectionID) < 0) {
077                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
078                }
079    
080                if (connection.isDeleted(dest)) {
081                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
082                }
083            }
084    
085            this.info = new ConsumerInfo(consumerId);
086            this.info.setSubscriptionName(name);
087    
088            if (selector != null && selector.trim().length() != 0) {
089                selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
090            } else {
091                selector = "JMSType='org.apache.activemq.Stream'";
092            }
093    
094            SelectorParser.parse(selector);
095            this.info.setSelector(selector);
096    
097            this.info.setPrefetchSize(prefetch);
098            this.info.setNoLocal(noLocal);
099            this.info.setBrowser(false);
100            this.info.setDispatchAsync(false);
101    
102            // Allows the options on the destination to configure the consumerInfo
103            if (dest.getOptions() != null) {
104                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
105                IntrospectionSupport.setProperties(this.info, options, "consumer.");
106            }
107    
108            this.info.setDestination(dest);
109    
110            this.connection.addInputStream(this);
111            this.connection.addDispatcher(info.getConsumerId(), this);
112            this.connection.syncSendPacket(info);
113            unconsumedMessages.start();
114        }
115    
116        public void close() throws IOException {
117            if (!unconsumedMessages.isClosed()) {
118                try {
119                    if (lastDelivered != null) {
120                        MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
121                        connection.asyncSendPacket(ack);
122                    }
123                    dispose();
124                    this.connection.syncSendPacket(info.createRemoveCommand());
125                } catch (JMSException e) {
126                    throw IOExceptionSupport.create(e);
127                }
128            }
129        }
130    
131        public void dispose() {
132            if (!unconsumedMessages.isClosed()) {
133                unconsumedMessages.close();
134                this.connection.removeDispatcher(info.getConsumerId());
135                this.connection.removeInputStream(this);
136            }
137        }
138    
139        public ActiveMQMessage receive() throws JMSException {
140            checkClosed();
141            MessageDispatch md;
142            try {
143                md = unconsumedMessages.dequeue(-1);
144            } catch (InterruptedException e) {
145                Thread.currentThread().interrupt();
146                throw JMSExceptionSupport.create(e);
147            }
148    
149            if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
150                return null;
151            }
152    
153            deliveredCounter++;
154            if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
155                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
156                connection.asyncSendPacket(ack);
157                deliveredCounter = 0;
158                lastDelivered = null;
159            } else {
160                lastDelivered = md;
161            }
162    
163            return (ActiveMQMessage)md.getMessage();
164        }
165    
166        /**
167         * @throws IllegalStateException
168         */
169        protected void checkClosed() throws IllegalStateException {
170            if (unconsumedMessages.isClosed()) {
171                throw new IllegalStateException("The Consumer is closed");
172            }
173        }
174    
175        public int read() throws IOException {
176            fillBuffer();
177            if (eosReached || buffer.length == 0) {
178                return -1;
179            }
180    
181            return buffer[pos++] & 0xff;
182        }
183    
184        public int read(byte[] b, int off, int len) throws IOException {
185            fillBuffer();
186            if (eosReached || buffer.length == 0) {
187                return -1;
188            }
189    
190            int max = Math.min(len, buffer.length - pos);
191            System.arraycopy(buffer, pos, b, off, max);
192    
193            pos += max;
194            return max;
195        }
196    
197        private void fillBuffer() throws IOException {
198            if (eosReached || (buffer != null && buffer.length > pos)) {
199                return;
200            }
201            try {
202                while (true) {
203                    ActiveMQMessage m = receive();
204                    if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
205                        // First message.
206                        long producerSequenceId = m.getMessageId().getProducerSequenceId();
207                        if (producerId == null) {
208                            // We have to start a stream at sequence id = 0
209                            if (producerSequenceId != 0) {
210                                continue;
211                            }
212                            nextSequenceId++;
213                            producerId = m.getMessageId().getProducerId();
214                        } else {
215                            // Verify it's the next message of the sequence.
216                            if (!m.getMessageId().getProducerId().equals(producerId)) {
217                                throw new IOException("Received an unexpected message: invalid producer: " + m);
218                            }
219                            if (producerSequenceId != nextSequenceId++) {
220                                throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
221                            }
222                        }
223    
224                        // Read the buffer in.
225                        ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
226                        buffer = new byte[(int)bm.getBodyLength()];
227                        bm.readBytes(buffer);
228                        pos = 0;
229                    } else {
230                        eosReached = true;
231                    }
232                    return;
233                }
234            } catch (JMSException e) {
235                eosReached = true;
236                throw IOExceptionSupport.create(e);
237            }
238        }
239    
240        public void dispatch(MessageDispatch md) {
241            unconsumedMessages.enqueue(md);
242        }
243    
244        public String toString() {
245            return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
246        }
247    
248    }