001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 020 package org.activemq.io.impl; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 024 import org.activemq.message.AbstractPacket; 025 import org.activemq.message.ActiveMQDestination; 026 import org.activemq.message.ActiveMQXid; 027 import org.activemq.message.MessageAck; 028 import org.activemq.message.Packet; 029 import org.activemq.util.BitArray; 030 031 /** 032 * Writes a ConsumerInfo object to a Stream 033 */ 034 035 public class MessageAckWriter extends AbstractPacketWriter { 036 private AbstractDefaultWireFormat wireFormat; 037 038 MessageAckWriter(AbstractDefaultWireFormat wf){ 039 this.wireFormat = wf; 040 } 041 042 MessageAckWriter(){ 043 } 044 045 /** 046 * Return the type of Packet 047 * 048 * @return integer representation of the type of Packet 049 */ 050 051 public int getPacketType() { 052 return Packet.ACTIVEMQ_MSG_ACK; 053 } 054 055 /** 056 * Write a Packet instance to data output stream 057 * 058 * @param packet the instance to be seralized 059 * @param dataOut the output stream 060 * @throws IOException thrown if an error occurs 061 */ 062 063 public void writePacket(Packet packet, DataOutput dataOut) throws IOException { 064 MessageAck ack = (MessageAck) packet; 065 066 boolean cachingEnabled = wireFormat != null ? wireFormat.isCachingEnabled() : false; 067 boolean longSequence = ack.getSequenceNumber() > Integer.MAX_VALUE; 068 069 070 Object[] visited = ack.getBrokersVisited(); 071 boolean writeVisited = visited != null && visited.length > 0; 072 BitArray ba = ack.getBitArray(); 073 ba.reset(); 074 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, ack.isReceiptRequired()); 075 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited); 076 ba.set(MessageAck.MESSAGE_READ_INDEX, ack.isMessageRead()); 077 ba.set(MessageAck.TRANSACTION_ID_INDEX, ack.isPartOfTransaction()); 078 ba.set(MessageAck.XA_TRANS_INDEX, ack.isXaTransacted()); 079 ba.set(MessageAck.PERSISTENT_INDEX,ack.isPersistent()); 080 ba.set(MessageAck.EXPIRED_INDEX,ack.isExpired()); 081 ba.set(MessageAck.EXTERNAL_MESSAGE_ID_INDEX, ack.isExternalMessageId()); 082 ba.set(MessageAck.CACHED_VALUES_INDEX,cachingEnabled); 083 ba.set(MessageAck.LONG_SEQUENCE_INDEX, longSequence); 084 ba.writeToStream(dataOut); 085 086 if (ack.isReceiptRequired()){ 087 dataOut.writeShort(ack.getId()); 088 } 089 if (ack.isExternalMessageId()){ 090 writeUTF(ack.getMessageID(),dataOut); 091 }else { 092 if (cachingEnabled){ 093 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getProducerKey())); 094 }else{ 095 writeUTF(ack.getProducerKey(),dataOut); 096 } 097 if (longSequence){ 098 dataOut.writeLong(ack.getSequenceNumber()); 099 }else { 100 dataOut.writeInt((int)ack.getSequenceNumber()); 101 } 102 } 103 if (writeVisited){ 104 dataOut.writeShort(visited.length); 105 for(int i =0; i < visited.length; i++){ 106 final String brokerName = visited[i].toString(); 107 if (brokerName != null) { 108 dataOut.writeUTF(brokerName); 109 } 110 } 111 } 112 113 if (ack.isPartOfTransaction()) { 114 if (cachingEnabled){ 115 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getTransactionId())); 116 } else { 117 if( ack.isXaTransacted()) { 118 ActiveMQXid xid = (ActiveMQXid) ack.getTransactionId(); 119 xid.write(dataOut); 120 } else { 121 super.writeUTF((String) ack.getTransactionId(), dataOut); 122 } 123 } 124 } 125 126 if (cachingEnabled){ 127 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getConsumerId())); 128 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getDestination())); 129 }else { 130 super.writeUTF(ack.getConsumerId(), dataOut); 131 ActiveMQDestination.writeToStream((ActiveMQDestination) ack.getDestination(), dataOut); 132 } 133 } 134 135 136 }