001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.memory; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.concurrent.ConcurrentHashMap; 023 024 import javax.transaction.xa.XAException; 025 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageAck; 029 import org.apache.activemq.command.TransactionId; 030 import org.apache.activemq.command.XATransactionId; 031 import org.apache.activemq.store.MessageStore; 032 import org.apache.activemq.store.PersistenceAdapter; 033 import org.apache.activemq.store.ProxyMessageStore; 034 import org.apache.activemq.store.ProxyTopicMessageStore; 035 import org.apache.activemq.store.TopicMessageStore; 036 import org.apache.activemq.store.TransactionRecoveryListener; 037 import org.apache.activemq.store.TransactionStore; 038 039 /** 040 * Provides a TransactionStore implementation that can create transaction aware 041 * MessageStore objects from non transaction aware MessageStore objects. 042 * 043 * @version $Revision: 1.4 $ 044 */ 045 public class MemoryTransactionStore implements TransactionStore { 046 047 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 048 ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 049 final PersistenceAdapter persistenceAdapter; 050 051 private boolean doingRecover; 052 053 public class Tx { 054 private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 055 056 private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 057 058 public void add(AddMessageCommand msg) { 059 messages.add(msg); 060 } 061 062 public void add(RemoveMessageCommand ack) { 063 acks.add(ack); 064 } 065 066 public Message[] getMessages() { 067 Message rc[] = new Message[messages.size()]; 068 int count = 0; 069 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 070 AddMessageCommand cmd = iter.next(); 071 rc[count++] = cmd.getMessage(); 072 } 073 return rc; 074 } 075 076 public MessageAck[] getAcks() { 077 MessageAck rc[] = new MessageAck[acks.size()]; 078 int count = 0; 079 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 080 RemoveMessageCommand cmd = iter.next(); 081 rc[count++] = cmd.getMessageAck(); 082 } 083 return rc; 084 } 085 086 /** 087 * @throws IOException 088 */ 089 public void commit() throws IOException { 090 ConnectionContext ctx = new ConnectionContext(); 091 persistenceAdapter.beginTransaction(ctx); 092 try { 093 094 // Do all the message adds. 095 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 096 AddMessageCommand cmd = iter.next(); 097 cmd.run(ctx); 098 } 099 // And removes.. 100 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 101 RemoveMessageCommand cmd = iter.next(); 102 cmd.run(ctx); 103 } 104 105 } catch ( IOException e ) { 106 persistenceAdapter.rollbackTransaction(ctx); 107 throw e; 108 } 109 persistenceAdapter.commitTransaction(ctx); 110 } 111 } 112 113 public interface AddMessageCommand { 114 Message getMessage(); 115 116 void run(ConnectionContext context) throws IOException; 117 } 118 119 public interface RemoveMessageCommand { 120 MessageAck getMessageAck(); 121 122 void run(ConnectionContext context) throws IOException; 123 } 124 125 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 126 this.persistenceAdapter=persistenceAdapter; 127 } 128 129 public MessageStore proxy(MessageStore messageStore) { 130 return new ProxyMessageStore(messageStore) { 131 public void addMessage(ConnectionContext context, final Message send) throws IOException { 132 MemoryTransactionStore.this.addMessage(getDelegate(), send); 133 } 134 135 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 136 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 137 } 138 }; 139 } 140 141 public TopicMessageStore proxy(TopicMessageStore messageStore) { 142 return new ProxyTopicMessageStore(messageStore) { 143 public void addMessage(ConnectionContext context, final Message send) throws IOException { 144 MemoryTransactionStore.this.addMessage(getDelegate(), send); 145 } 146 147 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 148 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 149 } 150 }; 151 } 152 153 /** 154 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 155 */ 156 public void prepare(TransactionId txid) { 157 Tx tx = inflightTransactions.remove(txid); 158 if (tx == null) { 159 return; 160 } 161 preparedTransactions.put(txid, tx); 162 } 163 164 public Tx getTx(Object txid) { 165 Tx tx = inflightTransactions.get(txid); 166 if (tx == null) { 167 tx = new Tx(); 168 inflightTransactions.put(txid, tx); 169 } 170 return tx; 171 } 172 173 /** 174 * @throws XAException 175 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 176 */ 177 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 178 179 Tx tx; 180 if (wasPrepared) { 181 tx = preparedTransactions.remove(txid); 182 } else { 183 tx = inflightTransactions.remove(txid); 184 } 185 186 if (tx == null) { 187 return; 188 } 189 tx.commit(); 190 191 } 192 193 /** 194 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 195 */ 196 public void rollback(TransactionId txid) { 197 preparedTransactions.remove(txid); 198 inflightTransactions.remove(txid); 199 } 200 201 public void start() throws Exception { 202 } 203 204 public void stop() throws Exception { 205 } 206 207 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 208 // All the inflight transactions get rolled back.. 209 inflightTransactions.clear(); 210 this.doingRecover = true; 211 try { 212 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 213 Object txid = iter.next(); 214 Tx tx = preparedTransactions.get(txid); 215 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 216 } 217 } finally { 218 this.doingRecover = false; 219 } 220 } 221 222 /** 223 * @param message 224 * @throws IOException 225 */ 226 void addMessage(final MessageStore destination, final Message message) throws IOException { 227 228 if (doingRecover) { 229 return; 230 } 231 232 if (message.getTransactionId() != null) { 233 Tx tx = getTx(message.getTransactionId()); 234 tx.add(new AddMessageCommand() { 235 public Message getMessage() { 236 return message; 237 } 238 239 public void run(ConnectionContext ctx) throws IOException { 240 destination.addMessage(ctx, message); 241 } 242 243 }); 244 } else { 245 destination.addMessage(null, message); 246 } 247 } 248 249 /** 250 * @param ack 251 * @throws IOException 252 */ 253 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 254 if (doingRecover) { 255 return; 256 } 257 258 if (ack.isInTransaction()) { 259 Tx tx = getTx(ack.getTransactionId()); 260 tx.add(new RemoveMessageCommand() { 261 public MessageAck getMessageAck() { 262 return ack; 263 } 264 265 public void run(ConnectionContext ctx) throws IOException { 266 destination.removeMessage(ctx, ack); 267 } 268 }); 269 } else { 270 destination.removeMessage(null, ack); 271 } 272 } 273 274 public void delete() { 275 inflightTransactions.clear(); 276 preparedTransactions.clear(); 277 doingRecover = false; 278 } 279 280 }