*** empty log message ***
[IRC.git] / Robust / Transactions / dstm2 / src / dstm2 / Thread.java
1 package dstm2;
2
3 /*
4  * Thread.java
5  *
6  * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7  * Clara, California 95054, U.S.A.  All rights reserved.  
8  * 
9  * Sun Microsystems, Inc. has intellectual property rights relating to
10  * technology embodied in the product that is described in this
11  * document.  In particular, and without limitation, these
12  * intellectual property rights may include one or more of the
13  * U.S. patents listed at http://www.sun.com/patents and one or more
14  * additional patents or pending patent applications in the U.S. and
15  * in other countries.
16  * 
17  * U.S. Government Rights - Commercial software.
18  * Government users are subject to the Sun Microsystems, Inc. standard
19  * license agreement and applicable provisions of the FAR and its
20  * supplements.  Use is subject to license terms.  Sun, Sun
21  * Microsystems, the Sun logo and Java are trademarks or registered
22  * trademarks of Sun Microsystems, Inc. in the U.S. and other
23  * countries.  
24  * 
25  * This product is covered and controlled by U.S. Export Control laws
26  * and may be subject to the export or import laws in other countries.
27  * Nuclear, missile, chemical biological weapons or nuclear maritime
28  * end uses or end users, whether direct or indirect, are strictly
29  * prohibited.  Export or reexport to countries subject to
30  * U.S. embargo or to entities identified on U.S. export exclusion
31  * lists, including, but not limited to, the denied persons and
32  * specially designated nationals lists is strictly prohibited.
33  */
34
35 //package dstm2;
36
37
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
41
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import dstm2.SpecialLock;
46 import TransactionalIO.core.TransactionalFile;
47 import TransactionalIO.core.Wrapper;
48 import java.lang.reflect.Constructor;
49 import java.lang.reflect.InvocationTargetException;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.HashSet;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Set;
58 import java.util.concurrent.Callable;
59 import static dstm2.Defaults.*;
60 /**
61  * The basic unit of computation for the transactional memory.  This
62  * class extends <code>java.lang.Thread</code> by providing methods to
63  * begin, commit and abort transactions.
64  *
65  * Every <code>Thread</code> has a contention manager, created when
66  * the thread is created.  Before creating any <code>Thread</code>s,
67  * you must call <code>Thread.setContentionManager</code> to set the
68  * class of the contention manager that will be created.  The
69  * contention manager of a thread is notified (by invoking its
70  * notification methods) of the results of any methods involving the
71  * thread.  It is also consulted on whether a transaction should be
72  * begun.
73  *
74  * @see dstm2.ContentionManager
75  */
76 public class Thread extends java.lang.Thread{
77   /**
78    * Contention manager class.
79    */
80   protected static Class contentionManagerClass;
81   
82   /**
83    * Adapter class.
84    */
85   protected static Class<dstm2.factory.Adapter> adapterClass;
86   
87   /**
88    * Set to true when benchmark runs out of time.
89    **/
90   public static volatile boolean stop = false;
91   /**
92    * number of committed transactions for all threads
93    */
94   public static long totalCommitted = 0;
95   /**
96    * total number of transactions for all threads
97    */
98   public static long totalTotal = 0;
99   /**
100    * number of committed memory references for all threads
101    */
102   public static long totalCommittedMemRefs = 0;
103   /**
104    * total number of memory references for all threads
105    */
106   public static long totalTotalMemRefs = 0;
107   
108   static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
109     protected synchronized ThreadState initialValue() {
110       return new ThreadState();
111     }
112   };
113   static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
114     protected synchronized Thread initialValue() {
115       return null;
116     }
117   };
118   
119   private static int MAX_NESTING_DEPTH = 1;
120   
121   private static Object lock = new Object();
122   
123   // Memo-ize factories so we don't have to recreate them.
124   private static Map<Class,Factory> factoryTable
125       = Collections.synchronizedMap(new HashMap<Class,Factory>());
126   
127   /**
128    * Create thread to run a method.
129    * @param target execute this object's <CODE>run()</CODE> method
130    */
131   public Thread(final Runnable target) {
132     super(new Runnable() {
133       public void run() {
134         ThreadState threadState = _threadState.get();
135         threadState.reset();
136         target.run();
137         // collect statistics
138         synchronized (lock){
139           totalCommitted += threadState.committed;
140           totalTotal += threadState.total;
141           totalCommittedMemRefs += threadState.committedMemRefs;
142           totalTotalMemRefs += threadState.totalMemRefs;
143         }
144       }
145     });
146   }
147   /**
148    * no-arg constructor
149    */
150   public Thread() {
151     super();
152   }
153   
154   /**
155    * Establishes a contention manager.  You must call this method
156    * before creating any <code>Thread</code>.
157    *
158    * @see dstm2.ContentionManager
159    * @param theClass class of desired contention manager.
160    */
161   public static void setContentionManagerClass(Class theClass) {
162     Class cm;
163     try {
164       cm = Class.forName("dstm2.ContentionManager");
165     } catch (ClassNotFoundException e) {
166       throw new PanicException(e);
167     }
168     try {
169       contentionManagerClass = theClass;
170     } catch (Exception e) {
171       throw new PanicException("The class " + theClass
172           + " does not implement dstm2.ContentionManager");
173     }
174   }
175   
176   /**
177    * set Adapter class for this thread
178    * @param adapterClassName adapter class as string
179    */
180   public static void setAdapterClass(String adapterClassName) {
181     try {
182       adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
183     } catch (ClassNotFoundException ex) {
184       throw new PanicException("Adapter class not found: %s\n", adapterClassName);
185     }
186   }
187   
188   /**
189    * Tests whether the current transaction can still commit.  Does not
190    * actually end the transaction (either <code>commitTransaction</code> or
191    * <code>abortTransaction</code> must still be called).  The contention
192    * manager of the invoking thread is notified if the onValidate fails
193    * because a <code>TMObject</code> opened for reading was invalidated.
194    *
195    * @return whether the current transaction may commit successfully.
196    */
197   static public boolean validate() {
198     ThreadState threadState = _threadState.get();
199     return threadState.validate();
200   }
201   
202   /**
203    * Gets the current transaction, if any, of the invoking <code>Thread</code>.
204    *
205    * @return the current thread's current transaction; <code>null</code> if
206    *         there is no current transaction.
207    */
208   static public Transaction getTransaction() {
209     return _threadState.get().transaction;
210   }
211   
212   /**
213    * Gets the contention manager of the invoking <code>Thread</code>.
214    *
215    * @return the invoking thread's contention manager
216    */
217   static public ContentionManager getContentionManager() {
218     return _threadState.get().manager;
219   }
220   
221   /**
222    * Create a new factory instance.
223    * @param _class class to implement
224    * @return new factory
225    */
226   static public <T> Factory<T> makeFactory(Class<T> _class) {
227     try {
228       Factory<T> factory = (Factory<T>) factoryTable.get(_class);
229      
230       if (factory == null) {
231         factory =  new AtomicFactory<T>(_class, adapterClass);
232         factoryTable.put(_class, factory);
233       }
234       return factory;
235     } catch (Exception e) {
236       throw new PanicException(e);
237     }
238   }
239   
240   /**
241    * Execute a transaction
242    * @param xaction execute this object's <CODE>call()</CODE> method.
243    * @return result of <CODE>call()</CODE> method
244    */
245   public static <T> T doIt(Callable<T> xaction) {
246     ThreadState threadState = _threadState.get();
247     ContentionManager manager = threadState.manager;
248     T result = null;
249    // System.out.println(Thread.currentThread() + " astarted the transaction");
250     boolean flag = false;
251     try {
252       while (true) {
253         threadState.beginTransaction();
254      //   System.out.println(Thread.currentThread() + " offically started the transaction");
255        /////For Integrating with IO////////// 
256         Wrapper.Initialize(Thread.getTransaction());
257       //  System.out.println(Thread.currentThread() + " even more offically started the transaction");
258        ////////////////////////////////////// 
259         try {
260           result = xaction.call();
261       //     System.out.println(Thread.currentThread() + " aborted in committing");
262       //  } catch (AbortedException d) {
263           /*  synchronized(benchmark.lock){
264                 System.out.println(Thread.currentThread() + " aborted in committing");
265             }*/
266
267        // } //catch (SnapshotException s) {
268           //threadState.abortTransaction();
269       //} 
270        // catch (Exception e) {
271       //    e.printStackTrace();
272        //   throw new PanicException("Unhandled exception " + e);
273        // }
274             threadState.totalMemRefs += threadState.transaction.memRefs;
275             threadState.transaction.attempts++;
276      
277             Wrapper.prepareIOCommit();
278
279         ///////////////////////////////
280         
281                 if (threadState.commitTransaction()) {
282                     threadState.committedMemRefs += threadState.transaction.memRefs;
283                     
284                     Wrapper.commitIO();
285                     flag = true;
286                }
287         }
288         catch(AbortedException ex){
289             threadState.depth--;
290            // System.out.println(Thread.currentThread() + " aborted");
291            // Wrapper.getTransaction().unlockAllLocks();
292         }
293         catch (Exception e) {
294           e.printStackTrace();
295           throw new PanicException("Unhandled exception " + e);
296         }
297         finally{
298             
299             
300             Wrapper.getTransaction().unlockAllLocks();
301             if (Thread.getTransaction() == SpecialLock.getSpecialLock().getOwnerTransaction())
302                 SpecialLock.getSpecialLock().unlock(Thread.getTransaction());
303             if  (flag == true)
304                 break;
305         }
306       
307         // transaction aborted
308       }
309       if (threadState.transaction != null) {
310         threadState.abortTransaction();
311       }
312     } finally {
313       threadState.transaction = null;
314       Wrapper.setTransaction(null);
315     }
316     // collect statistics
317     synchronized (lock){
318       totalTotalMemRefs = threadState.totalMemRefs;
319       totalCommittedMemRefs = threadState.committedMemRefs;
320       totalCommitted += threadState.committed;
321       totalTotal += threadState.total;
322       threadState.reset();  // set up for next iteration
323     }
324    /* if (result == null)
325         throw new GracefulException();
326     else return result;*/
327     return result;
328   }
329   /**
330    * Execute transaction
331    * @param xaction call this object's <CODE>run()</CODE> method
332    */
333   public static void doIt(final Runnable xaction) {
334     doIt(new Callable<Boolean>() {
335       public Boolean call() {
336         xaction.run();
337         return false;
338       };
339     });
340   }
341   
342   /**
343    * number of transactions committed by this thread
344    * @return number of transactions committed by this thread
345    */
346   public static long getCommitted() {
347     return totalCommitted;
348   }
349   
350   /**
351    * umber of transactions aborted by this thread
352    * @return number of aborted transactions
353    */
354   public static long getAborted() {
355     return totalTotal -  totalCommitted;
356   }
357   
358   /**
359    * number of transactions executed by this thread
360    * @return number of transactions
361    */
362   public static long getTotal() {
363     return totalTotal;
364   }
365   
366   /**
367    * Register a method to be called every time this thread validates any transaction.
368    * @param c abort if this object's <CODE>call()</CODE> method returns false
369    */
370   public static void onValidate(Callable<Boolean> c) {
371     _threadState.get().onValidate.add(c);
372   }
373   /**
374    * Register a method to be called every time the current transaction is validated.
375    * @param c abort if this object's <CODE>call()</CODE> method returns false
376    */
377   public static void onValidateOnce(Callable<Boolean> c) {
378     _threadState.get().onValidateOnce.add(c);
379   }
380   /**
381    * Register a method to be called every time this thread commits a transaction.
382    * @param r call this object's <CODE>run()</CODE> method
383    */
384   public static void onCommit(Runnable r) {
385     _threadState.get().onCommit.add(r);
386   }
387   /**
388    * Register a method to be called once if the current transaction commits.
389    * @param r call this object's <CODE>run()</CODE> method
390    */
391   public static void onCommitOnce(Runnable r) {
392     _threadState.get().onCommitOnce.add(r);
393   }
394   /**
395    * Register a method to be called every time this thread aborts a transaction.
396    * @param r call this objec't <CODE>run()</CODE> method
397    */
398   public static void onAbort(Runnable r) {
399     _threadState.get().onAbort.add(r);
400   }
401   /**
402    * Register a method to be called once if the current transaction aborts.
403    * @param r call this object's <CODE>run()</CODE> method
404    */
405   public static void onAbortOnce(Runnable r) {
406     _threadState.get().onAbortOnce.add(r);
407   }
408   /**
409    * get thread ID for debugging
410    * @return unique id
411    */
412   public static int getID() {
413     return _threadState.get().hashCode();
414   }
415   
416   /**
417    * reset thread statistics
418    */
419   public static void clear() {
420     totalTotal = 0;
421     totalCommitted = 0;
422     totalCommittedMemRefs = 0;
423     totalTotalMemRefs = 0;
424     stop = false;
425   }
426   
427   /**
428    * Class that holds thread's actual state
429    */
430   public static class ThreadState {
431     
432     int depth = 0;
433     ContentionManager manager;
434     
435     private long committed = 0;        // number of committed transactions
436     private long total = 0;            // total number of transactions
437     private long committedMemRefs = 0; // number of committed reads and writes
438     private long totalMemRefs = 0;     // total number of reads and writes
439     
440     Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
441     Set<Runnable>          onCommit   = new HashSet<Runnable>();
442     Set<Runnable>          onAbort    = new HashSet<Runnable>();
443     Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
444     Set<Runnable>          onCommitOnce   = new HashSet<Runnable>();
445     Set<Runnable>          onAbortOnce    = new HashSet<Runnable>();
446     
447     Transaction transaction = null;
448     
449     /**
450      * Creates new ThreadState
451      */
452     public ThreadState() {
453       try {
454         manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
455       } catch (NullPointerException e) {
456         throw new PanicException("No default contention manager class set.");
457       } catch (Exception e) {  // Some problem with instantiation
458         throw new PanicException(e);
459       }
460     }
461     
462     /**
463      * Resets any metering information (commits/aborts, etc).
464      */
465     public void reset() {
466       committed = 0;        // number of committed transactions
467       total = 0;            // total number of transactions
468       committedMemRefs = 0; // number of committed reads and writes
469       totalMemRefs = 0;     // total number of reads and writes
470     }
471     
472     /**
473      * used for debugging
474      * @return string representation of thread state
475      */
476     public String toString() {
477       return
478           "Thread" + hashCode() + "["+
479           "committed: " +  committed + "," +
480           "aborted: " + ( total -  committed) +
481           "]";
482     }
483     
484     /**
485      * Can this transaction still commit?
486      * This method may be called at any time, not just at transaction end,
487      * so we do not clear the onValidateOnce table.
488      * @return true iff transaction might still commit
489      */
490     public boolean validate() {
491       try {
492         // permanent
493         for (Callable<Boolean> v : onValidate) {
494           if (!v.call()) {
495             return false;
496           }
497         }
498         // temporary
499         for (Callable<Boolean> v : onValidateOnce) {
500           if (!v.call()) {
501             return false;
502           }
503         }
504         return transaction.validate();
505       } catch (Exception ex) {
506         return false;
507       }
508     }
509     
510     /**
511      * Call methods registered to be called on commit.
512      */
513     public void runCommitHandlers() {
514       try {
515         // permanent
516         for (Runnable r: onCommit) {
517           r.run();
518         }
519         // temporary
520         for (Runnable r: onCommitOnce) {
521           r.run();
522         }
523         onCommitOnce.clear();
524         onValidateOnce.clear();
525       } catch (Exception ex) {
526         throw new PanicException(ex);
527       }
528     }
529     
530     /**
531      * Starts a new transaction.  Cannot nest transactions deeper than
532      * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
533      * invoking thread is notified when a transaction is begun.
534      */
535     public void beginTransaction() {
536       transaction = new Transaction();
537       if (depth == 0) {
538         total++;
539       }
540       // first thing to fix if we allow nested transactions
541       if (depth >= 1) {
542         throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
543       }
544       depth++;
545     }
546     
547     /**
548      * Attempts to commit the current transaction of the invoking
549      * <code>Thread</code>.  Always succeeds for nested
550      * transactions.  The contention manager of the invoking thread is
551      * notified of the result.  If the transaction does not commit
552      * because a <code>TMObject</code> opened for reading was
553      * invalidated, the contention manager is also notified of the
554      * inonValidate.
555      *
556      *
557      * @return whether commit succeeded.
558      */
559     public boolean commitTransaction() {
560       depth--;
561       if (depth < 0) {
562         throw new PanicException("commitTransaction invoked when no transaction active.");
563       }
564       if (depth > 0) {
565         throw new PanicException("commitTransaction invoked on nested transaction.");
566       }
567       if (depth == 0) {
568         if (validate() && transaction.commit()) {
569           committed++;
570           runCommitHandlers();
571           return true;
572         }
573         abortTransaction();
574         return false;
575       } else {
576         return true;
577       }
578     }
579     
580     /**
581      * Aborts the current transaction of the invoking <code>Thread</code>.
582      * Does not end transaction, but ensures it will never commit.
583      */
584     public void abortTransaction() {
585       runAbortHandlers();
586       transaction.abort();
587     }
588     
589     /**
590      * Call methods registered to be called on commit.
591      */
592     public void runAbortHandlers() {
593       try {
594         // permanent
595         for (Runnable r: onAbort) {
596           r.run();
597         }
598         // temporary
599         for (Runnable r: onAbortOnce) {
600           r.run();
601         }
602         onAbortOnce.clear();
603         onValidateOnce.clear();
604       } catch (Exception ex) {
605         throw new PanicException(ex);
606       }
607     }
608   }
609 }