001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.HashSet;
021    import java.util.Set;
022    import java.util.concurrent.locks.Lock;
023    import java.util.concurrent.locks.ReentrantLock;
024    
025    import org.apache.activemq.ActiveMQMessageAudit;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.kaha.MapContainer;
032    import org.apache.activemq.kaha.StoreEntry;
033    import org.apache.activemq.store.AbstractMessageStore;
034    import org.apache.activemq.store.MessageRecoveryListener;
035    import org.apache.activemq.store.ReferenceStore;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * @author rajdavies
041     *
042     */
043    public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
044    
045        private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
046        protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
047        protected KahaReferenceStoreAdapter adapter;
048        // keep track of dispatched messages so that duplicate sends that follow a successful
049        // dispatch can be suppressed.
050        protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
051        private StoreEntry batchEntry;
052        private String lastBatchId;
053        protected final Lock lock = new ReentrantLock();
054    
055        public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
056                                  ActiveMQDestination destination) throws IOException {
057            super(destination);
058            this.adapter = adapter;
059            this.messageContainer = container;
060        }
061        
062        public Lock getStoreLock() {
063            return lock;
064        }
065    
066        public void dispose(ConnectionContext context) {
067            super.dispose(context);
068            this.messageContainer.delete();
069            this.adapter.removeReferenceStore(this);
070        }
071    
072        protected MessageId getMessageId(Object object) {
073            return new MessageId(((ReferenceRecord)object).getMessageId());
074        }
075    
076        public void addMessage(ConnectionContext context, Message message) throws IOException {
077            throw new RuntimeException("Use addMessageReference instead");
078        }
079    
080        public Message getMessage(MessageId identity) throws IOException {
081            throw new RuntimeException("Use addMessageReference instead");
082        }
083    
084        protected final boolean recoverReference(MessageRecoveryListener listener,
085                ReferenceRecord record) throws Exception {
086            MessageId id = new MessageId(record.getMessageId());
087            if (listener.hasSpace()) {
088                return listener.recoverMessageReference(id);
089            }
090            return false;
091        }
092    
093        public void recover(MessageRecoveryListener listener) throws Exception {
094            lock.lock();
095            try {
096                for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
097                    .getNext(entry)) {
098                    ReferenceRecord record = messageContainer.getValue(entry);
099                    if (!recoverReference(listener, record)) {
100                        break;
101                    }
102                }
103            }finally {
104                lock.unlock();
105            }
106        }
107    
108        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
109            throws Exception {
110            lock.lock();
111            try {
112                StoreEntry entry = batchEntry;
113                if (entry == null) {
114                    entry = messageContainer.getFirst();
115                } else {
116                    entry = messageContainer.refresh(entry);
117                    if (entry != null) {
118                        entry = messageContainer.getNext(entry);
119                    }
120                }
121                if (entry != null) {      
122                    int count = 0;
123                    do {
124                        ReferenceRecord msg = messageContainer.getValue(entry);
125                        if (msg != null ) {
126                            if (recoverReference(listener, msg)) {
127                                count++;
128                                lastBatchId = msg.getMessageId();
129                            } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
130                                if (LOG.isDebugEnabled()) {
131                                    LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
132                                }
133                                // give usage limits a chance to reclaim
134                                break;
135                            } else {
136                                // skip duplicate and continue
137                                if (LOG.isDebugEnabled()) {
138                                    LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
139                                }
140                            }                        
141                        } else {
142                            lastBatchId = null;
143                        }
144                        batchEntry = entry;
145                        entry = messageContainer.getNext(entry);
146                    } while (entry != null && count < maxReturned && listener.hasSpace());
147                }
148            }finally {
149                lock.unlock();
150            }
151        }
152    
153        public boolean addMessageReference(ConnectionContext context, MessageId messageId,
154                                                     ReferenceData data) throws IOException {
155            
156            boolean uniqueueReferenceAdded = false;
157            lock.lock();
158            try {
159                if (!isDuplicate(messageId)) {
160                    ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
161                    messageContainer.put(messageId, record);
162                    uniqueueReferenceAdded = true;
163                    addInterest(record);
164                    if (LOG.isDebugEnabled()) {
165                        LOG.debug(destination.getPhysicalName() + " add: " + messageId);
166                    }
167                }
168            } finally {
169                lock.unlock();
170            }
171            return uniqueueReferenceAdded;
172        }
173    
174        protected boolean isDuplicate(final MessageId messageId) {
175            boolean duplicate = messageContainer.containsKey(messageId);
176            if (!duplicate) {
177                duplicate = dispatchAudit.isDuplicate(messageId);
178                if (duplicate) {
179                    if (LOG.isDebugEnabled()) {
180                        LOG.debug(destination.getPhysicalName()
181                            + " ignoring duplicated (add) message reference, already dispatched: "
182                            + messageId);
183                    }
184                }
185            } else if (LOG.isDebugEnabled()) {
186                LOG.debug(destination.getPhysicalName()
187                        + " ignoring duplicated (add) message reference, already in store: " + messageId);
188            }
189            return duplicate;
190        }
191        
192        public ReferenceData getMessageReference(MessageId identity) throws IOException {
193            lock.lock();
194            try {
195                ReferenceRecord result = messageContainer.get(identity);
196                if (result == null) {
197                    return null;
198                }
199                return result.getData();
200            }finally {
201                lock.unlock();
202            }
203        }
204    
205        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
206            removeMessage(ack.getLastMessageId());
207        }
208    
209        public void removeMessage(MessageId msgId) throws IOException {  
210            lock.lock();
211            try {
212                StoreEntry entry = messageContainer.getEntry(msgId);
213                if (entry != null) {
214                    ReferenceRecord rr = messageContainer.remove(msgId);
215                    if (rr != null) {
216                        removeInterest(rr);
217                        dispatchAudit.isDuplicate(msgId);
218                        if (LOG.isDebugEnabled()) {
219                            LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
220                        }
221                        if (messageContainer.isEmpty()
222                            || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
223                            || (batchEntry != null && batchEntry.equals(entry))) {
224                            resetBatching();
225                        }
226                    }
227                }
228            }finally {
229                lock.unlock();
230            }
231        }
232    
233        public void removeAllMessages(ConnectionContext context) throws IOException {
234            lock.lock();
235            try {
236                Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
237                for (MessageId id:tmpSet) {
238                    removeMessage(id);
239                }
240                resetBatching();
241                messageContainer.clear();
242            }finally {
243                lock.unlock();
244            }
245        }
246    
247        public void delete() {
248            lock.lock();
249            try {
250                messageContainer.clear();
251            }finally {
252                lock.unlock();
253            }
254        }
255    
256        public void resetBatching() {
257            lock.lock();
258            try {
259                batchEntry = null;
260                lastBatchId = null;
261            }finally {
262                lock.unlock();
263            }
264        }
265    
266        public int getMessageCount() {
267            return messageContainer.size();
268        }
269    
270        public boolean isSupportForCursors() {
271            return true;
272        }
273    
274        public boolean supportsExternalBatchControl() {
275            return true;
276        }
277    
278        void removeInterest(ReferenceRecord rr) {
279            adapter.removeInterestInRecordFile(rr.getData().getFileId());
280        }
281    
282        void addInterest(ReferenceRecord rr) {
283            adapter.addInterestInRecordFile(rr.getData().getFileId());
284        }
285    
286        /**
287         * @param startAfter
288         * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
289         */
290        public void setBatch(MessageId startAfter) {
291            lock.lock();
292            try {
293                batchEntry = messageContainer.getEntry(startAfter);
294                if (LOG.isDebugEnabled()) {
295                    LOG.debug("setBatch: " + startAfter);
296                }
297            } finally {
298                lock.unlock();
299            }
300        }
301    }