Random random;
String hname;
- barr = new Barrier("128.195.175.79");
+ barr = new Barrier("128.195.175.84");
atomic {
iteration = numIter;
degree = numDegree;
long start0 = System.currentTimeMillis();
int numThreads = em.numThreads;
int[] mid = new int[4];
- mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8
- mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9
- mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7
- mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5
+ mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-8
+ mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-9
+ mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-7
+ mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-5
System.printString("DEBUG -> numThreads = " + numThreads+"\n");
BarrierServer mybarr;
}
int[] mid = new int[4];
- mid[0] = (128<<24)|(195<<16)|(175<<8)|79; //dw-8
- mid[1] = (128<<24)|(195<<16)|(175<<8)|80; //dw-9
- mid[2] = (128<<24)|(195<<16)|(175<<8)|78; //dw-7
- mid[3] = (128<<24)|(195<<16)|(175<<8)|73; //dw-5
+ mid[0] = (128<<24)|(195<<16)|(175<<8)|84; //dw-10
+ mid[1] = (128<<24)|(195<<16)|(175<<8)|85; //dw-11
+ mid[2] = (128<<24)|(195<<16)|(175<<8)|86; //dw-12
+ mid[3] = (128<<24)|(195<<16)|(175<<8)|87; //dw-13
int p, q, r;
MatrixMultiply[] mm;
MatrixMultiply tmp;
Barrier.enterBarrier(barr);
- /*
//All machines reading data from array
int val;
for(int i=0; i<10000; i++) {
}
}
}
- */
}
public static void main(String[] args) {
}
Barrier.enterBarrier(barr);
-/*
//Write into array elements
Integer val;
for(int j=0; j<10000; j++) {
}
}
}
- */
}
public static void main(String[] args) {
BarrierServer mybarr;
int[] mid = new int[4];
- mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8
- mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9
- mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7
- mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5
+ mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-10
+ mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-11
+ mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-12
+ mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-13
double[][] G;
int num_iterations;
public int JGFvalidate(){
double refval[];
- refval = new double[3];
+ refval = new double[4];
refval[0] = 0.498574406322512;
refval[1] = 1.1234778980135105;
refval[2] = 1.9954895063582696;
class SORRunner extends Thread {
- int id,num_iterations;
+ int id, num_iterations;
double G[][],omega;
int nthreads;
double omega_over_four, one_minus_omega;
int numiterations;
Barrier barr;
- barr = new Barrier("128.195.175.79");
+ barr = new Barrier("128.195.175.84");
int ilow, iupper, slice, tslice, ttslice, Mm1, Nm1;
atomic {
outmethod.println("printf(\"nmhashSearch= %d\\n\", nmhashSearch);");
outmethod.println("printf(\"nprehashSearch= %d\\n\", nprehashSearch);");
outmethod.println("printf(\"nRemoteReadSend= %d\\n\", nRemoteSend);");
+ outmethod.println("printf(\"nSoftAbort= %d\\n\", nSoftAbort);");
outmethod.println("#endif\n");
outmethod.println("}");
outclassdefs.print("extern int nmhashSearch;\n");
outclassdefs.print("extern int nprehashSearch;\n");
outclassdefs.print("extern int nRemoteSend;\n");
+ outclassdefs.print("extern int nSoftAbort;\n");
outclassdefs.print("extern void handle();\n");
outclassdefs.print("#endif\n");
outclassdefs.print("int numprefetchsites = " + pa.prefetchsiteid + ";\n");
char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
+void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
+ int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short);
+void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
+ int *, int *, char *, unsigned int, unsigned short);
/* end server portion */
/* Prototypes for transactions */
unsigned short getObjType(unsigned int oid);
int startRemoteThread(unsigned int oid, unsigned int mid);
plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
+void commitCountForObjRead(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
+void commitCountForObjMod(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
if (fixed->nummod > 0)
free(modptr);
/* Unlock objects that was locked due to this transaction */
+ int useWriteUnlock = 0;
for(i = 0; i< transinfo->numlocked; i++) {
+ if(transinfo->objlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
return 1;
}
- UnLock(STATUSPTR(header));
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(header));
+ } else {
+ read_unlock(STATUSPTR(header));
+ }
}
/* Send ack to Coordinator */
char control = 0, *ptr;
unsigned int oid;
unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
- void *mobj;
objheader_t *headptr;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
- if (i < fixed->numread) { //Objs only read and not modified
- int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ if (i < fixed->numread) { //Objs only read and not modified
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
incr *= i;
oid = *((unsigned int *)(objread + incr));
incr += sizeof(unsigned int);
version = *((unsigned short *)(objread + incr));
- } else { //Objs modified
+ getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+ &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
+ } else { //Objs modified
+ if(i == fixed->numread) {
+ oidlocked[objlocked] = -1;
+ objlocked++;
+ }
int tmpsize;
headptr = (objheader_t *) ptr;
oid = OID(headptr);
version = headptr->version;
GETSIZE(tmpsize, headptr);
ptr += sizeof(objheader_t) + tmpsize;
- }
-
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
- if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if (test_and_set(STATUSPTR(mobj))) {
- //don't have lock
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- oidvernotmatch[objvernotmatch] = oid;
- objvernotmatch++;
- int size;
- GETSIZE(size, mobj);
- size += sizeof(objheader_t);
- numBytes += size;
- /* Send TRANS_DISAGREE to Coordinator */
- control = TRANS_DISAGREE;
- }
- } else { /* If Obj is not locked then lock object */
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- oidvernotmatch[objvernotmatch] = oid;
- objvernotmatch++;
- int size;
- GETSIZE(size, mobj);
- size += sizeof(objheader_t);
- numBytes += size;
- control = TRANS_DISAGREE;
- }
- }
+ getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+ &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
+ &numBytes, &control, oid, version);
}
}
}
#endif
if (objlocked > 0) {
+ int useWriteUnlock = 0;
for(j = 0; j < objlocked; j++) {
+ if(oidlocked[j] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((headptr = mhashSearch(oidlocked[j])) == NULL) {
printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 0;
}
- UnLock(STATUSPTR(headptr));
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(headptr));
+ } else {
+ read_unlock(STATUSPTR(headptr));
+ }
}
free(oidlocked);
}
return control;
}
+
+/* Update Commit info for objects that are read */
+void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+ unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
+ int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
+ char *control, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*objnotfound] = oid;
+ (*objnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+ if (version == ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_matchnolock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *control = TRANS_DISAGREE;
+ }
+ //Keep track of oid locked
+ oidlocked[*objlocked] = OID(((objheader_t *)mobj));
+ (*objlocked)++;
+ } else { //we are locked
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
+ *control = TRANS_DISAGREE;
+ }
+ }
+ }
+}
+
+/* Update Commit info for objects that are read */
+void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+ int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
+ int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*objnotfound] = oid;
+ (*objnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
+ if (version == ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_matchnolock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *control = TRANS_DISAGREE;
+ }
+ //Keep track of oid locked
+ oidlocked[*objlocked] = OID(((objheader_t *)mobj));
+ (*objlocked)++;
+ } else { /* Some other transaction has aquired a write lock on this object */
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
+ *control = TRANS_DISAGREE;
+ }
+ }
+ }
+}
+
/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
* to send to Coordinator based on the votes of oids involved in the transaction */
char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
free(modptr);
/* Unlock locked objects */
+ int useWriteUnlock = 0;
for(i = 0; i < numlocked; i++) {
+ if(oidlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(header));
+ } else {
+ read_unlock(STATUSPTR(header));
+ }
}
//TODO Update location lookup table
} else {
/* Check to see if versions are same */
checkversion:
- if (test_and_set(STATUSPTR(header))==0) {
- //have lock
+ if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
newversion = header->version;
if(newversion == *(versionarry + i)) {
//Add to the notify list
printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
return;
}
- UnLock(STATUSPTR(header));
+ write_unlock(STATUSPTR(header));
} else {
- UnLock(STATUSPTR(header));
+ write_unlock(STATUSPTR(header));
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("processReqNotify():socket()");
return;
extern int nmhashSearch;
extern int nprehashSearch;
extern int nRemoteSend;
+extern int nSoftAbort;
extern int numprefetchsites;
void handle();
extern pfcstats_t *evalPrefetch;
printf("nmhashSearch = %d\n", nmhashSearch);
printf("nprehashSearch = %d\n", nprehashSearch);
printf("nRemoteReadSend = %d\n", nRemoteSend);
+ printf("nSoftAbort = %d\n", nSoftAbort);
//TODO Remove later
/*
int i;
#include "addUdpEnhance.h"
#include "addPrefetchEnhance.h"
#include "gCollect.h"
+#include "dsmlock.h"
#ifdef COMPILER
#include "thread.h"
#endif
int nmhashSearch = 0;
int nprehashSearch = 0;
int nRemoteSend = 0;
+int nSoftAbort = 0;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
t = time(NULL);
req.tv_sec = 0;
- req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+ req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
nanosleep(&req, NULL);
return;
}
free(thread_data_array);
free(ltdata);
randomdelay();
+#ifdef TRANSSTATS
+ nSoftAbort++;
+#endif
}
/* Retry trans commit procedure during soft_abort case */
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
pthread_exit(NULL);
}
pthread_mutex_unlock(&prefetchcache_mutex);
int numread, i;
unsigned int oid;
unsigned short version;
- void *mobj;
- objheader_t *headptr;
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+ //setting a divider of read locks
+ //and write locks
numread = localtdata->tdata->buffer->f.numread;
/* Process each oid in the machine pile/ group per thread */
incr *= i;
oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+ commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
} else { // Objects Modified
+ if(i == localtdata->tdata->buffer->f.numread) {
+ oidlocked[numoidlocked] = -1;
+ numoidlocked++;
+ }
int tmpsize;
+ objheader_t *headptr;
headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
if (headptr == NULL) {
printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
}
oid = OID(headptr);
version = headptr->version;
+ commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
}
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ }
- /* Save the oids not found and number of oids not found for later use */
- if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[numoidnotfound] = oid;
- numoidnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if (test_and_set(STATUSPTR(mobj))) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- break;
- }
- } else {
- //we're locked
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
- numoidlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- break;
- }
- }
- }
- } // End for
- /* Condition to send TRANS_AGREE */
+ /* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
}
localtdata->transinfo->modptr = NULL;
localtdata->transinfo->numlocked = numoidlocked;
localtdata->transinfo->numnotfound = numoidnotfound;
+
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
}
pthread_mutex_unlock(localtdata->tdata->lock);
+
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
free(localtdata->transinfo->objnotfound);
}
pthread_exit(NULL);
+
+}
+
+/* Commit info for objects modified */
+void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+ if (version == ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ return;
+ }
+ } else { //A lock is acquired some place else
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ return;
+ }
+ }
+ }
+}
+
+/* Commit info for objects modified */
+void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ return;
+ }
+ } else { //Has reached max number of readers or some other transaction
+ //has acquired a lock on this object
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ return;
+ }
+ }
+ }
}
/* This function completes the ABORT process if the transaction is aborting */
numlocked = localtdata->transinfo->numlocked;
objlocked = localtdata->transinfo->objlocked;
+ int useWriteUnlock = 0;
for (i = 0; i < numlocked; i++) {
+ if(objlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = mhashSearch(objlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
return 0;
return 1;
}
pthread_mutex_unlock(&mainobjstore_mutex);
+ /* Initialize read and write locks */
+ initdsmlocks(STATUSPTR(header));
memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
/* Unlock locked objects */
+ int useWriteUnlock = 0;
for(i = 0; i < numlocked; i++) {
+ if(oidlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
-
return 0;
}
/* Clear Flags */
STATUS(headeraddr) =0;
+
return pile;
}
then
EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME"
fi
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c $DSMRUNTIME/dsmlock.c"
fi
if $RECOVERFLAG