001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.vm; 019 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedHashMap; 024 import java.util.Map; 025 026 import javax.jms.JMSException; 027 028 import org.activemq.message.ActiveMQMessage; 029 import org.activemq.message.ConsumerInfo; 030 import org.activemq.service.MessageIdentity; 031 import org.activemq.service.SubscriberEntry; 032 import org.activemq.store.RecoveryListener; 033 import org.activemq.store.TopicMessageStore; 034 035 /** 036 * @version $Revision: 1.1.1.1 $ 037 */ 038 public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore { 039 private static final Integer ONE = new Integer(1); 040 041 private Map ackDatabase; 042 private Map messageCounts; 043 private Map subscriberDatabase; 044 045 public VMTopicMessageStore() { 046 this(new LinkedHashMap(), makeMap(), makeMap(), makeMap()); 047 } 048 049 public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) { 050 super(messageTable); 051 this.subscriberDatabase = subscriberDatabase; 052 this.ackDatabase = ackDatabase; 053 this.messageCounts = messageCounts; 054 } 055 056 public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException { 057 Integer number = (Integer) messageCounts.get(messageId.getMessageID()); 058 if (number == null) { 059 number = ONE; 060 } 061 else { 062 number = new Integer(number.intValue() + 1); 063 } 064 messageCounts.put(messageId.getMessageID(), number); 065 } 066 067 public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity msgId) throws JMSException { 068 Integer number = (Integer) messageCounts.get(msgId.getMessageID()); 069 if (number == null || number.intValue() <= 1) { 070 removeMessage(msgId); 071 if (number != null) { 072 messageCounts.remove(msgId.getMessageID()); 073 } 074 } 075 else { 076 messageCounts.put(msgId.getMessageID(), new Integer(number.intValue() - 1)); 077 number = ONE; 078 } 079 } 080 081 public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException { 082 ackDatabase.put(subscription, messageIdentity); 083 } 084 085 public synchronized void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException { 086 //iterate through the message table and populate the subscriber 087 Map map = new HashMap(messageTable); 088 boolean alreadyAcked = true; 089 MessageIdentity lastAcked = (MessageIdentity)ackDatabase.get(subscriptionId); 090 if( lastAcked==null ) 091 return; 092 093 for (Iterator i = map.values().iterator(); i.hasNext(); ){ 094 ActiveMQMessage msg = (ActiveMQMessage)i.next(); 095 if (!alreadyAcked){ 096 listener.recoverMessage(msg.getJMSMessageIdentity()); 097 } 098 if (lastAcked.getMessageID().equals(msg.getJMSMessageID())){ 099 alreadyAcked = false; 100 } 101 } 102 } 103 104 public MessageIdentity getLastestMessageIdentity() throws JMSException { 105 return super.lastMessageIdentity; 106 } 107 108 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException { 109 Object key = info.getConsumerKey(); 110 return (SubscriberEntry) subscriberDatabase.get(key); 111 } 112 113 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException { 114 subscriberDatabase.put(info.getConsumerKey(), subscriberEntry); 115 } 116 117 public void stop() throws JMSException { 118 } 119 120 protected static Map makeMap() { 121 return Collections.synchronizedMap(new HashMap()); 122 } 123 124 public void deleteSubscription(String sub) { 125 ackDatabase.remove(sub); 126 } 127 128 }