001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017     package org.apache.activemq.plugin;
018    
019    import java.io.IOException;
020    import java.util.regex.Pattern;
021    
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.BrokerFilter;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.region.MessageReference;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * @author Filip Hanik
033     * @version 1.0
034     */
035    public class DiscardingDLQBroker extends BrokerFilter {
036        public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
037        private boolean dropTemporaryTopics = true;
038        private boolean dropTemporaryQueues = true;
039        private boolean dropAll = true;
040        private Pattern[] destFilter;
041        private int reportInterval = 1000;
042        private long dropCount = 0;
043    
044        public DiscardingDLQBroker(Broker next) {
045            super(next);
046        }
047    
048        @Override
049        public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
050            if (log.isTraceEnabled()) {
051                try {
052                    log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
053                } catch (IOException x) {
054                    log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x);
055                }
056            }
057            boolean dropped = true;
058            Message msg = null;
059            ActiveMQDestination dest = null;
060            String destName = null;
061            try {
062                msg = msgRef.getMessage();
063                dest = msg.getDestination();
064                destName = dest.getPhysicalName();
065            }catch (IOException x) {
066                if (log.isDebugEnabled()) {
067                    log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x);
068                }
069            }
070    
071            if (dest == null || destName == null ) {
072                //do nothing, no need to forward it
073                skipMessage("NULL DESTINATION",msgRef);
074            } else if (dropAll) {
075                //do nothing
076                skipMessage("dropAll",msgRef);
077            } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
078                //do nothing
079                skipMessage("dropTemporaryTopics",msgRef);
080            } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
081                //do nothing
082                skipMessage("dropTemporaryQueues",msgRef);
083            } else if (destFilter!=null && matches(destName)) {
084                //do nothing
085                skipMessage("dropOnly",msgRef);
086            } else {
087                dropped = false;
088                next.sendToDeadLetterQueue(ctx, msgRef);
089            }
090            if (dropped && getReportInterval()>0) {
091                if ((++dropCount)%getReportInterval() == 0 ) {
092                    log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
093                }
094            }
095        }
096    
097        public boolean matches(String destName) {
098            for (int i=0; destFilter!=null && i<destFilter.length; i++) {
099                if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
100                    return true;
101                }
102            }
103            return false;
104        }
105    
106        private void skipMessage(String prefix, MessageReference msgRef) {
107            if (log.isDebugEnabled()) {
108                try {
109                    String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
110                    log.debug(lmsg);
111                }catch (IOException x) {
112                    log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x);
113                }
114            }
115        }
116    
117        public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
118            this.dropTemporaryTopics = dropTemporaryTopics;
119        }
120    
121        public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
122            this.dropTemporaryQueues = dropTemporaryQueues;
123        }
124    
125        public void setDropAll(boolean dropAll) {
126            this.dropAll = dropAll;
127        }
128    
129        public void setDestFilter(Pattern[] destFilter) {
130            this.destFilter = destFilter;
131        }
132    
133        public void setReportInterval(int reportInterval) {
134            this.reportInterval = reportInterval;
135        }
136    
137        public boolean isDropTemporaryTopics() {
138            return dropTemporaryTopics;
139        }
140    
141        public boolean isDropTemporaryQueues() {
142            return dropTemporaryQueues;
143        }
144    
145        public boolean isDropAll() {
146            return dropAll;
147        }
148    
149        public Pattern[] getDestFilter() {
150            return destFilter;
151        }
152    
153        public int getReportInterval() {
154            return reportInterval;
155        }
156    
157    }