001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2005 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.service.boundedvm; 021 import java.util.List; 022 023 import javax.jms.JMSException; 024 025 import org.activemq.broker.BrokerClient; 026 import org.activemq.filter.Filter; 027 import org.activemq.io.util.MemoryBoundedQueue; 028 import org.activemq.message.ActiveMQMessage; 029 import org.activemq.message.ConsumerInfo; 030 031 /** 032 * A holder for Durable Queue consumer info and message routing 033 * 034 * @version $Revision: 1.1.1.1 $ 035 */ 036 public class DurableQueueSubscription extends DurableSubscription { 037 038 private MemoryBoundedQueue dispatchedQueue; 039 private MemoryBoundedQueue ackedQueue; // Where messages go that are acked in a transaction 040 041 /** 042 * Construct the DurableQueueSubscription 043 * 044 * @param client 045 * @param dispatchedQueue 046 * @param ackQueue 047 * @param filter 048 * @param info 049 */ 050 public DurableQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, MemoryBoundedQueue ackQueue, Filter filter, 051 ConsumerInfo info) { 052 super(filter, info, client); 053 this.dispatchedQueue = dispatchedQueue; 054 this.ackedQueue = ackQueue; 055 } 056 057 /** 058 * determines if the Subscription is interested in the message 059 * 060 * @param message 061 * @return true if this Subscription will accept the message 062 * @throws JMSException 063 */ 064 public boolean isTarget(ActiveMQMessage message) throws JMSException { 065 boolean result = false; 066 if (message != null) { 067 //make sure we don't loop messages around the cluster 068 if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName) 069 || message.isEntryBroker(brokerName)) { 070 result = filter.matches(message); 071 } 072 } 073 return result; 074 } 075 076 /** 077 * @return true if the consumer has capacity for more messages 078 */ 079 public boolean canAcceptMessages() { 080 return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber(); 081 } 082 083 /** 084 * Dispatch a message to the Consumer 085 * 086 * @param message 087 * @throws JMSException 088 */ 089 public void doDispatch(DurableMessagePointer message) throws JMSException { 090 dispatchedQueue.enqueue(message); 091 ActiveMQMessage msg = message.getMessage().shallowCopy(); 092 msg.setConsumerNos(new int[]{consumerInfo.getConsumerNo()}); 093 client.dispatch(msg); 094 } 095 096 097 /** 098 * Acknowledge the receipt of a message by a consumer 099 * 100 * @param id 101 * @return the removed ActiveMQMessage with the associated id 102 */ 103 public DurableMessagePointer acknowledgeMessage(String id) { 104 return (DurableMessagePointer) dispatchedQueue.remove(id); 105 } 106 107 /** 108 * @return all the unacknowledge messages 109 */ 110 public List getUndeliveredMessages() { 111 return dispatchedQueue.getContents(); 112 } 113 114 /** 115 * close the subscription 116 */ 117 public void close() { 118 super.close(); 119 dispatchedQueue.close(); 120 ackedQueue.close(); 121 } 122 123 /** 124 * @return true if acked a message 125 */ 126 public boolean hasAckedMessage() { 127 return !ackedQueue.isEmpty(); 128 } 129 130 /** 131 * Add an acked message. 132 * @param message 133 */ 134 public void addAckedMessage(DurableMessagePointer message) { 135 ackedQueue.enqueueNoBlock(message); 136 } 137 138 /** 139 * @return a list of all the acked messages 140 */ 141 public List listAckedMessages() { 142 return ackedQueue.getContents(); 143 } 144 145 /** 146 * Add an acked message. 147 */ 148 public void removeAllAckedMessages() { 149 ackedQueue.clear(); 150 } 151 152 public boolean isBrowser() { 153 return consumerInfo.isBrowser(); 154 } 155 }