From: navid Date: Tue, 9 Sep 2008 04:18:28 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: buildscript^6~33 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=f20762aeab1312fc04c49105cbae8dfa7170c551;p=IRC.git *** empty log message *** --- diff --git a/Robust/Transactions/Notes/descrip.odt b/Robust/Transactions/Notes/descrip.odt index 60cd3333..cdd35b7b 100644 Binary files a/Robust/Transactions/Notes/descrip.odt and b/Robust/Transactions/Notes/descrip.odt differ diff --git a/Robust/Transactions/src/Defaults.java b/Robust/Transactions/src/Defaults.java new file mode 100644 index 00000000..134e0233 --- /dev/null +++ b/Robust/Transactions/src/Defaults.java @@ -0,0 +1,69 @@ +/* + * Defaults.java + * + * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa + * Clara, California 95054, U.S.A. All rights reserved. + * + * Sun Microsystems, Inc. has intellectual property rights relating to + * technology embodied in the product that is described in this + * document. In particular, and without limitation, these + * intellectual property rights may include one or more of the + * U.S. patents listed at http://www.sun.com/patents and one or more + * additional patents or pending patent applications in the U.S. and + * in other countries. + * + * U.S. Government Rights - Commercial software. + * Government users are subject to the Sun Microsystems, Inc. standard + * license agreement and applicable provisions of the FAR and its + * supplements. Use is subject to license terms. Sun, Sun + * Microsystems, the Sun logo and Java are trademarks or registered + * trademarks of Sun Microsystems, Inc. in the U.S. and other + * countries. + * + * This product is covered and controlled by U.S. Export Control laws + * and may be subject to the export or import laws in other countries. + * Nuclear, missile, chemical biological weapons or nuclear maritime + * end uses or end users, whether direct or indirect, are strictly + * prohibited. Export or reexport to countries subject to + * U.S. embargo or to entities identified on U.S. export exclusion + * lists, including, but not limited to, the denied persons and + * specially designated nationals lists is strictly prohibited. + */ + +package dstm2; + +/** + * + * @author Maurice Herlihy + */ +public class Defaults { + /** + * how many threads + **/ + public static final int THREADS = 1; + /** + * benchmark duration in milliseconds + **/ + public static final int TIME = 10000; + /** + * uninterpreted arg passed to benchmark + **/ + public static final int EXPERIMENT = 100; + /** + * fully-qualified contention manager name + **/ + public static final String MANAGER = "dstm2.manager.BackoffManager"; + /** + * fully-qualified factory name + **/ + public static final String FACTORY = "dstm2.factory.shadow.Factory"; + /** + * fully-qualified adapter name + **/ + public static final String ADAPTER = "dstm2.factory.shadow.Adapter";; + + public static final int FILEFRAGMENTSIZE= 1024; + + public static final boolean READWRITECONFLIT = true; + +} diff --git a/Robust/Transactions/src/Thread.java b/Robust/Transactions/src/Thread.java new file mode 100644 index 00000000..fe5bfd04 --- /dev/null +++ b/Robust/Transactions/src/Thread.java @@ -0,0 +1,565 @@ +/* + * Thread.java + * + * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa + * Clara, California 95054, U.S.A. All rights reserved. + * + * Sun Microsystems, Inc. has intellectual property rights relating to + * technology embodied in the product that is described in this + * document. In particular, and without limitation, these + * intellectual property rights may include one or more of the + * U.S. patents listed at http://www.sun.com/patents and one or more + * additional patents or pending patent applications in the U.S. and + * in other countries. + * + * U.S. Government Rights - Commercial software. + * Government users are subject to the Sun Microsystems, Inc. standard + * license agreement and applicable provisions of the FAR and its + * supplements. Use is subject to license terms. Sun, Sun + * Microsystems, the Sun logo and Java are trademarks or registered + * trademarks of Sun Microsystems, Inc. in the U.S. and other + * countries. + * + * This product is covered and controlled by U.S. Export Control laws + * and may be subject to the export or import laws in other countries. + * Nuclear, missile, chemical biological weapons or nuclear maritime + * end uses or end users, whether direct or indirect, are strictly + * prohibited. Export or reexport to countries subject to + * U.S. embargo or to entities identified on U.S. export exclusion + * lists, including, but not limited to, the denied persons and + * specially designated nationals lists is strictly prohibited. + */ + +package dstm2; + +import dstm2.exceptions.AbortedException; +import dstm2.exceptions.GracefulException; +import dstm2.exceptions.PanicException; +import dstm2.exceptions.SnapshotException; +import dstm2.factory.AtomicFactory; +import dstm2.factory.Factory; +import dstm2.file.factory.ExtendedTransaction; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import static dstm2.Defaults.*; +/** + * The basic unit of computation for the transactional memory. This + * class extends java.lang.Thread by providing methods to + * begin, commit and abort transactions. + * + * Every Thread has a contention manager, created when + * the thread is created. Before creating any Threads, + * you must call Thread.setContentionManager to set the + * class of the contention manager that will be created. The + * contention manager of a thread is notified (by invoking its + * notification methods) of the results of any methods involving the + * thread. It is also consulted on whether a transaction should be + * begun. + * + * @see dstm2.ContentionManager + */ +public class Thread extends java.lang.Thread { + /** + * Contention manager class. + */ + protected static Class contentionManagerClass; + + /** + * Adapter class. + */ + protected static Class adapterClass; + + /** + * Set to true when benchmark runs out of time. + **/ + public static volatile boolean stop = false; + /** + * number of committed transactions for all threads + */ + public static long totalCommitted = 0; + /** + * total number of transactions for all threads + */ + public static long totalTotal = 0; + /** + * number of committed memory references for all threads + */ + public static long totalCommittedMemRefs = 0; + /** + * total number of memory references for all threads + */ + public static long totalTotalMemRefs = 0; + + static ThreadLocal _threadState = new ThreadLocal() { + protected synchronized ThreadState initialValue() { + return new ThreadState(); + } + }; + static ThreadLocal _thread = new ThreadLocal() { + protected synchronized Thread initialValue() { + return null; + } + }; + + private static int MAX_NESTING_DEPTH = 1; + + private static Object lock = new Object(); + + // Memo-ize factories so we don't have to recreate them. + private static Map factoryTable + = Collections.synchronizedMap(new HashMap()); + + /** + * Create thread to run a method. + * @param target execute this object's run() method + */ + public Thread(final Runnable target) { + super(new Runnable() { + public void run() { + ThreadState threadState = _threadState.get(); + threadState.reset(); + target.run(); + // collect statistics + synchronized (lock){ + totalCommitted += threadState.committed; + totalTotal += threadState.total; + totalCommittedMemRefs += threadState.committedMemRefs; + totalTotalMemRefs += threadState.totalMemRefs; + } + } + }); + } + /** + * no-arg constructor + */ + public Thread() { + super(); + } + + /** + * Establishes a contention manager. You must call this method + * before creating any Thread. + * + * @see dstm2.ContentionManager + * @param theClass class of desired contention manager. + */ + public static void setContentionManagerClass(Class theClass) { + Class cm; + try { + cm = Class.forName("dstm2.ContentionManager"); + } catch (ClassNotFoundException e) { + throw new PanicException(e); + } + try { + contentionManagerClass = theClass; + } catch (Exception e) { + throw new PanicException("The class " + theClass + + " does not implement dstm2.ContentionManager"); + } + } + + /** + * set Adapter class for this thread + * @param adapterClassName adapter class as string + */ + public static void setAdapterClass(String adapterClassName) { + try { + adapterClass = (Class)Class.forName(adapterClassName); + } catch (ClassNotFoundException ex) { + throw new PanicException("Adapter class not found: %s\n", adapterClassName); + } + } + + /** + * Tests whether the current transaction can still commit. Does not + * actually end the transaction (either commitTransaction or + * abortTransaction must still be called). The contention + * manager of the invoking thread is notified if the onValidate fails + * because a TMObject opened for reading was invalidated. + * + * @return whether the current transaction may commit successfully. + */ + static public boolean validate() { + ThreadState threadState = _threadState.get(); + return threadState.validate(); + } + + /** + * Gets the current transaction, if any, of the invoking Thread. + * + * @return the current thread's current transaction; null if + * there is no current transaction. + */ +/* static public Transaction getTransaction() { + return _threadState.get().transaction; + }*/ + + + static public ExtendedTransaction getTransaction() { + return _threadState.get().transaction; + } + + /** + * Gets the contention manager of the invoking Thread. + * + * @return the invoking thread's contention manager + */ + static public ContentionManager getContentionManager() { + return _threadState.get().manager; + } + + /** + * Create a new factory instance. + * @param _class class to implement + * @return new factory + */ + static public Factory makeFactory(Class _class) { + try { + Factory factory = (Factory) factoryTable.get(_class); + if (factory == null) { + factory = new AtomicFactory(_class, adapterClass); + factoryTable.put(_class, factory); + } + return factory; + } catch (Exception e) { + throw new PanicException(e); + } + } + + /** + * Execute a transaction + * @param xaction execute this object's call() method. + * @return result of call() method + */ + public static T doIt(Callable xaction) { + ThreadState threadState = _threadState.get(); + ContentionManager manager = threadState.manager; + T result = null; + try { + while (!Thread.stop) { + threadState.beginTransaction(); + try { + result = xaction.call(); + } catch (AbortedException d) { + } catch (SnapshotException s) { + threadState.abortTransaction(); + } catch (Exception e) { + e.printStackTrace(); + throw new PanicException("Unhandled exception " + e); + } + threadState.totalMemRefs += threadState.transaction.memRefs; + if (threadState.commitTransaction()) { + threadState.committedMemRefs += threadState.transaction.memRefs; + return result; + } + threadState.transaction.attempts++; + // transaction aborted + } + if (threadState.transaction != null) { + threadState.abortTransaction(); + } + } finally { + threadState.transaction = null; + } + // collect statistics + synchronized (lock){ + totalTotalMemRefs = threadState.totalMemRefs; + totalCommittedMemRefs = threadState.committedMemRefs; + totalCommitted += threadState.committed; + totalTotal += threadState.total; + threadState.reset(); // set up for next iteration + } + throw new GracefulException(); + } + /** + * Execute transaction + * @param xaction call this object's run() method + */ + public static void doIt(final Runnable xaction) { + doIt(new Callable() { + public Boolean call() { + xaction.run(); + return false; + }; + }); + } + + /** + * number of transactions committed by this thread + * @return number of transactions committed by this thread + */ + public static long getCommitted() { + return totalCommitted; + } + + /** + * umber of transactions aborted by this thread + * @return number of aborted transactions + */ + public static long getAborted() { + return totalTotal - totalCommitted; + } + + /** + * number of transactions executed by this thread + * @return number of transactions + */ + public static long getTotal() { + return totalTotal; + } + + /** + * Register a method to be called every time this thread validates any transaction. + * @param c abort if this object's call() method returns false + */ + public static void onValidate(Callable c) { + _threadState.get().onValidate.add(c); + } + /** + * Register a method to be called every time the current transaction is validated. + * @param c abort if this object's call() method returns false + */ + public static void onValidateOnce(Callable c) { + _threadState.get().onValidateOnce.add(c); + } + /** + * Register a method to be called every time this thread commits a transaction. + * @param r call this object's run() method + */ + public static void onCommit(Runnable r) { + _threadState.get().onCommit.add(r); + } + /** + * Register a method to be called once if the current transaction commits. + * @param r call this object's run() method + */ + public static void onCommitOnce(Runnable r) { + _threadState.get().onCommitOnce.add(r); + } + /** + * Register a method to be called every time this thread aborts a transaction. + * @param r call this objec't run() method + */ + public static void onAbort(Runnable r) { + _threadState.get().onAbort.add(r); + } + /** + * Register a method to be called once if the current transaction aborts. + * @param r call this object's run() method + */ + public static void onAbortOnce(Runnable r) { + _threadState.get().onAbortOnce.add(r); + } + /** + * get thread ID for debugging + * @return unique id + */ + public static int getID() { + return _threadState.get().hashCode(); + } + + /** + * reset thread statistics + */ + public static void clear() { + totalTotal = 0; + totalCommitted = 0; + totalCommittedMemRefs = 0; + totalTotalMemRefs = 0; + stop = false; + } + + /** + * Class that holds thread's actual state + */ + public static class ThreadState { + + int depth = 0; + ContentionManager manager; + + private long committed = 0; // number of committed transactions + private long total = 0; // total number of transactions + private long committedMemRefs = 0; // number of committed reads and writes + private long totalMemRefs = 0; // total number of reads and writes + + Set> onValidate = new HashSet>(); + Set onCommit = new HashSet(); + Set onAbort = new HashSet(); + Set> onValidateOnce = new HashSet>(); + Set onCommitOnce = new HashSet(); + Set onAbortOnce = new HashSet(); + + //Transaction transaction = null; + ExtendedTransaction transaction = null; + + /** + * Creates new ThreadState + */ + public ThreadState() { + try { + manager = (ContentionManager)Thread.contentionManagerClass.newInstance(); + } catch (NullPointerException e) { + throw new PanicException("No default contention manager class set."); + } catch (Exception e) { // Some problem with instantiation + throw new PanicException(e); + } + } + + /** + * Resets any metering information (commits/aborts, etc). + */ + public void reset() { + committed = 0; // number of committed transactions + total = 0; // total number of transactions + committedMemRefs = 0; // number of committed reads and writes + totalMemRefs = 0; // total number of reads and writes + } + + /** + * used for debugging + * @return string representation of thread state + */ + public String toString() { + return + "Thread" + hashCode() + "["+ + "committed: " + committed + "," + + "aborted: " + ( total - committed) + + "]"; + } + + /** + * Can this transaction still commit? + * This method may be called at any time, not just at transaction end, + * so we do not clear the onValidateOnce table. + * @return true iff transaction might still commit + */ + public boolean validate() { + try { + // permanent + for (Callable v : onValidate) { + if (!v.call()) { + return false; + } + } + // temporary + for (Callable v : onValidateOnce) { + if (!v.call()) { + return false; + } + } + return transaction.validate(); + } catch (Exception ex) { + return false; + } + } + + /** + * Call methods registered to be called on commit. + */ + public void runCommitHandlers() { + try { + // permanent + for (Runnable r: onCommit) { + r.run(); + } + // temporary + for (Runnable r: onCommitOnce) { + r.run(); + } + onCommitOnce.clear(); + onValidateOnce.clear(); + } catch (Exception ex) { + throw new PanicException(ex); + } + } + + /** + * Starts a new transaction. Cannot nest transactions deeper than + * Thread.MAX_NESTING_DEPTH. The contention manager of the + * invoking thread is notified when a transaction is begun. + */ + public void beginTransaction() { + //transaction = new Transaction(); + transaction = new ExtendedTransaction(); + if (depth == 0) { + total++; + } + // first thing to fix if we allow nested transactions + if (depth >= 1) { + throw new PanicException("beginTransaction: attempting to nest transactions too deeply."); + } + depth++; + } + + /** + * Attempts to commit the current transaction of the invoking + * Thread. Always succeeds for nested + * transactions. The contention manager of the invoking thread is + * notified of the result. If the transaction does not commit + * because a TMObject opened for reading was + * invalidated, the contention manager is also notified of the + * inonValidate. + * + * + * @return whether commit succeeded. + */ + public boolean commitTransaction() { + depth--; + if (depth < 0) { + throw new PanicException("commitTransaction invoked when no transaction active."); + } + if (depth > 0) { + throw new PanicException("commitTransaction invoked on nested transaction."); + } + if (depth == 0) { + if (validate() && transaction.commit()) { + committed++; + runCommitHandlers(); + return true; + } + abortTransaction(); + return false; + } else { + return true; + } + } + + /** + * Aborts the current transaction of the invoking Thread. + * Does not end transaction, but ensures it will never commit. + */ + public void abortTransaction() { + runAbortHandlers(); + transaction.abort(); + } + + /** + * Call methods registered to be called on commit. + */ + public void runAbortHandlers() { + try { + // permanent + for (Runnable r: onAbort) { + r.run(); + } + // temporary + for (Runnable r: onAbortOnce) { + r.run(); + } + onAbortOnce.clear(); + onValidateOnce.clear(); + } catch (Exception ex) { + throw new PanicException(ex); + } + } + } +} diff --git a/Robust/Transactions/src/Transaction.java b/Robust/Transactions/src/Transaction.java new file mode 100644 index 00000000..5606d5bd --- /dev/null +++ b/Robust/Transactions/src/Transaction.java @@ -0,0 +1,265 @@ +/* + * Transaction.java + * + * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa + * Clara, California 95054, U.S.A. All rights reserved. + * + * Sun Microsystems, Inc. has intellectual property rights relating to + * technology embodied in the product that is described in this + * document. In particular, and without limitation, these + * intellectual property rights may include one or more of the + * U.S. patents listed at http://www.sun.com/patents and one or more + * additional patents or pending patent applications in the U.S. and + * in other countries. + * + * U.S. Government Rights - Commercial software. + * Government users are subject to the Sun Microsystems, Inc. standard + * license agreement and applicable provisions of the FAR and its + * supplements. Use is subject to license terms. Sun, Sun + * Microsystems, the Sun logo and Java are trademarks or registered + * trademarks of Sun Microsystems, Inc. in the U.S. and other + * countries. + * + * This product is covered and controlled by U.S. Export Control laws + * and may be subject to the export or import laws in other countries. + * Nuclear, missile, chemical biological weapons or nuclear maritime + * end uses or end users, whether direct or indirect, are strictly + * prohibited. Export or reexport to countries subject to + * U.S. embargo or to entities identified on U.S. export exclusion + * lists, including, but not limited to, the denied persons and + * specially designated nationals lists is strictly prohibited. + */ + +package dstm2; + +import dstm2.exceptions.PanicException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Transaction.java + * Keeps a transaction's status and contention manager. + */ + +public class Transaction implements NewInterface { + + /** + * Possible transaction status + **/ + //public enum Status {ABORTED, ACTIVE, COMMITTED}; + + + public enum Status {ABORTED, ACTIVE, COMMITTED}; + + /** + * Predefined committed transaction + */ + public static Transaction COMMITTED = new Transaction(Status.COMMITTED); + /** + * Predefined orted transaction + */ + public static Transaction ABORTED = new Transaction(Status.ABORTED); + + /** + * Is transaction waiting for another? + */ + public boolean waiting = false; + + /** + * Number of times this transaction tried + */ + public int attempts = 0; + + /** + * Number of unique memory references so far. + */ + public int memRefs = 0; + + /** + * Time in nanos when transaction started + */ + public long startTime = 0; + /** + * Time in nanos when transaction committed or aborted + */ + public long stopTime = 0; + + // generate unique ids + private static AtomicInteger unique = new AtomicInteger(100); + + /** Updater for status */ + /*private static final + AtomicReferenceFieldUpdater + statusUpdater = AtomicReferenceFieldUpdater.newUpdater + (Transaction.class, Status.class, "status");*/ + + protected static final + AtomicReferenceFieldUpdater + statusUpdater = AtomicReferenceFieldUpdater.newUpdater + (Transaction.class, Status.class, "status"); + + + private volatile Status status; + + private long id; + + private ContentionManager manager; + + /** + * Creates a new, active transaction. + */ + public Transaction() { + this.status = Status.ACTIVE; + this.id = this.startTime = System.nanoTime(); + this.manager = Thread.getContentionManager(); + } + + /** + * Creates a new transaction with given status. + * @param myStatus active, committed, or aborted + */ + private Transaction(Transaction.Status myStatus) { + this.status = myStatus; + this.startTime = 0; + } + + /** + * Access the transaction's current status. + * @return current transaction status + */ + public Status getStatus() { + return status; + } + + /** + * Tests whether transaction is active. + * @return whether transaction is active + */ + public boolean isActive() { + return this.getStatus() == Status.ACTIVE; + } + + /** + * Tests whether transaction is aborted. + * @return whether transaction is aborted + */ + public boolean isAborted() { + return this.getStatus() == Status.ABORTED; + } + + /** + * Tests whether transaction is committed. + * @return whether transaction is committed + */ + public boolean isCommitted() { + return (this.getStatus() == Status.COMMITTED); + } + + /** + * Tests whether transaction is committed or active. + * @return whether transaction is committed or active + */ + public boolean validate() { + Status status = this.getStatus(); + switch (status) { + case COMMITTED: + throw new PanicException("committed transaction still running"); + case ACTIVE: + return true; + case ABORTED: + return false; + default: + throw new PanicException("unexpected transaction state: " + status); + } + } + + /** + * Tries to commit transaction + * @return whether transaction was committed + */ + public boolean commit() { + try { + while (this.getStatus() == Status.ACTIVE) { + if (statusUpdater.compareAndSet(this, + Status.ACTIVE, + Status.COMMITTED)) { + return true; + } + } + return false; + } finally { + wakeUp(); + } + } + + /** + * Tries to abort transaction + * @return whether transaction was aborted (not necessarily by this call) + */ + public boolean abort() { + try { + while (this.getStatus() == Status.ACTIVE) { + if (statusUpdater.compareAndSet(this, Status.ACTIVE, Status.ABORTED)) { + return true; + } + } + return this.getStatus() == Status.ABORTED; + } finally { + wakeUp(); + } + } + + /** + * Returns a string representation of this transaction + * @return the string representcodes[ation + */ + public String toString() { + switch (this.status) { + case COMMITTED: + return "Transaction" + this.startTime + "[committed]"; + case ABORTED: + return "Transaction" + this.startTime + "[aborted]"; + case ACTIVE: + return "Transaction" + this.startTime + "[active]"; + default: + return "Transaction" + this.startTime + "[???]"; + } + } + + /** + * Block caller while transaction is active. + */ + public synchronized void waitWhileActive() { + while (this.getStatus() == Status.ACTIVE) { + try { + wait(); + } catch (InterruptedException ex) {} + } + } + /** + * Block caller while transaction is active. + */ + public synchronized void waitWhileActiveNotWaiting() { + while (getStatus() == Status.ACTIVE && !waiting) { + try { + wait(); + } catch (InterruptedException ex) {} + } + } + + /** + * Wake up any transactions waiting for this one to finish. + */ + public synchronized void wakeUp() { + notifyAll(); + } + + /** + * This transaction's contention manager + * @return the manager + */ + public ContentionManager getContentionManager() { + return manager; + } +} diff --git a/Robust/Transactions/src/file/factory/Adapter.java b/Robust/Transactions/src/file/factory/Adapter.java new file mode 100644 index 00000000..e2f58933 --- /dev/null +++ b/Robust/Transactions/src/file/factory/Adapter.java @@ -0,0 +1,264 @@ +/* + * Adapter.java + * + * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa + * Clara, California 95054, U.S.A. All rights reserved. + * + * Sun Microsystems, Inc. has intellectual property rights relating to + * technology embodied in the product that is described in this + * document. In particular, and without limitation, these + * intellectual property rights may include one or more of the + * U.S. patents listed at http://www.sun.com/patents and one or more + * additional patents or pending patent applications in the U.S. and + * in other countries. + * + * U.S. Government Rights - Commercial software. + * Government users are subject to the Sun Microsystems, Inc. standard + * license agreement and applicable provisions of the FAR and its + * supplements. Use is subject to license terms. Sun, Sun + * Microsystems, the Sun logo and Java are trademarks or registered + * trademarks of Sun Microsystems, Inc. in the U.S. and other + * countries. + * + * This product is covered and controlled by U.S. Export Control laws + * and may be subject to the export or import laws in other countries. + * Nuclear, missile, chemical biological weapons or nuclear maritime + * end uses or end users, whether direct or indirect, are strictly + * prohibited. Export or reexport to countries subject to + * U.S. embargo or to entities identified on U.S. export exclusion + * lists, including, but not limited to, the denied persons and + * specially designated nationals lists is strictly prohibited. + */ +package dstm2.file.factory; + +import dstm2.ContentionManager; +import dstm2.Transaction; +import dstm2.exceptions.AbortedException; +import dstm2.exceptions.PanicException; +import dstm2.exceptions.SnapshotException; +import dstm2.factory.Copyable; +import dstm2.factory.Factory; +import dstm2.Thread; +import dstm2.factory.Releasable; +import dstm2.factory.Snapable; + +import dstm2.factory.shadow.RecoverableFactory; +import java.io.File; +import java.io.RandomAccessFile; +import java.lang.Class; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +/** + * Obstruction-free atomic object implementation. Visible reads. + * Support snapshots and early release. + * @author Navid Farri + */ +public class Adapter { + + //public HashMap lockmap; + public HashMap lockmap; + //public AtomicLong commitedoffset; + public AtomicLong commitedfilesize; + private Transaction writer; + // protected AtomicInteger version = new AtomicInteger(0); + //public ReentrantLock lock; + + public Transaction getWriter() { + return writer; + } + + public void setWriter(Transaction writer) { + this.writer = writer; + } + + + public Adapter() { + // version.set(0); + // lock = new ReentrantLock(); + writer = null; + lockmap = new HashMap(); +// commitedoffset.set(0); + } + + /* public Adapter(Adapter adapter) { + version.set(adapter.version.get()); + lock = adapter.lock; + writer = adapter.writer; + lockmap = adapter.lockmap; + }*/ + /* + * Creates a new instance of Adapter + */ + //protected Factory factory; + /* + protected AtomicReference start; + + public AtomicReference getStart() { + return start; + } + + public void setStart(AtomicReference start) { + this.start = start; + } + */ + /* public void onRead(){ + + try { + Thread.onCommitOnce( new Runnable() { + public void run() { + /// commit the changes to the actual file, meanwhile the memory blocks would be owned by this + /// transaction so no change could take place regarding those, this need not be done atomically + } + }); + + Thread.onAbortOnce( new Runnable() { + public void run() { + //// nothing no-op + } + }); + + Transaction me = Thread.getTransaction(); + Locator oldLocator = start.get(); + Copyable version = oldLocator.fastPath(me); + + ContentionManager manager = Thread.getContentionManager(); + Locator newLocator = new Locator(me, (Copyable) new CopyableFileFactory()); + version = (Copyable) newLocator.newVersion; + while (true) { + oldLocator.writePath(me, manager, newLocator); + if (!me.isActive()) { + throw new AbortedException(); + } + + if (Adapter.this.start.compareAndSet(oldLocator, newLocator)) { + return; + } + oldLocator = Adapter.this.start.get(); + } + } catch (IllegalAccessException e) { + throw new PanicException(e); + } catch (InvocationTargetException e) { + throw new PanicException(e); + } + } + + public void onWrite(){ + try { + T version = (T) start.get().newVersion; + final Method method = version.getClass().getMethod(methodName); + return new Adapter.Getter() { + public V call() { + try { + Transaction me = Thread.getTransaction(); + Locator oldLocator = Adapter.this.start.get(); + T version = (T) oldLocator.fastPath(me); + if (version == null) { + ContentionManager manager = Thread.getContentionManager(); + Locator newLocator = new Locator(); + while (true) { + oldLocator.readPath(me, manager, newLocator); + if (Adapter.this.start.compareAndSet(oldLocator, newLocator)) { + version = (T) newLocator.newVersion; + break; + } + oldLocator = start.get(); + } + if (!me.isActive()) { + throw new AbortedException(); + } + } + return (V)method.invoke(version); + } catch (SecurityException e) { + throw new PanicException(e); + } catch (IllegalAccessException e) { + throw new PanicException(e); + } catch (InvocationTargetException e) { + throw new PanicException(e); + } + }}; + } catch (NoSuchMethodException e) { + throw new PanicException(e); + } + } + + public void release() { + Transaction me = Thread.getTransaction(); + Locator oldLocator = this.start.get(); + T version = (T) oldLocator.fastPath(me); + if (version == null) { + ContentionManager manager = Thread.getContentionManager(); + Locator newLocator = new Locator(); + version = (T) newLocator.newVersion; + while (true) { + oldLocator.releasePath(me, manager, newLocator); + if (this.start.compareAndSet(oldLocator, newLocator)) { + break; + } + oldLocator = this.start.get(); + } + if (!me.isActive()) { + throw new AbortedException(); + } + } + return; + } + + public T snapshot() { + Transaction me = Thread.getTransaction(); + Locator oldLocator = this.start.get(); + T version = (T) oldLocator.fastPath(me); + if (version == null) { + ContentionManager manager = Thread.getContentionManager(); + return (T)oldLocator.snapshot(me, manager); + } else { + return version; + } + } + + public void validate(T snap) { + if (snap != snapshot()) { + throw new SnapshotException(); + } + } + + public void upgrade(T snap) { + Transaction me = Thread.getTransaction(); + Locator oldLocator = this.start.get(); + T version = (T) oldLocator.fastPath(me); + if (version != null) { + if (version != snap) { + throw new SnapshotException(); + } else { + return; + } + } + ContentionManager manager = Thread.getContentionManager(); + Locator newLocator = new Locator(me, (Copyable)factory.create()); + while (true) { + oldLocator.writePath(me, manager, newLocator); + if (!me.isActive()) { + throw new AbortedException(); + } + if (snap != newLocator.oldVersion) { + throw new SnapshotException(); + } + if (this.start.compareAndSet(oldLocator, newLocator)) { + return; + } + oldLocator = this.start.get(); + } + }*/ +} + diff --git a/Robust/Transactions/src/file/factory/BlockLock.java b/Robust/Transactions/src/file/factory/BlockLock.java new file mode 100644 index 00000000..6ed48d51 --- /dev/null +++ b/Robust/Transactions/src/file/factory/BlockLock.java @@ -0,0 +1,49 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.factory; + +import dstm2.Transaction; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * + * @author navid + */ +public class BlockLock { + + //public ReentrantReadWriteLock lock; + public ReentrantLock lock; + public Transaction owner; + public INode inode; + public int blocknumber; + public AtomicInteger version; + int referncount; + public static enum MODE {READ, WRITE, READ_WRITE}; + public MODE accessmode; + + public BlockLock(INode inode, int blocknumber) { + version = new AtomicInteger(0); + //lock = new ReentrantReadWriteLock(); + lock = new ReentrantLock(); + this.inode = inode; + this.blocknumber = blocknumber; + referncount = 0; + } + + + public synchronized int getReferncount() { + return referncount; + } + + public synchronized void setReferncount(int referncount) { + this.referncount = referncount; + } + + + +} diff --git a/Robust/Transactions/src/file/factory/ExtendedTransaction.java b/Robust/Transactions/src/file/factory/ExtendedTransaction.java new file mode 100644 index 00000000..e948264f --- /dev/null +++ b/Robust/Transactions/src/file/factory/ExtendedTransaction.java @@ -0,0 +1,357 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +import dstm2.Transaction; +import dstm2.Thread; +import dstm2.exceptions.AbortedException; +import dstm2.file.interfaces.BlockAccessModesEnum; +import dstm2.file.interfaces.FileAccessModesEum; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author navid + */ +public class ExtendedTransaction extends Transaction { + + static ExtendedTransaction getTransaction() { + throw new UnsupportedOperationException("Not yet implemented"); + } + + // Vector heldlocks; + private Vector heldlocks; + // HashMap FilesAccesses; + private HashMap FilesAccesses; + //HashMap FilesAccessMode; + private HashMap FilesAccessModes; + + public HashMap getFilesAccessModes() { + return FilesAccessModes; + } + + public HashMap getFilesAccesses() { + return FilesAccesses; + } + + public void addtoFileAccessModeMap(INode inode, FileAccessModesEum mode) { + /* if (FilesAccessModes.containsKey(inode)) { + if (((FileAccessModesEum) (FilesAccessModes.get(inode))) == FileAccessModesEum.APPEND) { + FilesAccessModes.put(inode, mode); + } else if (((FileAccessModesEum) (FilesAccessModes.get(inode))) == FileAccessModesEum.READ) { + if (mode == FileAccessModesEum.READ_WRITE) { + FilesAccessModes.put(inode, mode); + } + } + } else {*/ + FilesAccessModes.put(inode, mode); + //} + } + + public ExtendedTransaction() { + super(); + } + + public Vector getHeldlocks() { + return heldlocks; + } + + public Map getSortedFileAccessMap(HashMap hmap) { + Map sortedMap = new TreeMap(hmap); + return sortedMap; + } + + public void addFile(TransactionalFile tf/*, TransactionLocalFileAttributes.MODE mode*/) { + + + if (tf.appendmode) { + this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.APPEND); + } else if (tf.writemode) { + this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ_WRITE); + } else { + this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ); + } + boolean flag = tf.to_be_created; + RandomAccessFile fd = tf.file; + ReentrantLock lock = tf.offsetlock; + Offset commitedoffset = tf.commitedoffset; + + synchronized (tf.adapter) { + //Adapter ad = new Adapter(tf.adapter); + FilesAccesses.put(tf.getInode(), new TransactionLocalFileAttributes(/*ad*/tf.adapter, flag/*, mode*/, fd, lock, commitedoffset)); + } + + } + + @Override + public boolean commit() { /// Locking offsets for File Descriptors + + // Vector offsetlocks = new Vector(); + Map hm = getSortedFileAccessMap(FilesAccesses); + //lock phase + Iterator iter = hm.keySet().iterator(); + TransactionLocalFileAttributes value; + while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) { + INode key = (INode) iter.next(); + value = (TransactionLocalFileAttributes) hm.get(key); + + if (value.getAccesedblocks().isEmpty()) { + value.setValidatelocaloffset(false); + } else if ((value.getAccesedblocks().values().contains(BlockAccessModesEnum.READ)) || (value.getAccesedblocks().values().contains(BlockAccessModesEnum.READ_WRITE))) { + value.setValidatelocaloffset(true); + } else { + value.setValidatelocaloffset(false); + } + if (value.isValidatelocaloffset()) { + if (value.getCopylocaloffset() == value.currentcommitedoffset.getOffsetnumber()) { + value.offsetlock.lock(); + //offsetlocks.add(value.offsetlock); + heldlocks.add(value.offsetlock); + if (!(value.getCopylocaloffset() == value.currentcommitedoffset.getOffsetnumber())) { + /* for (int i = 0; i < offsetlocks.size(); i++) { + ((ReentrantLock) offsetlocks.get(i)).unlock(); + }*/ + unlockAllLocks(); + //throw new AbortedException(); + return false; + } + } else { + /*for (int i = 0; i < offsetlocks.size(); i++) { + ((ReentrantLock) offsetlocks.get(i)).unlock(); + }*/ + unlockAllLocks(); + return false; + // throw new AbortedException(); + } + } else { + value.offsetlock.lock(); + heldlocks.add(value.offsetlock); + } + } + return true; + + + // return ok; + /* + if (ok) { + if (!(super.commit())) { + unlockAllLocks(); + return false; + } else { + return true; + } + } else { + return false; + }*/ + } + + public boolean lock(BlockLock block, Adapter adapter, BlockAccessModesEnum mode, int expvalue/*INode inode, TransactionLocalFileAttributes tf*/) { + + final ReentrantLock lock = block.lock; + while (this.getStatus() == Status.ACTIVE) { + if (lock.tryLock()) { + Thread.onAbortOnce(new Runnable() { + + public void run() { + lock.unlock(); + } + }); + + heldlocks.add(lock); + if (mode != BlockAccessModesEnum.WRITE) { + if (block.version.get() != expvalue) { + unlockAllLocks(); + return false; + } + } + return true; + } else { + getContentionManager().resolveConflict(this, block.owner); + } + } + return false; + } + + public void endTransaction() { + + if (commit()) { + /////////////////////////// + Map hm = getSortedFileAccessMap(FilesAccesses); + Iterator iter = hm.keySet().iterator(); + TransactionLocalFileAttributes value; + boolean ok = true; + while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && ok) { + int expvalue; + INode key = (INode) iter.next(); + + value = (TransactionLocalFileAttributes) hm.get(key); + if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.APPEND) { + Range tmp = (Range) (value.getWrittendata().firstKey()); + int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(value.currentcommitedoffset.getOffsetnumber()); + int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(value.currentcommitedoffset.getOffsetnumber(), (int) (tmp.getEnd() - tmp.getStart())); + for (int i = startblock; i <= targetblock; i++) { + value.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE); + } + } + + + + + + Iterator it = value.getAccesedblocks().keySet().iterator(); + while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) { + Integer blockno = (Integer) it.next(); + BlockLock blockobj = (BlockLock) value.adapter.lockmap.get(blockno); + expvalue = ((Integer) value.getBlockversions().get(it)).intValue(); + if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) { + + if (blockobj.version.get() == expvalue) { + // ok = this.lock(key, value/*value.adapter*/); + ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue); + if (ok == false) { + // unlockAllLocks(); + break; + } + } else { + ok = false; + // unlockAllLocks(); + break; + // return false; + } + } else { + + ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue); + if (ok == false) { + break; + } + } + } + } + + if (!(ok)) { + unlockAllLocks(); + throw new AbortedException(); + + + } + + if (!(super.commit())) { + unlockAllLocks(); + throw new AbortedException(); + + } + + iter = hm.keySet().iterator(); + while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) { + INode key = (INode) iter.next(); + value = (TransactionLocalFileAttributes) hm.get(key); + if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.APPEND) { + try { + Range range = (Range) value.getWrittendata().firstKey(); + + + //synchronized(value.adapter){ + //value.f.seek(value.adapter.commitedfilesize.get()); + value.f.seek(value.currentcommitedoffset.getOffsetnumber()); + //} + + Byte[] data = new Byte[(int) (range.getEnd() - range.getStart())]; + byte[] bytedata = new byte[(int) (range.getEnd() - range.getStart())]; + data = (Byte[]) value.getWrittendata().get(range); + + for (int i = 0; i < data.length; i++) { + bytedata[i] = data[i]; + } + value.f.write(bytedata); + + } catch (IOException ex) { + Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex); + } + + } else if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.READ) { + continue; + } else { + int tobeaddedoffset = 0; + + if (value.isValidatelocaloffset()) + tobeaddedoffset = 0; + else + tobeaddedoffset = (int) (value.currentcommitedoffset.getOffsetnumber() - value.getCopylocaloffset()); + + Iterator it = value.getWrittendata().keySet().iterator(); + while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) { + try { + Range range = (Range) it.next(); + value.f.seek(range.getStart() + tobeaddedoffset); + Byte[] data = new Byte[(int) (range.getEnd() - range.getStart())]; + byte[] bytedata = new byte[(int) (range.getEnd() - range.getStart())]; + data = (Byte[]) value.getWrittendata().get(range); + + for (int i = 0; i < data.length; i++) { + bytedata[i] = data[i]; + } + value.f.write(bytedata); + + } catch (IOException ex) { + Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex); + } + } + } + } + + + iter = hm.keySet().iterator(); + while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) { + INode key = (INode) iter.next(); + value = (TransactionLocalFileAttributes) hm.get(key); + Iterator it = value.getAccesedblocks().keySet().iterator(); + + while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) { + Integer blockno = (Integer) it.next(); + BlockLock blockobj = (BlockLock) value.adapter.lockmap.get(blockno); + blockobj.version.getAndIncrement(); + value.currentcommitedoffset.setOffsetnumber(value.getLocaloffset()); + synchronized (value.adapter) { + value.adapter.commitedfilesize.getAndSet(value.getFilelength()); + } + } + } + + + // unlock phase + /*iter = hm.keySet().iterator(); + while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) { + INode key = (INode) iter.next(); + value = (TransactionLocalFileAttributes) hm.get(key); + value.offsetlock.unlock(); + }*/ + unlockAllLocks(); + } else { + throw new AbortedException(); + } + } + + private void unlockAllLocks() { + Iterator it = heldlocks.iterator(); + while (it.hasNext()) { + ReentrantLock lock = (ReentrantLock) it.next(); + lock.unlock(); + } + } +} + + + diff --git a/Robust/Transactions/src/file/factory/FileBlockManager.java b/Robust/Transactions/src/file/factory/FileBlockManager.java new file mode 100644 index 00000000..53623059 --- /dev/null +++ b/Robust/Transactions/src/file/factory/FileBlockManager.java @@ -0,0 +1,27 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +import dstm2.Defaults; + + +/** + * + * @author navid + */ +public class FileBlockManager { + + public static long getFragmentIndexofTheFile(long filesize) { + return (filesize / Defaults.FILEFRAGMENTSIZE); + } + + public static int getCurrentFragmentIndexofTheFile(long start) { + return (int) ((start / Defaults.FILEFRAGMENTSIZE)); + } + + public static int getTargetFragmentIndexofTheFile(long start, int offset) { + return (int) (((offset + start) / Defaults.FILEFRAGMENTSIZE)); + } +} diff --git a/Robust/Transactions/src/file/factory/INode.java b/Robust/Transactions/src/file/factory/INode.java new file mode 100644 index 00000000..9afbc94f --- /dev/null +++ b/Robust/Transactions/src/file/factory/INode.java @@ -0,0 +1,31 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.factory; + +/** + * + * @author navid + */ +public class INode { + + private long number; + + + public INode(long number) { + this.number = number; + } + + public long getNumber() { + return number; + } + + public void setNumber(long number) { + this.number = number; + } + + + +} diff --git a/Robust/Transactions/src/file/factory/Offset.java b/Robust/Transactions/src/file/factory/Offset.java new file mode 100644 index 00000000..cf4298b7 --- /dev/null +++ b/Robust/Transactions/src/file/factory/Offset.java @@ -0,0 +1,29 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.factory; + +/** + * + * @author navid + */ +public class Offset { + private long offsetnumber; + + public Offset(long offsetnumber) { + this.offsetnumber = offsetnumber; + } + + + public long getOffsetnumber() { + return offsetnumber; + } + + public void setOffsetnumber(long offsetnumber) { + this.offsetnumber = offsetnumber; + } + + +} diff --git a/Robust/Transactions/src/file/factory/Range.java b/Robust/Transactions/src/file/factory/Range.java new file mode 100644 index 00000000..1d80295a --- /dev/null +++ b/Robust/Transactions/src/file/factory/Range.java @@ -0,0 +1,106 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +/** + * + * @author navid + */ +public class Range implements Comparable { + + private long start; + private long end; + + public Range() { + } + + public Range(long start, long end) { + this.start = start; + this.end = end; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public Range intersection(Range secondrange) { + if ((secondrange.start <= this.start) && (this.start <= secondrange.end)) { + return new Range(this.start, Math.min(this.end, secondrange.end)); + } else if ((secondrange.start <= this.end) && (this.end <= secondrange.end)) { + return new Range(Math.max(this.start, secondrange.start), this.end); + } else if ((this.start <= secondrange.start) && (secondrange.end <= this.end)) { + return new Range(secondrange.start, secondrange.end); + } else { + return null; + } + } + + public boolean hasIntersection(Range secondrange) { + if ((secondrange.start <= this.start) && (this.start <= secondrange.end)) { + return true; + } else if ((secondrange.start <= this.end) && (this.end <= secondrange.end)) { + return true; + } else if ((this.start <= secondrange.start) && (secondrange.end <= this.end)) { + return true; + } else { + return false; + } + } + + public boolean includes(Range secondrange) { + if (this.start <= secondrange.start && secondrange.end <= this.end) { + return true; + } else { + return false; + } + } + + public Range[] minus(Range[] intersectedranges, int size) { + Range[] tmp = new Range[size + 1]; + + int counter = 0; + if (this.start < intersectedranges[0].start) { + tmp[counter] = new Range(this.start, intersectedranges[0].start); + counter++; + } + for (int i = 1; i < size; i++) { + tmp[counter] = new Range(intersectedranges[i - 1].end, intersectedranges[i].start); + counter++; + } + if (this.end > intersectedranges[size - 1].end) { + tmp[counter] = new Range(intersectedranges[size - 1].end, this.end); + counter++; + } + Range[] result = new Range[counter]; + for (int i = 0; i < counter; i++) { + result[i] = tmp[i]; + } + return result; + } + + public int compareTo(Object arg0) { + + Range tmp = (Range) arg0; + if (this.start < tmp.start) { + return -1; + } else if (this.start == tmp.start) { + return 0; + } else { + return -1; + } + } +} diff --git a/Robust/Transactions/src/file/factory/TransactionLocalFileAttributes.java b/Robust/Transactions/src/file/factory/TransactionLocalFileAttributes.java new file mode 100644 index 00000000..5798ef2f --- /dev/null +++ b/Robust/Transactions/src/file/factory/TransactionLocalFileAttributes.java @@ -0,0 +1,143 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author navid + */ +public class TransactionLocalFileAttributes { + + //public int recordedversion; + public Adapter adapter; + + public boolean to_be_created = false; + RandomAccessFile f; + private boolean validatelocaloffset = true; + public boolean writetoabsoluteoffset = false; + public int writetoabsoluteoffsetfromhere = -1; + public ReentrantLock offsetlock; + //Vector startoffset; + //Vector data; + //Vector occupiedblocks; + // private Vector readoccupiedblocks; + //private Vector writeoccupiedblocks; + // private Vector startoffset; + // private Vector data; + //private TreeMap writtendata; + private TreeMap writtendata; + //private TreeMap accesesblocks; + private TreeMap accesedblocks; + //private TreeMap blockversions; + private TreeMap blockversions; + + + public Offset currentcommitedoffset; + private final long copylocaloffset; + private final long copycurrentfilesize; + private long localoffset; + private long filelength; + + + public boolean isValidatelocaloffset() { + return validatelocaloffset; + } + + public void setValidatelocaloffset(boolean validatelocaloffset) { + this.validatelocaloffset = validatelocaloffset; + } + + public long getFilelength() { + return filelength; + } + + public void setFilelength(long filelength) { + this.filelength = filelength; + } + + public long getLocaloffset() { + return localoffset; + } + + public void setLocaloffset(long localoffset) { + this.localoffset = localoffset; + } + + //public MODE accessmode; + + public TreeMap getAccesedblocks() { + return accesedblocks; + } + + public TreeMap getBlockversions() { + return blockversions; + } + + public long getCopycurrentfilesize() { + return copycurrentfilesize; + } + + public long getCopylocaloffset() { + return copylocaloffset; + } + + + /* public Vector getData() { + return data; + } + + public Vector getReadoccupiedblocks() { + return readoccupiedblocks; + } + + public Vector getWriteoccupiedblocks() { + return writeoccupiedblocks; + } + */ + + /* public Vector getStartoffset() { + return startoffset; + } + */ + public TransactionLocalFileAttributes(Adapter adapter, boolean to_be_created, /*TransactionLocalFileAttributes.MODE mode,*/ RandomAccessFile f, ReentrantLock offsetlock, Offset currentcommitedoffset) { + + this.adapter = adapter; + this.localoffset = currentcommitedoffset.getOffsetnumber(); + this.copylocaloffset = this.localoffset; + this.currentcommitedoffset = currentcommitedoffset; + this.filelength = adapter.commitedfilesize.get(); + this.copycurrentfilesize = this.filelength; + this.to_be_created = to_be_created; + this.f = f; + //this.filelength = filelength; + writtendata = new TreeMap(); + validatelocaloffset = false; + this.offsetlock = offsetlock; + + //readoccupiedblocks = new Vector(); + //writeoccupiedblocks = new Vector(); + //accessmode = mode; + //recordedversion = adapter.version.get(); + //startoffset = new Vector(); + //data = new Vector(); + //readoccupiedblocks = new Vector(); + //writeoccupiedblocks = new Vector(); + } + + public TreeMap getWrittendata() { + return writtendata; + } +} diff --git a/Robust/Transactions/src/file/factory/TransactionalFile.java b/Robust/Transactions/src/file/factory/TransactionalFile.java new file mode 100644 index 00000000..2545a112 --- /dev/null +++ b/Robust/Transactions/src/file/factory/TransactionalFile.java @@ -0,0 +1,678 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +import dstm2.Thread; +import dstm2.Transaction.Status; +import dstm2.exceptions.AbortedException; +import dstm2.exceptions.PanicException; +import dstm2.file.interfaces.BlockAccessModesEnum; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Iterator; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author navid + */ +public class TransactionalFile { + + public RandomAccessFile file; + private INode inode; + public Adapter adapter; + /* public AtomicLong commitedoffset; + public AtomicLong commitedfilesize;*/ + public boolean to_be_created = false; + public boolean writemode = false; + public boolean appendmode = false; + public ReentrantLock offsetlock; + public Offset commitedoffset; + + public TransactionalFile(String filename, String mode) { + synchronized (this) { + adapter = TransactionalFileWrapperFactory.createTransactionalFile(filename, mode); + inode = TransactionalFileWrapperFactory.getINodefromFileName(filename); + } + + File f = new File(filename); + if ((!(f.exists()))) { + to_be_created = true; + file = null; + } else { + try { + offsetlock = new ReentrantLock(); + file = new RandomAccessFile(f, mode); + } catch (FileNotFoundException ex) { + + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } + } + if (mode.equals("rw")) { + writemode = true; + } else if (mode.equals("a")) { + appendmode = true; + } + synchronized (adapter) { + try { + if (!(to_be_created)) { + + adapter.commitedfilesize.set(file.length()); + + } else { + adapter.commitedfilesize.set(0); + } + if (!appendmode) { + commitedoffset.setOffsetnumber(0); + } else { + commitedoffset.setOffsetnumber(file.length()); + } + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } + } + } + + /* public TransactionalFile(Adapter adapter, RandomAccessFile file) { + + this.adapter = adapter; + this.file = file; + decriptors = new Vector(); + } + + public void copyTransactionalFile(TransactionalFile tf){ + try { + int tmp = tf.commitedoffset.get(); + boolean flag = tf.to_be_created; + FileDescriptor fd = tf.file.getFD(); + Adapter ad = new Adapter(tf.adapter); + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } + }*/ + public /*synchronized*/ BlockLock getBlockLock(int blocknumber) { + synchronized (adapter.lockmap) { + if (adapter.lockmap.containsKey(blocknumber)) { + return ((BlockLock) (adapter.lockmap.get(blocknumber))); + } else { + BlockLock tmp = new BlockLock(getInode(),blocknumber); + adapter.lockmap.put(blocknumber, tmp); + return tmp; + } + } + + } + + /* public boolean deleteBlockLock(int blocknumber){ + synchronized(adapter.lockmap){ + //adapter.lockmap.get(blocknumber) + if (adapter.lockmap.containsKey(blocknumber)){ + if (((BlockLock)(adapter.lockmap.get(blocknumber))).referncount == 0){ + adapter.lockmap.remove(adapter.lockmap.get(blocknumber)); + return true; + } + else + return false; + } + else { + return false; + } + } + }*/ + public void close() { + try { + file.close(); + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } + } + + public void seek(long offset) { + + if (appendmode) { + throw new PanicException("Cannot seek into a file opened in append mode"); + } + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + TransactionLocalFileAttributes tmp; + if (!(me.getFilesAccesses().containsKey(this.inode))) { + me.addFile(this); + tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + tmp.writetoabsoluteoffset = true; + tmp.writetoabsoluteoffsetfromhere = 0; + //tmp.setValidatelocaloffset(false); + } else { + tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + //tmp.setValidatelocaloffset(true); + if (!(tmp.writetoabsoluteoffset)) + { + tmp.writetoabsoluteoffset = true; + tmp.writetoabsoluteoffsetfromhere = tmp.getWrittendata().size(); + } + } + + tmp.setLocaloffset(offset); + } + + public int read(byte[] b) { + + if (appendmode) { + throw new PanicException("Cannot seek into a file opened in append mode"); + } + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + int size = b.length; + int result; + + if (me == null) { // not a transaction + + size = 10; + return size; + } else if (me.getFilesAccesses().containsKey(this.getInode())) {// in its read list files, read from the file + + TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + tmp.setValidatelocaloffset(true); + + + long loffset = tmp.getLocaloffset(); + + + int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); + int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); + for (int i = startblock; i <= targetblock; i++) { + if (tmp.getAccesedblocks().containsKey(Integer.valueOf(i))) { + if (((BlockAccessModesEnum) (tmp.getAccesedblocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.WRITE) { + tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE); + } + } else { + tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ); + + + //adapter.lock.lock(); + tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockLock(i).version.get())); + //adapter.lock.unlock(); + /*synchronized(adapter.version){ + tmp.getBlockversions().put(Integer.valueOf(i), tmp.adapter.version); // 1st alternative + }*/ + + //return readFromFile(b, tmp); + } + } + + + if (!(validateBlocksVersions(startblock, targetblock))) { + throw new AbortedException(); + } + + + + Range readrange = new Range(loffset, loffset + size); + Range writerange = null; + Range[] intersectedrange = new Range[tmp.getWrittendata().size()]; + Range[] markedwriteranges = new Range[tmp.getWrittendata().size()]; + + int counter = 0; + + + + + boolean flag = false; + Iterator it = tmp.getWrittendata().keySet().iterator(); + while (it.hasNext()) { + writerange = (Range) it.next(); + if (writerange.includes(readrange)) { + flag = true; + break; + } + + if (writerange.hasIntersection(readrange)) { + intersectedrange[counter] = readrange.intersection(writerange); + markedwriteranges[counter] = writerange; + counter++; + } + } + + if (flag) { + result = readFromBuffer(b, tmp, writerange); + tmp.setLocaloffset(tmp.getLocaloffset() + result); + //return result; + } else if (counter == 0) { + result = readFromFile(b, tmp); + tmp.setLocaloffset(tmp.getLocaloffset() + result); + //return result; + } else { + + for (int i = 0; i < counter; i++) { + Byte[] data = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i])); + byte[] copydata = new byte[data.length]; + + for (int j = 0; j < data.length; j++) { + copydata[j] = data[j].byteValue(); + } + System.arraycopy(copydata, (int) (intersectedrange[i].getStart() - markedwriteranges[i].getStart()), b, (int) (intersectedrange[i].getStart() - readrange.getStart()), (int) (Math.min(intersectedrange[i].getEnd(), readrange.getEnd()) - intersectedrange[i].getStart())); + } + + Range[] non_intersected_ranges = readrange.minus(intersectedrange, counter); + Vector occupiedblocks = new Vector(); + Vector heldlocks = new Vector(); + for (int i = 0; i < non_intersected_ranges.length; i++) { + int st = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getStart()); + int en = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getEnd()); + for (int j = st; j <= en; j++) { + if (!(occupiedblocks.contains(j))) { + occupiedblocks.add(j); + } + } + } + + + offsetlock.lock(); + + for (int k = 0; k < occupiedblocks.size(); k++) { + int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue(); + while (me.getStatus() == Status.ACTIVE) { + BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(k))); + // if (block.version.get() == expvalue) { + + if (block.lock.tryLock()) { + heldlocks.add(block.lock); + if (!(block.version.get() == expvalue)) { + me.abort(); + } else { + break; + } + } else { + me.getContentionManager().resolveConflict(me, block.owner); + } + // } else { + // me.abort(); + //} + } + if (me.getStatus() == Status.ABORTED) { + unlockLocks(heldlocks); + throw new AbortedException(); + } + } + + for (int i = 0; i < non_intersected_ranges.length; i++) { + try { + // offsetlock.lock(); + file.seek(non_intersected_ranges[i].getStart()); + file.read(b, (int) (non_intersected_ranges[i].getStart() - readrange.getStart()), (int) (non_intersected_ranges[i].getEnd() - non_intersected_ranges[i].getStart())); + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + + } + + } + + unlockLocks(heldlocks); + offsetlock.unlock(); + tmp.setLocaloffset(tmp.getLocaloffset() + size); + result = size; + //return size; + + } + + return result; + + } else { // add to the readers list + //me.addReadFile(this); + + me.addFile(this/*, TransactionLocalFileAttributes.MODE.READ*/); + TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + tmp.setValidatelocaloffset(true); + return read(b); + } + + } + + public void write(byte[] data) throws IOException { + + if (!(writemode)) { + throw new IOException(); + //return; + } + + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + + int size = data.length; + + + if (me == null) // not a transaction + { + size = 10; + } else if (me.getFilesAccesses().containsKey(this.getInode())) // in its read list files, read from the file + { + + TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + + Byte[] by = new Byte[size]; + for (int i = 0; i < size; i++) { + by[i] = Byte.valueOf(data[i]); + } + TreeMap tm = tmp.getWrittendata(); + long loffset = tmp.getLocaloffset(); + Range newwriterange; + if (appendmode) { + newwriterange = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd()) + size); + Range range = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd())); + Byte[] appenddata = new Byte[(int) (newwriterange.getEnd() - newwriterange.getStart())]; + Byte[] tempor = new Byte[(int) (range.getEnd() - range.getStart())]; + System.arraycopy(tempor, 0, appenddata, 0, tempor.length); + System.arraycopy(by, 0, appenddata, tempor.length, by.length); + tm.remove(range); + tm.put(newwriterange, appenddata); + tmp.setLocaloffset(loffset + size); + tmp.setFilelength(tmp.getFilelength() + size); + + return; + } + + newwriterange = new Range(loffset, loffset + size); + Range oldwriterange = null; + Range intersect = null; + Range[] intersectedrange = new Range[tmp.getWrittendata().size()]; + Range[] markedwriteranges = new Range[tmp.getWrittendata().size()]; + by = new Byte[size]; + int counter = 0; + + /* + + if (tmp.accessmode == TransactionLocalFileAttributes.MODE.READ) + tmp.accessmode = TransactionLocalFileAttributes.MODE.READ_WRITE; + else if (tmp.accessmode == TransactionLocalFileAttributes.MODE.WRITE) + simpleWritetoBuffer(by, newwriterange, tm); + */ + + int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); + int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); + for (int i = startblock; i <= targetblock; i++) { + if (tmp.getAccesedblocks().containsKey(Integer.valueOf(i))) { + if (((BlockAccessModesEnum) (tmp.getAccesedblocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.READ) { + tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE); + } + } else { + tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE); + + tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockLock(i).version.get())); + } + } + + // Vector offset = tmp.getStartoffset(); + // Vector tmpdata = tmp.getData(); + + + + // offset.add(new Integer(tmp.localoffset)); + + boolean flag = false; + Iterator it = tmp.getWrittendata().keySet().iterator(); + while (it.hasNext()) { + oldwriterange = (Range) it.next(); + if (oldwriterange.includes(newwriterange)) { + flag = true; + intersect = newwriterange.intersection(oldwriterange); + break; + } + + if (oldwriterange.hasIntersection(newwriterange)) { + intersectedrange[counter] = newwriterange.intersection(oldwriterange); + markedwriteranges[counter] = oldwriterange; + counter++; + } + } + + if (flag) { + int datasize = (int) (oldwriterange.getEnd() - oldwriterange.getStart()); + Byte[] original = (Byte[]) (tmp.getWrittendata().get(oldwriterange)); + byte[] originaldata = new byte[datasize]; + + for (int i = 0; i < data.length; i++) { + originaldata[i] = original[i].byteValue(); + } + System.arraycopy(data, 0, originaldata, (int) (newwriterange.getStart() - oldwriterange.getStart()), size); + Byte[] to_be_inserted = new Byte[datasize]; + //System.arraycopy(b, 0, ab, a.length); + + for (int i = 0; i < datasize; i++) { + to_be_inserted[i] = Byte.valueOf(originaldata[i]); + } + tm.put(oldwriterange, to_be_inserted); + tmp.setLocaloffset(loffset + size); + if (tmp.getLocaloffset() > tmp.getFilelength()) + tmp.setFilelength(tmp.getLocaloffset()); + return; + + } else if (counter == 0) { + tm.put(newwriterange, data); + tmp.setLocaloffset(loffset + size); + if (tmp.getLocaloffset() > tmp.getFilelength()) + tmp.setFilelength(tmp.getLocaloffset()); + return; + } else { + + int suffixstart = 0; + long start = 0; + long end = 0; + Byte[] prefixdata = null; + Byte[] suffixdata = null; + boolean prefix = false; + boolean suffix = false; + + for (int i = 0; i < counter; i++) { + + //if (newwriterange.includes(markedwriteranges[i])) + //tm.remove(markedwriteranges); + + if (markedwriteranges[i].getStart() < newwriterange.getStart()) { + + prefixdata = new Byte[(int) (newwriterange.getStart() - markedwriteranges[i].getStart())]; + prefixdata = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i])); + start = markedwriteranges[i].getStart(); + //newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()]; + //System.arraycopy(by, 0, newdata, newwriterange.getStart() - markedwriteranges[i].getStart(), size); + //System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart()); + + //newwriterange.setStart(markedwriteranges[i].getStart()); + prefix = true; + + + } else if (markedwriteranges[i].getEnd() > newwriterange.getEnd()) { + + suffixdata = new Byte[(int) (markedwriteranges[i].getStart() - newwriterange.getStart())]; + suffixdata = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i])); + end = markedwriteranges[i].getEnd(); + + /*Byte [] originaldata = (Byte [])(tmp.getWrittendata().get(markedwriteranges[i])); + newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()]; + System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart()); + + newwriterange.setStart(markedwriteranges[i].getStart());*/ + //newwriterange.setEnd(markedwriteranges[i].getEnd()); + suffix = true; + suffixstart = (int) (intersectedrange[i].getEnd() - intersectedrange[i].getStart()); + //tm.remove(markedwriteranges[i]); + } + tm.remove(markedwriteranges[i]); + + } + Byte[] data_to_insert; + + if ((prefix) && (suffix)) { + data_to_insert = new Byte[(int) (newwriterange.getStart() - start + size + newwriterange.getEnd() - end)]; + System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (newwriterange.getStart() - start)); + System.arraycopy(by, 0, data_to_insert, (int) (newwriterange.getStart() - start), size); + System.arraycopy(suffixdata, suffixstart, data_to_insert, (int) (size + newwriterange.getStart() - start), (int) (end - newwriterange.getEnd())); + newwriterange.setStart(start); + newwriterange.setEnd(end); + } + else if (prefix) { + data_to_insert = new Byte[(int) (newwriterange.getStart() - start + size)]; + System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (newwriterange.getStart() - start)); + System.arraycopy(by, 0, data_to_insert, (int) (newwriterange.getStart() - start), size); + newwriterange.setStart(start); + } + else if (suffix) { + data_to_insert = new Byte[(int) (newwriterange.getEnd() - end + size)]; + System.arraycopy(by, 0, data_to_insert, 0, size); + System.arraycopy(suffixdata, suffixstart, data_to_insert, size, (int) (end - newwriterange.getEnd())); + newwriterange.setEnd(end); + } + else { + data_to_insert = new Byte[size]; + System.arraycopy(data_to_insert, (int) (newwriterange.getStart() - start), by, 0, size); + } + tm.put(newwriterange, data_to_insert); + tmp.setLocaloffset(loffset + size); + if (tmp.getLocaloffset() > tmp.getFilelength()) + tmp.setFilelength(tmp.getLocaloffset()); + } + + + } else { + me.addFile(this/*, TransactionLocalFileAttributes.MODE.WRITE*/); + write(data); + } + + } + + private int readFromFile(byte[] readdata, TransactionLocalFileAttributes tmp) { + + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + int st = FileBlockManager.getCurrentFragmentIndexofTheFile(tmp.getLocaloffset()); + int end = FileBlockManager.getTargetFragmentIndexofTheFile(tmp.getLocaloffset(), readdata.length); + Vector heldlocks = new Vector(); + for (int k = st; k <= end; k++) { + int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue(); + while (me.getStatus() == Status.ACTIVE) { + BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(k))); + // if (block.version.get() == expvalue) { + + if (block.lock.tryLock()) { + heldlocks.add(block.lock); + if (!(block.version.get() == expvalue)) { + me.abort(); + } else { + break; + } + } else { + me.getContentionManager().resolveConflict(me, block.owner); + } + //} else { + // me.abort(); + //} + } + if (me.getStatus() == Status.ABORTED) { + //unlockLocks(heldlocks); + throw new AbortedException(); + } + + } + int size = -1; + try { + + offsetlock.lock(); + file.seek(tmp.getLocaloffset()); + size = file.read(readdata); + offsetlock.unlock(); + + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } + + + + unlockLocks(heldlocks); + tmp.setLocaloffset(tmp.getLocaloffset() + size); + return size; + + /*try { + while (me.getStatus() == Status.ACTIVE) { + if (this.adapter.lock.tryLock()) { + file.seek(tmp.localoffset); + int result = file.read(readdata); + this.adapter.lock.unlock(); + tmp.localoffset += result; + return result; + } else { + me.getContentionManager().resolveConflict(me, adapter.writer); + } + } + + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + }*/ + + } + + private int readFromBuffer(byte[] readdata, TransactionLocalFileAttributes tmp, Range writerange) { + + + long loffset = tmp.getLocaloffset(); + + Byte[] data = (Byte[]) (tmp.getWrittendata().get(writerange)); + byte[] copydata = null; + + for (int i = 0; i < data.length; i++) { + copydata[i] = data[i].byteValue(); + } + System.arraycopy(copydata, (int) (loffset - writerange.getStart()), readdata, 0, readdata.length); + return readdata.length; + + } + + public void simpleWritetoBuffer(Byte[] data, Range newwriterange, TreeMap tm) { + tm.put(newwriterange, data); + } + + public void unlockLocks(Vector heldlocks) { + for (int i = 0; i < heldlocks.size(); i++) { + ((ReentrantLock) heldlocks.get(i)).unlock(); + } + } + + public boolean validateBlocksVersions(int startblock, int targetblock) { + boolean valid = true; + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); + for (int i = startblock; i <= targetblock; i++) { + int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(i))).intValue(); + BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(i))); + if (expvalue != block.version.get()) { + valid = false; + break; + } + } + + return valid; + } + + public INode getInode() { + return inode; + } + + public void setInode(INode inode) { + this.inode = inode; + } + /* public void check(){ + ExtendedTransaction me = ExtendedTransaction.getTransaction(); + for (Adapter reader : me.ReadOnly) + if (reader.version.get() == adapter.version.get()) + + + + }*/ +} \ No newline at end of file diff --git a/Robust/Transactions/src/file/factory/TransactionalFileWrapperFactory.java b/Robust/Transactions/src/file/factory/TransactionalFileWrapperFactory.java new file mode 100644 index 00000000..a6db1429 --- /dev/null +++ b/Robust/Transactions/src/file/factory/TransactionalFileWrapperFactory.java @@ -0,0 +1,56 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package dstm2.file.factory; + +import java.io.File; +import java.util.HashMap; + +/** + * + * @author navid + */ +public class TransactionalFileWrapperFactory { + + + + private TransactionalFileWrapperFactory() { + } + // private static HashMap filemappings; + private static HashMap filemappings; + + private static native long getINodeNative(String filename); + + static{ + System.load("/home/navid/libnav.so"); + } + + static INode getINodefromFileName(String filename) { + return new INode(getINodeNative(filename)); + } + + public static void main(String args[]){ + System.out.print("in java " + getINodeNative("/home/navid/myfile.txt") +"\n"); + System.out.print("in java " + getINodeNative("/home/navid/HellWorld.java") +"\n"); + } + + public synchronized static Adapter createTransactionalFile(String filename, String mode) { + + + long inodenumber = getINodeNative(filename); + INode inode = new INode(inodenumber); + + + + if (filemappings.containsKey(inode)) { + return (Adapter)filemappings.get(inode); + + } else { + Adapter adapter = new Adapter(); + filemappings.put(inode, adapter); + return adapter; + } + + } +} diff --git a/Robust/Transactions/src/file/factory/TransactionalRandomAccessFile.java b/Robust/Transactions/src/file/factory/TransactionalRandomAccessFile.java new file mode 100644 index 00000000..49e5b121 --- /dev/null +++ b/Robust/Transactions/src/file/factory/TransactionalRandomAccessFile.java @@ -0,0 +1,14 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.factory; + +/** + * + * @author navid + */ +public class TransactionalRandomAccessFile { + +} diff --git a/Robust/Transactions/src/file/interfaces/BlockAccessModesEnum.java b/Robust/Transactions/src/file/interfaces/BlockAccessModesEnum.java new file mode 100644 index 00000000..5ba064fb --- /dev/null +++ b/Robust/Transactions/src/file/interfaces/BlockAccessModesEnum.java @@ -0,0 +1,14 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.interfaces; + +/** + * + * @author navid + */ +public enum BlockAccessModesEnum { + READ_WRITE, WRITE, READ +} diff --git a/Robust/Transactions/src/file/interfaces/ContentionManager.java b/Robust/Transactions/src/file/interfaces/ContentionManager.java new file mode 100644 index 00000000..6be9557c --- /dev/null +++ b/Robust/Transactions/src/file/interfaces/ContentionManager.java @@ -0,0 +1,79 @@ +/* + * ContentionManager.java + * + * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa + * Clara, California 95054, U.S.A. All rights reserved. + * + * Sun Microsystems, Inc. has intellectual property rights relating to + * technology embodied in the product that is described in this + * document. In particular, and without limitation, these + * intellectual property rights may include one or more of the + * U.S. patents listed at http://www.sun.com/patents and one or more + * additional patents or pending patent applications in the U.S. and + * in other countries. + * + * U.S. Government Rights - Commercial software. + * Government users are subject to the Sun Microsystems, Inc. standard + * license agreement and applicable provisions of the FAR and its + * supplements. Use is subject to license terms. Sun, Sun + * Microsystems, the Sun logo and Java are trademarks or registered + * trademarks of Sun Microsystems, Inc. in the U.S. and other + * countries. + * + * This product is covered and controlled by U.S. Export Control laws + * and may be subject to the export or import laws in other countries. + * Nuclear, missile, chemical biological weapons or nuclear maritime + * end uses or end users, whether direct or indirect, are strictly + * prohibited. Export or reexport to countries subject to + * U.S. embargo or to entities identified on U.S. export exclusion + * lists, including, but not limited to, the denied persons and + * specially designated nationals lists is strictly prohibited. + */ + +package dstm2.file.interfaces; + + +import java.util.Collection; +import dstm2.file.factory.TransactionalFile; +/** + * Interface satisfied by all contention managers + */ +public interface ContentionManager { + /** + * Either give the writer a chance to finish it, abort it, or both. + * @param me Calling transaction. + * @param other Transaction that's in my way. + */ + void resolveConflict(Transaction me, Transaction other, TransactionalFile tf); + + + /** + * Either give the writer a chance to finish it, abort it, or both. + * @param me Calling transaction. + * @param other set of transactions in my way + */ + void resolveConflict(Transaction me, Collection other); + + /** + * Assign a priority to caller. Not all managers assign meaningful priorities. + * @return Priority of conflicting transaction. + */ + long getPriority(); + + /** + * Change this manager's priority. + * @param value new priority value + */ + void setPriority(long value); + + /** + * Notify manager that object was opened. + */ + void openSucceeded(); + + /** + * Notify manager that transaction committed. + */ + void committed(); + +}; diff --git a/Robust/Transactions/src/file/interfaces/CustomCM.java b/Robust/Transactions/src/file/interfaces/CustomCM.java new file mode 100644 index 00000000..a3487892 --- /dev/null +++ b/Robust/Transactions/src/file/interfaces/CustomCM.java @@ -0,0 +1,48 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.interfaces; + +import dstm2.Transaction.Status; +import java.util.Collection; +import dstm2.file.factory.TransactionalFile; + +/** + * + * @author navid + */ +public class CustomCM implements ContentionManager{ + + public void resolveConflict(Transaction me, Transaction other, TransactionalFile obj) { + if (other != null) + if (other.getStatus() == Status.ACTIVE || other.getStatus() == Status.COMMITTED) + other.waitWhileActiveNotWaiting(); + } + + public void resolveConflict(Transaction me, Collection other) { + throw new UnsupportedOperationException("Not supported yet."); + } + + public long getPriority() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void setPriority(long value) { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void openSucceeded() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void committed() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void resolveConflict(Transaction me, Transaction other) { + throw new UnsupportedOperationException("Not supported yet."); + } + +} diff --git a/Robust/Transactions/src/file/interfaces/FileAccessModesEum.java b/Robust/Transactions/src/file/interfaces/FileAccessModesEum.java new file mode 100644 index 00000000..2549f027 --- /dev/null +++ b/Robust/Transactions/src/file/interfaces/FileAccessModesEum.java @@ -0,0 +1,17 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.interfaces; + +import java.io.WriteAbortedException; + +/** + * + * @author navid + */ +public enum FileAccessModesEum { + READ_WRITE, APPEND, READ + +} diff --git a/Robust/Transactions/src/file/interfaces/Transaction.java b/Robust/Transactions/src/file/interfaces/Transaction.java new file mode 100644 index 00000000..df411bec --- /dev/null +++ b/Robust/Transactions/src/file/interfaces/Transaction.java @@ -0,0 +1,96 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package dstm2.file.interfaces; + +/** + * + * @author navid + */ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + + +import dstm2.Transaction.Status; + +/** + * + * @author navid + */ +public interface Transaction { + + /** + * Tries to abort transaction + * @return whether transaction was aborted (not necessarily by this call) + */ + boolean abort(); + + /** + * Tries to commit transaction + * @return whether transaction was committed + */ + boolean commit(); + + /** + * This transaction's contention manager + * @return the manager + */ + ContentionManager getContentionManager(); + + /** + * Access the transaction's current status. + * @return current transaction status + */ + Status getStatus(); + + /** + * Tests whether transaction is aborted. + * @return whether transaction is aborted + */ + boolean isAborted(); + + /** + * Tests whether transaction is active. + * @return whether transaction is active + */ + boolean isActive(); + + /** + * Tests whether transaction is committed. + * @return whether transaction is committed + */ + boolean isCommitted(); + + /** + * Returns a string representation of this transaction + * @return the string representcodes[ation + */ + String toString(); + + /** + * Tests whether transaction is committed or active. + * @return whether transaction is committed or active + */ + boolean validate(); + + /** + * Block caller while transaction is active. + */ + void waitWhileActive(); + + /** + * Block caller while transaction is active. + */ + void waitWhileActiveNotWaiting(); + + /** + * Wake up any transactions waiting for this one to finish. + */ + void wakeUp(); + +} +