001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.HashSet; 021 import java.util.Set; 022 import java.util.concurrent.locks.Lock; 023 import java.util.concurrent.locks.ReentrantLock; 024 025 import org.apache.activemq.ActiveMQMessageAudit; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.command.MessageAck; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.kaha.MapContainer; 032 import org.apache.activemq.kaha.StoreEntry; 033 import org.apache.activemq.store.AbstractMessageStore; 034 import org.apache.activemq.store.MessageRecoveryListener; 035 import org.apache.activemq.store.ReferenceStore; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * @author rajdavies 041 * 042 */ 043 public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore { 044 045 private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class); 046 protected final MapContainer<MessageId, ReferenceRecord> messageContainer; 047 protected KahaReferenceStoreAdapter adapter; 048 // keep track of dispatched messages so that duplicate sends that follow a successful 049 // dispatch can be suppressed. 050 protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit(); 051 private StoreEntry batchEntry; 052 private String lastBatchId; 053 protected final Lock lock = new ReentrantLock(); 054 055 public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container, 056 ActiveMQDestination destination) throws IOException { 057 super(destination); 058 this.adapter = adapter; 059 this.messageContainer = container; 060 } 061 062 public Lock getStoreLock() { 063 return lock; 064 } 065 066 public void dispose(ConnectionContext context) { 067 super.dispose(context); 068 this.messageContainer.delete(); 069 this.adapter.removeReferenceStore(this); 070 } 071 072 protected MessageId getMessageId(Object object) { 073 return new MessageId(((ReferenceRecord)object).getMessageId()); 074 } 075 076 public void addMessage(ConnectionContext context, Message message) throws IOException { 077 throw new RuntimeException("Use addMessageReference instead"); 078 } 079 080 public Message getMessage(MessageId identity) throws IOException { 081 throw new RuntimeException("Use addMessageReference instead"); 082 } 083 084 protected final boolean recoverReference(MessageRecoveryListener listener, 085 ReferenceRecord record) throws Exception { 086 MessageId id = new MessageId(record.getMessageId()); 087 if (listener.hasSpace()) { 088 return listener.recoverMessageReference(id); 089 } 090 return false; 091 } 092 093 public void recover(MessageRecoveryListener listener) throws Exception { 094 lock.lock(); 095 try { 096 for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer 097 .getNext(entry)) { 098 ReferenceRecord record = messageContainer.getValue(entry); 099 if (!recoverReference(listener, record)) { 100 break; 101 } 102 } 103 }finally { 104 lock.unlock(); 105 } 106 } 107 108 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) 109 throws Exception { 110 lock.lock(); 111 try { 112 StoreEntry entry = batchEntry; 113 if (entry == null) { 114 entry = messageContainer.getFirst(); 115 } else { 116 entry = messageContainer.refresh(entry); 117 if (entry != null) { 118 entry = messageContainer.getNext(entry); 119 } 120 } 121 if (entry != null) { 122 int count = 0; 123 do { 124 ReferenceRecord msg = messageContainer.getValue(entry); 125 if (msg != null ) { 126 if (recoverReference(listener, msg)) { 127 count++; 128 lastBatchId = msg.getMessageId(); 129 } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) { 130 if (LOG.isDebugEnabled()) { 131 LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId()); 132 } 133 // give usage limits a chance to reclaim 134 break; 135 } else { 136 // skip duplicate and continue 137 if (LOG.isDebugEnabled()) { 138 LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId()); 139 } 140 } 141 } else { 142 lastBatchId = null; 143 } 144 batchEntry = entry; 145 entry = messageContainer.getNext(entry); 146 } while (entry != null && count < maxReturned && listener.hasSpace()); 147 } 148 }finally { 149 lock.unlock(); 150 } 151 } 152 153 public boolean addMessageReference(ConnectionContext context, MessageId messageId, 154 ReferenceData data) throws IOException { 155 156 boolean uniqueueReferenceAdded = false; 157 lock.lock(); 158 try { 159 if (!isDuplicate(messageId)) { 160 ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); 161 messageContainer.put(messageId, record); 162 uniqueueReferenceAdded = true; 163 addInterest(record); 164 if (LOG.isDebugEnabled()) { 165 LOG.debug(destination.getPhysicalName() + " add: " + messageId); 166 } 167 } 168 } finally { 169 lock.unlock(); 170 } 171 return uniqueueReferenceAdded; 172 } 173 174 protected boolean isDuplicate(final MessageId messageId) { 175 boolean duplicate = messageContainer.containsKey(messageId); 176 if (!duplicate) { 177 duplicate = dispatchAudit.isDuplicate(messageId); 178 if (duplicate) { 179 if (LOG.isDebugEnabled()) { 180 LOG.debug(destination.getPhysicalName() 181 + " ignoring duplicated (add) message reference, already dispatched: " 182 + messageId); 183 } 184 } 185 } else if (LOG.isDebugEnabled()) { 186 LOG.debug(destination.getPhysicalName() 187 + " ignoring duplicated (add) message reference, already in store: " + messageId); 188 } 189 return duplicate; 190 } 191 192 public ReferenceData getMessageReference(MessageId identity) throws IOException { 193 lock.lock(); 194 try { 195 ReferenceRecord result = messageContainer.get(identity); 196 if (result == null) { 197 return null; 198 } 199 return result.getData(); 200 }finally { 201 lock.unlock(); 202 } 203 } 204 205 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 206 removeMessage(ack.getLastMessageId()); 207 } 208 209 public void removeMessage(MessageId msgId) throws IOException { 210 lock.lock(); 211 try { 212 StoreEntry entry = messageContainer.getEntry(msgId); 213 if (entry != null) { 214 ReferenceRecord rr = messageContainer.remove(msgId); 215 if (rr != null) { 216 removeInterest(rr); 217 dispatchAudit.isDuplicate(msgId); 218 if (LOG.isDebugEnabled()) { 219 LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId); 220 } 221 if (messageContainer.isEmpty() 222 || (lastBatchId != null && lastBatchId.equals(msgId.toString())) 223 || (batchEntry != null && batchEntry.equals(entry))) { 224 resetBatching(); 225 } 226 } 227 } 228 }finally { 229 lock.unlock(); 230 } 231 } 232 233 public void removeAllMessages(ConnectionContext context) throws IOException { 234 lock.lock(); 235 try { 236 Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet()); 237 for (MessageId id:tmpSet) { 238 removeMessage(id); 239 } 240 resetBatching(); 241 messageContainer.clear(); 242 }finally { 243 lock.unlock(); 244 } 245 } 246 247 public void delete() { 248 lock.lock(); 249 try { 250 messageContainer.clear(); 251 }finally { 252 lock.unlock(); 253 } 254 } 255 256 public void resetBatching() { 257 lock.lock(); 258 try { 259 batchEntry = null; 260 lastBatchId = null; 261 }finally { 262 lock.unlock(); 263 } 264 } 265 266 public int getMessageCount() { 267 return messageContainer.size(); 268 } 269 270 public boolean isSupportForCursors() { 271 return true; 272 } 273 274 public boolean supportsExternalBatchControl() { 275 return true; 276 } 277 278 void removeInterest(ReferenceRecord rr) { 279 adapter.removeInterestInRecordFile(rr.getData().getFileId()); 280 } 281 282 void addInterest(ReferenceRecord rr) { 283 adapter.addInterestInRecordFile(rr.getData().getFileId()); 284 } 285 286 /** 287 * @param startAfter 288 * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId) 289 */ 290 public void setBatch(MessageId startAfter) { 291 lock.lock(); 292 try { 293 batchEntry = messageContainer.getEntry(startAfter); 294 if (LOG.isDebugEnabled()) { 295 LOG.debug("setBatch: " + startAfter); 296 } 297 } finally { 298 lock.unlock(); 299 } 300 } 301 }