001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.HashSet; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.Map.Entry; 026 import java.util.concurrent.ConcurrentHashMap; 027 028 import org.apache.activemq.broker.ConnectionContext; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageId; 032 import org.apache.activemq.command.SubscriptionInfo; 033 import org.apache.activemq.kaha.ListContainer; 034 import org.apache.activemq.kaha.MapContainer; 035 import org.apache.activemq.kaha.Marshaller; 036 import org.apache.activemq.kaha.Store; 037 import org.apache.activemq.kaha.StoreEntry; 038 import org.apache.activemq.store.MessageRecoveryListener; 039 import org.apache.activemq.store.TopicReferenceStore; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore { 044 private static final Log LOG = LogFactory.getLog(KahaTopicReferenceStore.class); 045 protected ListContainer<TopicSubAck> ackContainer; 046 protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>(); 047 private MapContainer<String, SubscriptionInfo> subscriberContainer; 048 private Store store; 049 private static final String TOPIC_SUB_NAME = "tsn"; 050 051 public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter, 052 MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer, 053 MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination) 054 throws IOException { 055 super(adapter, messageContainer, destination); 056 this.store = store; 057 this.ackContainer = ackContainer; 058 subscriberContainer = subsContainer; 059 // load all the Ack containers 060 for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) { 061 SubscriptionInfo info = i.next(); 062 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); 063 } 064 } 065 066 public void dispose(ConnectionContext context) { 067 super.dispose(context); 068 subscriberContainer.delete(); 069 } 070 071 protected MessageId getMessageId(Object object) { 072 return new MessageId(((ReferenceRecord)object).getMessageId()); 073 } 074 075 public void addMessage(ConnectionContext context, Message message) throws IOException { 076 throw new RuntimeException("Use addMessageReference instead"); 077 } 078 079 public Message getMessage(MessageId identity) throws IOException { 080 throw new RuntimeException("Use addMessageReference instead"); 081 } 082 083 public boolean addMessageReference(final ConnectionContext context, final MessageId messageId, 084 final ReferenceData data) { 085 boolean uniqueReferenceAdded = false; 086 lock.lock(); 087 try { 088 final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); 089 final int subscriberCount = subscriberMessages.size(); 090 if (subscriberCount > 0 && !isDuplicate(messageId)) { 091 final StoreEntry messageEntry = messageContainer.place(messageId, record); 092 addInterest(record); 093 uniqueReferenceAdded = true; 094 final TopicSubAck tsa = new TopicSubAck(); 095 tsa.setCount(subscriberCount); 096 tsa.setMessageEntry(messageEntry); 097 final StoreEntry ackEntry = ackContainer.placeLast(tsa); 098 for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 099 final TopicSubContainer container = i.next(); 100 final ConsumerMessageRef ref = new ConsumerMessageRef(); 101 ref.setAckEntry(ackEntry); 102 ref.setMessageEntry(messageEntry); 103 ref.setMessageId(messageId); 104 container.add(ref); 105 } 106 if (LOG.isTraceEnabled()) { 107 LOG.trace(destination.getPhysicalName() + " add reference: " + messageId); 108 } 109 } 110 } finally { 111 lock.unlock(); 112 } 113 return uniqueReferenceAdded; 114 } 115 116 public ReferenceData getMessageReference(final MessageId identity) throws IOException { 117 final ReferenceRecord result = messageContainer.get(identity); 118 if (result == null) { 119 return null; 120 } 121 return result.getData(); 122 } 123 124 public void addReferenceFileIdsInUse() { 125 for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) { 126 TopicSubAck subAck = ackContainer.get(entry); 127 if (subAck.getCount() > 0) { 128 ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry()); 129 addInterest(rr); 130 } 131 } 132 } 133 134 135 protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException { 136 String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName)); 137 MapContainer container = store.getMapContainer(containerName,containerName); 138 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER); 139 Marshaller marshaller = new ConsumerMessageRefMarshaller(); 140 container.setValueMarshaller(marshaller); 141 TopicSubContainer tsc = new TopicSubContainer(container); 142 subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc); 143 return container; 144 } 145 146 public boolean acknowledgeReference(ConnectionContext context, 147 String clientId, String subscriptionName, MessageId messageId) 148 throws IOException { 149 boolean removeMessage = false; 150 lock.lock(); 151 try { 152 String key = getSubscriptionKey(clientId, subscriptionName); 153 154 TopicSubContainer container = subscriberMessages.get(key); 155 if (container != null) { 156 ConsumerMessageRef ref = null; 157 if((ref = container.remove(messageId)) != null) { 158 StoreEntry entry = ref.getAckEntry(); 159 //ensure we get up to-date pointers 160 entry = ackContainer.refresh(entry); 161 TopicSubAck tsa = ackContainer.get(entry); 162 if (tsa != null) { 163 if (tsa.decrementCount() <= 0) { 164 ackContainer.remove(entry); 165 ReferenceRecord rr = messageContainer.get(messageId); 166 if (rr != null) { 167 entry = tsa.getMessageEntry(); 168 entry = messageContainer.refresh(entry); 169 messageContainer.remove(entry); 170 removeInterest(rr); 171 removeMessage = true; 172 dispatchAudit.isDuplicate(messageId); 173 } 174 }else { 175 ackContainer.update(entry,tsa); 176 } 177 } 178 if (LOG.isTraceEnabled()) { 179 LOG.trace(destination.getPhysicalName() + " remove: " + messageId); 180 } 181 }else{ 182 if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) { 183 // no message reference held 184 removeMessage = true; 185 if (LOG.isDebugEnabled()) { 186 LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (dup ack): " + messageId); 187 } 188 } 189 } 190 } 191 }finally { 192 lock.unlock(); 193 } 194 return removeMessage; 195 } 196 197 // verify that no subscriber has a reference to this message. In the case where the subscribers 198 // references are persisted but more than the persisted consumers get the message, the ack from the non 199 // persisted consumer would remove the message in error 200 // 201 // see: https://issues.apache.org/activemq/browse/AMQ-2123 202 private boolean isUnreferencedBySubscribers( 203 String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) { 204 boolean isUnreferenced = true; 205 for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) { 206 if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) { 207 TopicSubContainer container = entry.getValue(); 208 for (Iterator i = container.iterator(); i.hasNext();) { 209 ConsumerMessageRef ref = (ConsumerMessageRef) i.next(); 210 if (messageId.equals(ref.getMessageId())) { 211 isUnreferenced = false; 212 break; 213 } 214 } 215 } 216 } 217 return isUnreferenced; 218 } 219 220 public void acknowledge(ConnectionContext context, 221 String clientId, String subscriptionName, MessageId messageId) throws IOException { 222 acknowledgeReference(context, clientId, subscriptionName, messageId); 223 } 224 225 public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 226 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); 227 lock.lock(); 228 try { 229 // if already exists - won't add it again as it causes data files 230 // to hang around 231 if (!subscriberContainer.containsKey(key)) { 232 subscriberContainer.put(key, info); 233 adapter.addSubscriberState(info); 234 } 235 // add the subscriber 236 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); 237 if (retroactive) { 238 /* 239 * for(StoreEntry 240 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ 241 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); 242 * ConsumerMessageRef ref=new ConsumerMessageRef(); 243 * ref.setAckEntry(entry); 244 * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } 245 */ 246 } 247 }finally { 248 lock.unlock(); 249 } 250 } 251 252 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 253 lock.lock(); 254 try { 255 SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 256 if (info != null) { 257 adapter.removeSubscriberState(info); 258 } 259 removeSubscriberMessageContainer(clientId,subscriptionName); 260 }finally { 261 lock.unlock(); 262 } 263 } 264 265 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 266 SubscriptionInfo[] result = subscriberContainer.values() 267 .toArray(new SubscriptionInfo[subscriberContainer.size()]); 268 return result; 269 } 270 271 public int getMessageCount(String clientId, String subscriberName) throws IOException { 272 String key = getSubscriptionKey(clientId, subscriberName); 273 TopicSubContainer container = subscriberMessages.get(key); 274 return container != null ? container.size() : 0; 275 } 276 277 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 278 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); 279 } 280 281 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 282 MessageRecoveryListener listener) throws Exception { 283 String key = getSubscriptionKey(clientId, subscriptionName); 284 lock.lock(); 285 try { 286 TopicSubContainer container = subscriberMessages.get(key); 287 if (container != null) { 288 int count = 0; 289 StoreEntry entry = container.getBatchEntry(); 290 if (entry == null) { 291 entry = container.getEntry(); 292 } else { 293 entry = container.refreshEntry(entry); 294 if (entry != null) { 295 entry = container.getNextEntry(entry); 296 } 297 } 298 299 if (entry != null) { 300 do { 301 ConsumerMessageRef consumerRef = container.get(entry); 302 ReferenceRecord msg = messageContainer.getValue(consumerRef 303 .getMessageEntry()); 304 if (msg != null) { 305 if (recoverReference(listener, msg)) { 306 count++; 307 container.setBatchEntry(msg.getMessageId(), entry); 308 } else { 309 break; 310 } 311 } else { 312 container.reset(); 313 } 314 315 entry = container.getNextEntry(entry); 316 } while (entry != null && count < maxReturned && listener.hasSpace()); 317 } 318 } 319 }finally { 320 lock.unlock(); 321 } 322 } 323 324 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 325 throws Exception { 326 String key = getSubscriptionKey(clientId, subscriptionName); 327 TopicSubContainer container = subscriberMessages.get(key); 328 if (container != null) { 329 for (Iterator i = container.iterator(); i.hasNext();) { 330 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 331 ReferenceRecord msg = messageContainer.get(ref.getMessageEntry()); 332 if (msg != null) { 333 if (!recoverReference(listener, msg)) { 334 break; 335 } 336 } 337 } 338 } 339 } 340 341 public void resetBatching(String clientId, String subscriptionName) { 342 lock.lock(); 343 try { 344 String key = getSubscriptionKey(clientId, subscriptionName); 345 TopicSubContainer topicSubContainer = subscriberMessages.get(key); 346 if (topicSubContainer != null) { 347 topicSubContainer.reset(); 348 } 349 }finally { 350 lock.unlock(); 351 } 352 } 353 354 public void removeAllMessages(ConnectionContext context) throws IOException { 355 lock.lock(); 356 try { 357 Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet()); 358 for (String key:tmpSet) { 359 TopicSubContainer container = subscriberMessages.get(key); 360 if (container != null) { 361 container.clear(); 362 } 363 } 364 ackContainer.clear(); 365 }finally { 366 lock.unlock(); 367 } 368 super.removeAllMessages(context); 369 } 370 371 protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException { 372 String subscriberKey = getSubscriptionKey(clientId, subscriptionName); 373 String containerName = getSubscriptionContainerName(subscriberKey); 374 subscriberContainer.remove(subscriberKey); 375 TopicSubContainer container = subscriberMessages.remove(subscriberKey); 376 if (container != null) { 377 for (Iterator i = container.iterator(); i.hasNext();) { 378 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 379 if (ref != null) { 380 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 381 if (tsa != null) { 382 if (tsa.decrementCount() <= 0) { 383 ackContainer.remove(ref.getAckEntry()); 384 messageContainer.remove(tsa.getMessageEntry()); 385 } else { 386 ackContainer.update(ref.getAckEntry(), tsa); 387 } 388 } 389 } 390 } 391 } 392 store.deleteMapContainer(containerName,containerName); 393 } 394 395 protected String getSubscriptionKey(String clientId, String subscriberName) { 396 StringBuffer buffer = new StringBuffer(); 397 buffer.append(clientId).append(":"); 398 String name = subscriberName != null ? subscriberName : "NOT_SET"; 399 return buffer.append(name).toString(); 400 } 401 402 private String getSubscriptionContainerName(String subscriptionKey) { 403 StringBuffer result = new StringBuffer(TOPIC_SUB_NAME); 404 result.append(destination.getQualifiedName()); 405 result.append(subscriptionKey); 406 return result.toString(); 407 } 408 }