--- /dev/null
+/*
+ * Defaults.java
+ *
+ * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
+ * Clara, California 95054, U.S.A. All rights reserved.
+ *
+ * Sun Microsystems, Inc. has intellectual property rights relating to
+ * technology embodied in the product that is described in this
+ * document. In particular, and without limitation, these
+ * intellectual property rights may include one or more of the
+ * U.S. patents listed at http://www.sun.com/patents and one or more
+ * additional patents or pending patent applications in the U.S. and
+ * in other countries.
+ *
+ * U.S. Government Rights - Commercial software.
+ * Government users are subject to the Sun Microsystems, Inc. standard
+ * license agreement and applicable provisions of the FAR and its
+ * supplements. Use is subject to license terms. Sun, Sun
+ * Microsystems, the Sun logo and Java are trademarks or registered
+ * trademarks of Sun Microsystems, Inc. in the U.S. and other
+ * countries.
+ *
+ * This product is covered and controlled by U.S. Export Control laws
+ * and may be subject to the export or import laws in other countries.
+ * Nuclear, missile, chemical biological weapons or nuclear maritime
+ * end uses or end users, whether direct or indirect, are strictly
+ * prohibited. Export or reexport to countries subject to
+ * U.S. embargo or to entities identified on U.S. export exclusion
+ * lists, including, but not limited to, the denied persons and
+ * specially designated nationals lists is strictly prohibited.
+ */
+
+package dstm2;
+
+/**
+ *
+ * @author Maurice Herlihy
+ */
+public class Defaults {
+ /**
+ * how many threads
+ **/
+ public static final int THREADS = 1;
+ /**
+ * benchmark duration in milliseconds
+ **/
+ public static final int TIME = 10000;
+ /**
+ * uninterpreted arg passed to benchmark
+ **/
+ public static final int EXPERIMENT = 100;
+ /**
+ * fully-qualified contention manager name
+ **/
+ public static final String MANAGER = "dstm2.manager.BackoffManager";
+ /**
+ * fully-qualified factory name
+ **/
+ public static final String FACTORY = "dstm2.factory.shadow.Factory";
+ /**
+ * fully-qualified adapter name
+ **/
+ public static final String ADAPTER = "dstm2.factory.shadow.Adapter";;
+
+ public static final int FILEFRAGMENTSIZE= 1024;
+
+ public static final boolean READWRITECONFLIT = true;
+
+}
--- /dev/null
+/*
+ * Thread.java
+ *
+ * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
+ * Clara, California 95054, U.S.A. All rights reserved.
+ *
+ * Sun Microsystems, Inc. has intellectual property rights relating to
+ * technology embodied in the product that is described in this
+ * document. In particular, and without limitation, these
+ * intellectual property rights may include one or more of the
+ * U.S. patents listed at http://www.sun.com/patents and one or more
+ * additional patents or pending patent applications in the U.S. and
+ * in other countries.
+ *
+ * U.S. Government Rights - Commercial software.
+ * Government users are subject to the Sun Microsystems, Inc. standard
+ * license agreement and applicable provisions of the FAR and its
+ * supplements. Use is subject to license terms. Sun, Sun
+ * Microsystems, the Sun logo and Java are trademarks or registered
+ * trademarks of Sun Microsystems, Inc. in the U.S. and other
+ * countries.
+ *
+ * This product is covered and controlled by U.S. Export Control laws
+ * and may be subject to the export or import laws in other countries.
+ * Nuclear, missile, chemical biological weapons or nuclear maritime
+ * end uses or end users, whether direct or indirect, are strictly
+ * prohibited. Export or reexport to countries subject to
+ * U.S. embargo or to entities identified on U.S. export exclusion
+ * lists, including, but not limited to, the denied persons and
+ * specially designated nationals lists is strictly prohibited.
+ */
+
+package dstm2;
+
+import dstm2.exceptions.AbortedException;
+import dstm2.exceptions.GracefulException;
+import dstm2.exceptions.PanicException;
+import dstm2.exceptions.SnapshotException;
+import dstm2.factory.AtomicFactory;
+import dstm2.factory.Factory;
+import dstm2.file.factory.ExtendedTransaction;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import static dstm2.Defaults.*;
+/**
+ * The basic unit of computation for the transactional memory. This
+ * class extends <code>java.lang.Thread</code> by providing methods to
+ * begin, commit and abort transactions.
+ *
+ * Every <code>Thread</code> has a contention manager, created when
+ * the thread is created. Before creating any <code>Thread</code>s,
+ * you must call <code>Thread.setContentionManager</code> to set the
+ * class of the contention manager that will be created. The
+ * contention manager of a thread is notified (by invoking its
+ * notification methods) of the results of any methods involving the
+ * thread. It is also consulted on whether a transaction should be
+ * begun.
+ *
+ * @see dstm2.ContentionManager
+ */
+public class Thread extends java.lang.Thread {
+ /**
+ * Contention manager class.
+ */
+ protected static Class contentionManagerClass;
+
+ /**
+ * Adapter class.
+ */
+ protected static Class<dstm2.factory.Adapter> adapterClass;
+
+ /**
+ * Set to true when benchmark runs out of time.
+ **/
+ public static volatile boolean stop = false;
+ /**
+ * number of committed transactions for all threads
+ */
+ public static long totalCommitted = 0;
+ /**
+ * total number of transactions for all threads
+ */
+ public static long totalTotal = 0;
+ /**
+ * number of committed memory references for all threads
+ */
+ public static long totalCommittedMemRefs = 0;
+ /**
+ * total number of memory references for all threads
+ */
+ public static long totalTotalMemRefs = 0;
+
+ static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
+ protected synchronized ThreadState initialValue() {
+ return new ThreadState();
+ }
+ };
+ static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
+ protected synchronized Thread initialValue() {
+ return null;
+ }
+ };
+
+ private static int MAX_NESTING_DEPTH = 1;
+
+ private static Object lock = new Object();
+
+ // Memo-ize factories so we don't have to recreate them.
+ private static Map<Class,Factory> factoryTable
+ = Collections.synchronizedMap(new HashMap<Class,Factory>());
+
+ /**
+ * Create thread to run a method.
+ * @param target execute this object's <CODE>run()</CODE> method
+ */
+ public Thread(final Runnable target) {
+ super(new Runnable() {
+ public void run() {
+ ThreadState threadState = _threadState.get();
+ threadState.reset();
+ target.run();
+ // collect statistics
+ synchronized (lock){
+ totalCommitted += threadState.committed;
+ totalTotal += threadState.total;
+ totalCommittedMemRefs += threadState.committedMemRefs;
+ totalTotalMemRefs += threadState.totalMemRefs;
+ }
+ }
+ });
+ }
+ /**
+ * no-arg constructor
+ */
+ public Thread() {
+ super();
+ }
+
+ /**
+ * Establishes a contention manager. You must call this method
+ * before creating any <code>Thread</code>.
+ *
+ * @see dstm2.ContentionManager
+ * @param theClass class of desired contention manager.
+ */
+ public static void setContentionManagerClass(Class theClass) {
+ Class cm;
+ try {
+ cm = Class.forName("dstm2.ContentionManager");
+ } catch (ClassNotFoundException e) {
+ throw new PanicException(e);
+ }
+ try {
+ contentionManagerClass = theClass;
+ } catch (Exception e) {
+ throw new PanicException("The class " + theClass
+ + " does not implement dstm2.ContentionManager");
+ }
+ }
+
+ /**
+ * set Adapter class for this thread
+ * @param adapterClassName adapter class as string
+ */
+ public static void setAdapterClass(String adapterClassName) {
+ try {
+ adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
+ } catch (ClassNotFoundException ex) {
+ throw new PanicException("Adapter class not found: %s\n", adapterClassName);
+ }
+ }
+
+ /**
+ * Tests whether the current transaction can still commit. Does not
+ * actually end the transaction (either <code>commitTransaction</code> or
+ * <code>abortTransaction</code> must still be called). The contention
+ * manager of the invoking thread is notified if the onValidate fails
+ * because a <code>TMObject</code> opened for reading was invalidated.
+ *
+ * @return whether the current transaction may commit successfully.
+ */
+ static public boolean validate() {
+ ThreadState threadState = _threadState.get();
+ return threadState.validate();
+ }
+
+ /**
+ * Gets the current transaction, if any, of the invoking <code>Thread</code>.
+ *
+ * @return the current thread's current transaction; <code>null</code> if
+ * there is no current transaction.
+ */
+/* static public Transaction getTransaction() {
+ return _threadState.get().transaction;
+ }*/
+
+
+ static public ExtendedTransaction getTransaction() {
+ return _threadState.get().transaction;
+ }
+
+ /**
+ * Gets the contention manager of the invoking <code>Thread</code>.
+ *
+ * @return the invoking thread's contention manager
+ */
+ static public ContentionManager getContentionManager() {
+ return _threadState.get().manager;
+ }
+
+ /**
+ * Create a new factory instance.
+ * @param _class class to implement
+ * @return new factory
+ */
+ static public <T> Factory<T> makeFactory(Class<T> _class) {
+ try {
+ Factory<T> factory = (Factory<T>) factoryTable.get(_class);
+ if (factory == null) {
+ factory = new AtomicFactory<T>(_class, adapterClass);
+ factoryTable.put(_class, factory);
+ }
+ return factory;
+ } catch (Exception e) {
+ throw new PanicException(e);
+ }
+ }
+
+ /**
+ * Execute a transaction
+ * @param xaction execute this object's <CODE>call()</CODE> method.
+ * @return result of <CODE>call()</CODE> method
+ */
+ public static <T> T doIt(Callable<T> xaction) {
+ ThreadState threadState = _threadState.get();
+ ContentionManager manager = threadState.manager;
+ T result = null;
+ try {
+ while (!Thread.stop) {
+ threadState.beginTransaction();
+ try {
+ result = xaction.call();
+ } catch (AbortedException d) {
+ } catch (SnapshotException s) {
+ threadState.abortTransaction();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new PanicException("Unhandled exception " + e);
+ }
+ threadState.totalMemRefs += threadState.transaction.memRefs;
+ if (threadState.commitTransaction()) {
+ threadState.committedMemRefs += threadState.transaction.memRefs;
+ return result;
+ }
+ threadState.transaction.attempts++;
+ // transaction aborted
+ }
+ if (threadState.transaction != null) {
+ threadState.abortTransaction();
+ }
+ } finally {
+ threadState.transaction = null;
+ }
+ // collect statistics
+ synchronized (lock){
+ totalTotalMemRefs = threadState.totalMemRefs;
+ totalCommittedMemRefs = threadState.committedMemRefs;
+ totalCommitted += threadState.committed;
+ totalTotal += threadState.total;
+ threadState.reset(); // set up for next iteration
+ }
+ throw new GracefulException();
+ }
+ /**
+ * Execute transaction
+ * @param xaction call this object's <CODE>run()</CODE> method
+ */
+ public static void doIt(final Runnable xaction) {
+ doIt(new Callable<Boolean>() {
+ public Boolean call() {
+ xaction.run();
+ return false;
+ };
+ });
+ }
+
+ /**
+ * number of transactions committed by this thread
+ * @return number of transactions committed by this thread
+ */
+ public static long getCommitted() {
+ return totalCommitted;
+ }
+
+ /**
+ * umber of transactions aborted by this thread
+ * @return number of aborted transactions
+ */
+ public static long getAborted() {
+ return totalTotal - totalCommitted;
+ }
+
+ /**
+ * number of transactions executed by this thread
+ * @return number of transactions
+ */
+ public static long getTotal() {
+ return totalTotal;
+ }
+
+ /**
+ * Register a method to be called every time this thread validates any transaction.
+ * @param c abort if this object's <CODE>call()</CODE> method returns false
+ */
+ public static void onValidate(Callable<Boolean> c) {
+ _threadState.get().onValidate.add(c);
+ }
+ /**
+ * Register a method to be called every time the current transaction is validated.
+ * @param c abort if this object's <CODE>call()</CODE> method returns false
+ */
+ public static void onValidateOnce(Callable<Boolean> c) {
+ _threadState.get().onValidateOnce.add(c);
+ }
+ /**
+ * Register a method to be called every time this thread commits a transaction.
+ * @param r call this object's <CODE>run()</CODE> method
+ */
+ public static void onCommit(Runnable r) {
+ _threadState.get().onCommit.add(r);
+ }
+ /**
+ * Register a method to be called once if the current transaction commits.
+ * @param r call this object's <CODE>run()</CODE> method
+ */
+ public static void onCommitOnce(Runnable r) {
+ _threadState.get().onCommitOnce.add(r);
+ }
+ /**
+ * Register a method to be called every time this thread aborts a transaction.
+ * @param r call this objec't <CODE>run()</CODE> method
+ */
+ public static void onAbort(Runnable r) {
+ _threadState.get().onAbort.add(r);
+ }
+ /**
+ * Register a method to be called once if the current transaction aborts.
+ * @param r call this object's <CODE>run()</CODE> method
+ */
+ public static void onAbortOnce(Runnable r) {
+ _threadState.get().onAbortOnce.add(r);
+ }
+ /**
+ * get thread ID for debugging
+ * @return unique id
+ */
+ public static int getID() {
+ return _threadState.get().hashCode();
+ }
+
+ /**
+ * reset thread statistics
+ */
+ public static void clear() {
+ totalTotal = 0;
+ totalCommitted = 0;
+ totalCommittedMemRefs = 0;
+ totalTotalMemRefs = 0;
+ stop = false;
+ }
+
+ /**
+ * Class that holds thread's actual state
+ */
+ public static class ThreadState {
+
+ int depth = 0;
+ ContentionManager manager;
+
+ private long committed = 0; // number of committed transactions
+ private long total = 0; // total number of transactions
+ private long committedMemRefs = 0; // number of committed reads and writes
+ private long totalMemRefs = 0; // total number of reads and writes
+
+ Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
+ Set<Runnable> onCommit = new HashSet<Runnable>();
+ Set<Runnable> onAbort = new HashSet<Runnable>();
+ Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
+ Set<Runnable> onCommitOnce = new HashSet<Runnable>();
+ Set<Runnable> onAbortOnce = new HashSet<Runnable>();
+
+ //Transaction transaction = null;
+ ExtendedTransaction transaction = null;
+
+ /**
+ * Creates new ThreadState
+ */
+ public ThreadState() {
+ try {
+ manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
+ } catch (NullPointerException e) {
+ throw new PanicException("No default contention manager class set.");
+ } catch (Exception e) { // Some problem with instantiation
+ throw new PanicException(e);
+ }
+ }
+
+ /**
+ * Resets any metering information (commits/aborts, etc).
+ */
+ public void reset() {
+ committed = 0; // number of committed transactions
+ total = 0; // total number of transactions
+ committedMemRefs = 0; // number of committed reads and writes
+ totalMemRefs = 0; // total number of reads and writes
+ }
+
+ /**
+ * used for debugging
+ * @return string representation of thread state
+ */
+ public String toString() {
+ return
+ "Thread" + hashCode() + "["+
+ "committed: " + committed + "," +
+ "aborted: " + ( total - committed) +
+ "]";
+ }
+
+ /**
+ * Can this transaction still commit?
+ * This method may be called at any time, not just at transaction end,
+ * so we do not clear the onValidateOnce table.
+ * @return true iff transaction might still commit
+ */
+ public boolean validate() {
+ try {
+ // permanent
+ for (Callable<Boolean> v : onValidate) {
+ if (!v.call()) {
+ return false;
+ }
+ }
+ // temporary
+ for (Callable<Boolean> v : onValidateOnce) {
+ if (!v.call()) {
+ return false;
+ }
+ }
+ return transaction.validate();
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+
+ /**
+ * Call methods registered to be called on commit.
+ */
+ public void runCommitHandlers() {
+ try {
+ // permanent
+ for (Runnable r: onCommit) {
+ r.run();
+ }
+ // temporary
+ for (Runnable r: onCommitOnce) {
+ r.run();
+ }
+ onCommitOnce.clear();
+ onValidateOnce.clear();
+ } catch (Exception ex) {
+ throw new PanicException(ex);
+ }
+ }
+
+ /**
+ * Starts a new transaction. Cannot nest transactions deeper than
+ * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
+ * invoking thread is notified when a transaction is begun.
+ */
+ public void beginTransaction() {
+ //transaction = new Transaction();
+ transaction = new ExtendedTransaction();
+ if (depth == 0) {
+ total++;
+ }
+ // first thing to fix if we allow nested transactions
+ if (depth >= 1) {
+ throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
+ }
+ depth++;
+ }
+
+ /**
+ * Attempts to commit the current transaction of the invoking
+ * <code>Thread</code>. Always succeeds for nested
+ * transactions. The contention manager of the invoking thread is
+ * notified of the result. If the transaction does not commit
+ * because a <code>TMObject</code> opened for reading was
+ * invalidated, the contention manager is also notified of the
+ * inonValidate.
+ *
+ *
+ * @return whether commit succeeded.
+ */
+ public boolean commitTransaction() {
+ depth--;
+ if (depth < 0) {
+ throw new PanicException("commitTransaction invoked when no transaction active.");
+ }
+ if (depth > 0) {
+ throw new PanicException("commitTransaction invoked on nested transaction.");
+ }
+ if (depth == 0) {
+ if (validate() && transaction.commit()) {
+ committed++;
+ runCommitHandlers();
+ return true;
+ }
+ abortTransaction();
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * Aborts the current transaction of the invoking <code>Thread</code>.
+ * Does not end transaction, but ensures it will never commit.
+ */
+ public void abortTransaction() {
+ runAbortHandlers();
+ transaction.abort();
+ }
+
+ /**
+ * Call methods registered to be called on commit.
+ */
+ public void runAbortHandlers() {
+ try {
+ // permanent
+ for (Runnable r: onAbort) {
+ r.run();
+ }
+ // temporary
+ for (Runnable r: onAbortOnce) {
+ r.run();
+ }
+ onAbortOnce.clear();
+ onValidateOnce.clear();
+ } catch (Exception ex) {
+ throw new PanicException(ex);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Transaction.java
+ *
+ * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
+ * Clara, California 95054, U.S.A. All rights reserved.
+ *
+ * Sun Microsystems, Inc. has intellectual property rights relating to
+ * technology embodied in the product that is described in this
+ * document. In particular, and without limitation, these
+ * intellectual property rights may include one or more of the
+ * U.S. patents listed at http://www.sun.com/patents and one or more
+ * additional patents or pending patent applications in the U.S. and
+ * in other countries.
+ *
+ * U.S. Government Rights - Commercial software.
+ * Government users are subject to the Sun Microsystems, Inc. standard
+ * license agreement and applicable provisions of the FAR and its
+ * supplements. Use is subject to license terms. Sun, Sun
+ * Microsystems, the Sun logo and Java are trademarks or registered
+ * trademarks of Sun Microsystems, Inc. in the U.S. and other
+ * countries.
+ *
+ * This product is covered and controlled by U.S. Export Control laws
+ * and may be subject to the export or import laws in other countries.
+ * Nuclear, missile, chemical biological weapons or nuclear maritime
+ * end uses or end users, whether direct or indirect, are strictly
+ * prohibited. Export or reexport to countries subject to
+ * U.S. embargo or to entities identified on U.S. export exclusion
+ * lists, including, but not limited to, the denied persons and
+ * specially designated nationals lists is strictly prohibited.
+ */
+
+package dstm2;
+
+import dstm2.exceptions.PanicException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Transaction.java
+ * Keeps a transaction's status and contention manager.
+ */
+
+public class Transaction implements NewInterface {
+
+ /**
+ * Possible transaction status
+ **/
+ //public enum Status {ABORTED, ACTIVE, COMMITTED};
+
+
+ public enum Status {ABORTED, ACTIVE, COMMITTED};
+
+ /**
+ * Predefined committed transaction
+ */
+ public static Transaction COMMITTED = new Transaction(Status.COMMITTED);
+ /**
+ * Predefined orted transaction
+ */
+ public static Transaction ABORTED = new Transaction(Status.ABORTED);
+
+ /**
+ * Is transaction waiting for another?
+ */
+ public boolean waiting = false;
+
+ /**
+ * Number of times this transaction tried
+ */
+ public int attempts = 0;
+
+ /**
+ * Number of unique memory references so far.
+ */
+ public int memRefs = 0;
+
+ /**
+ * Time in nanos when transaction started
+ */
+ public long startTime = 0;
+ /**
+ * Time in nanos when transaction committed or aborted
+ */
+ public long stopTime = 0;
+
+ // generate unique ids
+ private static AtomicInteger unique = new AtomicInteger(100);
+
+ /** Updater for status */
+ /*private static final
+ AtomicReferenceFieldUpdater<Transaction, Status>
+ statusUpdater = AtomicReferenceFieldUpdater.newUpdater
+ (Transaction.class, Status.class, "status");*/
+
+ protected static final
+ AtomicReferenceFieldUpdater<Transaction, Status>
+ statusUpdater = AtomicReferenceFieldUpdater.newUpdater
+ (Transaction.class, Status.class, "status");
+
+
+ private volatile Status status;
+
+ private long id;
+
+ private ContentionManager manager;
+
+ /**
+ * Creates a new, active transaction.
+ */
+ public Transaction() {
+ this.status = Status.ACTIVE;
+ this.id = this.startTime = System.nanoTime();
+ this.manager = Thread.getContentionManager();
+ }
+
+ /**
+ * Creates a new transaction with given status.
+ * @param myStatus active, committed, or aborted
+ */
+ private Transaction(Transaction.Status myStatus) {
+ this.status = myStatus;
+ this.startTime = 0;
+ }
+
+ /**
+ * Access the transaction's current status.
+ * @return current transaction status
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Tests whether transaction is active.
+ * @return whether transaction is active
+ */
+ public boolean isActive() {
+ return this.getStatus() == Status.ACTIVE;
+ }
+
+ /**
+ * Tests whether transaction is aborted.
+ * @return whether transaction is aborted
+ */
+ public boolean isAborted() {
+ return this.getStatus() == Status.ABORTED;
+ }
+
+ /**
+ * Tests whether transaction is committed.
+ * @return whether transaction is committed
+ */
+ public boolean isCommitted() {
+ return (this.getStatus() == Status.COMMITTED);
+ }
+
+ /**
+ * Tests whether transaction is committed or active.
+ * @return whether transaction is committed or active
+ */
+ public boolean validate() {
+ Status status = this.getStatus();
+ switch (status) {
+ case COMMITTED:
+ throw new PanicException("committed transaction still running");
+ case ACTIVE:
+ return true;
+ case ABORTED:
+ return false;
+ default:
+ throw new PanicException("unexpected transaction state: " + status);
+ }
+ }
+
+ /**
+ * Tries to commit transaction
+ * @return whether transaction was committed
+ */
+ public boolean commit() {
+ try {
+ while (this.getStatus() == Status.ACTIVE) {
+ if (statusUpdater.compareAndSet(this,
+ Status.ACTIVE,
+ Status.COMMITTED)) {
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ wakeUp();
+ }
+ }
+
+ /**
+ * Tries to abort transaction
+ * @return whether transaction was aborted (not necessarily by this call)
+ */
+ public boolean abort() {
+ try {
+ while (this.getStatus() == Status.ACTIVE) {
+ if (statusUpdater.compareAndSet(this, Status.ACTIVE, Status.ABORTED)) {
+ return true;
+ }
+ }
+ return this.getStatus() == Status.ABORTED;
+ } finally {
+ wakeUp();
+ }
+ }
+
+ /**
+ * Returns a string representation of this transaction
+ * @return the string representcodes[ation
+ */
+ public String toString() {
+ switch (this.status) {
+ case COMMITTED:
+ return "Transaction" + this.startTime + "[committed]";
+ case ABORTED:
+ return "Transaction" + this.startTime + "[aborted]";
+ case ACTIVE:
+ return "Transaction" + this.startTime + "[active]";
+ default:
+ return "Transaction" + this.startTime + "[???]";
+ }
+ }
+
+ /**
+ * Block caller while transaction is active.
+ */
+ public synchronized void waitWhileActive() {
+ while (this.getStatus() == Status.ACTIVE) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {}
+ }
+ }
+ /**
+ * Block caller while transaction is active.
+ */
+ public synchronized void waitWhileActiveNotWaiting() {
+ while (getStatus() == Status.ACTIVE && !waiting) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {}
+ }
+ }
+
+ /**
+ * Wake up any transactions waiting for this one to finish.
+ */
+ public synchronized void wakeUp() {
+ notifyAll();
+ }
+
+ /**
+ * This transaction's contention manager
+ * @return the manager
+ */
+ public ContentionManager getContentionManager() {
+ return manager;
+ }
+}
--- /dev/null
+/*
+ * Adapter.java
+ *
+ * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
+ * Clara, California 95054, U.S.A. All rights reserved.
+ *
+ * Sun Microsystems, Inc. has intellectual property rights relating to
+ * technology embodied in the product that is described in this
+ * document. In particular, and without limitation, these
+ * intellectual property rights may include one or more of the
+ * U.S. patents listed at http://www.sun.com/patents and one or more
+ * additional patents or pending patent applications in the U.S. and
+ * in other countries.
+ *
+ * U.S. Government Rights - Commercial software.
+ * Government users are subject to the Sun Microsystems, Inc. standard
+ * license agreement and applicable provisions of the FAR and its
+ * supplements. Use is subject to license terms. Sun, Sun
+ * Microsystems, the Sun logo and Java are trademarks or registered
+ * trademarks of Sun Microsystems, Inc. in the U.S. and other
+ * countries.
+ *
+ * This product is covered and controlled by U.S. Export Control laws
+ * and may be subject to the export or import laws in other countries.
+ * Nuclear, missile, chemical biological weapons or nuclear maritime
+ * end uses or end users, whether direct or indirect, are strictly
+ * prohibited. Export or reexport to countries subject to
+ * U.S. embargo or to entities identified on U.S. export exclusion
+ * lists, including, but not limited to, the denied persons and
+ * specially designated nationals lists is strictly prohibited.
+ */
+package dstm2.file.factory;
+
+import dstm2.ContentionManager;
+import dstm2.Transaction;
+import dstm2.exceptions.AbortedException;
+import dstm2.exceptions.PanicException;
+import dstm2.exceptions.SnapshotException;
+import dstm2.factory.Copyable;
+import dstm2.factory.Factory;
+import dstm2.Thread;
+import dstm2.factory.Releasable;
+import dstm2.factory.Snapable;
+
+import dstm2.factory.shadow.RecoverableFactory;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.lang.Class;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+/**
+ * Obstruction-free atomic object implementation. Visible reads.
+ * Support snapshots and early release.
+ * @author Navid Farri
+ */
+public class Adapter {
+
+ //public HashMap<Integer,BlockLock> lockmap;
+ public HashMap lockmap;
+ //public AtomicLong commitedoffset;
+ public AtomicLong commitedfilesize;
+ private Transaction writer;
+ // protected AtomicInteger version = new AtomicInteger(0);
+ //public ReentrantLock lock;
+
+ public Transaction getWriter() {
+ return writer;
+ }
+
+ public void setWriter(Transaction writer) {
+ this.writer = writer;
+ }
+
+
+ public Adapter() {
+ // version.set(0);
+ // lock = new ReentrantLock();
+ writer = null;
+ lockmap = new HashMap();
+// commitedoffset.set(0);
+ }
+
+ /* public Adapter(Adapter adapter) {
+ version.set(adapter.version.get());
+ lock = adapter.lock;
+ writer = adapter.writer;
+ lockmap = adapter.lockmap;
+ }*/
+ /*
+ * Creates a new instance of Adapter
+ */
+ //protected Factory<T> factory;
+ /*
+ protected AtomicReference<Locator> start;
+
+ public AtomicReference<Locator> getStart() {
+ return start;
+ }
+
+ public void setStart(AtomicReference<Locator> start) {
+ this.start = start;
+ }
+ */
+ /* public void onRead(){
+
+ try {
+ Thread.onCommitOnce( new Runnable() {
+ public void run() {
+ /// commit the changes to the actual file, meanwhile the memory blocks would be owned by this
+ /// transaction so no change could take place regarding those, this need not be done atomically
+ }
+ });
+
+ Thread.onAbortOnce( new Runnable() {
+ public void run() {
+ //// nothing no-op
+ }
+ });
+
+ Transaction me = Thread.getTransaction();
+ Locator oldLocator = start.get();
+ Copyable version = oldLocator.fastPath(me);
+
+ ContentionManager manager = Thread.getContentionManager();
+ Locator newLocator = new Locator(me, (Copyable) new CopyableFileFactory());
+ version = (Copyable) newLocator.newVersion;
+ while (true) {
+ oldLocator.writePath(me, manager, newLocator);
+ if (!me.isActive()) {
+ throw new AbortedException();
+ }
+
+ if (Adapter.this.start.compareAndSet(oldLocator, newLocator)) {
+ return;
+ }
+ oldLocator = Adapter.this.start.get();
+ }
+ } catch (IllegalAccessException e) {
+ throw new PanicException(e);
+ } catch (InvocationTargetException e) {
+ throw new PanicException(e);
+ }
+ }
+
+ public void onWrite(){
+ try {
+ T version = (T) start.get().newVersion;
+ final Method method = version.getClass().getMethod(methodName);
+ return new Adapter.Getter<V>() {
+ public V call() {
+ try {
+ Transaction me = Thread.getTransaction();
+ Locator oldLocator = Adapter.this.start.get();
+ T version = (T) oldLocator.fastPath(me);
+ if (version == null) {
+ ContentionManager manager = Thread.getContentionManager();
+ Locator newLocator = new Locator();
+ while (true) {
+ oldLocator.readPath(me, manager, newLocator);
+ if (Adapter.this.start.compareAndSet(oldLocator, newLocator)) {
+ version = (T) newLocator.newVersion;
+ break;
+ }
+ oldLocator = start.get();
+ }
+ if (!me.isActive()) {
+ throw new AbortedException();
+ }
+ }
+ return (V)method.invoke(version);
+ } catch (SecurityException e) {
+ throw new PanicException(e);
+ } catch (IllegalAccessException e) {
+ throw new PanicException(e);
+ } catch (InvocationTargetException e) {
+ throw new PanicException(e);
+ }
+ }};
+ } catch (NoSuchMethodException e) {
+ throw new PanicException(e);
+ }
+ }
+
+ public void release() {
+ Transaction me = Thread.getTransaction();
+ Locator oldLocator = this.start.get();
+ T version = (T) oldLocator.fastPath(me);
+ if (version == null) {
+ ContentionManager manager = Thread.getContentionManager();
+ Locator newLocator = new Locator();
+ version = (T) newLocator.newVersion;
+ while (true) {
+ oldLocator.releasePath(me, manager, newLocator);
+ if (this.start.compareAndSet(oldLocator, newLocator)) {
+ break;
+ }
+ oldLocator = this.start.get();
+ }
+ if (!me.isActive()) {
+ throw new AbortedException();
+ }
+ }
+ return;
+ }
+
+ public T snapshot() {
+ Transaction me = Thread.getTransaction();
+ Locator oldLocator = this.start.get();
+ T version = (T) oldLocator.fastPath(me);
+ if (version == null) {
+ ContentionManager manager = Thread.getContentionManager();
+ return (T)oldLocator.snapshot(me, manager);
+ } else {
+ return version;
+ }
+ }
+
+ public void validate(T snap) {
+ if (snap != snapshot()) {
+ throw new SnapshotException();
+ }
+ }
+
+ public void upgrade(T snap) {
+ Transaction me = Thread.getTransaction();
+ Locator oldLocator = this.start.get();
+ T version = (T) oldLocator.fastPath(me);
+ if (version != null) {
+ if (version != snap) {
+ throw new SnapshotException();
+ } else {
+ return;
+ }
+ }
+ ContentionManager manager = Thread.getContentionManager();
+ Locator newLocator = new Locator(me, (Copyable)factory.create());
+ while (true) {
+ oldLocator.writePath(me, manager, newLocator);
+ if (!me.isActive()) {
+ throw new AbortedException();
+ }
+ if (snap != newLocator.oldVersion) {
+ throw new SnapshotException();
+ }
+ if (this.start.compareAndSet(oldLocator, newLocator)) {
+ return;
+ }
+ oldLocator = this.start.get();
+ }
+ }*/
+}
+
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.factory;
+
+import dstm2.Transaction;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ *
+ * @author navid
+ */
+public class BlockLock {
+
+ //public ReentrantReadWriteLock lock;
+ public ReentrantLock lock;
+ public Transaction owner;
+ public INode inode;
+ public int blocknumber;
+ public AtomicInteger version;
+ int referncount;
+ public static enum MODE {READ, WRITE, READ_WRITE};
+ public MODE accessmode;
+
+ public BlockLock(INode inode, int blocknumber) {
+ version = new AtomicInteger(0);
+ //lock = new ReentrantReadWriteLock();
+ lock = new ReentrantLock();
+ this.inode = inode;
+ this.blocknumber = blocknumber;
+ referncount = 0;
+ }
+
+
+ public synchronized int getReferncount() {
+ return referncount;
+ }
+
+ public synchronized void setReferncount(int referncount) {
+ this.referncount = referncount;
+ }
+
+
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+import dstm2.Transaction;
+import dstm2.Thread;
+import dstm2.exceptions.AbortedException;
+import dstm2.file.interfaces.BlockAccessModesEnum;
+import dstm2.file.interfaces.FileAccessModesEum;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author navid
+ */
+public class ExtendedTransaction extends Transaction {
+
+ static ExtendedTransaction getTransaction() {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ // Vector<ReentrantLock> heldlocks;
+ private Vector heldlocks;
+ // HashMap<INode, TransactionLocalFileAttributes> FilesAccesses;
+ private HashMap FilesAccesses;
+ //HashMap<INode, TransactionLocalFileAttributes.MODE> FilesAccessMode;
+ private HashMap FilesAccessModes;
+
+ public HashMap getFilesAccessModes() {
+ return FilesAccessModes;
+ }
+
+ public HashMap getFilesAccesses() {
+ return FilesAccesses;
+ }
+
+ public void addtoFileAccessModeMap(INode inode, FileAccessModesEum mode) {
+ /* if (FilesAccessModes.containsKey(inode)) {
+ if (((FileAccessModesEum) (FilesAccessModes.get(inode))) == FileAccessModesEum.APPEND) {
+ FilesAccessModes.put(inode, mode);
+ } else if (((FileAccessModesEum) (FilesAccessModes.get(inode))) == FileAccessModesEum.READ) {
+ if (mode == FileAccessModesEum.READ_WRITE) {
+ FilesAccessModes.put(inode, mode);
+ }
+ }
+ } else {*/
+ FilesAccessModes.put(inode, mode);
+ //}
+ }
+
+ public ExtendedTransaction() {
+ super();
+ }
+
+ public Vector getHeldlocks() {
+ return heldlocks;
+ }
+
+ public Map getSortedFileAccessMap(HashMap hmap) {
+ Map sortedMap = new TreeMap(hmap);
+ return sortedMap;
+ }
+
+ public void addFile(TransactionalFile tf/*, TransactionLocalFileAttributes.MODE mode*/) {
+
+
+ if (tf.appendmode) {
+ this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.APPEND);
+ } else if (tf.writemode) {
+ this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ_WRITE);
+ } else {
+ this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ);
+ }
+ boolean flag = tf.to_be_created;
+ RandomAccessFile fd = tf.file;
+ ReentrantLock lock = tf.offsetlock;
+ Offset commitedoffset = tf.commitedoffset;
+
+ synchronized (tf.adapter) {
+ //Adapter ad = new Adapter(tf.adapter);
+ FilesAccesses.put(tf.getInode(), new TransactionLocalFileAttributes(/*ad*/tf.adapter, flag/*, mode*/, fd, lock, commitedoffset));
+ }
+
+ }
+
+ @Override
+ public boolean commit() { /// Locking offsets for File Descriptors
+
+ // Vector offsetlocks = new Vector();
+ Map hm = getSortedFileAccessMap(FilesAccesses);
+ //lock phase
+ Iterator iter = hm.keySet().iterator();
+ TransactionLocalFileAttributes value;
+ while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ INode key = (INode) iter.next();
+ value = (TransactionLocalFileAttributes) hm.get(key);
+
+ if (value.getAccesedblocks().isEmpty()) {
+ value.setValidatelocaloffset(false);
+ } else if ((value.getAccesedblocks().values().contains(BlockAccessModesEnum.READ)) || (value.getAccesedblocks().values().contains(BlockAccessModesEnum.READ_WRITE))) {
+ value.setValidatelocaloffset(true);
+ } else {
+ value.setValidatelocaloffset(false);
+ }
+ if (value.isValidatelocaloffset()) {
+ if (value.getCopylocaloffset() == value.currentcommitedoffset.getOffsetnumber()) {
+ value.offsetlock.lock();
+ //offsetlocks.add(value.offsetlock);
+ heldlocks.add(value.offsetlock);
+ if (!(value.getCopylocaloffset() == value.currentcommitedoffset.getOffsetnumber())) {
+ /* for (int i = 0; i < offsetlocks.size(); i++) {
+ ((ReentrantLock) offsetlocks.get(i)).unlock();
+ }*/
+ unlockAllLocks();
+ //throw new AbortedException();
+ return false;
+ }
+ } else {
+ /*for (int i = 0; i < offsetlocks.size(); i++) {
+ ((ReentrantLock) offsetlocks.get(i)).unlock();
+ }*/
+ unlockAllLocks();
+ return false;
+ // throw new AbortedException();
+ }
+ } else {
+ value.offsetlock.lock();
+ heldlocks.add(value.offsetlock);
+ }
+ }
+ return true;
+
+
+ // return ok;
+ /*
+ if (ok) {
+ if (!(super.commit())) {
+ unlockAllLocks();
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }*/
+ }
+
+ public boolean lock(BlockLock block, Adapter adapter, BlockAccessModesEnum mode, int expvalue/*INode inode, TransactionLocalFileAttributes tf*/) {
+
+ final ReentrantLock lock = block.lock;
+ while (this.getStatus() == Status.ACTIVE) {
+ if (lock.tryLock()) {
+ Thread.onAbortOnce(new Runnable() {
+
+ public void run() {
+ lock.unlock();
+ }
+ });
+
+ heldlocks.add(lock);
+ if (mode != BlockAccessModesEnum.WRITE) {
+ if (block.version.get() != expvalue) {
+ unlockAllLocks();
+ return false;
+ }
+ }
+ return true;
+ } else {
+ getContentionManager().resolveConflict(this, block.owner);
+ }
+ }
+ return false;
+ }
+
+ public void endTransaction() {
+
+ if (commit()) {
+ ///////////////////////////
+ Map hm = getSortedFileAccessMap(FilesAccesses);
+ Iterator iter = hm.keySet().iterator();
+ TransactionLocalFileAttributes value;
+ boolean ok = true;
+ while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && ok) {
+ int expvalue;
+ INode key = (INode) iter.next();
+
+ value = (TransactionLocalFileAttributes) hm.get(key);
+ if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.APPEND) {
+ Range tmp = (Range) (value.getWrittendata().firstKey());
+ int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(value.currentcommitedoffset.getOffsetnumber());
+ int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(value.currentcommitedoffset.getOffsetnumber(), (int) (tmp.getEnd() - tmp.getStart()));
+ for (int i = startblock; i <= targetblock; i++) {
+ value.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
+ }
+ }
+
+
+
+
+
+ Iterator it = value.getAccesedblocks().keySet().iterator();
+ while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ Integer blockno = (Integer) it.next();
+ BlockLock blockobj = (BlockLock) value.adapter.lockmap.get(blockno);
+ expvalue = ((Integer) value.getBlockversions().get(it)).intValue();
+ if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) {
+
+ if (blockobj.version.get() == expvalue) {
+ // ok = this.lock(key, value/*value.adapter*/);
+ ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
+ if (ok == false) {
+ // unlockAllLocks();
+ break;
+ }
+ } else {
+ ok = false;
+ // unlockAllLocks();
+ break;
+ // return false;
+ }
+ } else {
+
+ ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
+ if (ok == false) {
+ break;
+ }
+ }
+ }
+ }
+
+ if (!(ok)) {
+ unlockAllLocks();
+ throw new AbortedException();
+
+
+ }
+
+ if (!(super.commit())) {
+ unlockAllLocks();
+ throw new AbortedException();
+
+ }
+
+ iter = hm.keySet().iterator();
+ while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ INode key = (INode) iter.next();
+ value = (TransactionLocalFileAttributes) hm.get(key);
+ if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.APPEND) {
+ try {
+ Range range = (Range) value.getWrittendata().firstKey();
+
+
+ //synchronized(value.adapter){
+ //value.f.seek(value.adapter.commitedfilesize.get());
+ value.f.seek(value.currentcommitedoffset.getOffsetnumber());
+ //}
+
+ Byte[] data = new Byte[(int) (range.getEnd() - range.getStart())];
+ byte[] bytedata = new byte[(int) (range.getEnd() - range.getStart())];
+ data = (Byte[]) value.getWrittendata().get(range);
+
+ for (int i = 0; i < data.length; i++) {
+ bytedata[i] = data[i];
+ }
+ value.f.write(bytedata);
+
+ } catch (IOException ex) {
+ Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex);
+ }
+
+ } else if (((FileAccessModesEum) (this.FilesAccessModes.get(key))) == FileAccessModesEum.READ) {
+ continue;
+ } else {
+ int tobeaddedoffset = 0;
+
+ if (value.isValidatelocaloffset())
+ tobeaddedoffset = 0;
+ else
+ tobeaddedoffset = (int) (value.currentcommitedoffset.getOffsetnumber() - value.getCopylocaloffset());
+
+ Iterator it = value.getWrittendata().keySet().iterator();
+ while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ try {
+ Range range = (Range) it.next();
+ value.f.seek(range.getStart() + tobeaddedoffset);
+ Byte[] data = new Byte[(int) (range.getEnd() - range.getStart())];
+ byte[] bytedata = new byte[(int) (range.getEnd() - range.getStart())];
+ data = (Byte[]) value.getWrittendata().get(range);
+
+ for (int i = 0; i < data.length; i++) {
+ bytedata[i] = data[i];
+ }
+ value.f.write(bytedata);
+
+ } catch (IOException ex) {
+ Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+ }
+
+
+ iter = hm.keySet().iterator();
+ while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ INode key = (INode) iter.next();
+ value = (TransactionLocalFileAttributes) hm.get(key);
+ Iterator it = value.getAccesedblocks().keySet().iterator();
+
+ while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ Integer blockno = (Integer) it.next();
+ BlockLock blockobj = (BlockLock) value.adapter.lockmap.get(blockno);
+ blockobj.version.getAndIncrement();
+ value.currentcommitedoffset.setOffsetnumber(value.getLocaloffset());
+ synchronized (value.adapter) {
+ value.adapter.commitedfilesize.getAndSet(value.getFilelength());
+ }
+ }
+ }
+
+
+ // unlock phase
+ /*iter = hm.keySet().iterator();
+ while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
+ INode key = (INode) iter.next();
+ value = (TransactionLocalFileAttributes) hm.get(key);
+ value.offsetlock.unlock();
+ }*/
+ unlockAllLocks();
+ } else {
+ throw new AbortedException();
+ }
+ }
+
+ private void unlockAllLocks() {
+ Iterator it = heldlocks.iterator();
+ while (it.hasNext()) {
+ ReentrantLock lock = (ReentrantLock) it.next();
+ lock.unlock();
+ }
+ }
+}
+
+
+
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+import dstm2.Defaults;
+
+
+/**
+ *
+ * @author navid
+ */
+public class FileBlockManager {
+
+ public static long getFragmentIndexofTheFile(long filesize) {
+ return (filesize / Defaults.FILEFRAGMENTSIZE);
+ }
+
+ public static int getCurrentFragmentIndexofTheFile(long start) {
+ return (int) ((start / Defaults.FILEFRAGMENTSIZE));
+ }
+
+ public static int getTargetFragmentIndexofTheFile(long start, int offset) {
+ return (int) (((offset + start) / Defaults.FILEFRAGMENTSIZE));
+ }
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.factory;
+
+/**
+ *
+ * @author navid
+ */
+public class INode {
+
+ private long number;
+
+
+ public INode(long number) {
+ this.number = number;
+ }
+
+ public long getNumber() {
+ return number;
+ }
+
+ public void setNumber(long number) {
+ this.number = number;
+ }
+
+
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.factory;
+
+/**
+ *
+ * @author navid
+ */
+public class Offset {
+ private long offsetnumber;
+
+ public Offset(long offsetnumber) {
+ this.offsetnumber = offsetnumber;
+ }
+
+
+ public long getOffsetnumber() {
+ return offsetnumber;
+ }
+
+ public void setOffsetnumber(long offsetnumber) {
+ this.offsetnumber = offsetnumber;
+ }
+
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+/**
+ *
+ * @author navid
+ */
+public class Range implements Comparable {
+
+ private long start;
+ private long end;
+
+ public Range() {
+ }
+
+ public Range(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public Range intersection(Range secondrange) {
+ if ((secondrange.start <= this.start) && (this.start <= secondrange.end)) {
+ return new Range(this.start, Math.min(this.end, secondrange.end));
+ } else if ((secondrange.start <= this.end) && (this.end <= secondrange.end)) {
+ return new Range(Math.max(this.start, secondrange.start), this.end);
+ } else if ((this.start <= secondrange.start) && (secondrange.end <= this.end)) {
+ return new Range(secondrange.start, secondrange.end);
+ } else {
+ return null;
+ }
+ }
+
+ public boolean hasIntersection(Range secondrange) {
+ if ((secondrange.start <= this.start) && (this.start <= secondrange.end)) {
+ return true;
+ } else if ((secondrange.start <= this.end) && (this.end <= secondrange.end)) {
+ return true;
+ } else if ((this.start <= secondrange.start) && (secondrange.end <= this.end)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean includes(Range secondrange) {
+ if (this.start <= secondrange.start && secondrange.end <= this.end) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public Range[] minus(Range[] intersectedranges, int size) {
+ Range[] tmp = new Range[size + 1];
+
+ int counter = 0;
+ if (this.start < intersectedranges[0].start) {
+ tmp[counter] = new Range(this.start, intersectedranges[0].start);
+ counter++;
+ }
+ for (int i = 1; i < size; i++) {
+ tmp[counter] = new Range(intersectedranges[i - 1].end, intersectedranges[i].start);
+ counter++;
+ }
+ if (this.end > intersectedranges[size - 1].end) {
+ tmp[counter] = new Range(intersectedranges[size - 1].end, this.end);
+ counter++;
+ }
+ Range[] result = new Range[counter];
+ for (int i = 0; i < counter; i++) {
+ result[i] = tmp[i];
+ }
+ return result;
+ }
+
+ public int compareTo(Object arg0) {
+
+ Range tmp = (Range) arg0;
+ if (this.start < tmp.start) {
+ return -1;
+ } else if (this.start == tmp.start) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author navid
+ */
+public class TransactionLocalFileAttributes {
+
+ //public int recordedversion;
+ public Adapter adapter;
+
+ public boolean to_be_created = false;
+ RandomAccessFile f;
+ private boolean validatelocaloffset = true;
+ public boolean writetoabsoluteoffset = false;
+ public int writetoabsoluteoffsetfromhere = -1;
+ public ReentrantLock offsetlock;
+ //Vector<Integer> startoffset;
+ //Vector<Byte[]> data;
+ //Vector<Integer> occupiedblocks;
+ // private Vector readoccupiedblocks;
+ //private Vector writeoccupiedblocks;
+ // private Vector startoffset;
+ // private Vector data;
+ //private TreeMap<Range, Byte[]> writtendata;
+ private TreeMap writtendata;
+ //private TreeMap<Integer, MODE> accesesblocks;
+ private TreeMap accesedblocks;
+ //private TreeMap<Integer, Integer> blockversions;
+ private TreeMap blockversions;
+
+
+ public Offset currentcommitedoffset;
+ private final long copylocaloffset;
+ private final long copycurrentfilesize;
+ private long localoffset;
+ private long filelength;
+
+
+ public boolean isValidatelocaloffset() {
+ return validatelocaloffset;
+ }
+
+ public void setValidatelocaloffset(boolean validatelocaloffset) {
+ this.validatelocaloffset = validatelocaloffset;
+ }
+
+ public long getFilelength() {
+ return filelength;
+ }
+
+ public void setFilelength(long filelength) {
+ this.filelength = filelength;
+ }
+
+ public long getLocaloffset() {
+ return localoffset;
+ }
+
+ public void setLocaloffset(long localoffset) {
+ this.localoffset = localoffset;
+ }
+
+ //public MODE accessmode;
+
+ public TreeMap getAccesedblocks() {
+ return accesedblocks;
+ }
+
+ public TreeMap getBlockversions() {
+ return blockversions;
+ }
+
+ public long getCopycurrentfilesize() {
+ return copycurrentfilesize;
+ }
+
+ public long getCopylocaloffset() {
+ return copylocaloffset;
+ }
+
+
+ /* public Vector getData() {
+ return data;
+ }
+
+ public Vector getReadoccupiedblocks() {
+ return readoccupiedblocks;
+ }
+
+ public Vector getWriteoccupiedblocks() {
+ return writeoccupiedblocks;
+ }
+ */
+
+ /* public Vector getStartoffset() {
+ return startoffset;
+ }
+ */
+ public TransactionLocalFileAttributes(Adapter adapter, boolean to_be_created, /*TransactionLocalFileAttributes.MODE mode,*/ RandomAccessFile f, ReentrantLock offsetlock, Offset currentcommitedoffset) {
+
+ this.adapter = adapter;
+ this.localoffset = currentcommitedoffset.getOffsetnumber();
+ this.copylocaloffset = this.localoffset;
+ this.currentcommitedoffset = currentcommitedoffset;
+ this.filelength = adapter.commitedfilesize.get();
+ this.copycurrentfilesize = this.filelength;
+ this.to_be_created = to_be_created;
+ this.f = f;
+ //this.filelength = filelength;
+ writtendata = new TreeMap();
+ validatelocaloffset = false;
+ this.offsetlock = offsetlock;
+
+ //readoccupiedblocks = new Vector();
+ //writeoccupiedblocks = new Vector();
+ //accessmode = mode;
+ //recordedversion = adapter.version.get();
+ //startoffset = new Vector();
+ //data = new Vector();
+ //readoccupiedblocks = new Vector();
+ //writeoccupiedblocks = new Vector();
+ }
+
+ public TreeMap getWrittendata() {
+ return writtendata;
+ }
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+import dstm2.Thread;
+import dstm2.Transaction.Status;
+import dstm2.exceptions.AbortedException;
+import dstm2.exceptions.PanicException;
+import dstm2.file.interfaces.BlockAccessModesEnum;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author navid
+ */
+public class TransactionalFile {
+
+ public RandomAccessFile file;
+ private INode inode;
+ public Adapter adapter;
+ /* public AtomicLong commitedoffset;
+ public AtomicLong commitedfilesize;*/
+ public boolean to_be_created = false;
+ public boolean writemode = false;
+ public boolean appendmode = false;
+ public ReentrantLock offsetlock;
+ public Offset commitedoffset;
+
+ public TransactionalFile(String filename, String mode) {
+ synchronized (this) {
+ adapter = TransactionalFileWrapperFactory.createTransactionalFile(filename, mode);
+ inode = TransactionalFileWrapperFactory.getINodefromFileName(filename);
+ }
+
+ File f = new File(filename);
+ if ((!(f.exists()))) {
+ to_be_created = true;
+ file = null;
+ } else {
+ try {
+ offsetlock = new ReentrantLock();
+ file = new RandomAccessFile(f, mode);
+ } catch (FileNotFoundException ex) {
+
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ if (mode.equals("rw")) {
+ writemode = true;
+ } else if (mode.equals("a")) {
+ appendmode = true;
+ }
+ synchronized (adapter) {
+ try {
+ if (!(to_be_created)) {
+
+ adapter.commitedfilesize.set(file.length());
+
+ } else {
+ adapter.commitedfilesize.set(0);
+ }
+ if (!appendmode) {
+ commitedoffset.setOffsetnumber(0);
+ } else {
+ commitedoffset.setOffsetnumber(file.length());
+ }
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ /* public TransactionalFile(Adapter adapter, RandomAccessFile file) {
+
+ this.adapter = adapter;
+ this.file = file;
+ decriptors = new Vector();
+ }
+
+ public void copyTransactionalFile(TransactionalFile tf){
+ try {
+ int tmp = tf.commitedoffset.get();
+ boolean flag = tf.to_be_created;
+ FileDescriptor fd = tf.file.getFD();
+ Adapter ad = new Adapter(tf.adapter);
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }*/
+ public /*synchronized*/ BlockLock getBlockLock(int blocknumber) {
+ synchronized (adapter.lockmap) {
+ if (adapter.lockmap.containsKey(blocknumber)) {
+ return ((BlockLock) (adapter.lockmap.get(blocknumber)));
+ } else {
+ BlockLock tmp = new BlockLock(getInode(),blocknumber);
+ adapter.lockmap.put(blocknumber, tmp);
+ return tmp;
+ }
+ }
+
+ }
+
+ /* public boolean deleteBlockLock(int blocknumber){
+ synchronized(adapter.lockmap){
+ //adapter.lockmap.get(blocknumber)
+ if (adapter.lockmap.containsKey(blocknumber)){
+ if (((BlockLock)(adapter.lockmap.get(blocknumber))).referncount == 0){
+ adapter.lockmap.remove(adapter.lockmap.get(blocknumber));
+ return true;
+ }
+ else
+ return false;
+ }
+ else {
+ return false;
+ }
+ }
+ }*/
+ public void close() {
+ try {
+ file.close();
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+
+ public void seek(long offset) {
+
+ if (appendmode) {
+ throw new PanicException("Cannot seek into a file opened in append mode");
+ }
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+ TransactionLocalFileAttributes tmp;
+ if (!(me.getFilesAccesses().containsKey(this.inode))) {
+ me.addFile(this);
+ tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+ tmp.writetoabsoluteoffset = true;
+ tmp.writetoabsoluteoffsetfromhere = 0;
+ //tmp.setValidatelocaloffset(false);
+ } else {
+ tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+ //tmp.setValidatelocaloffset(true);
+ if (!(tmp.writetoabsoluteoffset))
+ {
+ tmp.writetoabsoluteoffset = true;
+ tmp.writetoabsoluteoffsetfromhere = tmp.getWrittendata().size();
+ }
+ }
+
+ tmp.setLocaloffset(offset);
+ }
+
+ public int read(byte[] b) {
+
+ if (appendmode) {
+ throw new PanicException("Cannot seek into a file opened in append mode");
+ }
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+ int size = b.length;
+ int result;
+
+ if (me == null) { // not a transaction
+
+ size = 10;
+ return size;
+ } else if (me.getFilesAccesses().containsKey(this.getInode())) {// in its read list files, read from the file
+
+ TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+ tmp.setValidatelocaloffset(true);
+
+
+ long loffset = tmp.getLocaloffset();
+
+
+ int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset);
+ int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size);
+ for (int i = startblock; i <= targetblock; i++) {
+ if (tmp.getAccesedblocks().containsKey(Integer.valueOf(i))) {
+ if (((BlockAccessModesEnum) (tmp.getAccesedblocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.WRITE) {
+ tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
+ }
+ } else {
+ tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ);
+
+
+ //adapter.lock.lock();
+ tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockLock(i).version.get()));
+ //adapter.lock.unlock();
+ /*synchronized(adapter.version){
+ tmp.getBlockversions().put(Integer.valueOf(i), tmp.adapter.version); // 1st alternative
+ }*/
+
+ //return readFromFile(b, tmp);
+ }
+ }
+
+
+ if (!(validateBlocksVersions(startblock, targetblock))) {
+ throw new AbortedException();
+ }
+
+
+
+ Range readrange = new Range(loffset, loffset + size);
+ Range writerange = null;
+ Range[] intersectedrange = new Range[tmp.getWrittendata().size()];
+ Range[] markedwriteranges = new Range[tmp.getWrittendata().size()];
+
+ int counter = 0;
+
+
+
+
+ boolean flag = false;
+ Iterator it = tmp.getWrittendata().keySet().iterator();
+ while (it.hasNext()) {
+ writerange = (Range) it.next();
+ if (writerange.includes(readrange)) {
+ flag = true;
+ break;
+ }
+
+ if (writerange.hasIntersection(readrange)) {
+ intersectedrange[counter] = readrange.intersection(writerange);
+ markedwriteranges[counter] = writerange;
+ counter++;
+ }
+ }
+
+ if (flag) {
+ result = readFromBuffer(b, tmp, writerange);
+ tmp.setLocaloffset(tmp.getLocaloffset() + result);
+ //return result;
+ } else if (counter == 0) {
+ result = readFromFile(b, tmp);
+ tmp.setLocaloffset(tmp.getLocaloffset() + result);
+ //return result;
+ } else {
+
+ for (int i = 0; i < counter; i++) {
+ Byte[] data = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i]));
+ byte[] copydata = new byte[data.length];
+
+ for (int j = 0; j < data.length; j++) {
+ copydata[j] = data[j].byteValue();
+ }
+ System.arraycopy(copydata, (int) (intersectedrange[i].getStart() - markedwriteranges[i].getStart()), b, (int) (intersectedrange[i].getStart() - readrange.getStart()), (int) (Math.min(intersectedrange[i].getEnd(), readrange.getEnd()) - intersectedrange[i].getStart()));
+ }
+
+ Range[] non_intersected_ranges = readrange.minus(intersectedrange, counter);
+ Vector occupiedblocks = new Vector();
+ Vector heldlocks = new Vector();
+ for (int i = 0; i < non_intersected_ranges.length; i++) {
+ int st = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getStart());
+ int en = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getEnd());
+ for (int j = st; j <= en; j++) {
+ if (!(occupiedblocks.contains(j))) {
+ occupiedblocks.add(j);
+ }
+ }
+ }
+
+
+ offsetlock.lock();
+
+ for (int k = 0; k < occupiedblocks.size(); k++) {
+ int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue();
+ while (me.getStatus() == Status.ACTIVE) {
+ BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(k)));
+ // if (block.version.get() == expvalue) {
+
+ if (block.lock.tryLock()) {
+ heldlocks.add(block.lock);
+ if (!(block.version.get() == expvalue)) {
+ me.abort();
+ } else {
+ break;
+ }
+ } else {
+ me.getContentionManager().resolveConflict(me, block.owner);
+ }
+ // } else {
+ // me.abort();
+ //}
+ }
+ if (me.getStatus() == Status.ABORTED) {
+ unlockLocks(heldlocks);
+ throw new AbortedException();
+ }
+ }
+
+ for (int i = 0; i < non_intersected_ranges.length; i++) {
+ try {
+ // offsetlock.lock();
+ file.seek(non_intersected_ranges[i].getStart());
+ file.read(b, (int) (non_intersected_ranges[i].getStart() - readrange.getStart()), (int) (non_intersected_ranges[i].getEnd() - non_intersected_ranges[i].getStart()));
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+
+ }
+
+ }
+
+ unlockLocks(heldlocks);
+ offsetlock.unlock();
+ tmp.setLocaloffset(tmp.getLocaloffset() + size);
+ result = size;
+ //return size;
+
+ }
+
+ return result;
+
+ } else { // add to the readers list
+ //me.addReadFile(this);
+
+ me.addFile(this/*, TransactionLocalFileAttributes.MODE.READ*/);
+ TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+ tmp.setValidatelocaloffset(true);
+ return read(b);
+ }
+
+ }
+
+ public void write(byte[] data) throws IOException {
+
+ if (!(writemode)) {
+ throw new IOException();
+ //return;
+ }
+
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+
+ int size = data.length;
+
+
+ if (me == null) // not a transaction
+ {
+ size = 10;
+ } else if (me.getFilesAccesses().containsKey(this.getInode())) // in its read list files, read from the file
+ {
+
+ TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+
+ Byte[] by = new Byte[size];
+ for (int i = 0; i < size; i++) {
+ by[i] = Byte.valueOf(data[i]);
+ }
+ TreeMap tm = tmp.getWrittendata();
+ long loffset = tmp.getLocaloffset();
+ Range newwriterange;
+ if (appendmode) {
+ newwriterange = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd()) + size);
+ Range range = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd()));
+ Byte[] appenddata = new Byte[(int) (newwriterange.getEnd() - newwriterange.getStart())];
+ Byte[] tempor = new Byte[(int) (range.getEnd() - range.getStart())];
+ System.arraycopy(tempor, 0, appenddata, 0, tempor.length);
+ System.arraycopy(by, 0, appenddata, tempor.length, by.length);
+ tm.remove(range);
+ tm.put(newwriterange, appenddata);
+ tmp.setLocaloffset(loffset + size);
+ tmp.setFilelength(tmp.getFilelength() + size);
+
+ return;
+ }
+
+ newwriterange = new Range(loffset, loffset + size);
+ Range oldwriterange = null;
+ Range intersect = null;
+ Range[] intersectedrange = new Range[tmp.getWrittendata().size()];
+ Range[] markedwriteranges = new Range[tmp.getWrittendata().size()];
+ by = new Byte[size];
+ int counter = 0;
+
+ /*
+
+ if (tmp.accessmode == TransactionLocalFileAttributes.MODE.READ)
+ tmp.accessmode = TransactionLocalFileAttributes.MODE.READ_WRITE;
+ else if (tmp.accessmode == TransactionLocalFileAttributes.MODE.WRITE)
+ simpleWritetoBuffer(by, newwriterange, tm);
+ */
+
+ int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset);
+ int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size);
+ for (int i = startblock; i <= targetblock; i++) {
+ if (tmp.getAccesedblocks().containsKey(Integer.valueOf(i))) {
+ if (((BlockAccessModesEnum) (tmp.getAccesedblocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.READ) {
+ tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
+ }
+ } else {
+ tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
+
+ tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockLock(i).version.get()));
+ }
+ }
+
+ // Vector offset = tmp.getStartoffset();
+ // Vector tmpdata = tmp.getData();
+
+
+
+ // offset.add(new Integer(tmp.localoffset));
+
+ boolean flag = false;
+ Iterator it = tmp.getWrittendata().keySet().iterator();
+ while (it.hasNext()) {
+ oldwriterange = (Range) it.next();
+ if (oldwriterange.includes(newwriterange)) {
+ flag = true;
+ intersect = newwriterange.intersection(oldwriterange);
+ break;
+ }
+
+ if (oldwriterange.hasIntersection(newwriterange)) {
+ intersectedrange[counter] = newwriterange.intersection(oldwriterange);
+ markedwriteranges[counter] = oldwriterange;
+ counter++;
+ }
+ }
+
+ if (flag) {
+ int datasize = (int) (oldwriterange.getEnd() - oldwriterange.getStart());
+ Byte[] original = (Byte[]) (tmp.getWrittendata().get(oldwriterange));
+ byte[] originaldata = new byte[datasize];
+
+ for (int i = 0; i < data.length; i++) {
+ originaldata[i] = original[i].byteValue();
+ }
+ System.arraycopy(data, 0, originaldata, (int) (newwriterange.getStart() - oldwriterange.getStart()), size);
+ Byte[] to_be_inserted = new Byte[datasize];
+ //System.arraycopy(b, 0, ab, a.length);
+
+ for (int i = 0; i < datasize; i++) {
+ to_be_inserted[i] = Byte.valueOf(originaldata[i]);
+ }
+ tm.put(oldwriterange, to_be_inserted);
+ tmp.setLocaloffset(loffset + size);
+ if (tmp.getLocaloffset() > tmp.getFilelength())
+ tmp.setFilelength(tmp.getLocaloffset());
+ return;
+
+ } else if (counter == 0) {
+ tm.put(newwriterange, data);
+ tmp.setLocaloffset(loffset + size);
+ if (tmp.getLocaloffset() > tmp.getFilelength())
+ tmp.setFilelength(tmp.getLocaloffset());
+ return;
+ } else {
+
+ int suffixstart = 0;
+ long start = 0;
+ long end = 0;
+ Byte[] prefixdata = null;
+ Byte[] suffixdata = null;
+ boolean prefix = false;
+ boolean suffix = false;
+
+ for (int i = 0; i < counter; i++) {
+
+ //if (newwriterange.includes(markedwriteranges[i]))
+ //tm.remove(markedwriteranges);
+
+ if (markedwriteranges[i].getStart() < newwriterange.getStart()) {
+
+ prefixdata = new Byte[(int) (newwriterange.getStart() - markedwriteranges[i].getStart())];
+ prefixdata = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i]));
+ start = markedwriteranges[i].getStart();
+ //newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()];
+ //System.arraycopy(by, 0, newdata, newwriterange.getStart() - markedwriteranges[i].getStart(), size);
+ //System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart());
+
+ //newwriterange.setStart(markedwriteranges[i].getStart());
+ prefix = true;
+
+
+ } else if (markedwriteranges[i].getEnd() > newwriterange.getEnd()) {
+
+ suffixdata = new Byte[(int) (markedwriteranges[i].getStart() - newwriterange.getStart())];
+ suffixdata = (Byte[]) (tmp.getWrittendata().get(markedwriteranges[i]));
+ end = markedwriteranges[i].getEnd();
+
+ /*Byte [] originaldata = (Byte [])(tmp.getWrittendata().get(markedwriteranges[i]));
+ newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()];
+ System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart());
+
+ newwriterange.setStart(markedwriteranges[i].getStart());*/
+ //newwriterange.setEnd(markedwriteranges[i].getEnd());
+ suffix = true;
+ suffixstart = (int) (intersectedrange[i].getEnd() - intersectedrange[i].getStart());
+ //tm.remove(markedwriteranges[i]);
+ }
+ tm.remove(markedwriteranges[i]);
+
+ }
+ Byte[] data_to_insert;
+
+ if ((prefix) && (suffix)) {
+ data_to_insert = new Byte[(int) (newwriterange.getStart() - start + size + newwriterange.getEnd() - end)];
+ System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (newwriterange.getStart() - start));
+ System.arraycopy(by, 0, data_to_insert, (int) (newwriterange.getStart() - start), size);
+ System.arraycopy(suffixdata, suffixstart, data_to_insert, (int) (size + newwriterange.getStart() - start), (int) (end - newwriterange.getEnd()));
+ newwriterange.setStart(start);
+ newwriterange.setEnd(end);
+ }
+ else if (prefix) {
+ data_to_insert = new Byte[(int) (newwriterange.getStart() - start + size)];
+ System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (newwriterange.getStart() - start));
+ System.arraycopy(by, 0, data_to_insert, (int) (newwriterange.getStart() - start), size);
+ newwriterange.setStart(start);
+ }
+ else if (suffix) {
+ data_to_insert = new Byte[(int) (newwriterange.getEnd() - end + size)];
+ System.arraycopy(by, 0, data_to_insert, 0, size);
+ System.arraycopy(suffixdata, suffixstart, data_to_insert, size, (int) (end - newwriterange.getEnd()));
+ newwriterange.setEnd(end);
+ }
+ else {
+ data_to_insert = new Byte[size];
+ System.arraycopy(data_to_insert, (int) (newwriterange.getStart() - start), by, 0, size);
+ }
+ tm.put(newwriterange, data_to_insert);
+ tmp.setLocaloffset(loffset + size);
+ if (tmp.getLocaloffset() > tmp.getFilelength())
+ tmp.setFilelength(tmp.getLocaloffset());
+ }
+
+
+ } else {
+ me.addFile(this/*, TransactionLocalFileAttributes.MODE.WRITE*/);
+ write(data);
+ }
+
+ }
+
+ private int readFromFile(byte[] readdata, TransactionLocalFileAttributes tmp) {
+
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+ int st = FileBlockManager.getCurrentFragmentIndexofTheFile(tmp.getLocaloffset());
+ int end = FileBlockManager.getTargetFragmentIndexofTheFile(tmp.getLocaloffset(), readdata.length);
+ Vector heldlocks = new Vector();
+ for (int k = st; k <= end; k++) {
+ int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue();
+ while (me.getStatus() == Status.ACTIVE) {
+ BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(k)));
+ // if (block.version.get() == expvalue) {
+
+ if (block.lock.tryLock()) {
+ heldlocks.add(block.lock);
+ if (!(block.version.get() == expvalue)) {
+ me.abort();
+ } else {
+ break;
+ }
+ } else {
+ me.getContentionManager().resolveConflict(me, block.owner);
+ }
+ //} else {
+ // me.abort();
+ //}
+ }
+ if (me.getStatus() == Status.ABORTED) {
+ //unlockLocks(heldlocks);
+ throw new AbortedException();
+ }
+
+ }
+ int size = -1;
+ try {
+
+ offsetlock.lock();
+ file.seek(tmp.getLocaloffset());
+ size = file.read(readdata);
+ offsetlock.unlock();
+
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }
+
+
+
+ unlockLocks(heldlocks);
+ tmp.setLocaloffset(tmp.getLocaloffset() + size);
+ return size;
+
+ /*try {
+ while (me.getStatus() == Status.ACTIVE) {
+ if (this.adapter.lock.tryLock()) {
+ file.seek(tmp.localoffset);
+ int result = file.read(readdata);
+ this.adapter.lock.unlock();
+ tmp.localoffset += result;
+ return result;
+ } else {
+ me.getContentionManager().resolveConflict(me, adapter.writer);
+ }
+ }
+
+ } catch (IOException ex) {
+ Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
+ }*/
+
+ }
+
+ private int readFromBuffer(byte[] readdata, TransactionLocalFileAttributes tmp, Range writerange) {
+
+
+ long loffset = tmp.getLocaloffset();
+
+ Byte[] data = (Byte[]) (tmp.getWrittendata().get(writerange));
+ byte[] copydata = null;
+
+ for (int i = 0; i < data.length; i++) {
+ copydata[i] = data[i].byteValue();
+ }
+ System.arraycopy(copydata, (int) (loffset - writerange.getStart()), readdata, 0, readdata.length);
+ return readdata.length;
+
+ }
+
+ public void simpleWritetoBuffer(Byte[] data, Range newwriterange, TreeMap tm) {
+ tm.put(newwriterange, data);
+ }
+
+ public void unlockLocks(Vector heldlocks) {
+ for (int i = 0; i < heldlocks.size(); i++) {
+ ((ReentrantLock) heldlocks.get(i)).unlock();
+ }
+ }
+
+ public boolean validateBlocksVersions(int startblock, int targetblock) {
+ boolean valid = true;
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+ TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode())));
+ for (int i = startblock; i <= targetblock; i++) {
+ int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(i))).intValue();
+ BlockLock block = ((BlockLock) tmp.adapter.lockmap.get(Integer.valueOf(i)));
+ if (expvalue != block.version.get()) {
+ valid = false;
+ break;
+ }
+ }
+
+ return valid;
+ }
+
+ public INode getInode() {
+ return inode;
+ }
+
+ public void setInode(INode inode) {
+ this.inode = inode;
+ }
+ /* public void check(){
+ ExtendedTransaction me = ExtendedTransaction.getTransaction();
+ for (Adapter reader : me.ReadOnly)
+ if (reader.version.get() == adapter.version.get())
+
+
+
+ }*/
+}
\ No newline at end of file
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package dstm2.file.factory;
+
+import java.io.File;
+import java.util.HashMap;
+
+/**
+ *
+ * @author navid
+ */
+public class TransactionalFileWrapperFactory {
+
+
+
+ private TransactionalFileWrapperFactory() {
+ }
+ // private static HashMap<INode, Adapter> filemappings;
+ private static HashMap filemappings;
+
+ private static native long getINodeNative(String filename);
+
+ static{
+ System.load("/home/navid/libnav.so");
+ }
+
+ static INode getINodefromFileName(String filename) {
+ return new INode(getINodeNative(filename));
+ }
+
+ public static void main(String args[]){
+ System.out.print("in java " + getINodeNative("/home/navid/myfile.txt") +"\n");
+ System.out.print("in java " + getINodeNative("/home/navid/HellWorld.java") +"\n");
+ }
+
+ public synchronized static Adapter createTransactionalFile(String filename, String mode) {
+
+
+ long inodenumber = getINodeNative(filename);
+ INode inode = new INode(inodenumber);
+
+
+
+ if (filemappings.containsKey(inode)) {
+ return (Adapter)filemappings.get(inode);
+
+ } else {
+ Adapter adapter = new Adapter();
+ filemappings.put(inode, adapter);
+ return adapter;
+ }
+
+ }
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.factory;
+
+/**
+ *
+ * @author navid
+ */
+public class TransactionalRandomAccessFile {
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.interfaces;
+
+/**
+ *
+ * @author navid
+ */
+public enum BlockAccessModesEnum {
+ READ_WRITE, WRITE, READ
+}
--- /dev/null
+/*
+ * ContentionManager.java
+ *
+ * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
+ * Clara, California 95054, U.S.A. All rights reserved.
+ *
+ * Sun Microsystems, Inc. has intellectual property rights relating to
+ * technology embodied in the product that is described in this
+ * document. In particular, and without limitation, these
+ * intellectual property rights may include one or more of the
+ * U.S. patents listed at http://www.sun.com/patents and one or more
+ * additional patents or pending patent applications in the U.S. and
+ * in other countries.
+ *
+ * U.S. Government Rights - Commercial software.
+ * Government users are subject to the Sun Microsystems, Inc. standard
+ * license agreement and applicable provisions of the FAR and its
+ * supplements. Use is subject to license terms. Sun, Sun
+ * Microsystems, the Sun logo and Java are trademarks or registered
+ * trademarks of Sun Microsystems, Inc. in the U.S. and other
+ * countries.
+ *
+ * This product is covered and controlled by U.S. Export Control laws
+ * and may be subject to the export or import laws in other countries.
+ * Nuclear, missile, chemical biological weapons or nuclear maritime
+ * end uses or end users, whether direct or indirect, are strictly
+ * prohibited. Export or reexport to countries subject to
+ * U.S. embargo or to entities identified on U.S. export exclusion
+ * lists, including, but not limited to, the denied persons and
+ * specially designated nationals lists is strictly prohibited.
+ */
+
+package dstm2.file.interfaces;
+
+
+import java.util.Collection;
+import dstm2.file.factory.TransactionalFile;
+/**
+ * Interface satisfied by all contention managers
+ */
+public interface ContentionManager {
+ /**
+ * Either give the writer a chance to finish it, abort it, or both.
+ * @param me Calling transaction.
+ * @param other Transaction that's in my way.
+ */
+ void resolveConflict(Transaction me, Transaction other, TransactionalFile tf);
+
+
+ /**
+ * Either give the writer a chance to finish it, abort it, or both.
+ * @param me Calling transaction.
+ * @param other set of transactions in my way
+ */
+ void resolveConflict(Transaction me, Collection<Transaction> other);
+
+ /**
+ * Assign a priority to caller. Not all managers assign meaningful priorities.
+ * @return Priority of conflicting transaction.
+ */
+ long getPriority();
+
+ /**
+ * Change this manager's priority.
+ * @param value new priority value
+ */
+ void setPriority(long value);
+
+ /**
+ * Notify manager that object was opened.
+ */
+ void openSucceeded();
+
+ /**
+ * Notify manager that transaction committed.
+ */
+ void committed();
+
+};
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.interfaces;
+
+import dstm2.Transaction.Status;
+import java.util.Collection;
+import dstm2.file.factory.TransactionalFile;
+
+/**
+ *
+ * @author navid
+ */
+public class CustomCM implements ContentionManager{
+
+ public void resolveConflict(Transaction me, Transaction other, TransactionalFile obj) {
+ if (other != null)
+ if (other.getStatus() == Status.ACTIVE || other.getStatus() == Status.COMMITTED)
+ other.waitWhileActiveNotWaiting();
+ }
+
+ public void resolveConflict(Transaction me, Collection<Transaction> other) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public long getPriority() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void setPriority(long value) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void openSucceeded() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void committed() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void resolveConflict(Transaction me, Transaction other) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.interfaces;
+
+import java.io.WriteAbortedException;
+
+/**
+ *
+ * @author navid
+ */
+public enum FileAccessModesEum {
+ READ_WRITE, APPEND, READ
+
+}
--- /dev/null
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package dstm2.file.interfaces;
+
+/**
+ *
+ * @author navid
+ */
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+
+import dstm2.Transaction.Status;
+
+/**
+ *
+ * @author navid
+ */
+public interface Transaction {
+
+ /**
+ * Tries to abort transaction
+ * @return whether transaction was aborted (not necessarily by this call)
+ */
+ boolean abort();
+
+ /**
+ * Tries to commit transaction
+ * @return whether transaction was committed
+ */
+ boolean commit();
+
+ /**
+ * This transaction's contention manager
+ * @return the manager
+ */
+ ContentionManager getContentionManager();
+
+ /**
+ * Access the transaction's current status.
+ * @return current transaction status
+ */
+ Status getStatus();
+
+ /**
+ * Tests whether transaction is aborted.
+ * @return whether transaction is aborted
+ */
+ boolean isAborted();
+
+ /**
+ * Tests whether transaction is active.
+ * @return whether transaction is active
+ */
+ boolean isActive();
+
+ /**
+ * Tests whether transaction is committed.
+ * @return whether transaction is committed
+ */
+ boolean isCommitted();
+
+ /**
+ * Returns a string representation of this transaction
+ * @return the string representcodes[ation
+ */
+ String toString();
+
+ /**
+ * Tests whether transaction is committed or active.
+ * @return whether transaction is committed or active
+ */
+ boolean validate();
+
+ /**
+ * Block caller while transaction is active.
+ */
+ void waitWhileActive();
+
+ /**
+ * Block caller while transaction is active.
+ */
+ void waitWhileActiveNotWaiting();
+
+ /**
+ * Wake up any transactions waiting for this one to finish.
+ */
+ void wakeUp();
+
+}
+