6 * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7 * Clara, California 95054, U.S.A. All rights reserved.
9 * Sun Microsystems, Inc. has intellectual property rights relating to
10 * technology embodied in the product that is described in this
11 * document. In particular, and without limitation, these
12 * intellectual property rights may include one or more of the
13 * U.S. patents listed at http://www.sun.com/patents and one or more
14 * additional patents or pending patent applications in the U.S. and
17 * U.S. Government Rights - Commercial software.
18 * Government users are subject to the Sun Microsystems, Inc. standard
19 * license agreement and applicable provisions of the FAR and its
20 * supplements. Use is subject to license terms. Sun, Sun
21 * Microsystems, the Sun logo and Java are trademarks or registered
22 * trademarks of Sun Microsystems, Inc. in the U.S. and other
25 * This product is covered and controlled by U.S. Export Control laws
26 * and may be subject to the export or import laws in other countries.
27 * Nuclear, missile, chemical biological weapons or nuclear maritime
28 * end uses or end users, whether direct or indirect, are strictly
29 * prohibited. Export or reexport to countries subject to
30 * U.S. embargo or to entities identified on U.S. export exclusion
31 * lists, including, but not limited to, the denied persons and
32 * specially designated nationals lists is strictly prohibited.
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import dstm2.SpecialLock;
46 import TransactionalIO.core.TransactionalFile;
47 import TransactionalIO.core.Wrapper;
48 import java.lang.reflect.Constructor;
49 import java.lang.reflect.InvocationTargetException;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.HashSet;
54 import java.util.LinkedList;
55 import java.util.List;
58 import java.util.concurrent.Callable;
59 import static dstm2.Defaults.*;
61 * The basic unit of computation for the transactional memory. This
62 * class extends <code>java.lang.Thread</code> by providing methods to
63 * begin, commit and abort transactions.
65 * Every <code>Thread</code> has a contention manager, created when
66 * the thread is created. Before creating any <code>Thread</code>s,
67 * you must call <code>Thread.setContentionManager</code> to set the
68 * class of the contention manager that will be created. The
69 * contention manager of a thread is notified (by invoking its
70 * notification methods) of the results of any methods involving the
71 * thread. It is also consulted on whether a transaction should be
74 * @see dstm2.ContentionManager
76 public class Thread extends java.lang.Thread{
78 * Contention manager class.
80 protected static Class contentionManagerClass;
85 protected static Class<dstm2.factory.Adapter> adapterClass;
88 * Set to true when benchmark runs out of time.
90 public static volatile boolean stop = false;
92 * number of committed transactions for all threads
94 public static long totalCommitted = 0;
96 * total number of transactions for all threads
98 public static long totalTotal = 0;
100 * number of committed memory references for all threads
102 public static long totalCommittedMemRefs = 0;
104 * total number of memory references for all threads
106 public static long totalTotalMemRefs = 0;
108 static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
109 protected synchronized ThreadState initialValue() {
110 return new ThreadState();
113 static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
114 protected synchronized Thread initialValue() {
119 private static int MAX_NESTING_DEPTH = 1;
121 private static Object lock = new Object();
123 // Memo-ize factories so we don't have to recreate them.
124 private static Map<Class,Factory> factoryTable
125 = Collections.synchronizedMap(new HashMap<Class,Factory>());
128 * Create thread to run a method.
129 * @param target execute this object's <CODE>run()</CODE> method
131 public Thread(final Runnable target) {
132 super(new Runnable() {
134 ThreadState threadState = _threadState.get();
137 // collect statistics
139 totalCommitted += threadState.committed;
140 totalTotal += threadState.total;
141 totalCommittedMemRefs += threadState.committedMemRefs;
142 totalTotalMemRefs += threadState.totalMemRefs;
155 * Establishes a contention manager. You must call this method
156 * before creating any <code>Thread</code>.
158 * @see dstm2.ContentionManager
159 * @param theClass class of desired contention manager.
161 public static void setContentionManagerClass(Class theClass) {
164 cm = Class.forName("dstm2.ContentionManager");
165 } catch (ClassNotFoundException e) {
166 throw new PanicException(e);
169 contentionManagerClass = theClass;
170 } catch (Exception e) {
171 throw new PanicException("The class " + theClass
172 + " does not implement dstm2.ContentionManager");
177 * set Adapter class for this thread
178 * @param adapterClassName adapter class as string
180 public static void setAdapterClass(String adapterClassName) {
182 adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
183 } catch (ClassNotFoundException ex) {
184 throw new PanicException("Adapter class not found: %s\n", adapterClassName);
189 * Tests whether the current transaction can still commit. Does not
190 * actually end the transaction (either <code>commitTransaction</code> or
191 * <code>abortTransaction</code> must still be called). The contention
192 * manager of the invoking thread is notified if the onValidate fails
193 * because a <code>TMObject</code> opened for reading was invalidated.
195 * @return whether the current transaction may commit successfully.
197 static public boolean validate() {
198 ThreadState threadState = _threadState.get();
199 return threadState.validate();
203 * Gets the current transaction, if any, of the invoking <code>Thread</code>.
205 * @return the current thread's current transaction; <code>null</code> if
206 * there is no current transaction.
208 static public Transaction getTransaction() {
209 return _threadState.get().transaction;
213 * Gets the contention manager of the invoking <code>Thread</code>.
215 * @return the invoking thread's contention manager
217 static public ContentionManager getContentionManager() {
218 return _threadState.get().manager;
222 * Create a new factory instance.
223 * @param _class class to implement
224 * @return new factory
226 static public <T> Factory<T> makeFactory(Class<T> _class) {
228 Factory<T> factory = (Factory<T>) factoryTable.get(_class);
230 if (factory == null) {
231 factory = new AtomicFactory<T>(_class, adapterClass);
232 factoryTable.put(_class, factory);
235 } catch (Exception e) {
236 throw new PanicException(e);
241 * Execute a transaction
242 * @param xaction execute this object's <CODE>call()</CODE> method.
243 * @return result of <CODE>call()</CODE> method
245 public static <T> T doIt(Callable<T> xaction) {
246 ThreadState threadState = _threadState.get();
247 ContentionManager manager = threadState.manager;
249 // System.out.println(Thread.currentThread() + " astarted the transaction");
250 boolean flag = false;
253 threadState.beginTransaction();
254 // System.out.println(Thread.currentThread() + " offically started the transaction");
255 /////For Integrating with IO//////////
256 Wrapper.Initialize(Thread.getTransaction());
257 // System.out.println(Thread.currentThread() + " even more offically started the transaction");
258 //////////////////////////////////////
260 result = xaction.call();
261 // System.out.println(Thread.currentThread() + " aborted in committing");
262 // } catch (AbortedException d) {
263 /* synchronized(benchmark.lock){
264 System.out.println(Thread.currentThread() + " aborted in committing");
267 // } //catch (SnapshotException s) {
268 //threadState.abortTransaction();
270 // catch (Exception e) {
271 // e.printStackTrace();
272 // throw new PanicException("Unhandled exception " + e);
274 threadState.totalMemRefs += threadState.transaction.memRefs;
275 threadState.transaction.attempts++;
277 Wrapper.prepareIOCommit();
279 ///////////////////////////////
281 if (threadState.commitTransaction()) {
282 threadState.committedMemRefs += threadState.transaction.memRefs;
288 catch(AbortedException ex){
290 // System.out.println(Thread.currentThread() + " aborted");
291 // Wrapper.getTransaction().unlockAllLocks();
293 catch (Exception e) {
295 throw new PanicException("Unhandled exception " + e);
300 Wrapper.getTransaction().unlockAllLocks();
301 if (Thread.getTransaction() == SpecialLock.getSpecialLock().getOwnerTransaction())
302 SpecialLock.getSpecialLock().unlock(Thread.getTransaction());
307 // transaction aborted
309 if (threadState.transaction != null) {
310 threadState.abortTransaction();
313 threadState.transaction = null;
314 Wrapper.setTransaction(null);
316 // collect statistics
318 totalTotalMemRefs = threadState.totalMemRefs;
319 totalCommittedMemRefs = threadState.committedMemRefs;
320 totalCommitted += threadState.committed;
321 totalTotal += threadState.total;
322 threadState.reset(); // set up for next iteration
324 /* if (result == null)
325 throw new GracefulException();
326 else return result;*/
330 * Execute transaction
331 * @param xaction call this object's <CODE>run()</CODE> method
333 public static void doIt(final Runnable xaction) {
334 doIt(new Callable<Boolean>() {
335 public Boolean call() {
343 * number of transactions committed by this thread
344 * @return number of transactions committed by this thread
346 public static long getCommitted() {
347 return totalCommitted;
351 * umber of transactions aborted by this thread
352 * @return number of aborted transactions
354 public static long getAborted() {
355 return totalTotal - totalCommitted;
359 * number of transactions executed by this thread
360 * @return number of transactions
362 public static long getTotal() {
367 * Register a method to be called every time this thread validates any transaction.
368 * @param c abort if this object's <CODE>call()</CODE> method returns false
370 public static void onValidate(Callable<Boolean> c) {
371 _threadState.get().onValidate.add(c);
374 * Register a method to be called every time the current transaction is validated.
375 * @param c abort if this object's <CODE>call()</CODE> method returns false
377 public static void onValidateOnce(Callable<Boolean> c) {
378 _threadState.get().onValidateOnce.add(c);
381 * Register a method to be called every time this thread commits a transaction.
382 * @param r call this object's <CODE>run()</CODE> method
384 public static void onCommit(Runnable r) {
385 _threadState.get().onCommit.add(r);
388 * Register a method to be called once if the current transaction commits.
389 * @param r call this object's <CODE>run()</CODE> method
391 public static void onCommitOnce(Runnable r) {
392 _threadState.get().onCommitOnce.add(r);
395 * Register a method to be called every time this thread aborts a transaction.
396 * @param r call this objec't <CODE>run()</CODE> method
398 public static void onAbort(Runnable r) {
399 _threadState.get().onAbort.add(r);
402 * Register a method to be called once if the current transaction aborts.
403 * @param r call this object's <CODE>run()</CODE> method
405 public static void onAbortOnce(Runnable r) {
406 _threadState.get().onAbortOnce.add(r);
409 * get thread ID for debugging
412 public static int getID() {
413 return _threadState.get().hashCode();
417 * reset thread statistics
419 public static void clear() {
422 totalCommittedMemRefs = 0;
423 totalTotalMemRefs = 0;
428 * Class that holds thread's actual state
430 public static class ThreadState {
433 ContentionManager manager;
435 private long committed = 0; // number of committed transactions
436 private long total = 0; // total number of transactions
437 private long committedMemRefs = 0; // number of committed reads and writes
438 private long totalMemRefs = 0; // total number of reads and writes
440 Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
441 Set<Runnable> onCommit = new HashSet<Runnable>();
442 Set<Runnable> onAbort = new HashSet<Runnable>();
443 Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
444 Set<Runnable> onCommitOnce = new HashSet<Runnable>();
445 Set<Runnable> onAbortOnce = new HashSet<Runnable>();
447 Transaction transaction = null;
450 * Creates new ThreadState
452 public ThreadState() {
454 manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
455 } catch (NullPointerException e) {
456 throw new PanicException("No default contention manager class set.");
457 } catch (Exception e) { // Some problem with instantiation
458 throw new PanicException(e);
463 * Resets any metering information (commits/aborts, etc).
465 public void reset() {
466 committed = 0; // number of committed transactions
467 total = 0; // total number of transactions
468 committedMemRefs = 0; // number of committed reads and writes
469 totalMemRefs = 0; // total number of reads and writes
474 * @return string representation of thread state
476 public String toString() {
478 "Thread" + hashCode() + "["+
479 "committed: " + committed + "," +
480 "aborted: " + ( total - committed) +
485 * Can this transaction still commit?
486 * This method may be called at any time, not just at transaction end,
487 * so we do not clear the onValidateOnce table.
488 * @return true iff transaction might still commit
490 public boolean validate() {
493 for (Callable<Boolean> v : onValidate) {
499 for (Callable<Boolean> v : onValidateOnce) {
504 return transaction.validate();
505 } catch (Exception ex) {
511 * Call methods registered to be called on commit.
513 public void runCommitHandlers() {
516 for (Runnable r: onCommit) {
520 for (Runnable r: onCommitOnce) {
523 onCommitOnce.clear();
524 onValidateOnce.clear();
525 } catch (Exception ex) {
526 throw new PanicException(ex);
531 * Starts a new transaction. Cannot nest transactions deeper than
532 * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
533 * invoking thread is notified when a transaction is begun.
535 public void beginTransaction() {
536 transaction = new Transaction();
540 // first thing to fix if we allow nested transactions
542 throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
548 * Attempts to commit the current transaction of the invoking
549 * <code>Thread</code>. Always succeeds for nested
550 * transactions. The contention manager of the invoking thread is
551 * notified of the result. If the transaction does not commit
552 * because a <code>TMObject</code> opened for reading was
553 * invalidated, the contention manager is also notified of the
557 * @return whether commit succeeded.
559 public boolean commitTransaction() {
562 throw new PanicException("commitTransaction invoked when no transaction active.");
565 throw new PanicException("commitTransaction invoked on nested transaction.");
568 if (validate() && transaction.commit()) {
581 * Aborts the current transaction of the invoking <code>Thread</code>.
582 * Does not end transaction, but ensures it will never commit.
584 public void abortTransaction() {
590 * Call methods registered to be called on commit.
592 public void runAbortHandlers() {
595 for (Runnable r: onAbort) {
599 for (Runnable r: onAbortOnce) {
603 onValidateOnce.clear();
604 } catch (Exception ex) {
605 throw new PanicException(ex);