001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.Map; 023 024 import javax.jms.JMSException; 025 026 import org.activemq.broker.BrokerClient; 027 import org.activemq.filter.Filter; 028 import org.activemq.message.ConsumerInfo; 029 import org.activemq.message.MessageAck; 030 import org.activemq.service.DeadLetterPolicy; 031 import org.activemq.service.Dispatcher; 032 import org.activemq.service.QueueListEntry; 033 import org.activemq.service.RedeliveryPolicy; 034 import org.activemq.service.TopicMessageContainer; 035 import org.activemq.service.TransactionManager; 036 import org.activemq.service.TransactionTask; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * Represents a durable topic subscription where the consumer has a unique 042 * clientID used to persist the messages across both Broker restarts and 043 * JMS client restarts 044 * 045 * @version $Revision: 1.1.1.1 $ 046 */ 047 public class DurableTopicSubscription extends SubscriptionImpl { 048 049 private static final Log log = LogFactory.getLog(DurableTopicSubscription.class); 050 051 private String persistentKey; 052 053 public DurableTopicSubscription(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 054 super(dispatcher, client, info, filter, redeliveryPolicy,deadLetterPolicy); 055 } 056 057 public synchronized void messageConsumed(MessageAck ack) throws JMSException { 058 if (ack.isExpired() || (!ack.isMessageRead() && !isBrowser())) { 059 super.messageConsumed(ack); 060 } 061 else { 062 final Map lastMessagePointersPerContainer = new HashMap(); 063 064 //remove up to this message 065 boolean found = false; 066 QueueListEntry queueEntry = messagePtrs.getFirstEntry(); 067 while (queueEntry != null) { 068 final MessagePointer pointer = (MessagePointer) queueEntry.getElement(); 069 070 if( !pointer.isDispatched() ) { 071 break; 072 } 073 074 messagePtrs.remove(queueEntry); 075 lastMessagePointersPerContainer.put(pointer.getContainer(), pointer); 076 unconsumedMessagesDispatched.decrement(); 077 078 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){ 079 public void execute() throws Throwable { 080 unconsumedMessagesDispatched.increment(); 081 MessagePointer p = new MessagePointer(pointer); 082 p.setRedelivered(true); 083 messagePtrs.add(p); 084 dispatch.wakeup(DurableTopicSubscription.this); 085 lastMessageIdentity = pointer.getMessageIdentity(); 086 } 087 }); 088 089 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 090 public void execute() throws Throwable { 091 // now lets tell each container to update its lastAcknowlegedMessageID 092 for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext();) { 093 Map.Entry entry = (Map.Entry) iter.next(); 094 TopicMessageContainer container = (TopicMessageContainer) entry.getKey(); 095 MessagePointer pointer = (MessagePointer) entry.getValue(); 096 container.setLastAcknowledgedMessageID(DurableTopicSubscription.this, pointer.getMessageIdentity()); 097 } 098 } 099 }); 100 101 if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) { 102 found = true; 103 break; 104 } 105 queueEntry = messagePtrs.getNextEntry(queueEntry); 106 } 107 if (!found) { 108 log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity()); 109 } 110 //System.out.println("Message consumed. Remaining: " + messagePtrs.size() + " unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get()); 111 dispatch.wakeup(this); 112 } 113 } 114 115 public String getPersistentKey() { 116 if (persistentKey == null) { 117 persistentKey = "[" + getClientId() + ":" + getSubscriberName() + "]"; 118 } 119 return persistentKey; 120 } 121 }