001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.policy; 018 019 import java.util.concurrent.atomic.AtomicLong; 020 021 import javax.jms.JMSException; 022 import javax.jms.Message; 023 import javax.jms.MessageListener; 024 025 import org.apache.activemq.ActiveMQMessageTransformation; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.SubscriptionRecovery; 030 import org.apache.activemq.broker.region.Topic; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ActiveMQMessage; 033 import org.apache.activemq.command.ConnectionId; 034 import org.apache.activemq.command.MessageId; 035 import org.apache.activemq.command.ProducerId; 036 import org.apache.activemq.command.SessionId; 037 import org.apache.activemq.util.IdGenerator; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 /** 042 * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user 043 * specific query mechanism to load any messages they may have missed. 044 * 045 * @org.apache.xbean.XBean 046 * @version $Revision: 564271 $ 047 */ 048 public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 049 050 private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); 051 052 private MessageQuery query; 053 private AtomicLong messageSequence = new AtomicLong(0); 054 private IdGenerator idGenerator = new IdGenerator(); 055 private ProducerId producerId = createProducerId(); 056 057 public SubscriptionRecoveryPolicy copy() { 058 QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); 059 rc.setQuery(query); 060 return rc; 061 } 062 063 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 064 return query.validateUpdate(message.getMessage()); 065 } 066 067 public void recover(final ConnectionContext context, final Topic topic, final SubscriptionRecovery sub) throws Exception { 068 if (query != null) { 069 ActiveMQDestination destination = sub.getActiveMQDestination(); 070 query.execute(destination, new MessageListener() { 071 072 public void onMessage(Message message) { 073 dispatchInitialMessage(message, topic, context, sub); 074 } 075 }); 076 } 077 } 078 079 public void start() throws Exception { 080 if (query == null) { 081 throw new IllegalArgumentException("No query property configured"); 082 } 083 } 084 085 public void stop() throws Exception { 086 } 087 088 public MessageQuery getQuery() { 089 return query; 090 } 091 092 /** 093 * Sets the query strategy to load initial messages 094 */ 095 public void setQuery(MessageQuery query) { 096 this.query = query; 097 } 098 099 public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception { 100 return new org.apache.activemq.command.Message[0]; 101 } 102 103 protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { 104 try { 105 ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); 106 ActiveMQDestination destination = activeMessage.getDestination(); 107 if (destination == null) { 108 destination = sub.getActiveMQDestination(); 109 activeMessage.setDestination(destination); 110 } 111 activeMessage.setRegionDestination(regionDestination); 112 configure(activeMessage); 113 sub.addRecoveredMessage(context, activeMessage); 114 } catch (Throwable e) { 115 LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); 116 } 117 } 118 119 protected void configure(ActiveMQMessage msg) throws JMSException { 120 long sequenceNumber = messageSequence.incrementAndGet(); 121 msg.setMessageId(new MessageId(producerId, sequenceNumber)); 122 msg.onSend(); 123 msg.setProducerId(producerId); 124 } 125 126 protected ProducerId createProducerId() { 127 String id = idGenerator.generateId(); 128 ConnectionId connectionId = new ConnectionId(id); 129 SessionId sessionId = new SessionId(connectionId, 1); 130 return new ProducerId(sessionId, 1); 131 } 132 }