001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.commons.transaction.memory;
018    
019    import java.io.PrintWriter;
020    import java.util.HashSet;
021    import java.util.Iterator;
022    import java.util.Map;
023    import java.util.Set;
024    import java.util.Collections;
025    
026    import org.apache.commons.transaction.locking.ReadWriteLock;
027    import org.apache.commons.transaction.util.LoggerFacade;
028    import org.apache.commons.transaction.util.PrintWriterLogger;
029    
030    /**
031     * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using
032     * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but
033     * may also fail to commit. 
034     *  
035     * <br>
036     * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and
037     * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to
038     * undo them.
039     * <br>
040     * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism.
041     * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}.
042     * <br>
043     * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>.
044     * <br>
045     * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around.
046     * 
047     * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $
048     * @see TransactionalMapWrapper
049     * @see PessimisticMapWrapper
050     */
051    public class OptimisticMapWrapper extends TransactionalMapWrapper {
052    
053        protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute
054        protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds
055    
056        protected Set activeTransactions;
057    
058        protected LoggerFacade logger;
059    
060        protected ReadWriteLock commitLock;
061    
062        /**
063         * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
064         * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}. 
065         * 
066         * @param wrapped map to be wrapped
067         */
068        public OptimisticMapWrapper(Map wrapped) {
069            this(wrapped, new HashMapFactory(), new HashSetFactory());
070        }
071    
072        /**
073         * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
074         * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
075         * 
076         * @param wrapped map to be wrapped
077         * @param mapFactory factory for temporary maps
078         * @param setFactory factory for temporary sets
079         */
080        public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) {
081            this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out),
082                    OptimisticMapWrapper.class.getName(), false));
083        }
084    
085        /**
086         * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
087         * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
088         * 
089         * @param wrapped map to be wrapped
090         * @param mapFactory factory for temporary maps
091         * @param setFactory factory for temporary sets
092         * @param logger
093         *            generic logger used for all kinds of logging
094         */
095        public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) {
096            super(wrapped, mapFactory, setFactory);
097            activeTransactions = Collections.synchronizedSet(new HashSet());
098            this.logger = logger;
099            commitLock = new ReadWriteLock("COMMIT", logger);
100        }
101    
102        public void startTransaction() {
103            if (getActiveTx() != null) {
104                throw new IllegalStateException(
105                    "Active thread " + Thread.currentThread() + " already associated with a transaction!");
106            }
107            CopyingTxContext context = new CopyingTxContext();
108            activeTransactions.add(context);
109            setActiveTx(context);
110        }
111    
112        public void rollbackTransaction() {
113            TxContext txContext = getActiveTx();
114            super.rollbackTransaction();
115            activeTransactions.remove(txContext);
116        }
117    
118        public void commitTransaction() throws ConflictException {
119            commitTransaction(false);
120        }
121    
122        public void commitTransaction(boolean force) throws ConflictException {
123            TxContext txContext = getActiveTx();
124            
125            if (txContext == null) {
126                throw new IllegalStateException(
127                    "Active thread " + Thread.currentThread() + " not associated with a transaction!");
128            }
129    
130            if (txContext.status == STATUS_MARKED_ROLLBACK) {
131                throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!");
132            }
133            
134            try {
135                // in this final commit phase we need to be the only one access the map
136                // to make sure no one adds an entry after we checked for conflicts
137                commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
138    
139                if (!force) {
140                    Object conflictKey = checkForConflicts();
141                    if (conflictKey != null) {
142                        throw new ConflictException(conflictKey);
143                    }
144                }
145        
146                activeTransactions.remove(txContext);
147                copyChangesToConcurrentTransactions();
148                super.commitTransaction();
149                
150            } catch (InterruptedException e) {
151                // XXX a bit dirty ;)
152                throw new ConflictException(e);
153            } finally {
154                commitLock.release(txContext);
155            }
156        }
157    
158        // TODO: Shouldn't we return a collection rather than a single key here?
159        public Object checkForConflicts() {
160            CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
161    
162            Set keys = txContext.changedKeys();
163            Set externalKeys = txContext.externalChangedKeys();
164    
165            for (Iterator it2 = keys.iterator(); it2.hasNext();) {
166                Object key = it2.next();
167                if (externalKeys.contains(key)) {
168                    return key;
169                }
170            }
171            return null;
172        }
173    
174        protected void copyChangesToConcurrentTransactions() {
175            CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx();
176    
177            synchronized (activeTransactions) {
178                for (Iterator it = activeTransactions.iterator(); it.hasNext();) {
179                    CopyingTxContext otherTxContext = (CopyingTxContext) it.next();
180    
181                    // no need to copy data if the other transaction does not access global map anyway
182                    if (otherTxContext.cleared)
183                        continue;
184    
185                    if (thisTxContext.cleared) {
186                        // we will clear everything, so we have to copy everything before
187                        otherTxContext.externalChanges.putAll(wrapped);
188                    } else // no need to check if we have already copied everthing
189                    {
190                        for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) {
191                            Map.Entry entry = (Map.Entry) it2.next();
192                            Object value = wrapped.get(entry.getKey());
193                            if (value != null) {
194                                // undo change
195                                otherTxContext.externalChanges.put(entry.getKey(), value);
196                            } else {
197                                // undo add
198                                otherTxContext.externalDeletes.add(entry.getKey());
199                            }
200                        }
201    
202                        for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) {
203                            // undo delete
204                            Object key = it2.next();
205                            Object value = wrapped.get(key);
206                            otherTxContext.externalChanges.put(key, value);
207                        }
208                    }
209                }
210            }
211        }
212    
213        public class CopyingTxContext extends TxContext {
214            protected Map externalChanges;
215            protected Map externalAdds;
216            protected Set externalDeletes;
217    
218            protected CopyingTxContext() {
219                super();
220                externalChanges = mapFactory.createMap();
221                externalDeletes = setFactory.createSet();
222                externalAdds = mapFactory.createMap();
223            }
224    
225            protected Set externalChangedKeys() {
226                Set keySet = new HashSet();
227                keySet.addAll(externalDeletes);
228                keySet.addAll(externalChanges.keySet());
229                keySet.addAll(externalAdds.keySet());
230                return keySet;
231            }
232    
233            protected Set changedKeys() {
234                Set keySet = new HashSet();
235                keySet.addAll(deletes);
236                keySet.addAll(changes.keySet());
237                keySet.addAll(adds.keySet());
238                return keySet;
239            }
240    
241            protected Set keys() {
242                try {
243                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
244                    Set keySet = super.keys();
245                    keySet.removeAll(externalDeletes);
246                    keySet.addAll(externalAdds.keySet());
247                    return keySet;
248                } catch (InterruptedException e) {
249                    return null;
250                } finally {
251                    commitLock.release(this);
252                }
253            }
254    
255            protected Object get(Object key) {
256                try {
257                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
258    
259                    if (deletes.contains(key)) {
260                        // reflects that entry has been deleted in this tx 
261                        return null;
262                    }
263        
264                    Object changed = changes.get(key);
265                    if (changed != null) {
266                        return changed;
267                    }
268        
269                    Object added = adds.get(key);
270                    if (added != null) {
271                        return added;
272                    }
273        
274                    if (cleared) {
275                        return null;
276                    } else {
277                        if (externalDeletes.contains(key)) {
278                            // reflects that entry has been deleted in this tx 
279                            return null;
280                        }
281        
282                        changed = externalChanges.get(key);
283                        if (changed != null) {
284                            return changed;
285                        }
286        
287                        added = externalAdds.get(key);
288                        if (added != null) {
289                            return added;
290                        }
291        
292                        // not modified in this tx
293                        return wrapped.get(key);
294                    }
295                } catch (InterruptedException e) {
296                    return null;
297                } finally {
298                    commitLock.release(this);
299                }
300            }
301    
302            protected void put(Object key, Object value) {
303                try {
304                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
305                    super.put(key, value);
306                } catch (InterruptedException e) {
307                } finally {
308                    commitLock.release(this);
309                }
310            }
311    
312            protected void remove(Object key) {
313                try {
314                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
315                    super.remove(key);
316                } catch (InterruptedException e) {
317                } finally {
318                    commitLock.release(this);
319                }
320            }
321    
322            protected int size() {
323                try {
324                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
325                    int size = super.size();
326        
327                    size -= externalDeletes.size();
328                    size += externalAdds.size();
329        
330                    return size;
331                } catch (InterruptedException e) {
332                    return -1;
333                } finally {
334                    commitLock.release(this);
335                }
336            }
337    
338            protected void clear() {
339                try {
340                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
341                    super.clear();
342                    externalDeletes.clear();
343                    externalChanges.clear();
344                    externalAdds.clear();
345                } catch (InterruptedException e) {
346                } finally {
347                    commitLock.release(this);
348                }
349            }
350    
351            protected void merge() {
352                try {
353                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
354                    super.merge();
355                } catch (InterruptedException e) {
356                } finally {
357                    commitLock.release(this);
358                }
359            }
360    
361            protected void dispose() {
362                try {
363                    commitLock.acquireRead(this, ACCESS_TIMEOUT);
364                    super.dispose();
365                    setFactory.disposeSet(externalDeletes);
366                    externalDeletes = null;
367                    mapFactory.disposeMap(externalChanges);
368                    externalChanges = null;
369                    mapFactory.disposeMap(externalAdds);
370                    externalAdds = null;
371                } catch (InterruptedException e) {
372                } finally {
373                    commitLock.release(this);
374                }
375            }
376    
377            protected void finalize() throws Throwable {
378                activeTransactions.remove(this);
379                super.finalize();
380            }
381        }
382    }