From: navid Date: Mon, 24 Nov 2008 20:46:53 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: buildscript^7~9 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=4f1d067f89a872866ac2505d5b2626a1b233a106;p=IRC.git *** empty log message *** --- diff --git a/Robust/Transactions/TransactionalIO/dist/TransactionalIO.jar b/Robust/Transactions/TransactionalIO/dist/TransactionalIO.jar index f34c2dbd..df7a3390 100644 Binary files a/Robust/Transactions/TransactionalIO/dist/TransactionalIO.jar and b/Robust/Transactions/TransactionalIO/dist/TransactionalIO.jar differ diff --git a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java index d04a8720..e7f11488 100644 --- a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java +++ b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java @@ -45,9 +45,9 @@ public class ExtendedTransaction implements TransactionStatu { private native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd); - // { - // System.load("/home/navid/libkooni.so"); - // } + { + System.load("/home/navid/libkooni.so"); + } private boolean flag = true; public TransactionStatu memorystate; @@ -253,31 +253,20 @@ public class ExtendedTransaction implements TransactionStatu { - public void addFile(TransactionalFile tf, long offstenumber) { + public void addFile(TransactionalFile tf, long offsetnumber/*, TransactionLocalFileAttributes tmp*/) { - - //tf.lockOffset(this); - - TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offstenumber/*, tf.getInodestate().commitedfilesize.get()*/); - - //tf.offsetlock.unlock(); - - - - Vector dummy; - - if (AccessedFiles.containsKey(tf.getInode())){ - dummy = (Vector) AccessedFiles.get(tf.getInode()); - } - else{ - dummy = new Vector(); - AccessedFiles.put(tf.getInode(), dummy); - } - - dummy.add(tf); - GlobaltoLocalMappings.put(tf, tmp); - merge_for_writes_done.put(tf.getInode(), Boolean.TRUE); + TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offsetnumber/*, tf.getInodestate().commitedfilesize.get()*/); + Vector dummy; + if (AccessedFiles.containsKey(tf.getInode())) { + dummy = (Vector) AccessedFiles.get(tf.getInode()); + } else { + dummy = new Vector(); + AccessedFiles.put(tf.getInode(), dummy); + } + dummy.add(tf); + GlobaltoLocalMappings.put(tf, tmp); + merge_for_writes_done.put(tf.getInode(), Boolean.TRUE); } @@ -394,16 +383,31 @@ public class ExtendedTransaction implements TransactionStatu { if (mode == BlockAccessModesEnum.READ){ - lock = block.getLock().readLock(); - } + lock = block.getLock().readLock(); + + + } else { + lock = block.getLock().writeLock(); + } while (this.getStatus() == Status.ACTIVE) { + //synchronized(block){ + + // if (lock.tryLock()) { lock.lock(); + // synchronized(benchmark.lock){ + // System.out.println(Thread.currentThread() + " Lock the block lock for " + lock +" number " + block.getBlocknumber()); + // } heldblocklocks.add(lock); + // block.setOwner(this); return true; + // } + + + //getContentionmanager().resolveConflict(this, block); } return false; @@ -510,11 +514,11 @@ public class ExtendedTransaction implements TransactionStatu { long end; //synchronized(value.getOwnertransactionalFile().getCommitedoffset()){ - start = value.getRange().getStart() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber(); - end = value.getRange().getEnd() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber(); - if (value.getTFA().isUnknown_inital_offset_for_write()){ - value.getTFA().setLocaloffset(value.getTFA().getLocaloffset() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber()); - value.getTFA().setUnknown_inital_offset_for_write(false); + start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber(); + end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber(); + if (value.getBelongingto().isUnknown_inital_offset_for_write()){ + value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber()); + value.getBelongingto().setUnknown_inital_offset_for_write(false); } //} @@ -670,7 +674,7 @@ public class ExtendedTransaction implements TransactionStatu { // writeop.getOwnertransactionalFile().file.seek(writeop.getRange().getStart()); // System.out.println(Thread.currentThread() + " range " + writeop.getRange().getStart()); // writeop.getOwnertransactionalFile().file.write(bytedata); - invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnerTF().file); + invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file); // System.out.println(Thread.currentThread() + " " + bytedata); // } catch (IOException ex) { diff --git a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java index 435fa3d9..8c4a31a7 100644 --- a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java +++ b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java @@ -13,6 +13,7 @@ import TransactionalIO.benchmarks.benchmark; import TransactionalIO.core.ExtendedTransaction.Status; import TransactionalIO.interfaces.BlockAccessModesEnum; import TransactionalIO.interfaces.OffsetDependency; +import com.sun.org.apache.bcel.internal.generic.IFEQ; import java.io.File; import java.io.FileDescriptor; import java.io.FileNotFoundException; @@ -39,11 +40,10 @@ import sun.misc.ConditionLock; public class TransactionalFile implements Comparable{ - private native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd); + private native int nativepread(byte buff[], long offset, int size, FileDescriptor fd); { - System.load("/home/navid/libkooni.so"); } @@ -52,7 +52,8 @@ public class TransactionalFile implements Comparable{ private INode inode; private int sequenceNum = 0; public static int currenSeqNumforInode = 0; - + /* public AtomicLong commitedoffset; + public AtomicLong commitedfilesize;*/ public boolean to_be_created = false; public boolean writemode = false; public boolean appendmode = false; @@ -88,7 +89,7 @@ public class TransactionalFile implements Comparable{ sequenceNum = inodestate.seqNum; inodestate.seqNum++; - + if (mode.equals("rw")) { writemode = true; } else if (mode.equals("a")) { @@ -97,16 +98,7 @@ public class TransactionalFile implements Comparable{ if (inodestate != null) { synchronized (inodestate) { - try { - if (!appendmode) { - committedoffset = new GlobalOffset(0); - } else { - committedoffset = new GlobalOffset(file.length()); - } - - } catch (IOException ex) { - Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - } + committedoffset = new GlobalOffset(0); } } @@ -123,19 +115,6 @@ public class TransactionalFile implements Comparable{ } } - - public int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) { - try { - return nativepwrite(buff, offset, buff.length, file.getFD()); - } catch (IOException ex) { - - Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - return -1; - } - - } - - public int getSequenceNum() { @@ -151,12 +130,10 @@ public class TransactionalFile implements Comparable{ return inodestate; } - public INode getInode() { return inode; } - - + public void close() { try { file.close(); @@ -165,7 +142,7 @@ public class TransactionalFile implements Comparable{ } } - public long getFilePointer(){ + public long getFilePointer(){ ExtendedTransaction me = Wrapper.getTransaction(); TransactionLocalFileAttributes tmp = null; @@ -175,7 +152,7 @@ public class TransactionalFile implements Comparable{ } if (!(me.getGlobaltoLocalMappings().containsKey(this))){ - me.addFile(this, 0); + me.addFile(this, 0); } tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(this); @@ -183,29 +160,33 @@ public class TransactionalFile implements Comparable{ tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); long target; - lockOffset(me); + if (!(this.committedoffset.getOffsetReaders().contains(me))){ this.committedoffset.getOffsetReaders().add(me); + } + tmp.setLocaloffset(tmp.getLocaloffset() + this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset()); target = this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset(); + + offsetlock.unlock(); - - Iterator it; if ((me.getWriteBuffer().get(inode)) != null) - { + { + it = ((Vector) (me.getWriteBuffer().get(inode))).iterator(); while (it.hasNext()){ WriteOperations wrp = (WriteOperations) it.next(); - if (wrp.getTFA() == tmp && wrp.isUnknownoffset()) + if (wrp.getBelongingto()== tmp && wrp.isUnknownoffset()) wrp.setUnknownoffset(false); wrp.getRange().setStart(target + wrp.getRange().getStart()); wrp.getRange().setEnd(target + wrp.getRange().getEnd()); } - } + } + } @@ -215,9 +196,6 @@ public class TransactionalFile implements Comparable{ public void seek(long offset) { - if (appendmode) { - throw new PanicException("Cannot seek into a file opened in append mode"); - } ExtendedTransaction me = Wrapper.getTransaction(); if (me == null) { @@ -240,73 +218,56 @@ public class TransactionalFile implements Comparable{ tmp.setUnknown_inital_offset_for_write(false); tmp.setLocaloffset(offset); - } } public int read(byte[] b) { - + + ExtendedTransaction me = Wrapper.getTransaction(); int size = b.length; int result = 0; - if (me == null) { // not a transaction, but any I/O operation even though within a non-transaction is considered a single opertion transactiion return non_Transactional_Read(b); } - - + if (me.getGlobaltoLocalMappings().containsKey(this)){ - + TransactionLocalFileAttributes tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(this); tmp.setUnknown_inital_offset_for_write(false); - //make this transaction read dependent on this descriptor if it is not so already - if ((tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (tmp.offsetdependency == OffsetDependency.NO_ACCESS) || (tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_2)){ - + OffsetDependency dep = tmp.getOffsetdependency(); + if ((dep == OffsetDependency.WRITE_DEPENDENCY_1) || + (dep == OffsetDependency.NO_ACCESS) || + (dep == OffsetDependency.WRITE_DEPENDENCY_2)) + { + tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); lockOffset(me); - makeDependentonOffet(me, tmp); + + if (dep != OffsetDependency.WRITE_DEPENDENCY_2){ + tmp.setLocaloffset(tmp.getLocaloffset() + this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset()); + } + + if (!(this.committedoffset.getOffsetReaders().contains(me))){ + this.committedoffset.getOffsetReaders().add(me); + + } + offsetlock.unlock(); } - - // make all write operations by any transaction to this offset absolute and those transaction read - //dependent on the offset - Iterator it; - if (me.getWriteBuffer().get(inode) != null) - { - it = ((Vector) (me.getWriteBuffer().get(inode))).iterator(); - while (it.hasNext()){ - - WriteOperations wrp = (WriteOperations) it.next(); - if (wrp.isUnknownoffset()){ - wrp.setUnknownoffset(false); - - wrp.getOwnerTF().lockOffset(me); - makeWriteAbsolute(me, wrp); - wrp.getOwnerTF().offsetlock.unlock(); - - markAccessedBlocks(me, (int)wrp.getRange().getStart(), (int)(wrp.getRange().getEnd() - wrp.getRange().getStart()), BlockAccessModesEnum.WRITE); - - } - } - } - /* if (!(me.isWritesmerged())){ - mergeWrittenData(); - }*/ - - // merge the write by this transation to this descriptor before start reading from it + makeWritestDependent(me); + + if ((Boolean)me.merge_for_writes_done.get(inode) == Boolean.FALSE){ mergeWrittenData(me); } - - - // find the intersections of the data o be read with the - // transactions local buffer if any at all long loffset = tmp.getLocaloffset(); markAccessedBlocks(me, loffset, size, BlockAccessModesEnum.READ); + Vector writebuffer; if ((me.getWriteBuffer().get(this.inode)) != null) writebuffer = (Vector) (me.getWriteBuffer().get(this.inode)); @@ -319,55 +280,44 @@ public class TransactionalFile implements Comparable{ Range[] intersectedrange = new Range[writebuffer.size()]; WriteOperations[] markedwriteop = new WriteOperations[writebuffer.size()]; - int number_of_intersections = 0; - boolean data_in_local_buffer = false; + int counter = 0; + boolean in_local_buffer = false; - it = writebuffer.iterator(); + Iterator it = writebuffer.iterator(); while (it.hasNext()) { WriteOperations wrp = (WriteOperations) it.next(); writerange = wrp.getRange(); if (writerange.includes(readrange)) { - markedwriteop[number_of_intersections] = wrp; - data_in_local_buffer = true; + markedwriteop[counter] = wrp; + in_local_buffer = true; break; } if (writerange.hasIntersection(readrange)) { - intersectedrange[number_of_intersections] = readrange.intersection(writerange); - markedwriteop[number_of_intersections] = wrp; + intersectedrange[counter] = readrange.intersection(writerange); + markedwriteop[counter] = wrp; - number_of_intersections++; + counter++; } } - - - if (data_in_local_buffer) { - // the to be read offset is written previously by the transaction itself - // so the read is done from localbuffer - result = readFromBuffer(b, tmp, markedwriteop[number_of_intersections],writerange); + if (in_local_buffer) { // the read one from local buffer + result = readFromBuffer(b, tmp, markedwriteop[counter],writerange); return result; } else{ - if (number_of_intersections == 0) { - // the whole range to be read should be donefrom the file itself, + if (counter == 0) { // all the read straight from file result = readFromFile(me, b, tmp); } - - else { - //some of the parts to read are in local buffer some should be done - //from the file - for (int i = 0; i < number_of_intersections; i++) { - - + else { // some parts from file others from buffer + for (int i = 0; i < counter; i++) { Byte[] data = markedwriteop[i].getData(); byte[] copydata = new byte[data.length]; - for (int j = 0; j < data.length; j++) { copydata[j] = data[j].byteValue(); } @@ -375,7 +325,7 @@ public class TransactionalFile implements Comparable{ result += Math.min(intersectedrange[i].getEnd(), readrange.getEnd()) - intersectedrange[i].getStart(); } - Range[] non_intersected_ranges = readrange.minus(intersectedrange, number_of_intersections); + Range[] non_intersected_ranges = readrange.minus(intersectedrange, counter); Vector occupiedblocks = new Vector(); for (int i = 0; i < non_intersected_ranges.length; i++) { int st = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getStart()); @@ -387,50 +337,71 @@ public class TransactionalFile implements Comparable{ } } - - lockOffset(me); me.getHeldoffsetlocks().add(offsetlock); - - + boolean locked = false; + BlockDataStructure block; for (int k = 0; k < occupiedblocks.size(); k++) { // locking the block locks - - while (me.getStatus() == Status.ACTIVE) { - - BlockDataStructure block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k)));//(BlockDataStructure) tmp.adapter.lockmap.get(Integer.valueOf(k))); + + while (me.getStatus() == Status.ACTIVE) { + block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k)));//(BlockDataStructure) tmp.adapter.lockmap.get(Integer.valueOf(k))); block.getLock().readLock().lock(); - if (!(block.getReaders().contains(me))){ - block.getReaders().add(me); - } - me.getHeldblocklocks().add(block.getLock().readLock()); - break; - } + if (!(block.getReaders().contains(me))){ + block.getReaders().add(me); + } + locked = true; + //me.getHeldblocklocks().add(block.getLock().readLock()); + break; + } if (me.getStatus() == Status.ABORTED) { - throw new AbortedException(); + int m; + if (locked) { + m = k + 1; + } else { + m = k; + } + for (int i = 0; i < m; i++) { + block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k))); + me.getHeldblocklocks().add(block.getLock().readLock()); + } + + locked = false; + throw new AbortedException(); } } - - - + for (int i = 0; i < non_intersected_ranges.length; i++) { try { - //invokeNativepread(b, non_intersected_ranges[i].getStart(), size); + file.seek(non_intersected_ranges[i].getStart()); int tmpsize = file.read(b, (int) (non_intersected_ranges[i].getStart() - readrange.getStart()), (int) (non_intersected_ranges[i].getEnd() - non_intersected_ranges[i].getStart())); result += tmpsize; } catch (IOException ex) { - Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); } } - me.unlockAllLocks(); + + if (me.getStatus() == Status.ABORTED) { + for (int k = 0; k < occupiedblocks.size(); k++) { + block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k))); + me.getHeldblocklocks().add(block.getLock().readLock()); + } + throw new AbortedException(); + } + for (int k = 0; k < occupiedblocks.size(); k++) { + block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k)));//(BlockDataStructure) tmp.adapter.lockmap.get(Integer.valueOf(k))); + block.getLock().readLock().unlock(); + } + // me.unlockAllLocks(); + offsetlock.unlock(); tmp.setLocaloffset(tmp.getLocaloffset() + result); } - + return result; } } else { // add to the readers list + me.addFile(this, 0); return read(b); } @@ -449,14 +420,16 @@ public class TransactionalFile implements Comparable{ if (me == null) // not a transaction - { + { + non_Transactional_Write(data); return; } - - if (me.getGlobaltoLocalMappings().containsKey(this)) - { + if (me.getGlobaltoLocalMappings().containsKey(this)) // + { + + Byte[] by = new Byte[size]; for (int i = 0; i < size; i++) { by[i] = Byte.valueOf(data[i]); @@ -469,7 +442,7 @@ public class TransactionalFile implements Comparable{ } else dummy = new Vector(); - + dummy.add(new WriteOperations(by, new Range(tmp.getLocaloffset(), tmp.getLocaloffset() + by.length), tmp.isUnknown_inital_offset_for_write(), this, tmp)); me.getWriteBuffer().put(this.inode, dummy); @@ -479,15 +452,16 @@ public class TransactionalFile implements Comparable{ tmp.setLocaloffset(tmp.getLocaloffset() + by.length); me.merge_for_writes_done.put(inode, Boolean.FALSE); + if (!(tmp.isUnknown_inital_offset_for_write())){ markAccessedBlocks(me, loffset, size, BlockAccessModesEnum.WRITE); + } - if (tmp.getOffsetdependency() == OffsetDependency.NO_ACCESS) tmp.offsetdependency = OffsetDependency.WRITE_DEPENDENCY_1; - + } else { - me.addFile(this, 0); + me.addFile(this, 0); write(data); } } @@ -515,44 +489,64 @@ public class TransactionalFile implements Comparable{ map.put(Integer.valueOf(i), mode); } } - } - - - // reads the data directly from file, + private int readFromFile(ExtendedTransaction me, byte[] readdata, TransactionLocalFileAttributes tmp) { + int st = FileBlockManager.getCurrentFragmentIndexofTheFile(tmp.getLocaloffset()); int end = FileBlockManager.getTargetFragmentIndexofTheFile(tmp.getLocaloffset(), readdata.length); - BlockDataStructure block = null; - boolean locked = false; + BlockDataStructure block = null; + boolean locked = false; for (int k = st; k <= end; k++) { - lockBlock(me, st, k); - } - if (me.getStatus() == Status.ABORTED) { - for (int i=st; i<=end; i++){ - block = this.inodestate.getBlockDataStructure(Integer.valueOf(i)); + while (me.getStatus() == Status.ACTIVE) { + + block = this.inodestate.getBlockDataStructure(Integer.valueOf(k)); + + block.getLock().readLock().lock(); + + if (!(block.getReaders().contains(me))){ + block.getReaders().add(me); + } + locked = true; + break; + } + if (me.getStatus() == Status.ABORTED) { + int m; + if (locked){ + m = k+1; + } + else + m = k; + for (int i=st; i