2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
5 package TransactionalIO.core;
7 import TransactionalIO.exceptions.AbortedException;
8 import TransactionalIO.benchmarks.benchmark;
9 import TransactionalIO.benchmarks.customhandler;
10 import TransactionalIO.benchmarks.customhandler;
11 import TransactionalIO.interfaces.BlockAccessModesEnum;
12 import TransactionalIO.interfaces.ContentionManager;
13 import TransactionalIO.interfaces.TransactionStatu;
14 //import dstm2.file.managers.BaseManager;
15 import java.awt.event.ActionListener;
16 import java.beans.EventHandler;
17 import java.beans.PropertyChangeEvent;
18 import java.beans.PropertyChangeListener;
19 import java.beans.PropertyChangeSupport;
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
27 import java.util.TreeMap;
28 import java.util.Vector;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
39 public class ExtendedTransaction implements TransactionStatu {
41 private native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd);
45 System.load("/home/navid/libkooni.so");
47 private boolean flag = true;
48 public TransactionStatu memorystate;
49 private PropertyChangeSupport changes = new PropertyChangeSupport(this);
52 public TreeMap msg = new TreeMap();
53 public int numberofwrites;
54 public int numberofreads;
58 ABORTED, ACTIVE, COMMITTED
60 private boolean writesmerged = true;
61 //private Vector<ReentrantLock> heldoffsetlocks;
62 private Vector heldoffsetlocks;
63 //private Vector<ReentrantLock> heldblocklocks;
64 private Vector heldblocklocks;
65 //private HashMap<INode, Vector<TransactionalFile>> AccessedFiles;
66 private HashMap AccessedFiles;
67 //private HashMap<INode, HashMap<Integer, BlockAccessModesEnum> > accessedBlocks;
68 private HashMap accessedBlocks;
69 //private HashMap<TransactionalFile, TransactionLocalFileAttributes> LocaltoGlobalMappings;
70 private HashMap GlobaltoLocalMappings;
71 public HashMap merge_for_writes_done;
72 private HashMap writeBuffer;
73 private ContentionManager contentionmanager;
74 private /*volatile*/ Status status;
77 public ExtendedTransaction() {
79 // id = Integer.valueOf(Thread.currentThread().getName().substring(7));
80 heldblocklocks = new Vector();
81 heldoffsetlocks = new Vector();
82 AccessedFiles = new HashMap();
83 GlobaltoLocalMappings = new HashMap/*<TransactionalFile, TransactionLocalFileAttributes >*/();
84 writeBuffer = new HashMap();
85 status = Status.ACTIVE;
86 accessedBlocks = new HashMap();
87 merge_for_writes_done = new HashMap();
89 // setContentionmanager(new BaseManager());
90 // beginTransaction();
94 public ExtendedTransaction(TransactionStatu memorystate) {
97 this.memorystate = memorystate;
100 private int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) {
102 return nativepwrite(buff, offset, buff.length, file.getFD());
103 } catch (IOException ex) {
105 Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
111 public void beginTransaction() {
112 this.addPropertyChangeListener(new customhandler(Status.ABORTED));
115 public void abort() {
116 synchronized (this) {
118 this.status = Status.ABORTED;
119 if (this.memorystate != null && !(this.memorystate).isAborted()) {
120 this.memorystate.abortThisSystem();
127 public Status getStatus() {
131 public boolean isActive() {
132 return this.getStatus() == Status.ACTIVE;
135 public boolean isAborted() {
136 return this.getStatus() == Status.ABORTED;
139 public ContentionManager getContentionmanager() {
140 return contentionmanager;
143 public void setContentionmanager(ContentionManager contentionmanager) {
144 this.contentionmanager = contentionmanager;
147 public HashMap getWriteBuffer() {
151 public HashMap getAccessedFiles() {
152 return AccessedFiles;
155 public boolean isWritesmerged() {
159 public void setWritesmerged(boolean writesmerged) {
160 this.writesmerged = writesmerged;
163 public HashMap getGlobaltoLocalMappings() {
164 return GlobaltoLocalMappings;
167 public HashMap getAccessedBlocks() {
168 return accessedBlocks;
171 public ContentionManager getBlockContentionManager() {
172 return ManagerRepository.getBlockcm();
175 public ContentionManager getOffsetContentionManager() {
176 return ManagerRepository.getOffsetcm();
179 public TreeMap getSortedFileAccessMap(HashMap hmap) {
180 /*TreeMap sortedMap = new TreeMap(hmap);
182 return new TreeMap(hmap);
185 public void setStatus(Status st) {
186 Status oldst = getStatus();
188 this.changes.firePropertyChange("status", oldst, st);
191 public void addFile(TransactionalFile tf, long offsetnumber/*, TransactionLocalFileAttributes tmp*/) {
193 TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offsetnumber/*, tf.getInodestate().commitedfilesize.get()*/);
196 if (AccessedFiles.containsKey(tf.getInode())) {
197 dummy = (Vector) AccessedFiles.get(tf.getInode());
199 dummy = new Vector();
200 AccessedFiles.put(tf.getInode(), dummy);
203 GlobaltoLocalMappings.put(tf, tmp);
204 merge_for_writes_done.put(tf.getInode(), Boolean.TRUE);
207 public boolean lockOffsets() { /// Locking offsets for File Descriptors
210 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
211 Iterator iter = hm.keySet().iterator();
213 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
214 INode key = (INode) iter.next();
216 Vector vec = (Vector) AccessedFiles.get(key);
217 Collections.sort(vec);
218 Iterator it = vec.iterator();
219 while (it.hasNext()) {
220 TransactionalFile value = (TransactionalFile) it.next();
221 while (this.getStatus() == Status.ACTIVE) {
222 value.offsetlock.lock();
224 heldoffsetlocks.add(value.offsetlock);
228 if (this.getStatus() != Status.ACTIVE) {
236 if (this.getStatus() != Status.ACTIVE) {
244 public boolean lockBlock(BlockDataStructure block, BlockAccessModesEnum mode/*, GlobalINodeState adapter, BlockAccessModesEnum mode, int expvalue, INode inode, TransactionLocalFileAttributes tf*/) {
247 if (mode == BlockAccessModesEnum.READ) {
248 lock = block.getLock().readLock();
251 lock = block.getLock().writeLock();
254 while (this.getStatus() == Status.ACTIVE) {
256 heldblocklocks.add(lock);
262 public void prepareCommit() {
263 if (this.status != Status.ACTIVE) {
264 throw new AbortedException();
267 if (!lockOffsets()) {
268 throw new AbortedException();
272 ///////////////////////////
275 Map hm = getWriteBuffer();
277 Iterator iter = hm.keySet().iterator();
278 WriteOperations value;
279 Vector vec = new Vector();
280 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && ok) {
281 INode key = (INode) iter.next();
282 vec = (Vector) hm.get(key);
283 Collections.sort(vec);
284 Iterator it = vec.iterator();
285 while (it.hasNext()) {
287 value = (WriteOperations) it.next();
288 if (value.isUnknownoffset()) {
292 start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
293 end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
294 if (value.getBelongingto().isUnknown_inital_offset_for_write()) {
295 value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber());
296 value.getBelongingto().setUnknown_inital_offset_for_write(false);
299 int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(start);
300 int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(start, value.getRange().getEnd() - value.getRange().getStart());
303 if (this.getAccessedBlocks().get(key) != null) {
304 sset = (TreeMap) this.getAccessedBlocks().get(key);
306 sset = new TreeMap();
307 this.getAccessedBlocks().put(key, sset);
311 for (int i = startblock; i <= targetblock; i++) {
312 if (sset.containsKey(Integer.valueOf(i))) {
313 if (sset.get(Integer.valueOf(i)) != BlockAccessModesEnum.WRITE) {
314 sset.put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
317 sset.put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
321 value.getRange().setStart(start);
322 value.getRange().setEnd(end);
328 Iterator it = this.getAccessedBlocks().keySet().iterator();
329 while (it.hasNext() && (this.getStatus() == Status.ACTIVE)) {
330 INode inode = (INode) it.next();
331 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(inode);
332 TreeMap vec2 = (TreeMap) this.getAccessedBlocks().get(inode);
333 Iterator iter2 = vec2.keySet().iterator();
334 while (iter2.hasNext()) {
335 Integer num = (Integer) iter2.next();
337 BlockDataStructure blockobj = inodestate.getBlockDataStructure(num);
339 ok = this.lockBlock(blockobj, (BlockAccessModesEnum) vec2.get(num));
346 if (this.getStatus() != Status.ACTIVE) {
348 throw new AbortedException();
354 public void commitChanges() {
356 Map hm = getWriteBuffer();
357 Iterator iter = hm.keySet().iterator();
359 WriteOperations writeop;
361 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
362 INode key = (INode) iter.next();
364 vec = (Vector) hm.get(key);
365 Collections.sort(vec);
367 while (it.hasNext()) {
369 writeop = (WriteOperations) it.next();
370 Byte[] data = new Byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
371 byte[] bytedata = new byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
372 data = (Byte[]) writeop.getData();
374 for (int i = 0; i < data.length; i++) {
375 bytedata[i] = data[i];
377 invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file);
381 Iterator k = GlobaltoLocalMappings.keySet().iterator();
382 while (k.hasNext()) {
383 TransactionalFile trf = (TransactionalFile) (k.next());
384 trf.getCommitedoffset().setOffsetnumber(((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocaloffset());
388 public void unlockAllLocks() {
389 Iterator it = heldblocklocks.iterator();
391 while (it.hasNext()) {
393 Lock lock = (Lock) it.next();
396 heldblocklocks.clear();
398 it = heldoffsetlocks.iterator();
399 while (it.hasNext()) {
400 ReentrantLock lock = (ReentrantLock) it.next();
403 heldoffsetlocks.clear();
406 public void abortAllReaders() {
407 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
409 Iterator iter = hm.keySet().iterator();
410 TransactionalFile value;
411 while (iter.hasNext()) {
412 INode key = (INode) iter.next();
413 Vector vec = (Vector) AccessedFiles.get(key);
414 Iterator it = vec.iterator();
415 while (it.hasNext()) {
417 value = (TransactionalFile) it.next();
418 Iterator it2 = value.getCommitedoffset().getOffsetReaders().iterator(); // for visible readers strategy
420 while (it2.hasNext()) {
421 ExtendedTransaction tr = (ExtendedTransaction) it2.next();
426 value.getCommitedoffset().getOffsetReaders().clear();
430 if (accessedBlocks.get(key) != null) {
431 vec2 = (TreeMap) accessedBlocks.get(key);
433 vec2 = new TreeMap();
436 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(key);
437 Iterator it2 = vec2.keySet().iterator();
439 while (it2.hasNext()) {
441 Integer num = (Integer) it2.next();
442 if (vec2.get(num) != BlockAccessModesEnum.READ) {
443 BlockDataStructure blockobj = (BlockDataStructure) inodestate.getBlockDataStructure(num);//lockmap.get(num);
445 Iterator it4 = blockobj.getReaders().iterator(); // from here for visible readers strategy
447 while (it4.hasNext()) {
449 ExtendedTransaction tr = (ExtendedTransaction) it4.next();
454 blockobj.getReaders().clear();
463 public void addPropertyChangeListener(PropertyChangeListener listener) {
464 this.changes.addPropertyChangeListener("status", listener);
467 public void removePropertyChangeListener(PropertyChangeListener listener) {
468 this.changes.removePropertyChangeListener("status", listener);
471 public TransactionStatu getOtherSystem() {
475 public void setOtherSystem(TransactionStatu othersystem) {
476 memorystate = othersystem;
479 public Vector getHeldblocklocks() {
480 return heldblocklocks;
483 public void setHeldblocklocks(Vector heldblocklocks) {
484 this.heldblocklocks = heldblocklocks;
487 public Vector getHeldoffsetlocks() {
488 return heldoffsetlocks;
491 public void setHeldoffsetlocks(Vector heldoffsetlocks) {
492 this.heldoffsetlocks = heldoffsetlocks;
495 public void abortThisSystem() {
499 public boolean isCommitted() {
500 if (this.status == Status.COMMITTED) {
508 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // from here for visible readers strategy
509 while (this.getStatus() == Status.ACTIVE) {
510 if (lock.tryLock()) {
511 Thread.onAbortOnce(new Runnable() {
518 heldblocklocks.add(lock);
520 synchronized (adapter) {
521 block.setOwner(this);
522 // Iterator it = block.getReaders().iterator();
523 // while (it.hasNext())
525 // ExtendedTransaction tr = (ExtendedTransaction) it.next();
532 getBlockContentionManager().resolveConflict(this, block.getOwner());
537 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // versioning strat
538 while (this.getStatus() == Status.ACTIVE) {
539 if (lock.tryLock()) {
540 Thread.onAbortOnce(new Runnable() {
547 heldblocklocks.add(lock);
548 if (mode != BlockAccessModesEnum.WRITE) { egy
549 if (block.getVersion().get() != expvalue) {
554 synchronized (adapter) {
555 block.setOwner(this);
560 getContentionManager().resolveConflict(this, block.getOwner());
566 //expvalue = ((Integer) value.getBlockversions().get(it)).intValue(); //for versioning strategy
567 /*if (!(value.isValidatelocaloffset())) {
568 if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) { //versioning strategy
570 /if (blockobj.getVersion().get() == expvalue) {
572 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
583 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
593 throw new AbortedException();