001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.memory; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.HashSet; 022 import java.util.Iterator; 023 import java.util.Set; 024 import java.util.concurrent.ConcurrentHashMap; 025 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.ActiveMQQueue; 029 import org.apache.activemq.command.ActiveMQTopic; 030 import org.apache.activemq.store.MessageStore; 031 import org.apache.activemq.store.PersistenceAdapter; 032 import org.apache.activemq.store.ProxyMessageStore; 033 import org.apache.activemq.store.TopicMessageStore; 034 import org.apache.activemq.store.TransactionStore; 035 import org.apache.activemq.usage.SystemUsage; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * @org.apache.xbean.XBean 041 * @version $Revision: 1.4 $ 042 */ 043 public class MemoryPersistenceAdapter implements PersistenceAdapter { 044 private static final Log LOG = LogFactory.getLog(MemoryPersistenceAdapter.class); 045 046 MemoryTransactionStore transactionStore; 047 ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>(); 048 ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 049 private boolean useExternalMessageReferences; 050 051 public Set<ActiveMQDestination> getDestinations() { 052 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size()); 053 for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) { 054 rc.add(iter.next()); 055 } 056 for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) { 057 rc.add(iter.next()); 058 } 059 return rc; 060 } 061 062 public static MemoryPersistenceAdapter newInstance(File file) { 063 return new MemoryPersistenceAdapter(); 064 } 065 066 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 067 MessageStore rc = queues.get(destination); 068 if (rc == null) { 069 rc = new MemoryMessageStore(destination); 070 if (transactionStore != null) { 071 rc = transactionStore.proxy(rc); 072 } 073 queues.put(destination, rc); 074 } 075 return rc; 076 } 077 078 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 079 TopicMessageStore rc = topics.get(destination); 080 if (rc == null) { 081 rc = new MemoryTopicMessageStore(destination); 082 if (transactionStore != null) { 083 rc = transactionStore.proxy(rc); 084 } 085 topics.put(destination, rc); 086 } 087 return rc; 088 } 089 090 /** 091 * Cleanup method to remove any state associated with the given destination 092 * 093 * @param destination Destination to forget 094 */ 095 public void removeQueueMessageStore(ActiveMQQueue destination) { 096 queues.remove(destination); 097 } 098 099 /** 100 * Cleanup method to remove any state associated with the given destination 101 * 102 * @param destination Destination to forget 103 */ 104 public void removeTopicMessageStore(ActiveMQTopic destination) { 105 topics.remove(destination); 106 } 107 108 public TransactionStore createTransactionStore() throws IOException { 109 if (transactionStore == null) { 110 transactionStore = new MemoryTransactionStore(this); 111 } 112 return transactionStore; 113 } 114 115 public void beginTransaction(ConnectionContext context) { 116 } 117 118 public void commitTransaction(ConnectionContext context) { 119 } 120 121 public void rollbackTransaction(ConnectionContext context) { 122 } 123 124 public void start() throws Exception { 125 } 126 127 public void stop() throws Exception { 128 } 129 130 public long getLastMessageBrokerSequenceId() throws IOException { 131 return 0; 132 } 133 134 public void deleteAllMessages() throws IOException { 135 for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) { 136 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 137 if (store != null) { 138 store.delete(); 139 } 140 } 141 for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) { 142 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 143 if (store != null) { 144 store.delete(); 145 } 146 } 147 148 if (transactionStore != null) { 149 transactionStore.delete(); 150 } 151 } 152 153 public boolean isUseExternalMessageReferences() { 154 return useExternalMessageReferences; 155 } 156 157 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 158 this.useExternalMessageReferences = useExternalMessageReferences; 159 } 160 161 protected MemoryMessageStore asMemoryMessageStore(Object value) { 162 if (value instanceof MemoryMessageStore) { 163 return (MemoryMessageStore)value; 164 } 165 if (value instanceof ProxyMessageStore) { 166 MessageStore delegate = ((ProxyMessageStore)value).getDelegate(); 167 if (delegate instanceof MemoryMessageStore) { 168 return (MemoryMessageStore) delegate; 169 } 170 } 171 LOG.warn("Expected an instance of MemoryMessageStore but was: " + value); 172 return null; 173 } 174 175 /** 176 * @param usageManager The UsageManager that is controlling the broker's 177 * memory usage. 178 */ 179 public void setUsageManager(SystemUsage usageManager) { 180 } 181 182 public String toString() { 183 return "MemoryPersistenceAdapter"; 184 } 185 186 public void setBrokerName(String brokerName) { 187 } 188 189 public void setDirectory(File dir) { 190 } 191 192 public void checkpoint(boolean sync) throws IOException { 193 } 194 195 public long size(){ 196 return 0; 197 } 198 199 public void setCreateTransactionStore(boolean create) throws IOException { 200 if (create) { 201 createTransactionStore(); 202 } 203 } 204 }