6 * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7 * Clara, California 95054, U.S.A. All rights reserved.
9 * Sun Microsystems, Inc. has intellectual property rights relating to
10 * technology embodied in the product that is described in this
11 * document. In particular, and without limitation, these
12 * intellectual property rights may include one or more of the
13 * U.S. patents listed at http://www.sun.com/patents and one or more
14 * additional patents or pending patent applications in the U.S. and
17 * U.S. Government Rights - Commercial software.
18 * Government users are subject to the Sun Microsystems, Inc. standard
19 * license agreement and applicable provisions of the FAR and its
20 * supplements. Use is subject to license terms. Sun, Sun
21 * Microsystems, the Sun logo and Java are trademarks or registered
22 * trademarks of Sun Microsystems, Inc. in the U.S. and other
25 * This product is covered and controlled by U.S. Export Control laws
26 * and may be subject to the export or import laws in other countries.
27 * Nuclear, missile, chemical biological weapons or nuclear maritime
28 * end uses or end users, whether direct or indirect, are strictly
29 * prohibited. Export or reexport to countries subject to
30 * U.S. embargo or to entities identified on U.S. export exclusion
31 * lists, including, but not limited to, the denied persons and
32 * specially designated nationals lists is strictly prohibited.
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import TransactionalIO.core.TransactionalFile;
46 import TransactionalIO.core.Wrapper;
47 import java.lang.reflect.Constructor;
48 import java.lang.reflect.InvocationTargetException;
49 import java.util.ArrayList;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.HashSet;
53 import java.util.LinkedList;
54 import java.util.List;
57 import java.util.concurrent.Callable;
58 import static dstm2.Defaults.*;
60 * The basic unit of computation for the transactional memory. This
61 * class extends <code>java.lang.Thread</code> by providing methods to
62 * begin, commit and abort transactions.
64 * Every <code>Thread</code> has a contention manager, created when
65 * the thread is created. Before creating any <code>Thread</code>s,
66 * you must call <code>Thread.setContentionManager</code> to set the
67 * class of the contention manager that will be created. The
68 * contention manager of a thread is notified (by invoking its
69 * notification methods) of the results of any methods involving the
70 * thread. It is also consulted on whether a transaction should be
73 * @see dstm2.ContentionManager
75 public class Thread extends java.lang.Thread {
77 * Contention manager class.
79 protected static Class contentionManagerClass;
84 protected static Class<dstm2.factory.Adapter> adapterClass;
87 * Set to true when benchmark runs out of time.
89 public static volatile boolean stop = false;
91 * number of committed transactions for all threads
93 public static long totalCommitted = 0;
95 * total number of transactions for all threads
97 public static long totalTotal = 0;
99 * number of committed memory references for all threads
101 public static long totalCommittedMemRefs = 0;
103 * total number of memory references for all threads
105 public static long totalTotalMemRefs = 0;
107 static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
108 protected synchronized ThreadState initialValue() {
109 return new ThreadState();
112 static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
113 protected synchronized Thread initialValue() {
118 private static int MAX_NESTING_DEPTH = 1;
120 private static Object lock = new Object();
122 // Memo-ize factories so we don't have to recreate them.
123 private static Map<Class,Factory> factoryTable
124 = Collections.synchronizedMap(new HashMap<Class,Factory>());
127 * Create thread to run a method.
128 * @param target execute this object's <CODE>run()</CODE> method
130 public Thread(final Runnable target) {
131 super(new Runnable() {
133 ThreadState threadState = _threadState.get();
136 // collect statistics
138 totalCommitted += threadState.committed;
139 totalTotal += threadState.total;
140 totalCommittedMemRefs += threadState.committedMemRefs;
141 totalTotalMemRefs += threadState.totalMemRefs;
154 * Establishes a contention manager. You must call this method
155 * before creating any <code>Thread</code>.
157 * @see dstm2.ContentionManager
158 * @param theClass class of desired contention manager.
160 public static void setContentionManagerClass(Class theClass) {
163 cm = Class.forName("dstm2.ContentionManager");
164 } catch (ClassNotFoundException e) {
165 throw new PanicException(e);
168 contentionManagerClass = theClass;
169 } catch (Exception e) {
170 throw new PanicException("The class " + theClass
171 + " does not implement dstm2.ContentionManager");
176 * set Adapter class for this thread
177 * @param adapterClassName adapter class as string
179 public static void setAdapterClass(String adapterClassName) {
181 adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
182 } catch (ClassNotFoundException ex) {
183 throw new PanicException("Adapter class not found: %s\n", adapterClassName);
188 * Tests whether the current transaction can still commit. Does not
189 * actually end the transaction (either <code>commitTransaction</code> or
190 * <code>abortTransaction</code> must still be called). The contention
191 * manager of the invoking thread is notified if the onValidate fails
192 * because a <code>TMObject</code> opened for reading was invalidated.
194 * @return whether the current transaction may commit successfully.
196 static public boolean validate() {
197 ThreadState threadState = _threadState.get();
198 return threadState.validate();
202 * Gets the current transaction, if any, of the invoking <code>Thread</code>.
204 * @return the current thread's current transaction; <code>null</code> if
205 * there is no current transaction.
207 static public Transaction getTransaction() {
208 return _threadState.get().transaction;
212 * Gets the contention manager of the invoking <code>Thread</code>.
214 * @return the invoking thread's contention manager
216 static public ContentionManager getContentionManager() {
217 return _threadState.get().manager;
221 * Create a new factory instance.
222 * @param _class class to implement
223 * @return new factory
225 static public <T> Factory<T> makeFactory(Class<T> _class) {
227 Factory<T> factory = (Factory<T>) factoryTable.get(_class);
229 if (factory == null) {
230 factory = new AtomicFactory<T>(_class, adapterClass);
231 factoryTable.put(_class, factory);
234 } catch (Exception e) {
235 throw new PanicException(e);
240 * Execute a transaction
241 * @param xaction execute this object's <CODE>call()</CODE> method.
242 * @return result of <CODE>call()</CODE> method
244 public static <T> T doIt(Callable<T> xaction) {
245 ThreadState threadState = _threadState.get();
246 ContentionManager manager = threadState.manager;
248 boolean flag = false;
251 threadState.beginTransaction();
253 /////For Integrating with IO//////////
254 Wrapper.Initialize(Thread.getTransaction());
255 //////////////////////////////////////
257 result = xaction.call();
259 // } catch (AbortedException d) {
260 /* synchronized(benchmark.lock){
261 System.out.println(Thread.currentThread() + " aborted in committing");
264 // } //catch (SnapshotException s) {
265 //threadState.abortTransaction();
267 // catch (Exception e) {
268 // e.printStackTrace();
269 // throw new PanicException("Unhandled exception " + e);
271 threadState.totalMemRefs += threadState.transaction.memRefs;
272 threadState.transaction.attempts++;
273 /*synchronized(benchmark.lock){
274 System.out.println(Thread.currentThread() + " ghabl az try");
280 Wrapper.prepareIOCommit();
281 /* synchronized(benchmark.lock){
282 System.out.println(Thread.currentThread() + " to try");
284 ///////////////////////////////
286 if (threadState.commitTransaction()) {
287 threadState.committedMemRefs += threadState.transaction.memRefs;
293 catch(AbortedException ex){
295 /// synchronized(benchmark.lock){
296 // System.out.println(Thread.currentThread() + " aborted in committing");
300 catch (Exception e) {
302 throw new PanicException("Unhandled exception " + e);
306 Wrapper.getTransaction().unlockAllLocks();
311 // transaction aborted
313 if (threadState.transaction != null) {
314 threadState.abortTransaction();
317 threadState.transaction = null;
318 Wrapper.setTransaction(null);
320 // collect statistics
322 totalTotalMemRefs = threadState.totalMemRefs;
323 totalCommittedMemRefs = threadState.committedMemRefs;
324 totalCommitted += threadState.committed;
325 totalTotal += threadState.total;
326 threadState.reset(); // set up for next iteration
328 throw new GracefulException();
331 * Execute transaction
332 * @param xaction call this object's <CODE>run()</CODE> method
334 public static void doIt(final Runnable xaction) {
335 doIt(new Callable<Boolean>() {
336 public Boolean call() {
344 * number of transactions committed by this thread
345 * @return number of transactions committed by this thread
347 public static long getCommitted() {
348 return totalCommitted;
352 * umber of transactions aborted by this thread
353 * @return number of aborted transactions
355 public static long getAborted() {
356 return totalTotal - totalCommitted;
360 * number of transactions executed by this thread
361 * @return number of transactions
363 public static long getTotal() {
368 * Register a method to be called every time this thread validates any transaction.
369 * @param c abort if this object's <CODE>call()</CODE> method returns false
371 public static void onValidate(Callable<Boolean> c) {
372 _threadState.get().onValidate.add(c);
375 * Register a method to be called every time the current transaction is validated.
376 * @param c abort if this object's <CODE>call()</CODE> method returns false
378 public static void onValidateOnce(Callable<Boolean> c) {
379 _threadState.get().onValidateOnce.add(c);
382 * Register a method to be called every time this thread commits a transaction.
383 * @param r call this object's <CODE>run()</CODE> method
385 public static void onCommit(Runnable r) {
386 _threadState.get().onCommit.add(r);
389 * Register a method to be called once if the current transaction commits.
390 * @param r call this object's <CODE>run()</CODE> method
392 public static void onCommitOnce(Runnable r) {
393 _threadState.get().onCommitOnce.add(r);
396 * Register a method to be called every time this thread aborts a transaction.
397 * @param r call this objec't <CODE>run()</CODE> method
399 public static void onAbort(Runnable r) {
400 _threadState.get().onAbort.add(r);
403 * Register a method to be called once if the current transaction aborts.
404 * @param r call this object's <CODE>run()</CODE> method
406 public static void onAbortOnce(Runnable r) {
407 _threadState.get().onAbortOnce.add(r);
410 * get thread ID for debugging
413 public static int getID() {
414 return _threadState.get().hashCode();
418 * reset thread statistics
420 public static void clear() {
423 totalCommittedMemRefs = 0;
424 totalTotalMemRefs = 0;
429 * Class that holds thread's actual state
431 public static class ThreadState {
434 ContentionManager manager;
436 private long committed = 0; // number of committed transactions
437 private long total = 0; // total number of transactions
438 private long committedMemRefs = 0; // number of committed reads and writes
439 private long totalMemRefs = 0; // total number of reads and writes
441 Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
442 Set<Runnable> onCommit = new HashSet<Runnable>();
443 Set<Runnable> onAbort = new HashSet<Runnable>();
444 Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
445 Set<Runnable> onCommitOnce = new HashSet<Runnable>();
446 Set<Runnable> onAbortOnce = new HashSet<Runnable>();
448 Transaction transaction = null;
451 * Creates new ThreadState
453 public ThreadState() {
455 manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
456 } catch (NullPointerException e) {
457 throw new PanicException("No default contention manager class set.");
458 } catch (Exception e) { // Some problem with instantiation
459 throw new PanicException(e);
464 * Resets any metering information (commits/aborts, etc).
466 public void reset() {
467 committed = 0; // number of committed transactions
468 total = 0; // total number of transactions
469 committedMemRefs = 0; // number of committed reads and writes
470 totalMemRefs = 0; // total number of reads and writes
475 * @return string representation of thread state
477 public String toString() {
479 "Thread" + hashCode() + "["+
480 "committed: " + committed + "," +
481 "aborted: " + ( total - committed) +
486 * Can this transaction still commit?
487 * This method may be called at any time, not just at transaction end,
488 * so we do not clear the onValidateOnce table.
489 * @return true iff transaction might still commit
491 public boolean validate() {
494 for (Callable<Boolean> v : onValidate) {
500 for (Callable<Boolean> v : onValidateOnce) {
505 return transaction.validate();
506 } catch (Exception ex) {
512 * Call methods registered to be called on commit.
514 public void runCommitHandlers() {
517 for (Runnable r: onCommit) {
521 for (Runnable r: onCommitOnce) {
524 onCommitOnce.clear();
525 onValidateOnce.clear();
526 } catch (Exception ex) {
527 throw new PanicException(ex);
532 * Starts a new transaction. Cannot nest transactions deeper than
533 * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
534 * invoking thread is notified when a transaction is begun.
536 public void beginTransaction() {
537 transaction = new Transaction();
541 // first thing to fix if we allow nested transactions
543 throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
549 * Attempts to commit the current transaction of the invoking
550 * <code>Thread</code>. Always succeeds for nested
551 * transactions. The contention manager of the invoking thread is
552 * notified of the result. If the transaction does not commit
553 * because a <code>TMObject</code> opened for reading was
554 * invalidated, the contention manager is also notified of the
558 * @return whether commit succeeded.
560 public boolean commitTransaction() {
563 throw new PanicException("commitTransaction invoked when no transaction active.");
566 throw new PanicException("commitTransaction invoked on nested transaction.");
569 if (validate() && transaction.commit()) {
582 * Aborts the current transaction of the invoking <code>Thread</code>.
583 * Does not end transaction, but ensures it will never commit.
585 public void abortTransaction() {
591 * Call methods registered to be called on commit.
593 public void runAbortHandlers() {
596 for (Runnable r: onAbort) {
600 for (Runnable r: onAbortOnce) {
604 onValidateOnce.clear();
605 } catch (Exception ex) {
606 throw new PanicException(ex);