001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service;
020    import javax.jms.JMSException;
021    import javax.jms.DeliveryMode;
022    import org.apache.commons.logging.*;
023    import org.activemq.broker.BrokerContainer;
024    import org.activemq.broker.Broker;
025    import org.activemq.message.ActiveMQDestination;
026    import org.activemq.message.ActiveMQMessage;
027    import org.activemq.message.ActiveMQQueue;
028    import org.activemq.store.PersistenceAdapter;
029    import org.activemq.util.IdGenerator;
030    
031    /**
032     * Determines how messages are stored in a dead letter queue
033     * 
034     * @version $Revision: 1.1.1.1 $
035     */
036    public class DeadLetterPolicy {
037        /**
038         * Prefix used by dead letter queues
039         */
040        public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
041        private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";    
042        private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
043        private Broker broker;
044        private String deadLetterPrefix = DEAD_LETTER_PREFIX;
045        private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
046        private boolean deadLetterEnabled = true;
047        private boolean deadLetterPerDestinationName = true;
048        private boolean storeNonPersistentMessages = true;
049        private boolean noTopicConsumerEnabled = true;
050        private boolean allowDuplicates = false;
051        private boolean useDatabaseLocking = false;
052        private long deadLetterQueueTTL = 0L;
053        private long deadLetterTopicTTL = 0L;
054        private IdGenerator idGenerator = new IdGenerator();
055       
056        /**
057         * Construct a dead letter policy
058         * 
059         * @param broker
060         */
061        public DeadLetterPolicy(Broker broker) {
062            this.broker = broker;
063        }
064        
065        public DeadLetterPolicy(BrokerContainer brokerContainer) {
066            this(brokerContainer.getBroker());
067        }
068    
069        /**
070         * Default constructor
071         */
072        public DeadLetterPolicy() {
073        }
074    
075        /**
076         * @return Returns the broker.
077         */
078        public Broker getBroker() {
079            return broker;
080        }
081    
082        /**
083         * @param broker The broker to set.
084         */
085        public void setBroker(Broker broker) {
086            this.broker = broker;
087        }
088    
089        /**
090         * @return Returns the deadLetterEnabled.
091         */
092        public boolean isDeadLetterEnabled() {
093            return deadLetterEnabled;
094        }
095    
096        /**
097         * @param deadLetterEnabled The deadLetterEnabled to set.
098         */
099        public void setDeadLetterEnabled(boolean deadLetterEnabled) {
100            this.deadLetterEnabled = deadLetterEnabled;
101        }
102    
103        /**
104         * @return Returns the deadLetterPerDestinationName.
105         */
106        public boolean isDeadLetterPerDestinationName() {
107            return deadLetterPerDestinationName;
108        }
109    
110        /**
111         * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
112         */
113        public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
114            this.deadLetterPerDestinationName = deadLetterPerDestinationName;
115        }
116    
117        /**
118         * @return Returns the deadLetterName.
119         */
120        public String getDeadLetterName() {
121            return deadLetterName;
122        }
123    
124        /**
125         * @param deadLetterName The deadLetterName to set.
126         */
127        public void setDeadLetterName(String deadLetterName) {
128            this.deadLetterName = deadLetterName;
129        }
130    
131        /**
132         * @return Returns the deadLetterPrefix.
133         */
134        public String getDeadLetterPrefix() {
135            return deadLetterPrefix;
136        }
137    
138        /**
139         * @param deadLetterPrefix The deadLetterPrefix to set.
140         */
141        public void setDeadLetterPrefix(String deadLetterPrefix) {
142            this.deadLetterPrefix = deadLetterPrefix;
143        }
144    
145        /**
146         * @return Returns the storeNonPersistentMessages.
147         */
148        public boolean isStoreNonPersistentMessages() {
149            return storeNonPersistentMessages;
150        }
151    
152        /**
153         * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
154         */
155        public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
156            this.storeNonPersistentMessages = storeNonPersistentMessages;
157        }
158        
159        /**
160         * @return Returns the noTopicConsumerEnabled.
161         */
162        public boolean isNoTopicConsumerEnabled() {
163            return noTopicConsumerEnabled;
164        }
165        /**
166         * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
167         */
168        public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
169            this.noTopicConsumerEnabled = noTopicConsumerEnabled;
170        }
171        
172        /**
173             * @return Returns the allowDuplicates.
174             */
175            public boolean isAllowDuplicates() {
176                    return allowDuplicates;
177            }
178            /**
179             * @param allowDuplicates The allowDuplicates to set.
180             */
181            public void setAllowDuplicates(boolean allowDuplicates) {
182                    this.allowDuplicates = allowDuplicates;
183            }
184            /**
185             * @return Returns the useDatabaseLocking.
186             */
187            public boolean isUseDatabaseLocking() {
188                    return useDatabaseLocking;
189            }       
190            /**
191             * @param useDatabaseLocking The useDatabaseLocking to set.
192             */
193            public void setUseDatabaseLocking(boolean useDatabaseLocking) {
194                    this.useDatabaseLocking = useDatabaseLocking;
195            }
196            /**
197             * @param deadLetterQueueTTL The deadLetterQueueTTL to set.
198             */
199            public void setDeadLetterQueueTTL(long deadLetterQueueTTL) {
200                    this.deadLetterQueueTTL = deadLetterQueueTTL;
201            }
202            /**
203             * @param deadLetterTopicTTL The deadLetterTopicTTL to set.
204             */
205            public void setDeadLetterTopicTTL(long deadLetterTopicTTL) {
206                    this.deadLetterTopicTTL = deadLetterTopicTTL;
207            }
208            /**
209         * Get the name of the DLQ from the destination provided
210         * @param destination
211         * @return the name of the DLQ for this Destination
212         */
213        public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
214            String answer = this.deadLetterPrefix;
215            if (deadLetterPerDestinationName) {
216                answer += destination.getPhysicalName();
217            }
218            else {
219                answer += this.deadLetterName;
220            }
221            return answer;
222        }
223    
224        /**
225         * Send a message to a dead letter queue
226         * 
227         * @param message
228         * @throws JMSException
229         */
230        public void sendToDeadLetter(ActiveMQMessage message) {
231            if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages) && !message.isDispatchedFromDLQ()) {
232                if (broker != null) {
233                    // process duplicates
234                    if (!isAllowDuplicates()) {
235                            PersistenceAdapter persistenceAdapter = getBroker().getPersistenceAdapter();
236                            // make sure no previous dead letter was already sent
237                            if (persistenceAdapter!=null
238                                            && message.getJMSMessageIdentity()!=null
239                                                            && message.getJMSMessageIdentity().getSequenceNumber()!=null
240                                                            && persistenceAdapter.deadLetterAlreadySent(((Long)message.getJMSMessageIdentity().getSequenceNumber()).longValue(), isUseDatabaseLocking())) {
241                                    if (log.isDebugEnabled()) log.debug("Dead letter has been already sent for this message: " + message.getJMSMessageID());
242                                    return;
243                            }
244                    }
245    
246                    // send a dead letter message
247                    String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
248                    try {
249                            ActiveMQMessage deadMessage = createDeadLetterMessage(dlqName, message);
250                            broker.sendToDeadLetterQueue(dlqName, deadMessage);
251                            if (log.isDebugEnabled()) log.debug("Passed message: " + deadMessage + " to DLQ: " + dlqName);
252                    } catch (JMSException e) {
253                            log.warn("Failed to send message to dead letter due to: " + e, e);
254                    }
255                }
256                else {
257                    log.warn("Broker is not initialized - cannot add to DLQ: " + message);
258                }
259            }else if (log.isDebugEnabled()){
260                log.debug("DLQ not storing message: " + message);
261            }
262        }
263        
264        protected ActiveMQMessage createDeadLetterMessage(String dlqName, ActiveMQMessage message) throws JMSException {
265            // make a shallow copy of the orginal message
266            ActiveMQMessage deadMessage = message.shallowCopy();
267            
268            // generate a new producer and message ID
269            String id = idGenerator.generateId();
270            String producerKey = IdGenerator.getSeedFromId(id);
271            long seq = IdGenerator.getCountFromId(id);
272            deadMessage.setProducerKey(producerKey);
273            deadMessage.setJMSMessageID(id);
274            deadMessage.setSequenceNumber(seq);
275            deadMessage.getJMSMessageIdentity().setMessageID(id);
276            deadMessage.getJMSMessageIdentity().setSequenceNumber(new Long(seq));
277    
278            ActiveMQQueue destination = new ActiveMQQueue(dlqName);
279            deadMessage.setJMSDestination(destination);
280            deadMessage.setDispatchedFromDLQ(true);
281            
282            // set the expiration of the dead letter message
283            long expiration = 0L;
284            long timeStamp = System.currentTimeMillis();
285            if (message.getJMSActiveMQDestination().isTopic()) {
286                    if (deadLetterTopicTTL > 0) {
287                            expiration = deadLetterTopicTTL + timeStamp;
288                    }
289            } else {
290                    if (deadLetterQueueTTL > 0) {
291                            expiration = deadLetterQueueTTL + timeStamp;
292                    }
293            }
294            deadMessage.setJMSExpiration(expiration);
295            deadMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
296            
297            return deadMessage;
298        }
299    }