001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.Map; 022 import java.util.Map.Entry; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import javax.transaction.xa.XAException; 026 027 import org.apache.activemq.broker.BrokerService; 028 import org.apache.activemq.broker.BrokerServiceAware; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageAck; 032 import org.apache.activemq.command.TransactionId; 033 import org.apache.activemq.command.XATransactionId; 034 import org.apache.activemq.kaha.RuntimeStoreException; 035 import org.apache.activemq.store.MessageStore; 036 import org.apache.activemq.store.ProxyMessageStore; 037 import org.apache.activemq.store.ProxyTopicMessageStore; 038 import org.apache.activemq.store.TopicMessageStore; 039 import org.apache.activemq.store.TransactionRecoveryListener; 040 import org.apache.activemq.store.TransactionStore; 041 import org.apache.activemq.store.journal.JournalPersistenceAdapter; 042 import org.apache.commons.logging.Log; 043 import org.apache.commons.logging.LogFactory; 044 045 /** 046 * Provides a TransactionStore implementation that can create transaction aware 047 * MessageStore objects from non transaction aware MessageStore objects. 048 * 049 * @version $Revision: 1.4 $ 050 */ 051 public class KahaTransactionStore implements TransactionStore, BrokerServiceAware { 052 private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class); 053 054 private Map transactions = new ConcurrentHashMap(); 055 private Map prepared; 056 private KahaPersistenceAdapter adaptor; 057 058 private BrokerService brokerService; 059 060 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) { 061 this.adaptor = adaptor; 062 this.prepared = preparedMap; 063 } 064 065 public MessageStore proxy(MessageStore messageStore) { 066 return new ProxyMessageStore(messageStore) { 067 public void addMessage(ConnectionContext context, final Message send) throws IOException { 068 KahaTransactionStore.this.addMessage(getDelegate(), send); 069 } 070 071 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 072 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 073 } 074 }; 075 } 076 077 public TopicMessageStore proxy(TopicMessageStore messageStore) { 078 return new ProxyTopicMessageStore(messageStore) { 079 public void addMessage(ConnectionContext context, final Message send) throws IOException { 080 KahaTransactionStore.this.addMessage(getDelegate(), send); 081 } 082 083 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 084 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 085 } 086 }; 087 } 088 089 /** 090 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 091 */ 092 public void prepare(TransactionId txid) { 093 KahaTransaction tx = getTx(txid); 094 if (tx != null) { 095 tx.prepare(); 096 prepared.put(txid, tx); 097 } 098 } 099 100 /** 101 * @throws XAException 102 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 103 */ 104 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 105 KahaTransaction tx = getTx(txid); 106 if (tx != null) { 107 tx.commit(this); 108 removeTx(txid); 109 } 110 } 111 112 /** 113 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 114 */ 115 public void rollback(TransactionId txid) { 116 KahaTransaction tx = getTx(txid); 117 if (tx != null) { 118 tx.rollback(); 119 removeTx(txid); 120 } 121 } 122 123 public void start() throws Exception { 124 } 125 126 public void stop() throws Exception { 127 } 128 129 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 130 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) { 131 Map.Entry entry = (Entry)i.next(); 132 XATransactionId xid = (XATransactionId)entry.getKey(); 133 KahaTransaction kt = (KahaTransaction)entry.getValue(); 134 listener.recover(xid, kt.getMessages(), kt.getAcks()); 135 } 136 } 137 138 /** 139 * @param message 140 * @throws IOException 141 */ 142 void addMessage(final MessageStore destination, final Message message) throws IOException { 143 try { 144 if (message.isInTransaction()) { 145 KahaTransaction tx = getOrCreateTx(message.getTransactionId()); 146 tx.add((KahaMessageStore)destination, message); 147 } else { 148 destination.addMessage(null, message); 149 } 150 } catch (RuntimeStoreException rse) { 151 if (rse.getCause() instanceof IOException) { 152 brokerService.handleIOException((IOException)rse.getCause()); 153 } 154 throw rse; 155 } 156 } 157 158 /** 159 * @param ack 160 * @throws IOException 161 */ 162 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 163 try { 164 if (ack.isInTransaction()) { 165 KahaTransaction tx = getOrCreateTx(ack.getTransactionId()); 166 tx.add((KahaMessageStore)destination, ack); 167 } else { 168 destination.removeMessage(null, ack); 169 } 170 } catch (RuntimeStoreException rse) { 171 if (rse.getCause() instanceof IOException) { 172 brokerService.handleIOException((IOException)rse.getCause()); 173 } 174 throw rse; 175 } 176 } 177 178 protected synchronized KahaTransaction getTx(TransactionId key) { 179 KahaTransaction result = (KahaTransaction)transactions.get(key); 180 if (result == null) { 181 result = (KahaTransaction)prepared.get(key); 182 } 183 return result; 184 } 185 186 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) { 187 KahaTransaction result = (KahaTransaction)transactions.get(key); 188 if (result == null) { 189 result = new KahaTransaction(); 190 transactions.put(key, result); 191 } 192 return result; 193 } 194 195 protected synchronized void removeTx(TransactionId key) { 196 transactions.remove(key); 197 prepared.remove(key); 198 } 199 200 public void delete() { 201 transactions.clear(); 202 prepared.clear(); 203 } 204 205 protected MessageStore getStoreById(Object id) { 206 return adaptor.retrieveMessageStore(id); 207 } 208 209 public void setBrokerService(BrokerService brokerService) { 210 this.brokerService = brokerService; 211 } 212 }