001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import org.apache.activemq.state.CommandVisitor; 020 021 /** 022 * @openwire:marshaller code="22" 023 * @version $Revision: 1.11 $ 024 */ 025 public class MessageAck extends BaseCommand { 026 027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; 028 029 /** 030 * Used to let the broker know that the message has been delivered to the 031 * client. Message will still be retained until an standard ack is received. 032 * This is used get the broker to send more messages past prefetch limits 033 * when an standard ack has not been sent. 034 */ 035 public static final byte DELIVERED_ACK_TYPE = 0; 036 037 /** 038 * The standard ack case where a client wants the message to be discarded. 039 */ 040 public static final byte STANDARD_ACK_TYPE = 2; 041 042 /** 043 * In case the client want's to explicitly let the broker know that a 044 * message was not processed and the message was considered a poison 045 * message. 046 */ 047 public static final byte POSION_ACK_TYPE = 1; 048 049 /** 050 * In case the client want's to explicitly let the broker know that a 051 * message was not processed and it was re-delivered to the consumer 052 * but it was not yet considered to be a poison message. The messageCount 053 * field will hold the number of times the message was re-delivered. 054 */ 055 public static final byte REDELIVERED_ACK_TYPE = 3; 056 057 /** 058 * The ack case where a client wants only an individual message to be discarded. 059 */ 060 public static final byte INDIVIDUAL_ACK_TYPE = 4; 061 062 protected byte ackType; 063 protected ConsumerId consumerId; 064 protected MessageId firstMessageId; 065 protected MessageId lastMessageId; 066 protected ActiveMQDestination destination; 067 protected TransactionId transactionId; 068 protected int messageCount; 069 070 protected transient String consumerKey; 071 072 public MessageAck() { 073 } 074 075 public MessageAck(MessageDispatch md, byte ackType, int messageCount) { 076 this.ackType = ackType; 077 this.consumerId = md.getConsumerId(); 078 this.destination = md.getDestination(); 079 this.lastMessageId = md.getMessage().getMessageId(); 080 this.messageCount = messageCount; 081 } 082 083 public void copy(MessageAck copy) { 084 super.copy(copy); 085 copy.firstMessageId = firstMessageId; 086 copy.lastMessageId = lastMessageId; 087 copy.destination = destination; 088 copy.transactionId = transactionId; 089 copy.ackType = ackType; 090 copy.consumerId = consumerId; 091 } 092 093 public byte getDataStructureType() { 094 return DATA_STRUCTURE_TYPE; 095 } 096 097 public boolean isMessageAck() { 098 return true; 099 } 100 101 public boolean isPoisonAck() { 102 return ackType == POSION_ACK_TYPE; 103 } 104 105 public boolean isStandardAck() { 106 return ackType == STANDARD_ACK_TYPE; 107 } 108 109 public boolean isDeliveredAck() { 110 return ackType == DELIVERED_ACK_TYPE; 111 } 112 113 public boolean isRedeliveredAck() { 114 return ackType == REDELIVERED_ACK_TYPE; 115 } 116 117 public boolean isIndividualAck() { 118 return ackType == INDIVIDUAL_ACK_TYPE; 119 } 120 121 /** 122 * @openwire:property version=1 cache=true 123 */ 124 public ActiveMQDestination getDestination() { 125 return destination; 126 } 127 128 public void setDestination(ActiveMQDestination destination) { 129 this.destination = destination; 130 } 131 132 /** 133 * @openwire:property version=1 cache=true 134 */ 135 public TransactionId getTransactionId() { 136 return transactionId; 137 } 138 139 public void setTransactionId(TransactionId transactionId) { 140 this.transactionId = transactionId; 141 } 142 143 public boolean isInTransaction() { 144 return transactionId != null; 145 } 146 147 /** 148 * @openwire:property version=1 cache=true 149 */ 150 public ConsumerId getConsumerId() { 151 return consumerId; 152 } 153 154 public void setConsumerId(ConsumerId consumerId) { 155 this.consumerId = consumerId; 156 } 157 158 /** 159 * @openwire:property version=1 160 */ 161 public byte getAckType() { 162 return ackType; 163 } 164 165 public void setAckType(byte ackType) { 166 this.ackType = ackType; 167 } 168 169 /** 170 * @openwire:property version=1 171 */ 172 public MessageId getFirstMessageId() { 173 return firstMessageId; 174 } 175 176 public void setFirstMessageId(MessageId firstMessageId) { 177 this.firstMessageId = firstMessageId; 178 } 179 180 /** 181 * @openwire:property version=1 182 */ 183 public MessageId getLastMessageId() { 184 return lastMessageId; 185 } 186 187 public void setLastMessageId(MessageId lastMessageId) { 188 this.lastMessageId = lastMessageId; 189 } 190 191 /** 192 * The number of messages being acknowledged in the range. 193 * 194 * @openwire:property version=1 195 */ 196 public int getMessageCount() { 197 return messageCount; 198 } 199 200 public void setMessageCount(int messageCount) { 201 this.messageCount = messageCount; 202 } 203 204 public Response visit(CommandVisitor visitor) throws Exception { 205 return visitor.processMessageAck(this); 206 } 207 208 /** 209 * A helper method to allow a single message ID to be acknowledged 210 */ 211 public void setMessageID(MessageId messageID) { 212 setFirstMessageId(messageID); 213 setLastMessageId(messageID); 214 setMessageCount(1); 215 } 216 217 }