001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 021 import org.activemq.broker.BrokerClient; 022 import org.activemq.filter.DestinationMap; 023 import org.activemq.filter.Filter; 024 import org.activemq.message.ActiveMQDestination; 025 import org.activemq.message.ConsumerInfo; 026 import org.activemq.service.DeadLetterPolicy; 027 import org.activemq.service.Dispatcher; 028 import org.activemq.service.Subscription; 029 import org.activemq.service.SubscriptionContainer; 030 import org.activemq.service.RedeliveryPolicy; 031 032 import java.util.HashSet; 033 import java.util.Iterator; 034 import java.util.Map; 035 import java.util.Set; 036 037 /** 038 * A default RAM only implementation of the {@link SubscriptionContainer} 039 * 040 * @version $Revision: 1.1.1.1 $ 041 */ 042 public class SubscriptionContainerImpl implements SubscriptionContainer { 043 private Map subscriptions; 044 private DestinationMap destinationIndex = new DestinationMap(); 045 private RedeliveryPolicy redeliveryPolicy; 046 private DeadLetterPolicy deadLetterPolicy; 047 048 public SubscriptionContainerImpl(RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 049 this(new ConcurrentHashMap(), redeliveryPolicy,deadLetterPolicy); 050 } 051 052 public SubscriptionContainerImpl(Map subscriptions, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 053 this.subscriptions = subscriptions; 054 this.redeliveryPolicy = redeliveryPolicy; 055 this.deadLetterPolicy = deadLetterPolicy; 056 } 057 058 public String toString() { 059 return super.toString() + "[size:" + subscriptions.size() + "]"; 060 } 061 062 public RedeliveryPolicy getRedeliveryPolicy() { 063 return redeliveryPolicy; 064 } 065 066 public DeadLetterPolicy getDeadLetterPolicy(){ 067 return deadLetterPolicy; 068 } 069 public Subscription getSubscription(String consumerId) { 070 return (Subscription) subscriptions.get(consumerId); 071 } 072 073 public Subscription removeSubscription(String consumerId) { 074 Subscription subscription = (Subscription) subscriptions.remove(consumerId); 075 if (subscription != null) { 076 destinationIndex.remove(subscription.getDestination(), subscription); 077 } 078 return subscription; 079 } 080 081 public Set getSubscriptions(ActiveMQDestination destination) { 082 Object answer = destinationIndex.get(destination); 083 if (answer instanceof Set) { 084 return (Set) answer; 085 } 086 else { 087 Set set = new HashSet(1); 088 set.add(answer); 089 return set; 090 } 091 } 092 093 public Iterator subscriptionIterator() { 094 return subscriptions.values().iterator(); 095 } 096 097 public Subscription makeSubscription(Dispatcher dispatcher,BrokerClient client, ConsumerInfo info, Filter filter) { 098 Subscription subscription = createSubscription(dispatcher, client,info, filter); 099 subscriptions.put(info.getConsumerId(), subscription); 100 destinationIndex.put(subscription.getDestination(), subscription); 101 return subscription; 102 } 103 104 protected Subscription createSubscription(Dispatcher dispatcher, BrokerClient client,ConsumerInfo info, Filter filter) { 105 return new SubscriptionImpl(dispatcher, client,info, filter, getRedeliveryPolicy(),getDeadLetterPolicy()); 106 } 107 }