001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.util.HashSet; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.Set; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.advisory.AdvisorySupport; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.broker.region.policy.PolicyEntry; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ConnectionId; 033 import org.apache.activemq.command.ConsumerId; 034 import org.apache.activemq.command.ConsumerInfo; 035 import org.apache.activemq.command.RemoveSubscriptionInfo; 036 import org.apache.activemq.command.SessionId; 037 import org.apache.activemq.command.SubscriptionInfo; 038 import org.apache.activemq.store.TopicMessageStore; 039 import org.apache.activemq.thread.TaskRunnerFactory; 040 import org.apache.activemq.usage.SystemUsage; 041 import org.apache.activemq.util.LongSequenceGenerator; 042 import org.apache.activemq.util.SubscriptionKey; 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 046 /** 047 * @version $Revision: 1.12 $ 048 */ 049 public class TopicRegion extends AbstractRegion { 050 private static final Log LOG = LogFactory.getLog(TopicRegion.class); 051 protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 052 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 053 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 054 private boolean keepDurableSubsActive; 055 056 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 057 DestinationFactory destinationFactory) { 058 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 059 060 } 061 062 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 063 if (info.isDurable()) { 064 ActiveMQDestination destination = info.getDestination(); 065 if (!destination.isPattern()) { 066 // Make sure the destination is created. 067 lookup(context, destination); 068 } 069 String clientId = context.getClientId(); 070 String subscriptionName = info.getSubscriptionName(); 071 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 072 DurableTopicSubscription sub = durableSubscriptions.get(key); 073 if (sub != null) { 074 if (sub.isActive()) { 075 throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName); 076 } 077 // Has the selector changed?? 078 if (hasDurableSubChanged(info, sub.getConsumerInfo())) { 079 // Remove the consumer first then add it. 080 durableSubscriptions.remove(key); 081 synchronized (destinationsMutex) { 082 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 083 Destination dest = iter.next(); 084 //Account for virtual destinations 085 if (dest instanceof Topic){ 086 Topic topic = (Topic)dest; 087 topic.deleteSubscription(context, key); 088 } 089 } 090 } 091 super.removeConsumer(context, sub.getConsumerInfo()); 092 super.addConsumer(context, info); 093 sub = durableSubscriptions.get(key); 094 } else { 095 // Change the consumer id key of the durable sub. 096 if (sub.getConsumerInfo().getConsumerId() != null) { 097 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 098 } 099 subscriptions.put(info.getConsumerId(), sub); 100 } 101 } else { 102 super.addConsumer(context, info); 103 sub = durableSubscriptions.get(key); 104 if (sub == null) { 105 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId() 106 + " subscriberName: " + key.getSubscriptionName()); 107 } 108 } 109 sub.activate(usageManager, context, info); 110 return sub; 111 } else { 112 return super.addConsumer(context, info); 113 } 114 } 115 116 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 117 if (info.isDurable()) { 118 119 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 120 DurableTopicSubscription sub = durableSubscriptions.get(key); 121 if (sub != null) { 122 sub.deactivate(keepDurableSubsActive); 123 } 124 125 } else { 126 super.removeConsumer(context, info); 127 } 128 } 129 130 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 131 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); 132 DurableTopicSubscription sub = durableSubscriptions.get(key); 133 if (sub == null) { 134 throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName()); 135 } 136 if (sub.isActive()) { 137 throw new JMSException("Durable consumer is in use"); 138 } 139 140 durableSubscriptions.remove(key); 141 synchronized (destinationsMutex) { 142 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 143 Destination dest = iter.next(); 144 //Account for virtual destinations 145 if (dest instanceof Topic){ 146 Topic topic = (Topic)dest; 147 topic.deleteSubscription(context, key); 148 } 149 } 150 } 151 super.removeConsumer(context, sub.getConsumerInfo()); 152 } 153 154 public String toString() { 155 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 156 } 157 158 @Override 159 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 160 161 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest); 162 Set<Subscription> dupChecker = new HashSet<Subscription>(rc); 163 164 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); 165 // Eagerly recover the durable subscriptions 166 if (store != null) { 167 SubscriptionInfo[] infos = store.getAllSubscriptions(); 168 for (int i = 0; i < infos.length; i++) { 169 170 SubscriptionInfo info = infos[i]; 171 LOG.debug("Restoring durable subscription: " + infos); 172 SubscriptionKey key = new SubscriptionKey(info); 173 174 // A single durable sub may be subscribing to multiple topics. 175 // so it might exist already. 176 DurableTopicSubscription sub = durableSubscriptions.get(key); 177 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 178 if (sub == null) { 179 ConnectionContext c = new ConnectionContext(); 180 c.setBroker(context.getBroker()); 181 c.setClientId(key.getClientId()); 182 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 183 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); 184 } 185 186 if (dupChecker.contains(sub)) { 187 continue; 188 } 189 190 dupChecker.add(sub); 191 rc.add(sub); 192 dest.addSubscription(context, sub); 193 } 194 195 // Now perhaps there other durable subscriptions (via wild card) 196 // that would match this destination.. 197 durableSubscriptions.values(); 198 for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) { 199 DurableTopicSubscription sub = iterator.next(); 200 // Skip over subscriptions that we allready added.. 201 if (dupChecker.contains(sub)) { 202 continue; 203 } 204 205 if (sub.matches(dest.getActiveMQDestination())) { 206 rc.add(sub); 207 dest.addSubscription(context, sub); 208 } 209 } 210 } 211 212 return rc; 213 } 214 215 private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 216 ConsumerInfo rc = new ConsumerInfo(); 217 rc.setSelector(info.getSelector()); 218 rc.setSubscriptionName(info.getSubscriptionName()); 219 rc.setDestination(info.getSubscribedDestination()); 220 rc.setConsumerId(createConsumerId()); 221 return rc; 222 } 223 224 private ConsumerId createConsumerId() { 225 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId()); 226 } 227 228 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 229 if (broker.getDestinationPolicy() != null) { 230 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 231 if (entry != null) { 232 entry.configure(topic); 233 } 234 } 235 } 236 237 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { 238 ActiveMQDestination destination = info.getDestination(); 239 240 if (info.isDurable()) { 241 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 242 throw new JMSException("Cannot create a durable subscription for an advisory Topic"); 243 } 244 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 245 DurableTopicSubscription sub = durableSubscriptions.get(key); 246 247 if (sub == null) { 248 249 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); 250 if (destination != null && broker.getDestinationPolicy() != null) { 251 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 252 if (entry != null) { 253 entry.configure(broker, usageManager, sub); 254 } 255 } 256 durableSubscriptions.put(key, sub); 257 } else { 258 throw new JMSException("That durable subscription is already active."); 259 } 260 return sub; 261 } 262 try { 263 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); 264 // lets configure the subscription depending on the destination 265 if (destination != null && broker.getDestinationPolicy() != null) { 266 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 267 if (entry != null) { 268 entry.configure(broker, usageManager, answer); 269 } 270 } 271 answer.init(); 272 return answer; 273 } catch (Exception e) { 274 LOG.error("Failed to create TopicSubscription ", e); 275 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); 276 jmsEx.setLinkedException(e); 277 throw jmsEx; 278 } 279 } 280 281 /** 282 */ 283 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { 284 if (info1.getSelector() != null ^ info2.getSelector() != null) { 285 return true; 286 } 287 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 288 return true; 289 } 290 return !info1.getDestination().equals(info2.getDestination()); 291 } 292 293 protected Set<ActiveMQDestination> getInactiveDestinations() { 294 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 295 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 296 ActiveMQDestination dest = iter.next(); 297 if (!dest.isTopic()) { 298 iter.remove(); 299 } 300 } 301 return inactiveDestinations; 302 } 303 304 public boolean isKeepDurableSubsActive() { 305 return keepDurableSubsActive; 306 } 307 308 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 309 this.keepDurableSubsActive = keepDurableSubsActive; 310 } 311 312 }