001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.cache; 020 021 import javax.jms.JMSException; 022 023 import org.activemq.message.ActiveMQMessage; 024 import org.activemq.message.MessageAck; 025 import org.activemq.service.MessageIdentity; 026 import org.activemq.store.MessageStore; 027 import org.activemq.store.RecoveryListener; 028 029 /** 030 * A MessageStore that uses an in memory cache to speed up getMessage() method calls. 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public class CacheMessageStore implements MessageStore, CacheMessageStoreAware { 035 036 private final MessageStore longTermStore; 037 private final MessageCache cache; 038 039 public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) { 040 this.longTermStore = longTermStore; 041 this.cache = cache; 042 043 // Make any downstream CacheMessageStoreAware objects aware of us. 044 setCacheMessageStore(this); 045 } 046 047 /** 048 * Add the meessage to the long term store and cache it. 049 */ 050 public void addMessage(ActiveMQMessage message) throws JMSException { 051 longTermStore.addMessage(message); 052 cache.put(message.getJMSMessageID(), message); 053 } 054 055 /** 056 * Remove the meessage to the long term store and remove it from the cache. 057 */ 058 public void removeMessage(MessageAck ack) throws JMSException { 059 longTermStore.removeMessage( ack); 060 cache.remove(ack.getMessageID()); 061 } 062 063 /** 064 * Return the message from the cache or go to the longTermStore if it is not 065 * in there. 066 */ 067 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException { 068 ActiveMQMessage answer=null; 069 answer = cache.get(identity.getMessageID()); 070 if( answer!=null ) 071 return answer; 072 073 answer = longTermStore.getMessage(identity); 074 cache.put(identity.getMessageID(), answer); 075 return answer; 076 } 077 078 /** 079 * Replays the checkpointStore first as those messages are the oldest ones, 080 * then messages are replayed from the transaction log and then the cache is 081 * updated. 082 * 083 * @param listener 084 * @throws JMSException 085 */ 086 public synchronized void recover(final RecoveryListener listener) 087 throws JMSException { 088 longTermStore.recover(listener); 089 } 090 091 public void start() throws JMSException { 092 longTermStore.start(); 093 } 094 095 public void stop() throws JMSException { 096 longTermStore.stop(); 097 cache.close(); 098 } 099 100 /** 101 * @return Returns the longTermStore. 102 */ 103 public MessageStore getLongTermStore() { 104 return longTermStore; 105 } 106 107 /** 108 * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore) 109 */ 110 public void setCacheMessageStore(CacheMessageStore store) { 111 // Make any downstream CacheMessageStoreAware objects aware of us. 112 // This cache implementation could be cached by another cache. 113 if( longTermStore instanceof CacheMessageStoreAware ) { 114 ((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store); 115 } 116 } 117 118 /** 119 * @see org.activemq.store.MessageStore#removeAllMessages() 120 */ 121 public void removeAllMessages() throws JMSException { 122 longTermStore.removeAllMessages(); 123 } 124 125 126 }