001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.IOException; 020 import java.sql.SQLException; 021 import java.util.concurrent.atomic.AtomicLong; 022 023 import org.apache.activemq.ActiveMQMessageAudit; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.command.ActiveMQDestination; 026 import org.apache.activemq.command.Message; 027 import org.apache.activemq.command.MessageAck; 028 import org.apache.activemq.command.MessageId; 029 import org.apache.activemq.store.AbstractMessageStore; 030 import org.apache.activemq.store.MessageRecoveryListener; 031 import org.apache.activemq.util.ByteSequence; 032 import org.apache.activemq.util.ByteSequenceData; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.wireformat.WireFormat; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** 039 * @version $Revision: 1.10 $ 040 */ 041 public class JDBCMessageStore extends AbstractMessageStore { 042 043 private static final Log LOG = LogFactory.getLog(JDBCMessageStore.class); 044 protected final WireFormat wireFormat; 045 protected final JDBCAdapter adapter; 046 protected final JDBCPersistenceAdapter persistenceAdapter; 047 protected AtomicLong lastStoreSequenceId = new AtomicLong(-1); 048 049 protected ActiveMQMessageAudit audit; 050 051 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { 052 super(destination); 053 this.persistenceAdapter = persistenceAdapter; 054 this.adapter = adapter; 055 this.wireFormat = wireFormat; 056 this.audit = audit; 057 } 058 059 public void addMessage(ConnectionContext context, Message message) throws IOException { 060 061 MessageId messageId = message.getMessageId(); 062 if (audit != null && audit.isDuplicate(message)) { 063 if (LOG.isDebugEnabled()) { 064 LOG.debug(destination.getPhysicalName() 065 + " ignoring duplicated (add) message, already stored: " 066 + messageId); 067 } 068 return; 069 } 070 071 long sequenceId = persistenceAdapter.getNextSequenceId(); 072 073 // Serialize the Message.. 074 byte data[]; 075 try { 076 ByteSequence packet = wireFormat.marshal(message); 077 data = ByteSequenceData.toByteArray(packet); 078 } catch (IOException e) { 079 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 080 } 081 082 // Get a connection and insert the message into the DB. 083 TransactionContext c = persistenceAdapter.getTransactionContext(context); 084 try { 085 adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration()); 086 } catch (SQLException e) { 087 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 088 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 089 } finally { 090 c.close(); 091 } 092 } 093 094 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 095 // Get a connection and insert the message into the DB. 096 TransactionContext c = persistenceAdapter.getTransactionContext(context); 097 try { 098 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 099 } catch (SQLException e) { 100 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 101 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 102 } finally { 103 c.close(); 104 } 105 } 106 107 public Message getMessage(MessageId messageId) throws IOException { 108 // Get a connection and pull the message out of the DB 109 TransactionContext c = persistenceAdapter.getTransactionContext(); 110 try { 111 byte data[] = adapter.doGetMessage(c, messageId); 112 if (data == null) { 113 return null; 114 } 115 116 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 117 return answer; 118 } catch (IOException e) { 119 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 120 } catch (SQLException e) { 121 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 122 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 123 } finally { 124 c.close(); 125 } 126 } 127 128 public String getMessageReference(MessageId messageId) throws IOException { 129 long id = messageId.getBrokerSequenceId(); 130 131 // Get a connection and pull the message out of the DB 132 TransactionContext c = persistenceAdapter.getTransactionContext(); 133 try { 134 return adapter.doGetMessageReference(c, id); 135 } catch (IOException e) { 136 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 137 } catch (SQLException e) { 138 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 139 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 140 } finally { 141 c.close(); 142 } 143 } 144 145 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 146 147 long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId()); 148 149 // Get a connection and remove the message from the DB 150 TransactionContext c = persistenceAdapter.getTransactionContext(context); 151 try { 152 adapter.doRemoveMessage(c, seq); 153 } catch (SQLException e) { 154 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 155 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 156 } finally { 157 c.close(); 158 } 159 } 160 161 public void recover(final MessageRecoveryListener listener) throws Exception { 162 163 // Get all the Message ids out of the database. 164 TransactionContext c = persistenceAdapter.getTransactionContext(); 165 try { 166 c = persistenceAdapter.getTransactionContext(); 167 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 168 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 169 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 170 msg.getMessageId().setBrokerSequenceId(sequenceId); 171 return listener.recoverMessage(msg); 172 } 173 174 public boolean recoverMessageReference(String reference) throws Exception { 175 return listener.recoverMessageReference(new MessageId(reference)); 176 } 177 }); 178 } catch (SQLException e) { 179 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 180 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 181 } finally { 182 c.close(); 183 } 184 } 185 186 /** 187 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 188 */ 189 public void removeAllMessages(ConnectionContext context) throws IOException { 190 // Get a connection and remove the message from the DB 191 TransactionContext c = persistenceAdapter.getTransactionContext(context); 192 try { 193 adapter.doRemoveAllMessages(c, destination); 194 } catch (SQLException e) { 195 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 196 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 197 } finally { 198 c.close(); 199 } 200 } 201 202 public int getMessageCount() throws IOException { 203 int result = 0; 204 TransactionContext c = persistenceAdapter.getTransactionContext(); 205 try { 206 207 result = adapter.doGetMessageCount(c, destination); 208 209 } catch (SQLException e) { 210 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 211 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 212 } finally { 213 c.close(); 214 } 215 return result; 216 } 217 218 /** 219 * @param maxReturned 220 * @param listener 221 * @throws Exception 222 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 223 * org.apache.activemq.store.MessageRecoveryListener) 224 */ 225 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 226 TransactionContext c = persistenceAdapter.getTransactionContext(); 227 228 try { 229 adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() { 230 231 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 232 if (listener.hasSpace()) { 233 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 234 msg.getMessageId().setBrokerSequenceId(sequenceId); 235 listener.recoverMessage(msg); 236 lastStoreSequenceId.set(sequenceId); 237 return true; 238 } 239 return false; 240 } 241 242 public boolean recoverMessageReference(String reference) throws Exception { 243 if (listener.hasSpace()) { 244 listener.recoverMessageReference(new MessageId(reference)); 245 return true; 246 } 247 return false; 248 } 249 250 }); 251 } catch (SQLException e) { 252 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 253 } finally { 254 c.close(); 255 } 256 257 } 258 259 /** 260 * @see org.apache.activemq.store.MessageStore#resetBatching() 261 */ 262 public void resetBatching() { 263 if (LOG.isDebugEnabled()) { 264 LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get()); 265 } 266 lastStoreSequenceId.set(-1); 267 268 } 269 270 @Override 271 public void setBatch(MessageId messageId) { 272 long storeSequenceId = -1; 273 try { 274 storeSequenceId = getStoreSequenceIdForMessageId(messageId); 275 } catch (IOException ignoredAsAlreadyLogged) { 276 // reset batch in effect with default -1 value 277 } 278 if (LOG.isDebugEnabled()) { 279 LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get()); 280 } 281 lastStoreSequenceId.set(storeSequenceId); 282 } 283 284 private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { 285 long result = -1; 286 TransactionContext c = persistenceAdapter.getTransactionContext(); 287 try { 288 result = adapter.getStoreSequenceId(c, messageId); 289 } catch (SQLException e) { 290 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 291 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 292 } finally { 293 c.close(); 294 } 295 return result; 296 } 297 }