001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.OutputStream; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.command.ActiveMQBytesMessage; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQMessage; 031 import org.apache.activemq.command.MessageId; 032 import org.apache.activemq.command.ProducerId; 033 import org.apache.activemq.command.ProducerInfo; 034 import org.apache.activemq.util.IOExceptionSupport; 035 036 /** 037 * @version $Revision$ 038 */ 039 public class ActiveMQOutputStream extends OutputStream implements Disposable { 040 041 // Send down 64k messages. 042 protected int count; 043 044 final byte buffer[] = new byte[64 * 1024]; 045 046 private final ActiveMQConnection connection; 047 private final Map<String, Object> properties; 048 private final ProducerInfo info; 049 050 private long messageSequence; 051 private boolean closed; 052 private final int deliveryMode; 053 private final int priority; 054 private final long timeToLive; 055 056 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority, 057 long timeToLive) throws JMSException { 058 this.connection = connection; 059 this.deliveryMode = deliveryMode; 060 this.priority = priority; 061 this.timeToLive = timeToLive; 062 this.properties = properties == null ? null : new HashMap<String, Object>(properties); 063 064 if (destination == null) { 065 throw new InvalidDestinationException("Don't understand null destinations"); 066 } 067 068 this.info = new ProducerInfo(producerId); 069 this.info.setDestination(destination); 070 071 this.connection.addOutputStream(this); 072 this.connection.asyncSendPacket(info); 073 } 074 075 public void close() throws IOException { 076 if (!closed) { 077 flushBuffer(); 078 try { 079 // Send an EOS style empty message to signal EOS. 080 send(new ActiveMQMessage(), true); 081 dispose(); 082 this.connection.asyncSendPacket(info.createRemoveCommand()); 083 } catch (JMSException e) { 084 IOExceptionSupport.create(e); 085 } 086 } 087 } 088 089 public void dispose() { 090 if (!closed) { 091 this.connection.removeOutputStream(this); 092 closed = true; 093 } 094 } 095 096 public synchronized void write(int b) throws IOException { 097 buffer[count++] = (byte) b; 098 if (count == buffer.length) { 099 flushBuffer(); 100 } 101 } 102 103 public synchronized void write(byte b[], int off, int len) throws IOException { 104 while (len > 0) { 105 int max = Math.min(len, buffer.length - count); 106 System.arraycopy(b, off, buffer, count, max); 107 108 len -= max; 109 count += max; 110 off += max; 111 112 if (count == buffer.length) { 113 flushBuffer(); 114 } 115 } 116 } 117 118 public synchronized void flush() throws IOException { 119 flushBuffer(); 120 } 121 122 private void flushBuffer() throws IOException { 123 if (count != 0) { 124 try { 125 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 126 msg.writeBytes(buffer, 0, count); 127 send(msg, false); 128 } catch (JMSException e) { 129 throw IOExceptionSupport.create(e); 130 } 131 count = 0; 132 } 133 } 134 135 /** 136 * @param msg 137 * @throws JMSException 138 */ 139 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { 140 if (properties != null) { 141 for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) { 142 String key = (String) iter.next(); 143 Object value = properties.get(key); 144 msg.setObjectProperty(key, value); 145 } 146 } 147 msg.setType("org.apache.activemq.Stream"); 148 msg.setGroupID(info.getProducerId().toString()); 149 if (eosMessage) { 150 msg.setGroupSequence(-1); 151 } else { 152 msg.setGroupSequence((int) messageSequence); 153 } 154 MessageId id = new MessageId(info.getProducerId(), messageSequence++); 155 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage); 156 } 157 158 public String toString() { 159 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; 160 } 161 162 }