001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.activemq.broker.BrokerService;
028    import org.apache.activemq.broker.BrokerServiceAware;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQQueue;
032    import org.apache.activemq.command.ActiveMQTopic;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.kaha.CommandMarshaller;
036    import org.apache.activemq.kaha.ContainerId;
037    import org.apache.activemq.kaha.ListContainer;
038    import org.apache.activemq.kaha.MapContainer;
039    import org.apache.activemq.kaha.Marshaller;
040    import org.apache.activemq.kaha.MessageIdMarshaller;
041    import org.apache.activemq.kaha.MessageMarshaller;
042    import org.apache.activemq.kaha.Store;
043    import org.apache.activemq.kaha.StoreFactory;
044    import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
045    import org.apache.activemq.openwire.OpenWireFormat;
046    import org.apache.activemq.store.MessageStore;
047    import org.apache.activemq.store.PersistenceAdapter;
048    import org.apache.activemq.store.TopicMessageStore;
049    import org.apache.activemq.store.TransactionStore;
050    import org.apache.activemq.usage.SystemUsage;
051    import org.apache.activemq.util.IOHelper;
052    import org.apache.commons.logging.Log;
053    import org.apache.commons.logging.LogFactory;
054    
055    /**
056     * @org.apache.xbean.XBean
057     * @version $Revision: 1.4 $
058     */
059    public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
060    
061        private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
062        private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
063        private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
064    
065        protected OpenWireFormat wireFormat = new OpenWireFormat();
066        protected KahaTransactionStore transactionStore;
067        protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
068        protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
069        protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
070    
071        private long maxDataFileLength = 32 * 1024 * 1024;
072        private File directory;
073        private String brokerName;
074        private Store theStore;
075        private boolean initialized;
076        private final AtomicLong storeSize;
077        private boolean persistentIndex = true;
078        private BrokerService brokerService;
079    
080        
081        public KahaPersistenceAdapter(AtomicLong size) {
082            this.storeSize=size;
083        }
084        
085        public KahaPersistenceAdapter() {
086            this(new AtomicLong());
087        }
088        
089        public Set<ActiveMQDestination> getDestinations() {
090            Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
091            try {
092                Store store = getStore();
093                for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
094                    ContainerId id = (ContainerId)i.next();
095                    Object obj = id.getKey();
096                    if (obj instanceof ActiveMQDestination) {
097                        rc.add((ActiveMQDestination)obj);
098                    }
099                }
100            } catch (IOException e) {
101                LOG.error("Failed to get destinations ", e);
102            }
103            return rc;
104        }
105    
106        public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
107            MessageStore rc = queues.get(destination);
108            if (rc == null) {
109                rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
110                messageStores.put(destination, rc);
111                if (transactionStore != null) {
112                    rc = transactionStore.proxy(rc);
113                }
114                queues.put(destination, rc);
115            }
116            return rc;
117        }
118    
119        public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
120            throws IOException {
121            TopicMessageStore rc = topics.get(destination);
122            if (rc == null) {
123                Store store = getStore();
124                MapContainer messageContainer = getMapContainer(destination, "topic-data");
125                MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
126                                                                 "topic-subs");
127                ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
128                                                                                 "topic-acks");
129                ackContainer.setMarshaller(new TopicSubAckMarshaller());
130                rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
131                messageStores.put(destination, rc);
132                if (transactionStore != null) {
133                    rc = transactionStore.proxy(rc);
134                }
135                topics.put(destination, rc);
136            }
137            return rc;
138        }
139    
140        /**
141         * Cleanup method to remove any state associated with the given destination
142         *
143         * @param destination Destination to forget
144         */
145        public void removeQueueMessageStore(ActiveMQQueue destination) {
146            queues.remove(destination);
147            try{
148                    if(theStore!=null){
149                            theStore.deleteMapContainer(destination,"queue-data");
150                    }
151            }catch(IOException e ){
152                    LOG.error("Failed to remove store map container for queue:"+destination, e);
153            }
154        }
155    
156        /**
157         * Cleanup method to remove any state associated with the given destination
158         *
159         * @param destination Destination to forget
160         */
161        public void removeTopicMessageStore(ActiveMQTopic destination) {
162            topics.remove(destination);
163        }
164    
165        protected MessageStore retrieveMessageStore(Object id) {
166            MessageStore result = messageStores.get(id);
167            return result;
168        }
169    
170        public TransactionStore createTransactionStore() throws IOException {
171            if (transactionStore == null) {
172                while (true) {
173                    try {
174                        Store store = getStore();
175                        MapContainer container = store
176                            .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
177                        container.setKeyMarshaller(new CommandMarshaller(wireFormat));
178                        container.setValueMarshaller(new TransactionMarshaller(wireFormat));
179                        container.load();
180                        transactionStore = new KahaTransactionStore(this, container);
181                        transactionStore.setBrokerService(brokerService);
182                        break;
183                    } catch (StoreLockedExcpetion e) {
184                        LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
185                                 + " seconds for the Store to be unlocked.");
186                        try {
187                            Thread.sleep(STORE_LOCKED_WAIT_DELAY);
188                        } catch (InterruptedException e1) {
189                        }
190                    }
191                }
192            }
193            return transactionStore;
194        }
195    
196        public void beginTransaction(ConnectionContext context) {
197        }
198    
199        public void commitTransaction(ConnectionContext context) throws IOException {
200            if (theStore != null) {
201                theStore.force();
202            }
203        }
204    
205        public void rollbackTransaction(ConnectionContext context) {
206        }
207    
208        public void start() throws Exception {
209            initialize();
210        }
211    
212        public void stop() throws Exception {
213            if (theStore != null) {
214                theStore.close();
215            }
216        }
217    
218        public long getLastMessageBrokerSequenceId() throws IOException {
219            return 0;
220        }
221    
222        public void deleteAllMessages() throws IOException {
223            if (theStore != null) {
224                if (theStore.isInitialized()) {
225                    theStore.clear();
226                } else {
227                    theStore.delete();
228                }
229            } else {
230                StoreFactory.delete(getStoreDirectory());
231            }
232        }
233    
234        protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
235            throws IOException {
236            Store store = getStore();
237            MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
238            container.setKeyMarshaller(new MessageIdMarshaller());
239            container.setValueMarshaller(new MessageMarshaller(wireFormat));
240            container.load();
241            return container;
242        }
243    
244        protected MapContainer getSubsMapContainer(Object id, String containerName)
245            throws IOException {
246            Store store = getStore();
247            MapContainer container = store.getMapContainer(id, containerName);
248            container.setKeyMarshaller(Store.STRING_MARSHALLER);
249            container.setValueMarshaller(createMessageMarshaller());
250            container.load();
251            return container;
252        }
253    
254        protected Marshaller<Object> createMessageMarshaller() {
255            return new CommandMarshaller(wireFormat);
256        }
257    
258        protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
259            Store store = getStore();
260            ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
261            container.setMarshaller(createMessageMarshaller());
262            container.load();
263            return container;
264        }
265    
266        /**
267         * @param usageManager The UsageManager that is controlling the broker's
268         *                memory usage.
269         */
270        public void setUsageManager(SystemUsage usageManager) {
271        }
272    
273        /**
274         * @return the maxDataFileLength
275         */
276        public long getMaxDataFileLength() {
277            return maxDataFileLength;
278        }
279        
280        public boolean isPersistentIndex() {
281                    return persistentIndex;
282            }
283    
284            public void setPersistentIndex(boolean persistentIndex) {
285                    this.persistentIndex = persistentIndex;
286            }
287    
288        /**
289         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
290         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
291         */
292        public void setMaxDataFileLength(long maxDataFileLength) {
293            this.maxDataFileLength = maxDataFileLength;
294        }
295    
296        protected final synchronized Store getStore() throws IOException {
297            if (theStore == null) {
298                theStore = createStore();
299            }
300            return theStore;
301        }
302        
303        protected final Store createStore() throws IOException {
304            Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
305            result.setMaxDataFileLength(maxDataFileLength);
306            result.setPersistentIndex(isPersistentIndex());
307            result.setDefaultContainerName("container-roots");
308            return result;
309        }
310    
311        private String getStoreName() {
312            initialize();
313            return directory.getAbsolutePath();
314        }
315    
316        private File getStoreDirectory() {
317            initialize();
318            return directory;
319        }
320    
321        public String toString() {
322            return "KahaPersistenceAdapter(" + getStoreName() + ")";
323        }
324    
325        public void setBrokerName(String brokerName) {
326            this.brokerName = brokerName;
327        }
328    
329        public String getBrokerName() {
330            return brokerName;
331        }
332    
333        public File getDirectory() {
334            return this.directory;
335        }
336    
337        public void setDirectory(File directory) {
338            this.directory = directory;
339        }
340    
341        public void checkpoint(boolean sync) throws IOException {
342            if (sync) {
343                getStore().force();
344            }
345        }
346       
347        public long size(){
348           return storeSize.get();
349        }
350    
351        private void initialize() {
352            if (!initialized) {
353                initialized = true;
354                if (this.directory == null) {
355                    File file = new File(IOHelper.getDefaultDataDirectory());
356                    file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
357                    setDirectory(file);
358                }
359                try {
360                    IOHelper.mkdirs(this.directory);
361                } catch (IOException e) {
362                    throw new RuntimeException(e);
363                }
364                wireFormat.setCacheEnabled(false);
365                wireFormat.setTightEncodingEnabled(true);
366            }
367        }
368    
369            public void setBrokerService(BrokerService brokerService) {
370                    this.brokerService = brokerService;
371            }
372      
373    
374    }