001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.IOException; 020 import java.sql.SQLException; 021 import java.util.Map; 022 import java.util.concurrent.ConcurrentHashMap; 023 import java.util.concurrent.atomic.AtomicLong; 024 025 import org.apache.activemq.ActiveMQMessageAudit; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.ActiveMQTopic; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.command.MessageId; 030 import org.apache.activemq.command.SubscriptionInfo; 031 import org.apache.activemq.store.MessageRecoveryListener; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.util.ByteSequence; 034 import org.apache.activemq.util.IOExceptionSupport; 035 import org.apache.activemq.wireformat.WireFormat; 036 037 /** 038 * @version $Revision: 1.6 $ 039 */ 040 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 041 042 private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>(); 043 044 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { 045 super(persistenceAdapter, adapter, wireFormat, topic, audit); 046 } 047 048 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 049 // Get a connection and insert the message into the DB. 050 TransactionContext c = persistenceAdapter.getTransactionContext(context); 051 try { 052 long seq = adapter.getStoreSequenceId(c, messageId); 053 adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); 054 } catch (SQLException e) { 055 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 056 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); 057 } finally { 058 c.close(); 059 } 060 } 061 062 /** 063 * @throws Exception 064 */ 065 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 066 067 TransactionContext c = persistenceAdapter.getTransactionContext(); 068 try { 069 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { 070 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 071 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 072 msg.getMessageId().setBrokerSequenceId(sequenceId); 073 return listener.recoverMessage(msg); 074 } 075 076 public boolean recoverMessageReference(String reference) throws Exception { 077 return listener.recoverMessageReference(new MessageId(reference)); 078 } 079 080 }); 081 } catch (SQLException e) { 082 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 083 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 084 } finally { 085 c.close(); 086 } 087 } 088 089 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) 090 throws Exception { 091 TransactionContext c = persistenceAdapter.getTransactionContext(); 092 String subcriberId = getSubscriptionKey(clientId, subscriptionName); 093 AtomicLong last = subscriberLastMessageMap.get(subcriberId); 094 if (last == null) { 095 long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName); 096 last = new AtomicLong(lastAcked); 097 subscriberLastMessageMap.put(subcriberId, last); 098 } 099 final AtomicLong finalLast = last; 100 try { 101 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() { 102 103 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 104 if (listener.hasSpace()) { 105 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 106 msg.getMessageId().setBrokerSequenceId(sequenceId); 107 listener.recoverMessage(msg); 108 finalLast.set(sequenceId); 109 return true; 110 } 111 return false; 112 } 113 114 public boolean recoverMessageReference(String reference) throws Exception { 115 return listener.recoverMessageReference(new MessageId(reference)); 116 } 117 118 }); 119 } catch (SQLException e) { 120 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 121 } finally { 122 c.close(); 123 last.set(finalLast.get()); 124 } 125 } 126 127 public void resetBatching(String clientId, String subscriptionName) { 128 String subcriberId = getSubscriptionKey(clientId, subscriptionName); 129 subscriberLastMessageMap.remove(subcriberId); 130 } 131 132 /** 133 * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo, 134 * boolean) 135 */ 136 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 137 TransactionContext c = persistenceAdapter.getTransactionContext(); 138 try { 139 c = persistenceAdapter.getTransactionContext(); 140 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive); 141 } catch (SQLException e) { 142 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 143 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); 144 } finally { 145 c.close(); 146 } 147 } 148 149 /** 150 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String, 151 * String) 152 */ 153 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 154 TransactionContext c = persistenceAdapter.getTransactionContext(); 155 try { 156 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 157 } catch (SQLException e) { 158 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 159 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 160 } finally { 161 c.close(); 162 } 163 } 164 165 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 166 TransactionContext c = persistenceAdapter.getTransactionContext(); 167 try { 168 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 169 } catch (SQLException e) { 170 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 171 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 172 } finally { 173 c.close(); 174 resetBatching(clientId, subscriptionName); 175 } 176 } 177 178 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 179 TransactionContext c = persistenceAdapter.getTransactionContext(); 180 try { 181 return adapter.doGetAllSubscriptions(c, destination); 182 } catch (SQLException e) { 183 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 184 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 185 } finally { 186 c.close(); 187 } 188 } 189 190 public int getMessageCount(String clientId, String subscriberName) throws IOException { 191 int result = 0; 192 TransactionContext c = persistenceAdapter.getTransactionContext(); 193 try { 194 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName); 195 196 } catch (SQLException e) { 197 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 198 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 199 } finally { 200 c.close(); 201 } 202 return result; 203 } 204 205 protected String getSubscriptionKey(String clientId, String subscriberName) { 206 String result = clientId + ":"; 207 result += subscriberName != null ? subscriberName : "NOT_SET"; 208 return result; 209 } 210 211 }