001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.commons.transaction.file;
018    
019    import java.io.BufferedReader;
020    import java.io.BufferedWriter;
021    import java.io.File;
022    import java.io.FileInputStream;
023    import java.io.FileNotFoundException;
024    import java.io.FileOutputStream;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.InputStreamReader;
028    import java.io.OutputStream;
029    import java.io.OutputStreamWriter;
030    import java.util.ArrayList;
031    import java.util.Collection;
032    import java.util.HashMap;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.Iterator;
036    import java.util.Collections;
037    
038    import org.apache.commons.transaction.locking.GenericLock;
039    import org.apache.commons.transaction.locking.GenericLockManager;
040    import org.apache.commons.transaction.locking.LockException;
041    import org.apache.commons.transaction.locking.LockManager2;
042    import org.apache.commons.transaction.util.FileHelper;
043    import org.apache.commons.transaction.util.LoggerFacade;
044    
045    /**
046     * A resource manager for streamable objects stored in a file system.
047     * 
048     * It is intended for developer and "out of the box" use. 
049     * It is <em>not</em> intended to be a real alternative for
050     * a full blown DMBS (of course it can not be compared to a RDBMS at all).
051     * 
052     * Major features:<br>
053     * <ul>
054     * <li>Transactions performed with this class more or less comform to the widely accepted ACID properties
055     * <li>Reading should be as fast as from the ordinary file system (at the cost of a bit slower commits) 
056     * </ul>
057     * 
058     * Compared to a "real" DBMS major limitations are (in order of assumed severity):<br>
059     * <ul>
060     * <li>Number of simultaneously open resources is limited to the number of available file descriptors
061     * <li>It does not scale a bit
062     * <li>Pessimistic transaction and locking scheme
063     * <li>Isolation level currently is restricted to <em>read committed</em> and <em>repeated read</em> (which is not that bad)
064     * </ul>
065     * 
066     * <em>Important</em>: If possible you should have the work and store directory located in the 
067     * same file system. If not, you might get additional problems, as there are:
068     * <ul>
069     * <li>On commit it might be necessay to copy files instead of rename/relink them. This may lead to time consuming, 
070     * overly blocking commit phases and higher risk of corrupted files
071     * <li>Prepare phase might be too permissive, no check for sufficient memory on store file system is possible
072     * </ul> 
073     * 
074     * General limitations include:<br>
075     * <ul>
076     * <li>Due to lack of synchronization on the transaction context level, every transaction may only be
077     * accessed by a <em>single thread</em> throughout its full life. 
078     * This means it is forbidden for a thread that has not started a transaction 
079     * to perform any operations inside this transaction. However, threads associated
080     * with different transactions can safely access these methods concurrently.
081     * Reasons for the lack of synchronization are improved performance and simplicity (of the code of this class).
082     * <li>There is no dedicated class for a transaction. Having such a class would be better practice and 
083     * make certain actions more intuitive.
084     * <li>Resource identifiers need a reasonsable string representation obtainable by <code>toString</code>.
085     * More specifically, they will have to resolve to a <em>valid</em> file path that does note denote a directory. 
086     * If it does, you might be able to create it, but not to read or write anything 
087     * from resp. to it. Valid string representations of a resource idenfier are 
088     * for example "file" "/root" or "hjfhdfhuhuhsdufhdsufhdsufhdfuhdfduhduhduhdu". 
089     * Invalid are for example "/" or "/root/". Invalid on some file systems are for example "c:" or "file://huhu".
090     * <li>As there are no active processes inside this RM and it shares its threads with the application,
091     * control over transactions is limited to points where the application calls the RM. 
092     * In particular, this disables <em>active</em> termination of transactions upon timeout.
093     * <li>There is no notion of a connection to this file manager. This means you can not connect from hosts other than
094     * local and you will get problems when plugging this store into a J2EE store using connectors. 
095     * <li>Methods should throw more specific exceptions
096     * </ul>
097     * 
098     * <p><em>Caution</em>:<br>
099     * The <code>txId</code> passed to many methods as an identifier for the
100     * transaction concerned will function as a key in a <code>HashMap</code>.
101     * Thus assure that <code>equals</code> and <code>hashCode</code> are both
102     * properly implemented and match each other.</p>
103     *  
104     * <p><em>Caution</em>:<br>
105     * You will have to guarantee that no other process will access neither
106     * the store or the working dir concurrently to this <code>FileResourceManager</code>.</p>
107     * 
108     * <p><em>Special Caution</em>:<br>
109     * Be very careful not to have two instances of <code>FileResourceManager</code>
110     * working in the same store and/or working dir.
111     *   
112     * @version $Id: FileResourceManager.java 573315 2007-09-06 16:28:42Z ozeigermann $
113     */
114    public class FileResourceManager implements ResourceManager, ResourceManagerErrorCodes {
115    
116        // reflects the natural isolation level of this store
117        protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ;
118        protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL;
119    
120        protected static final int NO_LOCK = 0;
121        protected static final int LOCK_ACCESS = NO_LOCK + 1;
122        protected static final int LOCK_SHARED = NO_LOCK + 2;
123        protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3;
124        protected static final int LOCK_COMMIT = NO_LOCK + 4;
125    
126        protected static final int OPERATION_MODE_STOPPED = 0;
127        protected static final int OPERATION_MODE_STOPPING = 1;
128        protected static final int OPERATION_MODE_STARTED = 2;
129        protected static final int OPERATION_MODE_STARTING = 3;
130        protected static final int OPERATION_MODE_RECOVERING = 4;
131    
132        protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15";
133    
134        protected static final int DEFAULT_TIMEOUT_MSECS = 5000;
135        protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2;
136    
137        protected static final String WORK_CHANGE_DIR = "change";
138        protected static final String WORK_DELETE_DIR = "delete";
139    
140        protected static final String CONTEXT_FILE = "transaction.log";
141    
142        /*
143         * --- Static helper methods ---
144         *
145         *  
146         */
147    
148        protected static void applyDeletes(File removeDir, File targetDir, File rootDir)
149                throws IOException {
150            if (removeDir.isDirectory() && targetDir.isDirectory()) {
151                File[] files = removeDir.listFiles();
152                for (int i = 0; i < files.length; i++) {
153                    File removeFile = files[i];
154                    File targetFile = new File(targetDir, removeFile.getName());
155                    if (removeFile.isFile()) {
156                        if (targetFile.exists()) {
157                            if (!targetFile.delete()) {
158                                throw new IOException("Could not delete file " + removeFile.getName()
159                                        + " in directory targetDir");
160                            }
161                        }
162                        // indicate, this has been done
163                        removeFile.delete();
164                    } else {
165                        applyDeletes(removeFile, targetFile, rootDir);
166                    }
167                }
168                // delete empty target directories, except root dir
169                if (!targetDir.equals(rootDir) && targetDir.list().length == 0) {
170                    targetDir.delete();
171                }
172            }
173        }
174        
175        /*
176         * --- object members ---
177         * 
178         *  
179         */
180    
181        protected String workDir;
182        protected String storeDir;
183        protected boolean cleanUp = true;
184        protected boolean dirty = false;
185        protected int operationMode = OPERATION_MODE_STOPPED;
186        protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS;
187        protected boolean debug;
188    
189        protected LoggerFacade logger;
190    
191        protected Map globalTransactions;
192        protected List globalOpenResources;
193        protected LockManager2 lockManager;
194    
195        protected ResourceIdToPathMapper idMapper = null;
196        protected TransactionIdToPathMapper txIdMapper = null;
197    
198        protected int idCnt = 0;
199    
200        /*
201         * --- ctor and general getter / setter methods ---
202         *
203         *  
204         */
205    
206        /**
207         * Creates a new resource manager operation on the specified directories.
208         * 
209         * @param storeDir directory where main data should go after commit
210         * @param workDir directory where transactions store temporary data
211         * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
212         * @param logger the logger to be used by this store  
213         */
214        public FileResourceManager(String storeDir, String workDir, boolean urlEncodePath, LoggerFacade logger) {
215            this(storeDir, workDir, urlEncodePath, logger, false);
216        }
217    
218        /**
219         * Creates a new resource manager operation on the specified directories.
220         * 
221         * @param storeDir directory where main data should go after commit
222         * @param workDir directory where transactions store temporary data 
223         * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
224         * @param logger the logger to be used by this store
225         * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 
226         */
227        public FileResourceManager(
228            String storeDir,
229            String workDir,
230            boolean urlEncodePath,
231            LoggerFacade logger,
232            boolean debug) {
233            this(storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper() : null, new NoOpTransactionIdToPathMapper(), logger, debug);
234        }
235    
236        /**
237         * Creates a new resource manager operation on the specified directories.
238         * This constructor is reintroduced for backwards API compatibility and is used by jakarta-slide.
239         *
240         * @param storeDir directory where main data should go after commit
241         * @param workDir directory where transactions store temporary data
242         * @param idMapper mapper for resourceId to path
243         * @param logger the logger to be used by this store
244         * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
245         */
246        public FileResourceManager(
247            String storeDir,
248            String workDir,
249            ResourceIdToPathMapper idMapper,
250            LoggerFacade logger,
251            boolean debug) {
252            this(storeDir, workDir, idMapper, new NoOpTransactionIdToPathMapper(), logger, debug);
253        }
254        /**
255         * Creates a new resource manager operation on the specified directories.
256         * 
257         * @param storeDir directory where main data should go after commit
258         * @param workDir directory where transactions store temporary data 
259         * @param idMapper mapper for resourceId to path
260         * @param txIdMapper mapper for transaction id to path
261         * @param logger the logger to be used by this store
262         * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 
263         */
264        public FileResourceManager(
265            String storeDir,
266            String workDir,
267            ResourceIdToPathMapper idMapper,
268            TransactionIdToPathMapper txIdMapper,
269            LoggerFacade logger,
270            boolean debug) {
271            this.workDir = workDir;
272            this.storeDir = storeDir;
273            this.idMapper = idMapper;
274            this.txIdMapper = txIdMapper;
275            this.logger = logger;
276            this.debug = debug;
277        }
278    
279        /**
280         * Gets the store directory.
281         * 
282         * @return the store directory
283         * @see #FileResourceManager(String, String, boolean, LoggerFacade)
284         * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
285         */
286        public String getStoreDir() {
287            return storeDir;
288        }
289    
290        /**
291         * Gets the working directory.
292         * 
293         * @return the work directory
294         * @see #FileResourceManager(String, String, boolean, LoggerFacade)
295         * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
296         */
297        public String getWorkDir() {
298            return workDir;
299        }
300    
301        /**
302         * Gets the logger used by this resource manager. 
303         * 
304         * @return used logger 
305         */
306        public LoggerFacade getLogger() {
307            return logger;
308        }
309    
310        /*
311         * --- public methods of interface ResourceManager ---
312         *
313         *  
314         */
315    
316        public boolean lockResource(Object resourceId, Object txId) throws ResourceManagerException {
317            lockResource(resourceId, txId, false);
318            // XXX will never return false as it will either throw or return true
319            return true;
320        }
321    
322        public boolean lockResource(Object resourceId, Object txId, boolean shared) throws ResourceManagerException {
323            lockResource(resourceId, txId, shared, true, Long.MAX_VALUE, true);
324            // XXX will never return false as it will either throw or return true
325            return true;
326        }
327    
328        public boolean lockResource(
329            Object resourceId,
330            Object txId,
331            boolean shared,
332            boolean wait,
333            long timeoutMSecs,
334            boolean reentrant)
335            throws ResourceManagerException {
336    
337            TransactionContext context = (shared ? txInitialSaneCheck(txId) : txInitialSaneCheckForWriting(txId));
338            assureNotMarkedForRollback(context);
339            fileInitialSaneCheck(txId, resourceId);
340    
341            // XXX allows locking of non existent resources (e.g. to prepare a create)
342            int level = (shared ? getSharedLockLevel(context) : LOCK_EXCLUSIVE);
343            try {
344                lockManager.lock(txId, resourceId, level, reentrant, Math.min(timeoutMSecs,
345                        context.timeoutMSecs));
346                // XXX will never return false as it will either throw or return true
347                return true;
348            } catch (LockException e) {
349                switch (e.getCode()) {
350                case LockException.CODE_INTERRUPTED:
351                    throw new ResourceManagerException("Could not get lock for resource at '"
352                            + resourceId + "'", ERR_NO_LOCK, txId);
353                case LockException.CODE_TIMED_OUT:
354                    throw new ResourceManagerException("Lock timed out for resource at '" + resourceId
355                            + "'", ERR_NO_LOCK, txId);
356                case LockException.CODE_DEADLOCK_VICTIM:
357                    throw new ResourceManagerException("Deadlock victim resource at '" + resourceId
358                            + "'", ERR_DEAD_LOCK, txId);
359                default :
360                    throw new ResourceManagerException("Locking exception for resource at '" + resourceId
361                            + "'", ERR_DEAD_LOCK, txId);
362                }
363            }
364        }
365    
366        public int getDefaultIsolationLevel() {
367            return DEFAULT_ISOLATION_LEVEL;
368        }
369    
370        public int[] getSupportedIsolationLevels() throws ResourceManagerException {
371            return new int[] { ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ };
372        }
373    
374        public boolean isIsolationLevelSupported(int level) throws ResourceManagerException {
375            return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ);
376        }
377    
378        /**
379         * Gets the default transaction timeout in <em>milliseconds</em>.
380         */
381        public long getDefaultTransactionTimeout() {
382            return defaultTimeout;
383        }
384    
385        /**
386         * Sets the default transaction timeout.
387         * 
388         * @param timeout timeout in <em>milliseconds</em>
389         */
390        public void setDefaultTransactionTimeout(long timeout) {
391            defaultTimeout = timeout;
392        }
393    
394        public long getTransactionTimeout(Object txId) throws ResourceManagerException {
395            assureRMReady();
396            long msecs = 0;
397            TransactionContext context = getContext(txId);
398            if (context == null) {
399                msecs = getDefaultTransactionTimeout();
400            } else {
401                msecs = context.timeoutMSecs;
402            }
403            return msecs;
404        }
405    
406        public void setTransactionTimeout(Object txId, long mSecs) throws ResourceManagerException {
407            assureRMReady();
408            TransactionContext context = getContext(txId);
409            if (context != null) {
410                context.timeoutMSecs = mSecs;
411            } else {
412                throw new ResourceManagerException(ERR_NO_TX, txId);
413            }
414        }
415    
416        public int getIsolationLevel(Object txId) throws ResourceManagerException {
417            assureRMReady();
418            TransactionContext context = getContext(txId);
419            if (context == null) {
420                return DEFAULT_ISOLATION_LEVEL;
421            } else {
422                return context.isolationLevel;
423            }
424        }
425    
426        public void setIsolationLevel(Object txId, int level) throws ResourceManagerException {
427            assureRMReady();
428            TransactionContext context = getContext(txId);
429            if (context != null) {
430                if (level != ISOLATION_LEVEL_READ_COMMITTED || level != ISOLATION_LEVEL_REPEATABLE_READ) {
431                    context.isolationLevel = level;
432                } else {
433                    throw new ResourceManagerException(ERR_ISOLATION_LEVEL_UNSUPPORTED, txId);
434                }
435            } else {
436                throw new ResourceManagerException(ERR_NO_TX, txId);
437            }
438        }
439    
440        public synchronized void start() throws ResourceManagerSystemException {
441    
442            logger.logInfo("Starting RM at '" + storeDir + "' / '" + workDir + "'");
443    
444            operationMode = OPERATION_MODE_STARTING;
445    
446            globalTransactions = Collections.synchronizedMap(new HashMap());
447            lockManager = new GenericLockManager(LOCK_COMMIT, logger);
448            globalOpenResources = Collections.synchronizedList(new ArrayList());
449    
450            recover();
451            sync();
452    
453            operationMode = OPERATION_MODE_STARTED;
454    
455            if (dirty) {
456                logger.logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)");
457            } else {
458                logger.logInfo("Started RM");
459            }
460    
461        }
462    
463        public synchronized boolean stop(int mode) throws ResourceManagerSystemException {
464            return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR);
465        }
466    
467        public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException {
468    
469            logger.logInfo("Stopping RM at '" + storeDir + "' / '" + workDir + "'");
470    
471            operationMode = OPERATION_MODE_STOPPING;
472    
473            sync();
474            boolean success = shutdown(mode, timeOut);
475    
476            releaseGlobalOpenResources();
477    
478            if (success) {
479                operationMode = OPERATION_MODE_STOPPED;
480                logger.logInfo("Stopped RM");
481            } else {
482                logger.logWarning("Failed to stop RM");
483            }
484    
485            return success;
486        }
487    
488        public synchronized boolean recover() throws ResourceManagerSystemException {
489            if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STARTING) {
490                throw new ResourceManagerSystemException(
491                    ERR_SYSTEM,
492                    "Recovery is possible in started or starting resource manager only");
493            }
494            int oldMode = operationMode;
495            operationMode = OPERATION_MODE_RECOVERING;
496    
497            recoverContexts();
498            if (globalTransactions.size() > 0) {
499                logger.logInfo("Recovering pending transactions");
500            }
501    
502            dirty = !rollBackOrForward();
503    
504            operationMode = oldMode;
505            return dirty;
506        }
507    
508        public int getTransactionState(Object txId) throws ResourceManagerException {
509            TransactionContext context = getContext(txId);
510    
511            if (context == null) {
512                return STATUS_NO_TRANSACTION;
513            } else {
514                return context.status;
515            }
516    
517        }
518    
519        public void startTransaction(Object txId) throws ResourceManagerException {
520    
521            if (logger.isFineEnabled()) logger.logFine("Starting Tx " + txId);
522    
523            assureStarted(); // can only start a new transaction when not already stopping
524            if (txId == null || txIdMapper.getPathForId(txId).length() == 0) {
525                throw new ResourceManagerException(ERR_TXID_INVALID, txId);
526            }
527    
528            // be sure we are the only ones who create this tx 
529            synchronized (globalTransactions) {
530                TransactionContext context = getContext(txId);
531    
532                if (context != null) {
533                    throw new ResourceManagerException(ERR_DUP_TX, txId);
534                }
535    
536                context = new TransactionContext(txId);
537                context.init();
538                globalTransactions.put(txId, context);
539    
540            }
541        }
542    
543        public void markTransactionForRollback(Object txId) throws ResourceManagerException {
544            assureRMReady();
545            TransactionContext context = txInitialSaneCheckForWriting(txId);
546            try {
547                context.status = STATUS_MARKED_ROLLBACK;
548                context.saveState();
549            } finally {
550                // be very sure to free locks and resources, as application might crash or otherwise forget to roll this tx back
551                context.finalCleanUp();
552            }
553        }
554    
555        public int prepareTransaction(Object txId) throws ResourceManagerException {
556            assureRMReady();
557            // do not allow any further writing or commit or rollback when db is corrupt
558            if (dirty) {
559                throw new ResourceManagerSystemException(
560                    "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
561                    ERR_SYSTEM,
562                    txId);
563            }
564    
565            if (txId == null) {
566                throw new ResourceManagerException(ERR_TXID_INVALID, txId);
567            }
568    
569            TransactionContext context = getContext(txId);
570    
571            if (context == null) {
572                return PREPARE_FAILURE;
573            }
574    
575            synchronized (context) {
576    
577                sync();
578    
579                if (context.status != STATUS_ACTIVE) {
580                    context.status = STATUS_MARKED_ROLLBACK;
581                    context.saveState();
582                    return PREPARE_FAILURE;
583                }
584    
585                if (logger.isFineEnabled()) logger.logFine("Preparing Tx " + txId);
586    
587                int prepareStatus = PREPARE_FAILURE;
588    
589                context.status = STATUS_PREPARING;
590                context.saveState();
591                // do all checks as early as possible
592                context.closeResources();
593                if (context.readOnly) {
594                    prepareStatus = PREPARE_SUCCESS_READONLY;
595                } else {
596                    // do all checks as early as possible
597                    try {
598                        context.upgradeLockToCommit();
599                    } catch (ResourceManagerException rme) {
600                        // if this did not work, mark it for roll back as early as possible
601                        markTransactionForRollback(txId);
602                        throw rme;
603                    }
604                    prepareStatus = PREPARE_SUCCESS;
605                }
606                context.status = STATUS_PREPARED;
607                context.saveState();
608                if (logger.isFineEnabled()) logger.logFine("Prepared Tx " + txId);
609    
610                return prepareStatus;
611            }
612        }
613    
614        public void rollbackTransaction(Object txId) throws ResourceManagerException {
615            assureRMReady();
616            TransactionContext context = txInitialSaneCheckForWriting(txId);
617            // needing synchronization in order not to interfer with shutdown thread
618            synchronized (context) {
619                try {
620    
621                    if (logger.isFineEnabled()) logger.logFine("Rolling back Tx " + txId);
622    
623                    context.status = STATUS_ROLLING_BACK;
624                    context.saveState();
625                    context.rollback();
626                    if (logger.isFineEnabled()) logger.logFine("All resources successfully removed for tx" + txId);
627                    context.status = STATUS_ROLLEDBACK;
628                    context.saveState();
629                    globalTransactions.remove(txId);
630                    context.cleanUp();
631    
632                    if (logger.isFineEnabled()) logger.logFine("Rolled back Tx " + txId);
633    
634                    // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
635                } catch (Error e) {
636                    setDirty(txId, e);
637                    throw e;
638                } catch (RuntimeException e) {
639                    setDirty(txId, e);
640                    throw e;
641                } catch (ResourceManagerSystemException e) {
642                    setDirty(txId, e);
643                    throw e;
644                } finally {
645                    context.finalCleanUp();
646                    // tell shutdown thread this tx is finished
647                    context.notifyFinish();
648                }
649            }
650        }
651    
652        public void commitTransaction(Object txId) throws ResourceManagerException {
653            assureRMReady();
654            TransactionContext context = txInitialSaneCheckForWriting(txId);
655            assureNotMarkedForRollback(context);
656    
657            // needing synchronization in order not to interfer with shutdown thread
658            synchronized (context) {
659                try {
660    
661                    if (logger.isFineEnabled()) logger.logFine("Committing Tx " + txId);
662    
663                    context.status = STATUS_COMMITTING;
664                    context.saveState();
665                    context.commit();
666                    if (logger.isFineEnabled()) logger.logFine("All resources successfully moved for tx" + txId);
667                    context.status = STATUS_COMMITTED;
668                    context.saveState();
669                    globalTransactions.remove(txId);
670                    context.cleanUp();
671    
672                    if (logger.isFineEnabled()) logger.logFine("Committed Tx " + txId);
673    
674                    // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
675                } catch (Error e) {
676                    setDirty(txId, e);
677                    throw e;
678                } catch (RuntimeException e) {
679                    setDirty(txId, e);
680                    throw e;
681                } catch (ResourceManagerSystemException e) {
682                    setDirty(txId, e);
683                    throw e;
684                    // like "could not upgrade lock"
685                } catch (ResourceManagerException e) {
686                    logger.logWarning("Could not commit tx " + txId + ", rolling back instead", e);
687                    rollbackTransaction(txId);
688                } finally {
689                    context.finalCleanUp();
690                    // tell shutdown thread this tx is finished
691                    context.notifyFinish();
692                }
693            }
694        }
695    
696        public boolean resourceExists(Object resourceId) throws ResourceManagerException {
697            // create temporary light weight tx
698            Object txId;
699            TransactionContext context;
700            synchronized (globalTransactions) {
701                txId = generatedUniqueTxId();
702                if (logger.isFinerEnabled())
703                    logger.logFiner("Creating temporary light weight tx " + txId + " to check for exists");
704                context = new TransactionContext(txId);
705                context.isLightWeight = true;
706                // XXX higher isolation might be needed to make sure upgrade to commit lock always works
707                context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
708                // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
709                globalTransactions.put(txId, context);
710            }
711    
712            boolean exists = resourceExists(txId, resourceId);
713    
714            context.freeLocks();
715            globalTransactions.remove(txId);
716            if (logger.isFinerEnabled())
717                logger.logFiner("Removing temporary light weight tx " + txId);
718    
719            return exists;
720        }
721    
722        public boolean resourceExists(Object txId, Object resourceId) throws ResourceManagerException {
723            lockResource(resourceId, txId, true);
724            return (getPathForRead(txId, resourceId) != null);
725        }
726    
727        public void deleteResource(Object txId, Object resourceId) throws ResourceManagerException {
728            deleteResource(txId, resourceId, true);
729        }
730    
731        public void deleteResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
732    
733            if (logger.isFineEnabled()) logger.logFine(txId + " deleting " + resourceId);
734    
735            lockResource(resourceId, txId, false);
736    
737            if (getPathForRead(txId, resourceId) == null) {
738                if (assureOnly) {
739                    return;
740                }
741                throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
742            }
743            String txDeletePath = getDeletePath(txId, resourceId);
744            String mainPath = getMainPath(resourceId);
745            try {
746                getContext(txId).readOnly = false;
747    
748                // first undo change / create when there was one
749                undoScheduledChangeOrCreate(txId, resourceId);
750    
751                // if there still is a file in main store, we need to schedule
752                // a delete additionally
753                if (FileHelper.fileExists(mainPath)) {
754                    FileHelper.createFile(txDeletePath);
755                }
756            } catch (IOException e) {
757                throw new ResourceManagerSystemException(
758                    "Can not delete resource at '" + resourceId + "'",
759                    ERR_SYSTEM,
760                    txId,
761                    e);
762            }
763        }
764    
765        public void createResource(Object txId, Object resourceId) throws ResourceManagerException {
766            createResource(txId, resourceId, true);
767        }
768    
769        public void createResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
770    
771            if (logger.isFineEnabled()) logger.logFine(txId + " creating " + resourceId);
772    
773            lockResource(resourceId, txId, false);
774    
775            if (getPathForRead(txId, resourceId) != null) {
776                if (assureOnly) {
777                    return;
778                }
779                throw new ResourceManagerException(
780                    "Resource at '" + resourceId + "', already exists",
781                    ERR_RESOURCE_EXISTS,
782                    txId);
783            }
784    
785            String txChangePath = getChangePath(txId, resourceId);
786            try {
787                getContext(txId).readOnly = false;
788                
789                // creation means either undoing a delete or actually scheduling a create
790                if (!undoScheduledDelete(txId, resourceId)) {
791                    FileHelper.createFile(txChangePath);
792                }
793    
794            } catch (IOException e) {
795                throw new ResourceManagerSystemException(
796                    "Can not create resource at '" + resourceId + "'",
797                    ERR_SYSTEM,
798                    txId,
799                    e);
800            }
801        }
802    
803        public void copyResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
804            if (logger.isFineEnabled()) logger.logFine(txId + " copying " + fromResourceId + " to " + toResourceId);
805    
806            lockResource(fromResourceId, txId, true);
807            lockResource(toResourceId, txId, false);
808    
809            if (resourceExists(txId, toResourceId) && !overwrite) {
810                throw new ResourceManagerException(
811                    "Resource at '" + toResourceId + "' already exists",
812                    ERR_RESOURCE_EXISTS,
813                    txId);
814            }
815    
816            InputStream fromResourceStream = null;
817            OutputStream toResourceStream = null;
818            try {
819                fromResourceStream = readResource(txId, fromResourceId);
820                toResourceStream = writeResource(txId, toResourceId);
821                FileHelper.copy(fromResourceStream, toResourceStream);
822            } catch (IOException e) {
823                throw new ResourceManagerException(ERR_SYSTEM, txId, e);
824            } finally {
825                closeOpenResource(fromResourceStream);
826                closeOpenResource(toResourceStream);
827            }
828        }
829    
830        public void moveResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
831            if (logger.isFineEnabled()) logger.logFine(txId + " moving " + fromResourceId + " to " + toResourceId);
832    
833            lockResource(fromResourceId, txId, false);
834            lockResource(toResourceId, txId, false);
835    
836            copyResource(txId, fromResourceId, toResourceId, overwrite);
837    
838            deleteResource(txId, fromResourceId, false);
839        }
840    
841        public InputStream readResource(Object resourceId) throws ResourceManagerException {
842            // create temporary light weight tx
843            Object txId;
844            synchronized (globalTransactions) {
845                txId = generatedUniqueTxId();
846                if (logger.isFinerEnabled())
847                    logger.logFiner("Creating temporary light weight tx " + txId + " for reading");
848                TransactionContext context = new TransactionContext(txId);
849                context.isLightWeight = true;
850                // XXX higher isolation might be needed to make sure upgrade to commit lock always works
851                context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
852                // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
853                globalTransactions.put(txId, context);
854            }
855    
856            InputStream is = readResource(txId, resourceId);
857            return is;
858        }
859    
860        public InputStream readResource(Object txId, Object resourceId) throws ResourceManagerException {
861    
862            if (logger.isFineEnabled()) logger.logFine(txId + " reading " + resourceId);
863    
864            lockResource(resourceId, txId, true);
865    
866            String resourcePath = getPathForRead(txId, resourceId);
867            if (resourcePath == null) {
868                throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
869            }
870    
871            File file = new File(resourcePath);
872            try {
873                FileInputStream stream = new FileInputStream(file);
874                getContext(txId).registerResource(stream);
875                return new InputStreamWrapper(stream, txId, resourceId);
876            } catch (FileNotFoundException e) {
877                throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
878            }
879        }
880    
881        public OutputStream writeResource(Object txId, Object resourceId) throws ResourceManagerException {
882            return writeResource(txId, resourceId, false);
883        }
884    
885        public OutputStream writeResource(Object txId, Object resourceId, boolean append) throws ResourceManagerException {
886    
887            if (logger.isFineEnabled()) logger.logFine(txId + " writing " + resourceId);
888    
889            lockResource(resourceId, txId, false);
890    
891            if (append) {
892                String mainPath = getMainPath(resourceId);
893                String txChangePath = getChangePath(txId, resourceId);
894                String txDeletePath = getDeletePath(txId, resourceId);
895    
896                boolean changeExists = FileHelper.fileExists(txChangePath);
897                boolean deleteExists = FileHelper.fileExists(txDeletePath);
898                boolean mainExists = FileHelper.fileExists(mainPath);
899    
900                if (mainExists && !changeExists && !deleteExists) {
901                    // the read and the write path for resourceId will be different!
902                    copyResource(txId, resourceId, resourceId, true);
903                }
904            }
905    
906            String resourcePath = getPathForWrite(txId, resourceId);
907    
908            try {
909                FileOutputStream stream = new FileOutputStream(resourcePath, append);
910                TransactionContext context = getContext(txId);
911                context.registerResource(stream);
912                context.readOnly = false;
913                return stream;
914            } catch (FileNotFoundException e) {
915                throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
916            }
917        }
918    
919        /*
920         * --- additional public methods complementing implementation of interfaces ---
921         *
922         *  
923         */
924    
925        /**
926         * Resets the store by deleting work <em>and</em> store directory.
927         */
928        public synchronized void reset() {
929            FileHelper.removeRec(new File(storeDir));
930            FileHelper.removeRec(new File(workDir));
931            new File(storeDir).mkdirs();
932            new File(workDir).mkdirs();
933        }
934    
935        /**
936         * Synchronizes persistent data with caches. Is implemented with an empty
937         * body, but called by other methods relying on synchronization. Subclasses
938         * that utilize caching must implement this method reasonably.
939         * 
940         * @throws ResourceManagerSystemException if anything fatal hapened during synchonization
941         */
942        public synchronized void sync() throws ResourceManagerSystemException {
943        }
944    
945        /**
946         * Generates a transaction identifier unique to this resource manager. To do so
947         * it requires this resource manager to be started.
948         * 
949         * @return generated transaction identifier
950         * @throws ResourceManagerSystemException if this resource manager has not been started, yet
951         */
952        public String generatedUniqueTxId() throws ResourceManagerSystemException {
953            assureRMReady();
954            String txId;
955            synchronized (globalTransactions) {
956                do {
957                    txId = Long.toHexString(System.currentTimeMillis()) + "-"
958                            + Integer.toHexString(idCnt++);
959                    // XXX busy loop
960                } while (getContext(txId) != null);
961            }
962            return txId;
963        }
964    
965        /*
966         * --- sane checks ---
967         *
968         *  
969         */
970    
971        protected void fileInitialSaneCheck(Object txId, Object path) throws ResourceManagerException {
972            if (path == null || path.toString().length() == 0) {
973                throw new ResourceManagerException(ERR_RESOURCEID_INVALID, txId);
974            }
975        }
976    
977        protected void assureStarted() throws ResourceManagerSystemException {
978            if (operationMode != OPERATION_MODE_STARTED) {
979                throw new ResourceManagerSystemException("Resource Manager Service not started", ERR_SYSTEM, null);
980            }
981        }
982    
983        protected void assureRMReady() throws ResourceManagerSystemException {
984            if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) {
985                throw new ResourceManagerSystemException("Resource Manager Service not ready", ERR_SYSTEM, null);
986            }
987        }
988    
989        protected void assureNotMarkedForRollback(TransactionContext context) throws ResourceManagerException {
990            if (context.status == STATUS_MARKED_ROLLBACK) {
991                throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK, context.txId);
992            }
993        }
994    
995        protected TransactionContext txInitialSaneCheckForWriting(Object txId) throws ResourceManagerException {
996            assureRMReady();
997            // do not allow any further writing or commit or rollback when db is corrupt
998            if (dirty) {
999                throw new ResourceManagerSystemException(
1000                    "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
1001                    ERR_SYSTEM,
1002                    txId);
1003            }
1004            return txInitialSaneCheck(txId);
1005        }
1006    
1007        protected TransactionContext txInitialSaneCheck(Object txId) throws ResourceManagerException {
1008            assureRMReady();
1009            if (txId == null) {
1010                throw new ResourceManagerException(ERR_TXID_INVALID, txId);
1011            }
1012    
1013            TransactionContext context = getContext(txId);
1014    
1015            if (context == null) {
1016                throw new ResourceManagerException(ERR_NO_TX, txId);
1017            }
1018    
1019            return context;
1020        }
1021    
1022        /*
1023         * --- General Helpers ---
1024         *
1025         *  
1026         */
1027    
1028        protected TransactionContext getContext(Object txId) {
1029            return (TransactionContext) globalTransactions.get(txId);
1030        }
1031    
1032        protected String assureLeadingSlash(Object pathObject) {
1033            String path = "";
1034            if (pathObject != null) {
1035                if (idMapper != null) {
1036                    path = idMapper.getPathForId(pathObject);
1037                } else {
1038                    path = pathObject.toString();
1039                }
1040                if (path.length() > 0 && path.charAt(0) != '/' && path.charAt(0) != '\\') {
1041                    path = "/" + path;
1042                }
1043            }
1044            return path;
1045        }
1046    
1047        protected String getMainPath(Object path) {
1048            StringBuffer buf = new StringBuffer(storeDir.length() + path.toString().length() + 5);
1049            buf.append(storeDir).append(assureLeadingSlash(path));
1050            return buf.toString();
1051        }
1052    
1053        protected String getTransactionBaseDir(Object txId) {
1054            return workDir + '/' + txIdMapper.getPathForId(txId);
1055        }
1056    
1057        protected String getChangePath(Object txId, Object path) {
1058            String txBaseDir = getTransactionBaseDir(txId);
1059            StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1060                    + WORK_CHANGE_DIR.length() + 5);
1061            buf.append(txBaseDir).append('/').append(WORK_CHANGE_DIR).append(assureLeadingSlash(path));
1062            return buf.toString();
1063        }
1064    
1065        protected String getDeletePath(Object txId, Object path) {
1066            String txBaseDir = getTransactionBaseDir(txId);
1067            StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1068                    + WORK_DELETE_DIR.length() + 5);
1069            buf.append(txBaseDir).append('/').append(WORK_DELETE_DIR).append(assureLeadingSlash(path));
1070            return buf.toString();
1071        }
1072    
1073        protected boolean undoScheduledDelete(Object txId, Object resourceId) throws ResourceManagerException {
1074            String txDeletePath = getDeletePath(txId, resourceId);
1075            File deleteFile = new File(txDeletePath);
1076            if (deleteFile.exists()) {
1077                if (!deleteFile.delete()) {
1078                    throw new ResourceManagerSystemException(
1079                        "Failed to undo delete of '" + resourceId + "'",
1080                        ERR_SYSTEM,
1081                        txId);
1082                }
1083                return true;
1084            }
1085            return false;
1086        }
1087    
1088        protected boolean undoScheduledChangeOrCreate(Object txId, Object resourceId) throws ResourceManagerException {
1089            String txChangePath = getChangePath(txId, resourceId);
1090            File changeFile = new File(txChangePath);
1091            if (changeFile.exists()) {
1092                if (!changeFile.delete()) {
1093                    throw new ResourceManagerSystemException(
1094                        "Failed to undo change / create of '" + resourceId + "'",
1095                        ERR_SYSTEM,
1096                        txId);
1097                }
1098                return true;
1099            }
1100            return false;
1101        }
1102    
1103        protected String getPathForWrite(Object txId, Object resourceId) throws ResourceManagerException {
1104            try {
1105                // when we want to write, be sure to write to a local copy
1106                String txChangePath = getChangePath(txId, resourceId);
1107                if (!FileHelper.fileExists(txChangePath)) {
1108                    FileHelper.createFile(txChangePath);
1109                }
1110                return txChangePath;
1111            } catch (IOException e) {
1112                throw new ResourceManagerSystemException(
1113                    "Can not write to resource at '" + resourceId + "'",
1114                    ERR_SYSTEM,
1115                    txId,
1116                    e);
1117            }
1118        }
1119    
1120        protected String getPathForRead(Object txId, Object resourceId) throws ResourceManagerException {
1121    
1122            String mainPath = getMainPath(resourceId);
1123            String txChangePath = getChangePath(txId, resourceId);
1124            String txDeletePath = getDeletePath(txId, resourceId);
1125    
1126            // now, this gets a bit complicated:
1127    
1128            boolean changeExists = FileHelper.fileExists(txChangePath);
1129            boolean deleteExists = FileHelper.fileExists(txDeletePath);
1130            boolean mainExists = FileHelper.fileExists(mainPath);
1131            boolean resourceIsDir =
1132                ((mainExists && new File(mainPath).isDirectory())
1133                    || (changeExists && new File(txChangePath).isDirectory()));
1134            if (resourceIsDir) {
1135                logger.logWarning("Resource at '" + resourceId + "' maps to directory");
1136            }
1137    
1138            // first do some sane checks
1139    
1140            // this may never be, two cases are possible, both disallowing to have a delete together with a change
1141            // 1. first there was a change, than a delete -> at least delete file exists (when there is a file in main store)
1142            // 2. first there was a delete, than a change -> only change file exists
1143            if (!resourceIsDir && changeExists && deleteExists) {
1144                throw new ResourceManagerSystemException(
1145                    "Inconsistent delete and change combination for resource at '" + resourceId + "'",
1146                    ERR_TX_INCONSISTENT,
1147                    txId);
1148            }
1149    
1150            // you should not have been allowed to delete a file that does not exist at all
1151            if (deleteExists && !mainExists) {
1152                throw new ResourceManagerSystemException(
1153                    "Inconsistent delete for resource at '" + resourceId + "'",
1154                    ERR_TX_INCONSISTENT,
1155                    txId);
1156            }
1157    
1158            if (changeExists) {
1159                return txChangePath;
1160            } else if (mainExists && !deleteExists) {
1161                return mainPath;
1162            } else {
1163                return null;
1164            }
1165        }
1166    
1167        /*
1168         * --- Locking Helpers ---
1169         *
1170         *  
1171         */
1172    
1173        protected int getSharedLockLevel(TransactionContext context) throws ResourceManagerException {
1174            if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED
1175                || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1176                return LOCK_ACCESS;
1177            } else if (
1178                context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ
1179                    || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) {
1180                return LOCK_SHARED;
1181            } else {
1182                return LOCK_ACCESS;
1183            }
1184        }
1185    
1186        /*
1187         * --- Resource Management ---
1188         *
1189         *  
1190         */
1191    
1192        protected void registerOpenResource(Object openResource) {
1193            if (logger.isFinerEnabled())
1194                logger.logFiner("Registering open resource " + openResource);
1195            globalOpenResources.add(openResource);
1196        }
1197    
1198        protected void releaseGlobalOpenResources() {
1199            ArrayList copy;
1200            synchronized (globalOpenResources) {
1201                // XXX need to copy in order to allow removal in releaseOpenResource  
1202                copy = new ArrayList(globalOpenResources);
1203                for (Iterator it = copy.iterator(); it.hasNext();) {
1204                    Object stream = it.next();
1205                    closeOpenResource(stream);
1206                }
1207            }
1208        }
1209    
1210        protected void closeOpenResource(Object openResource) {
1211            if (logger.isFinerEnabled()) logger.logFiner("Releasing resource " + openResource);
1212            globalOpenResources.remove(openResource);
1213            if (openResource instanceof InputStream) {
1214                InputStream is = (InputStream) openResource;
1215                try {
1216                    is.close();
1217                } catch (IOException e) {
1218                    // do not care, as it might have been closed somewhere else, before 
1219                }
1220            } else if (openResource instanceof OutputStream) {
1221                OutputStream os = (OutputStream) openResource;
1222                try {
1223                    os.close();
1224                } catch (IOException e) {
1225                    // do not care, as it might have been closed somewhere else, before 
1226                }
1227            }
1228        }
1229    
1230        /*
1231         * --- Recovery / Shutdown Support ---
1232         *
1233         *  
1234         */
1235    
1236        protected boolean rollBackOrForward() {
1237            boolean allCool = true;
1238    
1239            synchronized (globalTransactions) {
1240                ArrayList contexts = new ArrayList(globalTransactions.values());
1241                for (Iterator it = contexts.iterator(); it.hasNext();) {
1242                    TransactionContext context = (TransactionContext) it.next();
1243                    if (context.status == STATUS_COMMITTING) {
1244                        // roll forward
1245                        logger.logInfo("Rolling forward " + context.txId);
1246    
1247                        try {
1248                            context.commit();
1249                            context.status = STATUS_COMMITTED;
1250                            context.saveState();
1251                            globalTransactions.remove(context.txId);
1252                            context.cleanUp();
1253                        } catch (ResourceManagerException e) {
1254                            // this is not good, but what can we do now?
1255                            allCool = false;
1256                            logger.logSevere("Rolling forward of " + context.txId + " failed", e);
1257                        }
1258                    } else if (context.status == STATUS_COMMITTED) {
1259                        logger.logInfo("Cleaning already commited " + context.txId);
1260                        globalTransactions.remove(context.txId);
1261                        try {
1262                            context.cleanUp();
1263                        } catch (ResourceManagerException e) {
1264                            // this is not good, but what can we do now?
1265                            allCool = false;
1266                            logger.logWarning("Cleaning of " + context.txId + " failed", e);
1267                        }
1268                    } else {
1269                        // in all other cases roll back and warn when not rollback was explicitely selected for tx
1270                        if (context.status != STATUS_ROLLING_BACK
1271                            && context.status != STATUS_ROLLEDBACK
1272                            && context.status != STATUS_MARKED_ROLLBACK) {
1273                            logger.logWarning("Irregularly rolling back " + context.txId);
1274                        } else {
1275                            logger.logInfo("Rolling back " + context.txId);
1276                        }
1277                        try {
1278                            context.rollback();
1279                            context.status = STATUS_ROLLEDBACK;
1280                            context.saveState();
1281                            globalTransactions.remove(context.txId);
1282                            context.cleanUp();
1283                        } catch (ResourceManagerException e) {
1284                            logger.logWarning("Rolling back of " + context.txId + " failed", e);
1285                        }
1286                    }
1287                }
1288    
1289            }
1290            return allCool;
1291        }
1292    
1293        protected void recoverContexts() {
1294            File dir = new File(workDir);
1295            File[] files = dir.listFiles();
1296            if (files == null)
1297                return;
1298            for (int i = 0; i < files.length; i++) {
1299                File file = files[i];
1300                Object txId = txIdMapper.getIdForPath(file.getName());
1301                // recover all transactions we do not already know
1302                if (!globalTransactions.containsKey(txId)) {
1303    
1304                    logger.logInfo("Recovering " + txId);
1305                    TransactionContext context;
1306                    try {
1307                        context = new TransactionContext(txId);
1308                        context.recoverState();
1309                        globalTransactions.put(txId, context);
1310                    } catch (ResourceManagerException e) {
1311                        // this is not good, but the best we get, just log as warning
1312                        logger.logWarning("Recovering of " + txId + " failed");
1313                    }
1314                }
1315            }
1316        }
1317    
1318        protected boolean waitForAllTxToStop(long timeoutMSecs) {
1319            long startTime = System.currentTimeMillis();
1320    
1321            // be sure not to lock globalTransactions for too long, as we need to give
1322            // txs the chance to complete (otherwise deadlocks are very likely to occur)
1323            // instead iterate over a copy as we can be sure no new txs will be registered
1324            // after operation level has been set to stopping
1325    
1326            Collection transactionsToStop;
1327            synchronized (globalTransactions) {
1328                transactionsToStop = new ArrayList(globalTransactions.values());
1329            }
1330            for (Iterator it = transactionsToStop.iterator(); it.hasNext();) {
1331                long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1332    
1333                if (remainingTimeout <= 0) {
1334                    return false;
1335                }
1336    
1337                TransactionContext context = (TransactionContext) it.next();
1338                synchronized (context) {
1339                    if (!context.finished) {
1340                        logger.logInfo(
1341                            "Waiting for tx " + context.txId + " to finish for " + remainingTimeout + " milli seconds");
1342                    }
1343                    while (!context.finished && remainingTimeout > 0) {
1344                        try {
1345                            context.wait(remainingTimeout);
1346                        } catch (InterruptedException e) {
1347                            return false;
1348                        }
1349                        remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1350                    }
1351                    if (context.finished) {
1352                        logger.logInfo("Tx " + context.txId + " finished");
1353                    } else {
1354                        logger.logWarning("Tx " + context.txId + " failed to finish in given time");
1355                    }
1356                }
1357            }
1358    
1359            return (globalTransactions.size() == 0);
1360        }
1361    
1362        protected boolean shutdown(int mode, long timeoutMSecs) {
1363            switch (mode) {
1364                case SHUTDOWN_MODE_NORMAL :
1365                    return waitForAllTxToStop(timeoutMSecs);
1366                case SHUTDOWN_MODE_ROLLBACK :
1367                    return rollBackOrForward();
1368                case SHUTDOWN_MODE_KILL :
1369                    return true;
1370                default :
1371                    return false;
1372            }
1373        }
1374    
1375        protected void setDirty(Object txId, Throwable t) {
1376            logger.logSevere(
1377                "Fatal error during critical commit/rollback of transaction " + txId + ", setting database to dirty.",
1378                t);
1379            dirty = true;
1380        }
1381    
1382        /**
1383         * Inner class to hold the complete context, i.e. all information needed, for a transaction.
1384         * 
1385         */
1386        protected class TransactionContext {
1387    
1388            protected Object txId;
1389            protected int status = STATUS_ACTIVE;
1390            protected int isolationLevel = DEFAULT_ISOLATION_LEVEL;
1391            protected long timeoutMSecs = getDefaultTransactionTimeout();
1392            protected long startTime;
1393            protected long commitTime = -1L;
1394            protected boolean isLightWeight = false;
1395            protected boolean readOnly = true;
1396            protected boolean finished = false;
1397    
1398            // list of streams participating in this tx
1399            private List openResources = new ArrayList();
1400    
1401            public TransactionContext(Object txId) throws ResourceManagerException {
1402                this.txId = txId;
1403                startTime = System.currentTimeMillis();
1404            }
1405    
1406            public long getRemainingTimeout() {
1407                long now = System.currentTimeMillis();
1408                return (startTime - now + timeoutMSecs);
1409            }
1410    
1411            public synchronized void init() throws ResourceManagerException {
1412                String baseDir = getTransactionBaseDir(txId);
1413                String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1414                String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1415    
1416                new File(changeDir).mkdirs();
1417                new File(deleteDir).mkdirs();
1418    
1419                saveState();
1420            }
1421    
1422            public synchronized void rollback() throws ResourceManagerException {
1423                closeResources();
1424                freeLocks();
1425            }
1426    
1427            public synchronized void commit() throws ResourceManagerException {
1428                String baseDir = getTransactionBaseDir(txId);
1429                String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1430                String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1431    
1432                closeResources();
1433                upgradeLockToCommit();
1434                try {
1435                    applyDeletes(new File(deleteDir), new File(storeDir), new File(storeDir));
1436                    FileHelper.moveRec(new File(changeDir), new File(storeDir));
1437                } catch (IOException e) {
1438                    throw new ResourceManagerSystemException("Commit failed", ERR_SYSTEM, txId, e);
1439                }
1440                freeLocks();
1441                commitTime = System.currentTimeMillis();
1442            }
1443    
1444            public synchronized void notifyFinish() {
1445                finished = true;
1446                notifyAll();
1447            }
1448    
1449            public synchronized void cleanUp() throws ResourceManagerException {
1450                if (!cleanUp)
1451                    return; // XXX for debugging only
1452                boolean clean = true;
1453                Exception cleanException = null;
1454                String baseDir = getTransactionBaseDir(txId);
1455                FileHelper.removeRec(new File(baseDir));
1456                if (!clean) {
1457                    throw new ResourceManagerSystemException(
1458                        "Clean up failed due to unreleasable lock",
1459                        ERR_SYSTEM,
1460                        txId,
1461                        cleanException);
1462                }
1463            }
1464    
1465            public synchronized void finalCleanUp() throws ResourceManagerException {
1466                closeResources();
1467                freeLocks();
1468            }
1469    
1470            public synchronized void upgradeLockToCommit() throws ResourceManagerException {
1471                for (Iterator it =  lockManager.getAll(txId).iterator(); it.hasNext();) {
1472                    GenericLock lock = (GenericLock) it.next();
1473                    // only upgrade if we had write access
1474                    if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) {
1475                        try {
1476                            // in case of deadlocks, make failure of non-committing tx more likely
1477                            if (!lock
1478                                .acquire(
1479                                    txId,
1480                                    LOCK_COMMIT,
1481                                    true,
1482                                    true,
1483                                    getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR)) {
1484                                throw new ResourceManagerException(
1485                                    "Could not upgrade to commit lock for resource at '"
1486                                        + lock.getResourceId().toString()
1487                                        + "'",
1488                                    ERR_NO_LOCK,
1489                                    txId);
1490                            }
1491                        } catch (InterruptedException e) {
1492                            throw new ResourceManagerSystemException(ERR_SYSTEM, txId, e);
1493                        }
1494                    }
1495    
1496                }
1497            }
1498    
1499            public synchronized void freeLocks() {
1500                lockManager.releaseAll(txId);
1501            }
1502    
1503            public synchronized void closeResources() {
1504                synchronized (globalOpenResources) {
1505                    for (Iterator it = openResources.iterator(); it.hasNext();) {
1506                        Object stream = it.next();
1507                        closeOpenResource(stream);
1508                    }
1509                }
1510            }
1511    
1512            public synchronized void registerResource(Object openResource) {
1513                synchronized (globalOpenResources) {
1514                    registerOpenResource(openResource);
1515                    openResources.add(openResource);
1516                }
1517            }
1518    
1519            public synchronized void saveState() throws ResourceManagerException {
1520                String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1521                File file = new File(statePath);
1522                BufferedWriter writer = null;
1523                try {
1524                    OutputStream os = new FileOutputStream(file);
1525                    writer = new BufferedWriter(new OutputStreamWriter(os, DEFAULT_PARAMETER_ENCODING));
1526                    writer.write(toString());
1527                } catch (FileNotFoundException e) {
1528                    String msg = "Saving status information to '" + statePath + "' failed! Could not create file";
1529                    logger.logSevere(msg, e);
1530                    throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1531                } catch (IOException e) {
1532                    String msg = "Saving status information to '" + statePath + "' failed";
1533                    logger.logSevere(msg, e);
1534                    throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1535                } finally {
1536                    if (writer != null) {
1537                        try {
1538                            writer.close();
1539                        } catch (IOException e) {
1540                        }
1541    
1542                    }
1543                }
1544            }
1545    
1546            public synchronized void recoverState() throws ResourceManagerException {
1547                String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1548                File file = new File(statePath);
1549                BufferedReader reader = null;
1550                try {
1551                    InputStream is = new FileInputStream(file);
1552    
1553                    reader = new BufferedReader(new InputStreamReader(is, DEFAULT_PARAMETER_ENCODING));
1554                    txId = reader.readLine();
1555                    status = Integer.parseInt(reader.readLine());
1556                    isolationLevel = Integer.parseInt(reader.readLine());
1557                    timeoutMSecs = Long.parseLong(reader.readLine());
1558                    startTime = Long.parseLong(reader.readLine());
1559                } catch (FileNotFoundException e) {
1560                    String msg = "Recovering status information from '" + statePath + "' failed! Could not find file";
1561                    logger.logSevere(msg, e);
1562                    throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId);
1563                } catch (IOException e) {
1564                    String msg = "Recovering status information from '" + statePath + "' failed";
1565                    logger.logSevere(msg, e);
1566                    throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1567                } catch (Throwable t) {
1568                    String msg = "Recovering status information from '" + statePath + "' failed";
1569                    logger.logSevere(msg, t);
1570                    throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, t);
1571                } finally {
1572                    if (reader != null) {
1573                        try {
1574                            reader.close();
1575                        } catch (IOException e) {
1576                        }
1577    
1578                    }
1579                }
1580            }
1581    
1582            public synchronized String toString() {
1583                StringBuffer buf = new StringBuffer();
1584                buf.append(txId).append('\n');
1585                buf.append(Integer.toString(status)).append('\n');
1586                buf.append(Integer.toString(isolationLevel)).append('\n');
1587                buf.append(Long.toString(timeoutMSecs)).append('\n');
1588                buf.append(Long.toString(startTime)).append('\n');
1589                if (debug) {
1590                    buf.append("----- Lock Debug Info -----\n");
1591    
1592                    for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) {
1593                        GenericLock lock = (GenericLock) it.next();
1594                        buf.append(lock.toString()+"\n");
1595                    }
1596    
1597                }
1598                return buf.toString();
1599            }
1600    
1601        }
1602    
1603        private class InputStreamWrapper extends InputStream {
1604            private InputStream is;
1605            private Object txId;
1606            private Object resourceId;
1607    
1608            public InputStreamWrapper(InputStream is, Object txId, Object resourceId) {
1609                this.is = is;
1610                this.txId = txId;
1611                this.resourceId = resourceId;
1612            }
1613    
1614            public int read() throws IOException {
1615                return is.read();
1616            }
1617    
1618            public int read(byte b[]) throws IOException {
1619                return is.read(b);
1620            }
1621    
1622            public int read(byte b[], int off, int len) throws IOException {
1623                return is.read(b, off, len);
1624            }
1625    
1626            public int available() throws IOException {
1627                return is.available();
1628            }
1629    
1630            public void close() throws IOException {
1631                try {
1632                    is.close();
1633                } finally {
1634                    TransactionContext context;
1635                    synchronized (globalTransactions) {
1636                        context = getContext(txId);
1637                        if (context == null) {
1638                            return;
1639                        }
1640                    }
1641                    synchronized (context) {
1642                        if (context.isLightWeight) {
1643                            if (logger.isFinerEnabled())
1644                                logger.logFiner("Upon close of resource removing temporary light weight tx " + txId);
1645                            context.freeLocks();
1646                            globalTransactions.remove(txId);
1647                        } else {
1648                            // release access lock in order to allow other transactions to commit
1649                            if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) {
1650                                if (logger.isFinerEnabled()) {
1651                                    logger.logFiner("Upon close of resource releasing access lock for tx " + txId + " on resource at " + resourceId);
1652                                }
1653                                lockManager.release(txId, resourceId);
1654                            }
1655                        }
1656                    }
1657                }
1658            }
1659    
1660            public void mark(int readlimit) {
1661                is.mark(readlimit);
1662            }
1663    
1664            public void reset() throws IOException {
1665                is.reset();
1666            }
1667    
1668            public boolean markSupported() {
1669                return is.markSupported();
1670    
1671            }
1672    
1673        }
1674    
1675    }