001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.plugin; 018 019 import java.io.IOException; 020 import java.util.regex.Pattern; 021 022 import org.apache.activemq.broker.Broker; 023 import org.apache.activemq.broker.BrokerFilter; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.region.MessageReference; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.Message; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * @author Filip Hanik 033 * @version 1.0 034 */ 035 public class DiscardingDLQBroker extends BrokerFilter { 036 public static Log log = LogFactory.getLog(DiscardingDLQBroker.class); 037 private boolean dropTemporaryTopics = true; 038 private boolean dropTemporaryQueues = true; 039 private boolean dropAll = true; 040 private Pattern[] destFilter; 041 private int reportInterval = 1000; 042 private long dropCount = 0; 043 044 public DiscardingDLQBroker(Broker next) { 045 super(next); 046 } 047 048 @Override 049 public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) { 050 if (log.isTraceEnabled()) { 051 try { 052 log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); 053 } catch (IOException x) { 054 log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x); 055 } 056 } 057 boolean dropped = true; 058 Message msg = null; 059 ActiveMQDestination dest = null; 060 String destName = null; 061 try { 062 msg = msgRef.getMessage(); 063 dest = msg.getDestination(); 064 destName = dest.getPhysicalName(); 065 }catch (IOException x) { 066 if (log.isDebugEnabled()) { 067 log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x); 068 } 069 } 070 071 if (dest == null || destName == null ) { 072 //do nothing, no need to forward it 073 skipMessage("NULL DESTINATION",msgRef); 074 } else if (dropAll) { 075 //do nothing 076 skipMessage("dropAll",msgRef); 077 } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) { 078 //do nothing 079 skipMessage("dropTemporaryTopics",msgRef); 080 } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) { 081 //do nothing 082 skipMessage("dropTemporaryQueues",msgRef); 083 } else if (destFilter!=null && matches(destName)) { 084 //do nothing 085 skipMessage("dropOnly",msgRef); 086 } else { 087 dropped = false; 088 next.sendToDeadLetterQueue(ctx, msgRef); 089 } 090 if (dropped && getReportInterval()>0) { 091 if ((++dropCount)%getReportInterval() == 0 ) { 092 log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue"); 093 } 094 } 095 } 096 097 public boolean matches(String destName) { 098 for (int i=0; destFilter!=null && i<destFilter.length; i++) { 099 if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) { 100 return true; 101 } 102 } 103 return false; 104 } 105 106 private void skipMessage(String prefix, MessageReference msgRef) { 107 if (log.isDebugEnabled()) { 108 try { 109 String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null); 110 log.debug(lmsg); 111 }catch (IOException x) { 112 log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x); 113 } 114 } 115 } 116 117 public void setDropTemporaryTopics(boolean dropTemporaryTopics) { 118 this.dropTemporaryTopics = dropTemporaryTopics; 119 } 120 121 public void setDropTemporaryQueues(boolean dropTemporaryQueues) { 122 this.dropTemporaryQueues = dropTemporaryQueues; 123 } 124 125 public void setDropAll(boolean dropAll) { 126 this.dropAll = dropAll; 127 } 128 129 public void setDestFilter(Pattern[] destFilter) { 130 this.destFilter = destFilter; 131 } 132 133 public void setReportInterval(int reportInterval) { 134 this.reportInterval = reportInterval; 135 } 136 137 public boolean isDropTemporaryTopics() { 138 return dropTemporaryTopics; 139 } 140 141 public boolean isDropTemporaryQueues() { 142 return dropTemporaryQueues; 143 } 144 145 public boolean isDropAll() { 146 return dropAll; 147 } 148 149 public Pattern[] getDestFilter() { 150 return destFilter; 151 } 152 153 public int getReportInterval() { 154 return reportInterval; 155 } 156 157 }