001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transaction; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Iterator; 023 024 import javax.transaction.xa.XAException; 025 026 import org.apache.activemq.command.TransactionId; 027 028 /** 029 * Keeps track of all the actions the need to be done when a transaction does a 030 * commit or rollback. 031 * 032 * @version $Revision: 1.5 $ 033 */ 034 public abstract class Transaction { 035 036 public static final byte START_STATE = 0; // can go to: 1,2,3 037 public static final byte IN_USE_STATE = 1; // can go to: 2,3 038 public static final byte PREPARED_STATE = 2; // can go to: 3 039 public static final byte FINISHED_STATE = 3; 040 041 private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>(); 042 private byte state = START_STATE; 043 044 public byte getState() { 045 return state; 046 } 047 048 public void setState(byte state) { 049 this.state = state; 050 } 051 052 public void addSynchronization(Synchronization r) { 053 synchronizations.add(r); 054 if (state == START_STATE) { 055 state = IN_USE_STATE; 056 } 057 } 058 059 public void removeSynchronization(Synchronization r) { 060 synchronizations.remove(r); 061 } 062 063 public void prePrepare() throws Exception { 064 065 // Is it ok to call prepare now given the state of the 066 // transaction? 067 switch (state) { 068 case START_STATE: 069 case IN_USE_STATE: 070 break; 071 default: 072 XAException xae = new XAException("Prepare cannot be called now."); 073 xae.errorCode = XAException.XAER_PROTO; 074 throw xae; 075 } 076 077 // // Run the prePrepareTasks 078 // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) { 079 // Callback r = (Callback) iter.next(); 080 // r.execute(); 081 // } 082 } 083 084 protected void fireAfterCommit() throws Exception { 085 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { 086 Synchronization s = iter.next(); 087 s.afterCommit(); 088 } 089 } 090 091 public void fireAfterRollback() throws Exception { 092 Collections.reverse(synchronizations); 093 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { 094 Synchronization s = iter.next(); 095 s.afterRollback(); 096 } 097 } 098 099 public String toString() { 100 return super.toString() + "[synchronizations=" + synchronizations + "]"; 101 } 102 103 public abstract void commit(boolean onePhase) throws XAException, IOException; 104 105 public abstract void rollback() throws XAException, IOException; 106 107 public abstract int prepare() throws XAException, IOException; 108 109 public abstract TransactionId getTransactionId(); 110 111 public boolean isPrepared() { 112 return getState() == PREPARED_STATE; 113 } 114 115 public int size() { 116 return synchronizations.size(); 117 } 118 }