001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.atomic.AtomicLong;
029    
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQQueue;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.SubscriptionInfo;
036    import org.apache.activemq.command.TransactionId;
037    import org.apache.activemq.kaha.CommandMarshaller;
038    import org.apache.activemq.kaha.ListContainer;
039    import org.apache.activemq.kaha.MapContainer;
040    import org.apache.activemq.kaha.MessageIdMarshaller;
041    import org.apache.activemq.kaha.Store;
042    import org.apache.activemq.kaha.StoreFactory;
043    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
044    import org.apache.activemq.store.MessageStore;
045    import org.apache.activemq.store.ReferenceStore;
046    import org.apache.activemq.store.ReferenceStoreAdapter;
047    import org.apache.activemq.store.TopicMessageStore;
048    import org.apache.activemq.store.TopicReferenceStore;
049    import org.apache.activemq.store.amq.AMQTx;
050    import org.apache.activemq.util.IOHelper;
051    import org.apache.commons.logging.Log;
052    import org.apache.commons.logging.LogFactory;
053    
054    public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
055    
056        
057    
058        private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
059        private static final String STORE_STATE = "store-state";
060        private static final String QUEUE_DATA = "queue-data";
061        private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
062        private static final Integer INDEX_VERSION = new Integer(7);
063        private static final String RECORD_REFERENCES = "record-references";
064        private static final String TRANSACTIONS = "transactions-state";
065        private MapContainer stateMap;
066        private MapContainer<TransactionId, AMQTx> preparedTransactions;
067        private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
068        private ListContainer<SubscriptionInfo> durableSubscribers;
069        private boolean storeValid;
070        private Store stateStore;
071        private boolean persistentIndex = true;
072        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
073        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
074        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
075        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
076        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
077       
078    
079        public KahaReferenceStoreAdapter(AtomicLong size){
080            super(size);
081        }
082        
083        public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
084            throw new RuntimeException("Use createQueueReferenceStore instead");
085        }
086    
087        public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
088            throws IOException {
089            throw new RuntimeException("Use createTopicReferenceStore instead");
090        }
091    
092        @Override
093        public synchronized void start() throws Exception {
094            super.start();
095            Store store = getStateStore();        
096            boolean empty = store.getMapContainerIds().isEmpty();
097            stateMap = store.getMapContainer("state", STORE_STATE);
098            stateMap.load();
099            storeValid=true;
100            if (!empty) {
101                AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
102                if (status != null) {
103                    storeValid = status.get();
104                }
105               
106                if (storeValid) {
107                    //check what version the indexes are at
108                    Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
109                    if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) {
110                        storeValid = false;
111                        LOG.warn("Indexes at an older version - need to regenerate");
112                    }
113                }
114                if (storeValid) {
115                    if (stateMap.containsKey(RECORD_REFERENCES)) {
116                        recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
117                    }
118                }
119            }
120            stateMap.put(STORE_STATE, new AtomicBoolean());
121            stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
122            durableSubscribers = store.getListContainer("durableSubscribers");
123            durableSubscribers.setMarshaller(new CommandMarshaller());
124            preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
125            // need to set the Marshallers here
126            preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER);
127            preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
128        }
129    
130        @Override
131        public synchronized void stop() throws Exception {
132            stateMap.put(RECORD_REFERENCES, recordReferences);
133            stateMap.put(STORE_STATE, new AtomicBoolean(true));
134            stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
135            if (this.stateStore != null) {
136                this.stateStore.close();
137                this.stateStore = null;
138                this.stateMap = null;
139            }
140            super.stop();
141        }
142        
143        public void commitTransaction(ConnectionContext context) throws IOException {
144            //we don;t need to force on a commit - as the reference store
145            //is rebuilt on a non clean shutdown
146        }
147    
148        public boolean isStoreValid() {
149            return storeValid;
150        }
151    
152        public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
153            ReferenceStore rc = (ReferenceStore)queues.get(destination);
154            if (rc == null) {
155                rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA),
156                                            destination);
157                messageStores.put(destination, rc);
158                // if(transactionStore!=null){
159                // rc=transactionStore.proxy(rc);
160                // }
161                queues.put(destination, rc);
162            }
163            return rc;
164        }
165    
166        public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
167            TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination);
168            if (rc == null) {
169                Store store = getStore();
170                MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data");
171                MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob");
172                ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks");
173                ackContainer.setMarshaller(new TopicSubAckMarshaller());
174                rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
175                                                 destination);
176                messageStores.put(destination, rc);
177                // if(transactionStore!=null){
178                // rc=transactionStore.proxy(rc);
179                // }
180                topics.put(destination, rc);
181            }
182            return rc;
183        }
184    
185        public void removeReferenceStore(KahaReferenceStore referenceStore) {
186            ActiveMQDestination destination = referenceStore.getDestination();
187            if (destination.isQueue()) {
188                queues.remove(destination);
189                try {
190                    getStore().deleteMapContainer(destination, QUEUE_DATA);
191                } catch (IOException e) {
192                    LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e);
193                }
194            } else {
195                topics.remove(destination);
196            }
197            messageStores.remove(destination);
198        }
199    /*
200        public void buildReferenceFileIdsInUse() throws IOException {
201            recordReferences = new HashMap<Integer, AtomicInteger>();
202            Set<ActiveMQDestination> destinations = getDestinations();
203            for (ActiveMQDestination destination : destinations) {
204                if (destination.isQueue()) {
205                    KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
206                    store.addReferenceFileIdsInUse();
207                } else {
208                    KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
209                    store.addReferenceFileIdsInUse();
210                }
211            }
212        }
213        */
214    
215        protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
216                                                                                    String containerName)
217            throws IOException {
218            Store store = getStore();
219            MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex);
220            container.setIndexBinSize(getIndexBinSize());
221            container.setIndexKeySize(getIndexKeySize());
222            container.setIndexPageSize(getIndexPageSize());
223            container.setIndexMaxBinSize(getIndexMaxBinSize());
224            container.setIndexLoadFactor(getIndexLoadFactor());
225            container.setKeyMarshaller(new MessageIdMarshaller());
226            container.setValueMarshaller(new ReferenceRecordMarshaller());
227            container.load();
228            return container;
229        }
230    
231        synchronized void addInterestInRecordFile(int recordNumber) {
232            Integer key = Integer.valueOf(recordNumber);
233            AtomicInteger rr = recordReferences.get(key);
234            if (rr == null) {
235                rr = new AtomicInteger();
236                recordReferences.put(key, rr);
237            }
238            rr.incrementAndGet();
239        }
240    
241        synchronized void removeInterestInRecordFile(int recordNumber) {
242            Integer key = Integer.valueOf(recordNumber);
243            AtomicInteger rr = recordReferences.get(key);
244            if (rr != null && rr.decrementAndGet() <= 0) {
245                recordReferences.remove(key);
246            }
247        }
248    
249        /**
250         * @return
251         * @throws IOException
252         * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
253         */
254        public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
255            return new HashSet<Integer>(recordReferences.keySet());
256        }
257    
258        /**
259         *
260         * @throws IOException
261         * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
262         */
263        public void clearMessages() throws IOException {
264            //don't delete messages as it will clear state - call base
265            //class method to clear out the data instead
266            super.deleteAllMessages();
267        }
268    
269        /**
270         *
271         * @throws IOException
272         * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
273         */
274    
275        public void recoverState() throws IOException {
276            Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
277            for (SubscriptionInfo info:set) {
278                LOG.info("Recovering subscriber state for durable subscriber: " + info);
279                TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
280                ts.addSubsciption(info, false);
281            }
282        }
283        
284        public void recoverSubscription(SubscriptionInfo info) throws IOException {
285            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
286            LOG.info("Recovering subscriber state for durable subscriber: " + info);
287            ts.addSubsciption(info, false);
288        }
289        
290    
291        public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
292            Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
293            preparedTransactions.load();
294            for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) {
295                TransactionId key = i.next();
296                AMQTx value = preparedTransactions.get(key);
297                result.put(key, value);
298            }
299            return result;
300        }
301    
302        public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException {
303            preparedTransactions.clear();
304            for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) {
305                Map.Entry<TransactionId, AMQTx> entry = iter.next();
306                preparedTransactions.put(entry.getKey(), entry.getValue());
307            }
308        }
309    
310        @Override
311        public synchronized void setDirectory(File directory) {
312            File file = new File(directory, "data");
313            super.setDirectory(file);
314            this.stateStore = createStateStore(directory);
315        }
316    
317        protected synchronized Store getStateStore() throws IOException {
318            if (this.stateStore == null) {
319                File stateDirectory = new File(getDirectory(), "kr-state");
320                IOHelper.mkdirs(stateDirectory);
321                this.stateStore = createStateStore(getDirectory());
322            }
323            return this.stateStore;
324        }
325    
326        public void deleteAllMessages() throws IOException {
327            super.deleteAllMessages();
328            if (stateStore != null) {
329                if (stateStore.isInitialized()) {
330                    stateStore.clear();
331                } else {
332                    stateStore.delete();
333                }
334            } else {
335                File stateDirectory = new File(getDirectory(), "kr-state");
336                StoreFactory.delete(stateDirectory);
337            }
338        }
339    
340        public boolean isPersistentIndex() {
341                    return persistentIndex;
342            }
343    
344            public void setPersistentIndex(boolean persistentIndex) {
345                    this.persistentIndex = persistentIndex;
346            }
347    
348        private Store createStateStore(File directory) {
349            File stateDirectory = new File(directory, "state");
350            try {
351                IOHelper.mkdirs(stateDirectory);
352                return StoreFactory.open(stateDirectory, "rw");
353            } catch (IOException e) {
354                LOG.error("Failed to create the state store", e);
355            }
356            return null;
357        }
358    
359        protected void addSubscriberState(SubscriptionInfo info) throws IOException {
360            durableSubscribers.add(info);
361        }
362    
363        protected void removeSubscriberState(SubscriptionInfo info) {
364            durableSubscribers.remove(info);
365        }
366    
367        public int getIndexBinSize() {
368            return indexBinSize;
369        }
370    
371        public void setIndexBinSize(int indexBinSize) {
372            this.indexBinSize = indexBinSize;
373        }
374    
375        public int getIndexKeySize() {
376            return indexKeySize;
377        }
378    
379        public void setIndexKeySize(int indexKeySize) {
380            this.indexKeySize = indexKeySize;
381        }
382    
383        public int getIndexPageSize() {
384            return indexPageSize;
385        }
386    
387        public void setIndexPageSize(int indexPageSize) {
388            this.indexPageSize = indexPageSize;
389        }
390    
391        public int getIndexMaxBinSize() {
392            return indexMaxBinSize;
393        }
394    
395        public void setIndexMaxBinSize(int maxBinSize) {
396            this.indexMaxBinSize = maxBinSize;
397        }
398    
399        /**
400         * @return the loadFactor
401         */
402        public int getIndexLoadFactor() {
403            return indexLoadFactor;
404        }
405    
406        /**
407         * @param loadFactor the loadFactor to set
408         */
409        public void setIndexLoadFactor(int loadFactor) {
410            this.indexLoadFactor = loadFactor;
411        }
412    }