001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.LinkedList; 023 import java.util.Map; 024 import java.util.Map.Entry; 025 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQMessage; 030 import org.apache.activemq.command.ConsumerInfo; 031 import org.apache.activemq.command.MessageAck; 032 import org.apache.activemq.command.MessageDispatch; 033 import org.apache.activemq.command.MessageId; 034 import org.apache.activemq.command.TransactionId; 035 036 /** 037 * Keeps track of the STOMP subscription so that acking is correctly done. 038 * 039 * @author <a href="http://hiramchirino.com">chirino</a> 040 */ 041 public class StompSubscription { 042 043 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; 044 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; 045 public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL; 046 047 private final ProtocolConverter protocolConverter; 048 private final String subscriptionId; 049 private final ConsumerInfo consumerInfo; 050 051 private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>(); 052 private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>(); 053 054 private String ackMode = AUTO_ACK; 055 private ActiveMQDestination destination; 056 private String transformation; 057 058 059 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { 060 this.protocolConverter = stompTransport; 061 this.subscriptionId = subscriptionId; 062 this.consumerInfo = consumerInfo; 063 this.transformation = transformation; 064 } 065 066 void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { 067 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 068 if (ackMode == CLIENT_ACK) { 069 synchronized (this) { 070 dispatchedMessage.put(message.getMessageId(), md); 071 } 072 } else if (ackMode == INDIVIDUAL_ACK) { 073 synchronized (this) { 074 dispatchedMessage.put(message.getMessageId(), md); 075 } 076 } else if (ackMode == AUTO_ACK) { 077 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 078 protocolConverter.getTransportFilter().sendToActiveMQ(ack); 079 } 080 081 boolean ignoreTransformation = false; 082 083 if (transformation != null) { 084 message.setReadOnlyProperties(false); 085 message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); 086 } else { 087 if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) { 088 ignoreTransformation = true; 089 } 090 } 091 092 StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation); 093 094 command.setAction(Stomp.Responses.MESSAGE); 095 if (subscriptionId != null) { 096 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); 097 } 098 099 protocolConverter.getTransportFilter().sendToStomp(command); 100 } 101 102 synchronized void onStompAbort(TransactionId transactionId) { 103 unconsumedMessage.clear(); 104 } 105 106 synchronized void onStompCommit(TransactionId transactionId) { 107 for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { 108 Map.Entry entry = (Entry)iter.next(); 109 MessageId id = (MessageId)entry.getKey(); 110 MessageDispatch msg = (MessageDispatch)entry.getValue(); 111 if (unconsumedMessage.contains(msg)) { 112 iter.remove(); 113 } 114 } 115 unconsumedMessage.clear(); 116 } 117 118 synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { 119 120 MessageId msgId = new MessageId(messageId); 121 122 if (!dispatchedMessage.containsKey(msgId)) { 123 return null; 124 } 125 126 MessageAck ack = new MessageAck(); 127 ack.setDestination(consumerInfo.getDestination()); 128 ack.setConsumerId(consumerInfo.getConsumerId()); 129 130 if (ackMode == CLIENT_ACK) { 131 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 132 int count = 0; 133 for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { 134 135 Map.Entry entry = (Entry)iter.next(); 136 MessageId id = (MessageId)entry.getKey(); 137 MessageDispatch msg = (MessageDispatch)entry.getValue(); 138 139 if (ack.getFirstMessageId() == null) { 140 ack.setFirstMessageId(id); 141 } 142 143 if (transactionId != null) { 144 if (!unconsumedMessage.contains(msg)) { 145 unconsumedMessage.add(msg); 146 } 147 } else { 148 iter.remove(); 149 } 150 151 152 count++; 153 154 if (id.equals(msgId)) { 155 ack.setLastMessageId(id); 156 break; 157 } 158 159 } 160 ack.setMessageCount(count); 161 if (transactionId != null) { 162 ack.setTransactionId(transactionId); 163 } 164 } 165 else if (ackMode == INDIVIDUAL_ACK) { 166 ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); 167 ack.setMessageID(msgId); 168 if (transactionId != null) { 169 unconsumedMessage.add(dispatchedMessage.get(msgId)); 170 ack.setTransactionId(transactionId); 171 } 172 dispatchedMessage.remove(msgId); 173 } 174 return ack; 175 } 176 177 public String getAckMode() { 178 return ackMode; 179 } 180 181 public void setAckMode(String ackMode) { 182 this.ackMode = ackMode; 183 } 184 185 public String getSubscriptionId() { 186 return subscriptionId; 187 } 188 189 public void setDestination(ActiveMQDestination destination) { 190 this.destination = destination; 191 } 192 193 public ActiveMQDestination getDestination() { 194 return destination; 195 } 196 197 public ConsumerInfo getConsumerInfo() { 198 return consumerInfo; 199 } 200 201 }