001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.broker.region.MessageReference;
023    import org.apache.activemq.broker.region.SubscriptionRecovery;
024    import org.apache.activemq.broker.region.Topic;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.memory.list.DestinationBasedMessageList;
028    import org.apache.activemq.memory.list.MessageList;
029    import org.apache.activemq.memory.list.SimpleMessageList;
030    
031    /**
032     * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
033     * amount of memory available in RAM for message history which is evicted in
034     * time order.
035     * 
036     * @org.apache.xbean.XBean
037     * @version $Revision$
038     */
039    public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
040    
041        private MessageList buffer;
042        private int maximumSize =  64 * 1024;
043        private boolean useSharedBuffer = true;
044    
045        public FixedSizedSubscriptionRecoveryPolicy() {
046            
047        }
048        
049        public SubscriptionRecoveryPolicy copy() {
050            FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy();
051            rc.setMaximumSize(maximumSize);
052            rc.setUseSharedBuffer(useSharedBuffer);
053            return rc;
054        }
055    
056        public boolean add(ConnectionContext context, MessageReference message) throws Exception {
057            buffer.add(message);
058            return true;
059        }
060    
061        public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
062            // Re-dispatch the messages from the buffer.
063            List copy = buffer.getMessages(sub.getActiveMQDestination());
064            if (!copy.isEmpty()) {
065                for (Iterator iter = copy.iterator(); iter.hasNext();) {
066                    MessageReference node = (MessageReference)iter.next();
067                    sub.addRecoveredMessage(context, node);
068                }
069            }
070        }
071    
072        public void start() throws Exception {
073            buffer = createMessageList();
074        }
075    
076        public void stop() throws Exception {
077            buffer.clear();
078        }
079    
080        // Properties
081        // -------------------------------------------------------------------------
082        public MessageList getBuffer() {
083            return buffer;
084        }
085    
086        public void setBuffer(MessageList buffer) {
087            this.buffer = buffer;
088        }
089    
090        public int getMaximumSize() {
091            return maximumSize;
092        }
093    
094        /**
095         * Sets the maximum amount of RAM in bytes that this buffer can hold in RAM
096         */
097        public void setMaximumSize(int maximumSize) {
098            this.maximumSize = maximumSize;
099        }
100    
101        public boolean isUseSharedBuffer() {
102            return useSharedBuffer;
103        }
104    
105        public void setUseSharedBuffer(boolean useSharedBuffer) {
106            this.useSharedBuffer = useSharedBuffer;
107        }
108    
109        public Message[] browse(ActiveMQDestination destination) throws Exception {
110            return buffer.browse(destination);
111        }
112    
113        // Implementation methods
114    
115        // -------------------------------------------------------------------------
116        protected MessageList createMessageList() {
117            if (useSharedBuffer) {
118                return new SimpleMessageList(maximumSize);
119            } else {
120                return new DestinationBasedMessageList(maximumSize);
121            }
122        }
123    
124    }