001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.ActiveMQQueue;
029    import org.apache.activemq.command.ActiveMQTopic;
030    import org.apache.activemq.store.MessageStore;
031    import org.apache.activemq.store.PersistenceAdapter;
032    import org.apache.activemq.store.ProxyMessageStore;
033    import org.apache.activemq.store.TopicMessageStore;
034    import org.apache.activemq.store.TransactionStore;
035    import org.apache.activemq.usage.SystemUsage;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * @org.apache.xbean.XBean
041     * @version $Revision: 1.4 $
042     */
043    public class MemoryPersistenceAdapter implements PersistenceAdapter {
044        private static final Log LOG = LogFactory.getLog(MemoryPersistenceAdapter.class);
045    
046        MemoryTransactionStore transactionStore;
047        ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
048        ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
049        private boolean useExternalMessageReferences;
050    
051        public Set<ActiveMQDestination> getDestinations() {
052            Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
053            for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
054                rc.add(iter.next());
055            }
056            for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
057                rc.add(iter.next());
058            }
059            return rc;
060        }
061    
062        public static MemoryPersistenceAdapter newInstance(File file) {
063            return new MemoryPersistenceAdapter();
064        }
065    
066        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
067            MessageStore rc = queues.get(destination);
068            if (rc == null) {
069                rc = new MemoryMessageStore(destination);
070                if (transactionStore != null) {
071                    rc = transactionStore.proxy(rc);
072                }
073                queues.put(destination, rc);
074            }
075            return rc;
076        }
077    
078        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
079            TopicMessageStore rc = topics.get(destination);
080            if (rc == null) {
081                rc = new MemoryTopicMessageStore(destination);
082                if (transactionStore != null) {
083                    rc = transactionStore.proxy(rc);
084                }
085                topics.put(destination, rc);
086            }
087            return rc;
088        }
089    
090        /**
091         * Cleanup method to remove any state associated with the given destination
092         *
093         * @param destination Destination to forget
094         */
095        public void removeQueueMessageStore(ActiveMQQueue destination) {
096            queues.remove(destination);
097        }
098    
099        /**
100         * Cleanup method to remove any state associated with the given destination
101         *
102         * @param destination Destination to forget
103         */
104        public void removeTopicMessageStore(ActiveMQTopic destination) {
105            topics.remove(destination);
106        }
107    
108        public TransactionStore createTransactionStore() throws IOException {
109            if (transactionStore == null) {
110                transactionStore = new MemoryTransactionStore(this);
111            }
112            return transactionStore;
113        }
114    
115        public void beginTransaction(ConnectionContext context) {
116        }
117    
118        public void commitTransaction(ConnectionContext context) {
119        }
120    
121        public void rollbackTransaction(ConnectionContext context) {
122        }
123    
124        public void start() throws Exception {
125        }
126    
127        public void stop() throws Exception {
128        }
129    
130        public long getLastMessageBrokerSequenceId() throws IOException {
131            return 0;
132        }
133    
134        public void deleteAllMessages() throws IOException {
135            for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
136                MemoryMessageStore store = asMemoryMessageStore(iter.next());
137                if (store != null) {
138                    store.delete();
139                }
140            }
141            for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
142                MemoryMessageStore store = asMemoryMessageStore(iter.next());
143                if (store != null) {
144                    store.delete();
145                }
146            }
147    
148            if (transactionStore != null) {
149                transactionStore.delete();
150            }
151        }
152    
153        public boolean isUseExternalMessageReferences() {
154            return useExternalMessageReferences;
155        }
156    
157        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
158            this.useExternalMessageReferences = useExternalMessageReferences;
159        }
160    
161        protected MemoryMessageStore asMemoryMessageStore(Object value) {
162            if (value instanceof MemoryMessageStore) {
163                return (MemoryMessageStore)value;
164            }
165            if (value instanceof ProxyMessageStore) {
166                MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
167                if (delegate instanceof MemoryMessageStore) {
168                    return (MemoryMessageStore) delegate;
169                }
170            }
171            LOG.warn("Expected an instance of MemoryMessageStore but was: " + value);
172            return null;
173        }
174    
175        /**
176         * @param usageManager The UsageManager that is controlling the broker's
177         *                memory usage.
178         */
179        public void setUsageManager(SystemUsage usageManager) {
180        }
181    
182        public String toString() {
183            return "MemoryPersistenceAdapter";
184        }
185    
186        public void setBrokerName(String brokerName) {
187        }
188    
189        public void setDirectory(File dir) {
190        }
191    
192        public void checkpoint(boolean sync) throws IOException {
193        }
194        
195        public long size(){
196            return 0;
197        }
198        
199        public void setCreateTransactionStore(boolean create) throws IOException {
200            if (create) {
201                createTransactionStore();
202            }
203        }
204    }