001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.util; 018 019 import org.apache.activemq.broker.BrokerPluginSupport; 020 import org.apache.activemq.broker.ProducerBrokerExchange; 021 import org.apache.activemq.command.Message; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 025 /** 026 * A Broker interceptor which updates a JMS Client's timestamp on the message 027 * with a broker timestamp. Useful when the clocks on client machines are known 028 * to not be correct and you can only trust the time set on the broker machines. 029 * 030 * Enabling this plugin will break JMS compliance since the timestamp that the 031 * producer sees on the messages after as send() will be different from the 032 * timestamp the consumer will observe when he receives the message. This plugin 033 * is not enabled in the default ActiveMQ configuration. 034 * 035 * 2 new attributes have been added which will allow the administrator some override control 036 * over the expiration time for incoming messages: 037 * 038 * Attribute 'zeroExpirationOverride' can be used to apply an expiration 039 * time to incoming messages with no expiration defined (messages that would never expire) 040 * 041 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time 042 * 043 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" 044 * 045 * @version $Revision$ 046 */ 047 public class TimeStampingBrokerPlugin extends BrokerPluginSupport { 048 049 /** 050 * variable which (when non-zero) is used to override 051 * the expiration date for messages that arrive with 052 * no expiration date set (in Milliseconds). 053 */ 054 long zeroExpirationOverride = 0; 055 056 /** 057 * variable which (when non-zero) is used to limit 058 * the expiration date (in Milliseconds). 059 */ 060 long ttlCeiling = 0; 061 062 /** 063 * If true, the plugin will not update timestamp to past values 064 * False by default 065 */ 066 boolean futureOnly = false; 067 068 /** 069 * setter method for zeroExpirationOverride 070 */ 071 public void setZeroExpirationOverride(long ttl) 072 { 073 this.zeroExpirationOverride = ttl; 074 } 075 076 /** 077 * setter method for ttlCeiling 078 */ 079 public void setTtlCeiling(long ttlCeiling) 080 { 081 this.ttlCeiling = ttlCeiling; 082 } 083 084 public void setFutureOnly(boolean futureOnly) { 085 this.futureOnly = futureOnly; 086 } 087 088 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 089 if (message.getTimestamp() > 0 090 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { 091 // timestamp not been disabled and has not passed through a network 092 long oldExpiration = message.getExpiration(); 093 long newTimeStamp = System.currentTimeMillis(); 094 long timeToLive = zeroExpirationOverride; 095 if (oldExpiration > 0) { 096 long oldTimestamp = message.getTimestamp(); 097 timeToLive = oldExpiration - oldTimestamp; 098 } 099 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { 100 timeToLive = ttlCeiling; 101 } 102 long expiration = timeToLive + newTimeStamp; 103 //In the scenario that the Broker is behind the clients we never want to set the Timestamp and Expiration in the past 104 if(!futureOnly || (expiration > oldExpiration)) { 105 if (timeToLive > 0 && expiration > 0) { 106 message.setExpiration(expiration); 107 } 108 message.setTimestamp(newTimeStamp); 109 } 110 } 111 super.send(producerExchange, message); 112 } 113 }