001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.policy; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Iterator; 022 import java.util.LinkedList; 023 import java.util.List; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.region.MessageReference; 026 import org.apache.activemq.broker.region.SubscriptionRecovery; 027 import org.apache.activemq.broker.region.Topic; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.Message; 030 import org.apache.activemq.filter.DestinationFilter; 031 import org.apache.activemq.filter.MessageEvaluationContext; 032 import org.apache.activemq.thread.Scheduler; 033 034 /** 035 * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed 036 * buffer of messages around in memory and use that to recover new 037 * subscriptions. 038 * 039 * @org.apache.xbean.XBean 040 * @version $Revision$ 041 */ 042 public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 043 044 private static final int GC_INTERVAL = 1000; 045 protected static final Scheduler scheduler = Scheduler.getInstance(); 046 047 // TODO: need to get a better synchronized linked list that has little 048 // contention between enqueuing and dequeuing 049 private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>()); 050 private volatile long lastGCRun = System.currentTimeMillis(); 051 052 private long recoverDuration = 60 * 1000; // Buffer for 1 min. 053 054 static class TimestampWrapper { 055 public MessageReference message; 056 public long timestamp; 057 058 public TimestampWrapper(MessageReference message, long timestamp) { 059 this.message = message; 060 this.timestamp = timestamp; 061 } 062 } 063 064 private final Runnable gcTask = new Runnable() { 065 public void run() { 066 gc(); 067 } 068 }; 069 070 public SubscriptionRecoveryPolicy copy() { 071 TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy(); 072 rc.setRecoverDuration(recoverDuration); 073 return rc; 074 } 075 076 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 077 buffer.add(new TimestampWrapper(message, lastGCRun)); 078 return true; 079 } 080 081 public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { 082 // Re-dispatch the messages from the buffer. 083 ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer); 084 if (!copy.isEmpty()) { 085 for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) { 086 TimestampWrapper timestampWrapper = iter.next(); 087 MessageReference message = timestampWrapper.message; 088 sub.addRecoveredMessage(context, message); 089 } 090 } 091 } 092 093 public void start() throws Exception { 094 scheduler.executePeriodically(gcTask, GC_INTERVAL); 095 } 096 097 public void stop() throws Exception { 098 scheduler.cancel(gcTask); 099 } 100 101 public void gc() { 102 lastGCRun = System.currentTimeMillis(); 103 while (buffer.size() > 0) { 104 TimestampWrapper timestampWrapper = buffer.get(0); 105 if (lastGCRun > timestampWrapper.timestamp + recoverDuration) { 106 // GC it. 107 buffer.remove(0); 108 } else { 109 break; 110 } 111 } 112 } 113 114 public long getRecoverDuration() { 115 return recoverDuration; 116 } 117 118 public void setRecoverDuration(long recoverDuration) { 119 this.recoverDuration = recoverDuration; 120 } 121 122 public Message[] browse(ActiveMQDestination destination) throws Exception { 123 List<Message> result = new ArrayList<Message>(); 124 ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer); 125 DestinationFilter filter = DestinationFilter.parseFilter(destination); 126 for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) { 127 TimestampWrapper timestampWrapper = iter.next(); 128 MessageReference ref = timestampWrapper.message; 129 Message message = ref.getMessage(); 130 if (filter.matches(message.getDestination())) { 131 result.add(message); 132 } 133 } 134 return result.toArray(new Message[result.size()]); 135 } 136 137 }