From 221182228ec238b23d8ff64afdc1984a9c9b2754 Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 31 Mar 2007 12:13:29 +0000 Subject: [PATCH] server portions done except a few things added more testcases for debugging fixed several bugs TODO : Handle events at server side when it sees TRANS_COMMIT and send ACK to Coordinator Handle special cases such as leader election --- Robust/src/Runtime/DSTM/interface/dstm.h | 32 +- .../src/Runtime/DSTM/interface/dstmserver.c | 443 +++++++++--------- Robust/src/Runtime/DSTM/interface/mlookup.c | 80 ---- Robust/src/Runtime/DSTM/interface/plookup.c | 7 +- .../src/Runtime/DSTM/interface/testclient.c | 16 +- .../src/Runtime/DSTM/interface/testserver.c | 3 + Robust/src/Runtime/DSTM/interface/trans.c | 178 +++++-- 7 files changed, 390 insertions(+), 369 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 047626fa..cb3a9bd0 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -13,15 +13,18 @@ #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9 //Participant Messages -#define OBJECT_FOUND 10 -#define OBJECT_NOT_FOUND 11 -#define OBJECTS_FOUND 12 -#define OBJECTS_NOT_FOUND 13 -#define TRANS_AGREE 14 -#define TRANS_DISAGREE 15 -#define TRANS_AGREE_BUT_MISSING_OBJECTS 16 -#define TRANS_SOFT_ABORT 17 -#define TRANS_SUCESSFUL 18 +#define OBJECT_FOUND 10 +#define OBJECT_NOT_FOUND 11 +#define OBJECTS_FOUND 12 +#define OBJECTS_NOT_FOUND 13 +#define OBJ_LOCKED_BUT_VERSION_MATCH 14 +#define OBJ_UNLOCK_BUT_VERSION_MATCH 15 +#define VERSION_NO_MATCH 16 +#define TRANS_AGREE 17 +#define TRANS_DISAGREE 18 +#define TRANS_AGREE_BUT_MISSING_OBJECTS 19 +#define TRANS_SOFT_ABORT 20 +#define TRANS_SUCESSFUL 21 #include #include @@ -97,6 +100,12 @@ typedef struct thread_data_array { transrecord_t *rec; // To send modified objects }thread_data_array_t; +// Structure to save information about an oid necesaary for the decideControl() +typedef struct objinfo { + unsigned int oid; + int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) +}objinfo_t; + /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -114,15 +123,18 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes /* Prototypes for server portion */ void *dstmListen(); void *dstmAccept(void *); +int readClientReq(int); +int handleTransReq(int, fixed_data_t *, unsigned int *, char *, void *); /* end server portion */ /* Prototypes for transactions */ transrecord_t *transStart(); objheader_t *transRead(transrecord_t *record, unsigned int oid); objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid +int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides what response to send to the participant void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins int transCommit(transrecord_t *record); //return 0 if successful -int decideResponse(thread_data_array_t *tdata, char *buffer, int sd);// Coordinator decides what response to send to the participant +void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); /* end transactions */ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index efa0efb7..b1a89d8e 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -78,254 +78,253 @@ void *dstmListen() void *dstmAccept(void *acceptfd) { - int numbytes,i,choice, oid; + int numbytes,i, val; + unsigned int oid; char buffer[RECEIVE_BUFFER_SIZE], control; + char *ptr; void *srcObj; objheader_t *h; + int fd_flags = fcntl((int)acceptfd, F_GETFD), size; printf("Recieved connection: fd = %d\n", (int)acceptfd); - while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0) - { - printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes); - control = buffer[0]; - switch(control) { - case READ_REQUEST: - printf("DEBUG -> READ_REQUEST\n"); - oid = *((int *)(buffer+1)); -#ifdef DEBUG1 - printf("DEBUG -> Received oid is %d\n", oid); -#endif - srcObj = mhashSearch(oid); - h = (objheader_t *) srcObj; - if (h == NULL) { - buffer[0] = OBJECT_NOT_FOUND; - } else { - buffer[0] = OBJECT_FOUND; - size = sizeof(objheader_t) + sizeof(classsize[h->type]); - memcpy(buffer+1, srcObj, size); - } -#ifdef DEBUG1 - printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type); -#endif + recv((int)acceptfd, &control, sizeof(char), 0); + switch(control) { + case READ_REQUEST: + recv((int)acceptfd, &oid, sizeof(unsigned int), 0); + srcObj = mhashSearch(oid); + h = (objheader_t *) srcObj; + if (h == NULL) { + buffer[0] = OBJECT_NOT_FOUND; + } else { + buffer[0] = OBJECT_FOUND; + size = sizeof(objheader_t) + sizeof(classsize[h->type]); + memcpy(buffer+1, srcObj, size); + } + if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) { + perror(""); + } + break; + + case READ_MULT_REQUEST: + printf("DEBUG-> READ_MULT_REQUEST\n"); + break; + + case MOVE_REQUEST: + printf("DEBUG -> MOVE_REQUEST\n"); + break; + + case MOVE_MULT_REQUEST: + printf("DEBUG -> MOVE_MULT_REQUEST\n"); + break; + + case TRANS_REQUEST: + //printf("DEBUG -> TRANS_REQUEST\n"); + if((val = readClientReq((int)acceptfd)) == 1) { + printf("Error in readClientReq\n"); + } + break; + + default: + printf("Error receiving\n"); + } + + //Read for new control message from Coordiator + recv((int)acceptfd, &control, sizeof(char), 0); + switch(control) { + case TRANS_ABORT: + printf("DEBUG -> TRANS_ABORT\n"); + write((int)acceptfd, &control, sizeof(char)); + break; + + case TRANS_COMMIT: + printf("DEBUG -> TRANS_COMMIT\n"); + write((int)acceptfd, &control, sizeof(char)); + //TODO + //change ptr address in mhash table + //unlock objects + //update object version + //change reference count of older address?? + //free space in objstr ?? + //Update location lookup table + break; + } - if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) { - perror(""); - } - break; - case READ_MULT_REQUEST: - printf("DEBUG-> READ_MULT_REQUEST\n"); - break; - case MOVE_REQUEST: - printf("DEBUG -> MOVE_REQUEST\n"); - break; - case MOVE_MULT_REQUEST: - printf("DEBUG -> MOVE_MULT_REQUEST\n"); - break; - case TRANS_REQUEST: - printf("DEBUG -> TRANS_REQUEST\n"); - printf("Client sent %d\n",buffer[0]); - // handleTransReq(acceptfd, buffer); - break; - case TRANS_ABORT: - printf("DEBUG -> TRANS_ABORT\n"); - break; - case TRANS_COMMIT: - printf("DEBUG -> TRANS_COMMIT\n"); - printf("Client sent %d\n",buffer[0]); - //TODO copy the objects into the machine - /*copy the object into the object store from its old - location in the objstore(pointer to its header is already stored before)*/ - break; - default: - printf("Error receiving"); - } - //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); - //printf("%s", buffer); - } if (close((int)acceptfd) == -1) { perror("close"); } else printf("Closed connection: fd = %d\n", (int)acceptfd); + pthread_exit(NULL); } -//TOOD put __FILE__ __LINE__ for all error conditions -#if 0 -int handleTransReq(int acceptfd, char *buf) { - short numread = 0, nummod = 0; - char control; - int offset = 0, size,i; - int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0; - objheader_t *headptr = NULL; - objstr_t *tmpholder; - void *top, *mobj; +int readClientReq(int acceptfd) { + char *ptr, control; + void *modptr; + objheader_t *h, tmp_header; + fixed_data_t fixed; + int sum = 0, N, n; + + // Read fixed_data + N = sizeof(fixed) - 1; + ptr = (char *)&fixed;; + fixed.control = TRANS_REQUEST; + do { + n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0); + // printf("DEBUG -> 1. Reading %d bytes \n", n); + sum += n; + } while(sum < N && n != 0); + + //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes); + // Read list of mids + int mcount = fixed.mcount; + N = mcount * sizeof(unsigned int); + unsigned int listmid[mcount]; + ptr = (char *) listmid; + sum = 0; + do { + n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0); + // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N); + sum += n; + } while(sum < N && n != 0); + + // Read oid and version tuples + int numread = fixed.numread; + N = numread * (sizeof(unsigned int) + sizeof(short)); + char objread[N]; + sum = 0; + do { + n = recv((int)acceptfd, (void *) objread, N, 0); + // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N); + sum += n; + } while(sum < N && n != 0); + //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18)); + + // Read modified objects + if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { + // printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); + return 1; + } + sum = 0; + do { + n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); + //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr)); + sum += n; + } while (sum < fixed.sum_bytes && n != 0); + //Send control message as per all votes from the particpants + handleTransReq(acceptfd, &fixed, listmid, objread, modptr); + - char sendbuf[RECEIVE_BUFFER_SIZE]; + + return 0; +} - control = buf[0]; - offset = sizeof(fixed_data_t); - list = *((short *)(buf+offset)); - offset += sizeof(short); - nummod = *((short *)(buf+offset)); - offset += sizeof(short); - if (numread) { - //Make an array to store the object headers for all objects that are only read - if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) { - perror("handleTransReq: Calloc error"); - return 1; +//This function runs a decision after all objects are weighed under one of the 4 possibilities +//and returns the appropriate control message to the Ccordinator +int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) { + short version; + char control, *ptr; + int i; + unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread]; + int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent + void *mobj; + objheader_t *headptr; + objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc) + + //Process each object present in the pile + ptr = modptr; + //Process each oid in the machine pile/ group + for (i = 0; i < fixed->numread + fixed->nummod; i++) { + if (i < fixed->numread) {//Object is read + int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(objread + incr)); + incr += sizeof(unsigned int); + version = *((short *)(objread + incr)); + } else {//Obj is modified + headptr = (objheader_t *) ptr; + oid = headptr->oid; + version = headptr->version; + ptr += sizeof(objheader_t) + classsize[headptr->type]; } - //Process each object id that is only read - for (i = 0; i < numread; i++) { - objheader_t *tmp; - tmp = (objheader_t *) (buf + offset); - //find if object is still present in the same machine since TRANS_REQUEST - if ((mobj = mhashSearch(tmp->oid)) == NULL) { - objnotfound++; - /* - sendbuf[0] = OBJECT_NOT_FOUND; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); + //Check if object is still present in the machine since the beginning of TRANS_REQUEST + if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found + objinfo[i].poss_val = OBJECT_NOT_FOUND; + //Save the oids not found for later use + oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; + objnotfound++; + } else { // If obj found in machine (i.e. has not moved) + //Check if obj is locked + if ((((objheader_t *)mobj)->status & LOCK) == LOCK) { + if (version == ((objheader_t *)mobj)->version) { // If version match + objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH; + v_matchlock++; + } else {//If versions don't match ..HARD ABORT + objinfo[i].poss_val = VERSION_NO_MATCH; + v_nomatch++; + //send TRANS_DISAGREE to Coordinator + control = TRANS_DISAGREE; + write(acceptfd, &control, sizeof(char)); + //TODO when TRANS_DISAGREE is sent + //Free space allocated in main objstore + //Unlock objects that was locked in the trans + return 0; } - */ - } else { // If obj found in machine (i.e. has not moved) - //Check if obj is locked - if ((((objheader_t *)mobj)->status >> 3) == 1) { - //Check version of the object - if (tmp->version == ((objheader_t *)mobj)->version) {//If version match - transdis++; - /* - sendbuf[0] = TRANS_DISAGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } else {//If versions don't match ..HARD ABORT - transabort++; - /* - sendbuf[0] = TRANS_DISAGREE_ABORT; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } - } else {// If object not locked then lock it - ((objheader_t *)mobj)->status |= LOCK; - if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match - transagree++; - /* - sendbuf[0] = TRANS_AGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } else {//If versions don't match - transabort++; - /* - sendbuf[0] = TRANS_DISAGREE_ABORT; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } + } else {//Obj is not locked , so lock object + ((objheader_t *)mobj)->status |= LOCK; + //Save all object oids that are locked on this machine during this transaction request call + oidlocked[objlocked] = ((objheader_t *)mobj)->oid; + objlocked++; + if (version == ((objheader_t *)mobj)->version) { //If versions match + objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH; + v_matchnolock++; + } else { //If versions don't match + objinfo[i].poss_val = VERSION_NO_MATCH; + v_nomatch++; + //send TRANS_DISAGREE to Coordinator + control = TRANS_DISAGREE; + write(acceptfd, &control, sizeof(char)); + return 0; } - } - memcpy(headptr, buf+offset, sizeof(objheader_t)); - offset += sizeof(objheader_t); + } } } - if (nummod) { - if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) { - perror("handleTransReq: Calloc error"); - return 1; - } - - //Process each object id that is only modified - for(i = 0; i < nummod; i++) { - objheader_t *tmp; - tmp = (objheader_t *)(buf + offset); - //find if object is still present in the same machine since TRANS_REQUEST - if ((mobj = mhashSearch(tmp->oid)) == NULL) { - objnotfound++; - /* - sendbuf[0] = OBJECT_NOT_FOUND; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } else { // If obj found in machine (i.e. has not moved) - //Check if obj is locked - if ((((objheader_t *)mobj)->status >> 3) == 1) { - //Check version of the object - if (tmp->version == ((objheader_t *)mobj)->version) {//If version match - transdis++; - /* - sendbuf[0] = TRANS_DISAGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } else {//If versions don't match ..HARD ABORT - transabort++; - /* - sendbuf[0] = TRANS_DISAGREE_ABORT; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } - } else {// If object not locked then lock it - ((objheader_t *)mobj)->status |= LOCK; - if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match - transagree++; - /* - sendbuf[0] = TRANS_AGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } else {//If versions don't match - transabort++; - /* - sendbuf[0] = TRANS_DISAGREE_ABORT; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - */ - } - } - } - size = sizeof(objheader_t) + classsize[tmp->type]; - if ((top = objstrAlloc(tmpholder, size)) == NULL) { - perror("handleTransReq: Calloc error"); - return 1; - } - memcpy(top, buf+offset, size); - offset += size; - } + //Decide what control message(s) to send + if(v_matchnolock == fixed->numread + fixed->nummod) { + //send TRANS_AGREE to Coordinator + control = TRANS_AGREE; + write(acceptfd, &control, sizeof(char)); } - /* - if(transabort > 0) { - sendbuf[0] = TRANS_DISAGREE_ABORT; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - } else if(transagree == numread+nummod) { - sendbuf[0] = TRANS_AGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } - } else { - sendbuf[0] = TRANS_DISAGREE; - if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { - perror(""); - } + if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) { + //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator + control = TRANS_AGREE_BUT_MISSING_OBJECTS; + write(acceptfd, &control, sizeof(char)); + //send missing oids and number of oids not found with it + write(acceptfd, &objnotfound, sizeof(int)); + write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); } - */ + + if(v_matchlock > 0 && v_nomatch == 0) { + //send TRANS_SOFT_ABORT to Coordinator + control = TRANS_SOFT_ABORT; + write(acceptfd, &control, sizeof(char)); + //send missing oids and number of oids not found with it + write(acceptfd, &objnotfound, sizeof(int)); + write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); + } + + //TODO when TRANS_DISAGREE is sent + //Free space allocated in main objstore + //Unlock objects that was locked in the trans + if(control == TRANS_DISAGREE) { + for(i = 0; i< objlocked ; i++) { + mobj = mhashSearch(oidlocked[i]);// find the header address + ((objheader_t *)mobj)->status &= ~(LOCK); + } + } return 0; } -#endif diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index bcee6650..8bc046dc 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -187,83 +187,3 @@ unsigned int mhashResize(unsigned int newsize) { return 0; } -#if 0 -// Hash Resize -vkey resize(obj_addr_table_t * table){ - int newCapacity = 2*(table->size) + 1; - obj_listnode_t **old; - //if ((table->hash = (obj_listnode_t **) malloc(sizeof(obj_listnode_t *)*size)) == NULL) { -} - -// Hashing for the Key -int hashKey(unsigned int key, obj_addr_table_t *table) { - // hash32shiftmult - int c2=0x27d4eb2d; // a prime or an odd constant - key = (key ^ 61) ^ (key >> 16); - key = key + (key << 3); - key = key ^ (key >> 4); - key = key * c2; - key = key ^ (key >> 15); - printf("The bucket number is %d\n", key % (table->size)); - return (key % (table->size)); -} - -//Add key and its address to the new ob_listnode_t -vkey addKey(unsigned int key, objheader_t *ptr, obj_addr_table_t *table) { - int index; - obj_listnode_t *node; - - table->numelements++; - if(table->numelements > (table->loadfactor * table->size)){ - //TODO : check if table is nearly full and then resize - } - - index = hashKey(key,table); - if ((node = (obj_listnode_t *) malloc(sizeof(obj_listnode_t))) == NULL) { - printf("Malloc error %s %d\n", __FILE__, __LINE__); - exit(-1); - } - node->key = key; - node->object = ptr; - node->next = table->hash[index]; - table->hash[index] = node; - return; -} -// Get the address of the object header for a given key -objheader_t *findKey(unsigned int key, obj_addr_table_t *table) { - int index; - obj_listnode_t *ptr; - - index = hashKey(key,table); - ptr = table->hash[index]; - while(ptr != NULL) { - if (ptr->key == key) { - return ptr->object; - } - ptr = ptr->next; - } - return NULL; -} -// Remove the pointer to the object header from a linked list of obj_listnode_t given an key -int removeKey(unsigned int key, obj_addr_table_t *table) { - int index; - obj_listnode_t *curr, *prev; // prev points to previous node and curr points to the node to be deleted - - index = hashKey(key,table); - prev = curr = table->hash[index]; - for (; curr != NULL; curr = curr->next) { - if (curr->key == key) { // Find a match in the hash table - table->numelements--; - prev->next = curr->next; - if (table->hash[index] == curr) { // Special case when there is one element pointed by the hash table - table->hash[index] = NULL; - } - free(curr); - return 0; - } - prev = curr; - } - return -1; -} - -#endif diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 47c3e012..97190542 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -37,14 +37,14 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi //Add oid into a machine that is a part of the pile linked list structure while(tmp != NULL) { if (tmp->mid == mid) { - if ((headeraddr->status >> 1) == 1) { + if ((headeraddr->status & DIRTY) == 1) { tmp->oidmod[tmp->nummod] = headeraddr->oid; tmp->nummod = tmp->nummod + 1; tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type]; } else { tmp->oidread[tmp->numread] = headeraddr->oid; offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; - memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int)); + memcpy(tmp->objread + offset, &headeraddr->oid, sizeof(unsigned int)); offset += sizeof(unsigned int); memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short)); tmp->numread = tmp->numread + 1; @@ -60,13 +60,14 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi return NULL; } ptr->mid = mid; - if ((headeraddr->status >> 1) == 1) { + if ((headeraddr->status & DIRTY) == 1) { ptr->oidmod[ptr->nummod] = headeraddr->oid; ptr->nummod = ptr->nummod + 1; ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type]; } else { ptr->oidread[ptr->numread] = headeraddr->oid; memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int)); + //printf("DEBUG -> objread oid is %d\n", *(ptr->objread)); memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); ptr->numread = ptr->numread + 1; } diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index 6d1c34e1..0df85e25 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -1,5 +1,6 @@ #include #include "dstm.h" +#include "llookup.h" int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; @@ -45,19 +46,22 @@ int test2(void) { record = transStart(); printf("DEBUG -> Init done\n"); h1 = transRead(record, 1); -// printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]); + lhashInsert(h1->oid, 1); h2 = transRead(record, 2); -// printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]); + lhashInsert(h2->oid, 1); h3 = transRead(record, 3); -// printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); + lhashInsert(h3->oid, 1); h4 = transRead(record, 4); -// printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); + lhashInsert(h4->oid, 1); h4->status |= DIRTY; h5 = transRead(record, 5); -// printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]); + lhashInsert(h5->oid, 1); h6 = transRead(record, 6); -// printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]); + lhashInsert(h6->oid, 1); h6->status |= DIRTY; + + + transCommit(record); } diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 22672f46..6e378e97 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -16,6 +16,9 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) { memcpy(tmp, header, size); mhashInsert(tmp->oid, tmp); lhashInsert(tmp->oid, 1); + //Lock oid 3 object +// if(tmp->oid == 3) +// tmp->status |= LOCK; return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 19333829..96bbdf9f 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -64,50 +64,122 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) tmp->type = type; tmp->version = 1; tmp->rcount = 0; //? not sure how to handle this yet + tmp->status = 0; tmp->status |= NEW; chashInsert(record->lookupTable, tmp->oid, tmp); return tmp; } - -int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) { - int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0; +//TODO Change the values inside write() to exact size of the message being sent +//int decideResponse(thread_data_array_t *tdata, char *control, int sd) { +int decideResponse(thread_data_array_t *tdata, int sd) { + int i, j, n, N, sum, oidcount = 0, transagree = 0, transdisagree = 0, transsoftabort = 0, transmiss = 0; + char ctrl, control, *ptr; + unsigned int *oidnotfound[tdata->pilecount]; + objheader_t *header; //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { - //Check in any DISAGREE has come - if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) { - //Send abort - transabort++; - buffer[0] = TRANS_ABORT; - if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { - perror("Error sending message for thread"); - return 1; - } - } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) { - transagree++; - } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) { - transmiss++; - } else - transsoftabort++; + //Switch case + control = tdata->recvmsg[i].rcv_status; + switch(control) { + case TRANS_DISAGREE: + printf("DEBUG-> Inside TRANS_DISAGREE\n"); + transdisagree++; + //Free transaction records + objstrDelete(tdata->rec->cache); + chashDelete(tdata->rec->lookupTable); + //send Abort + ctrl = TRANS_ABORT; + if (write(sd, &ctrl, sizeof(char)) < 0) { + perror("Error sending ctrl message for participant\n"); + return 1; + } + break; + + case TRANS_AGREE: + printf("DEBUG-> Inside TRANS_AGREE\n"); + transagree++; + break; + + case TRANS_SOFT_ABORT: + printf("DEBUG-> Inside TRANS_SOFT_ABORT\n"); + transsoftabort++; + //Read list of objects missing + read(sd, &oidcount, sizeof(int)); + N = oidcount * sizeof(unsigned int); + if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + } + ptr = (char *) oidnotfound[i]; + do { + n = read(sd, ptr+sum, N-sum); + sum += n; + } while(sum < N && n !=0); + + break; + + case TRANS_AGREE_BUT_MISSING_OBJECTS: + printf("DEBUG-> Inside TRANS_AGREE_BUT_MISSING_OBJECTS\n"); + transmiss++; + //Read list of objects missing + read(sd, &oidcount, sizeof(int)); + N = oidcount * sizeof(unsigned int); + if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + } + ptr = (char *) oidnotfound[i]; + do { + n = read(sd, ptr+sum, N-sum); + sum += n; + } while(sum < N && n !=0); + + + break; + default: + printf("Participant sent unknown message\n"); + } + } + + //For Debug purposes + for(i=0 ; i< tdata->pilecount; i++) { + for(j=0 ; j < oidcount; j++) { + printf("DEBUG-> Oid %d missing for pilecount: %d\n", oidnotfound[j], i+1); + } } + + //Decide what control message to send to Participant if(transagree == tdata->pilecount){ //Send Commit - buffer[0] = TRANS_COMMIT; - if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { - perror("Error sending message for thread"); + ctrl = TRANS_COMMIT; + if (write(sd, &ctrl, sizeof(char)) < 0) { + perror("Error sending ctrl message for participant\n"); return 1; } } - if(transsoftabort > 0 && transabort == 0) { + + if(transsoftabort > 0 && transdisagree == 0) { //Send abort but retry commit + ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; + if (write(sd, &ctrl, sizeof(char)) < 0) { + perror("Error sending ctrl message for participant\n"); + return 1; + } + //lookup objects and then retry commit + //set up a new connection readClientReq() + //rebuilt the pile and llookup table //i.e. wait at the participant end and then resend either agree or disagree - // - } - if(transmiss > 0 && transsoftabort == 0 && transabort == 0) { + if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) { //Relookup all missing objects //send missing mising object/ objects } + + //Free pointers + for(i=0 ; i< tdata->pilecount; i++) { + free(oidnotfound[i]); + } + + return 0; } void *transRequest(void *threadarg) { @@ -138,40 +210,44 @@ void *transRequest(void *threadarg) { //Multiple writes for sending packets of data //Send first few fixed bytes of the TRANS_REQUEST protocol - printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control); - printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t)); - if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) { + printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control); +// printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t)); +// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes); + if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) { perror("Error sending fixed bytes for thread"); return NULL; } //Send list of machines involved in the transaction - printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); +// printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) { perror("Error sending list of machines for thread"); return NULL; } //Send oids and version number tuples for objects that are read - printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount); - if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) { +// printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); +// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); + if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) { perror("Error sending tuples for thread"); return NULL; } //Send objects that are modified - for( i = 0; i < tdata->buffer->f.nummod ; i++) { + for(i = 0; i < tdata->buffer->f.nummod ; i++) { headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]); - if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { +// printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]); + if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { perror("Error sending obj modified for thread"); return NULL; } } - //Read message from participant side - while(n != 0) { - n = read(sd, buffer, sizeof(buffer)); - } - //process the participant's request - recvcontrol = buffer[0]; + //Read message control message from participant side + n = read(sd, &control, sizeof(char)); + recvcontrol = control; + printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol); +// while(n != 0) { +// n = read(sd, buffer, sizeof(buffer)); +// } + //Update common data structure and increment count tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; //Lock and update count @@ -181,8 +257,9 @@ void *transRequest(void *threadarg) { if(*(tdata->count) == tdata->pilecount) { pthread_cond_broadcast(tdata->threshold); - if (decideResponse(tdata, buffer, sd) == 1) { - printf("decideResponse returned error\n"); + //process the participant's request + if (decideResponse(tdata, sd) == 1) { + printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__); return NULL; } } else { @@ -196,7 +273,7 @@ void *transRequest(void *threadarg) { int transCommit(transrecord_t *record) { chashlistnode_t *curr, *ptr, *next; unsigned int size;//Represents number of bins in the chash table - unsigned int machinenum, tot_bytes_mod; + unsigned int machinenum, tot_bytes_mod, *listmid; objheader_t *headeraddr; plistnode_t *tmp, *pile = NULL; int i, rc; @@ -232,7 +309,7 @@ int transCommit(transrecord_t *record) { } //Make machine groups if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { - perror("pInsert calloc error"); + printf("pInsert error %s, %d\n", __FILE__, __LINE__); return 1; } curr = next; @@ -259,7 +336,11 @@ int transCommit(transrecord_t *record) { pthread_cond_init(&tcond, NULL); //Keep track of list of machine ids per transaction - unsigned int *listmid = calloc(pilecount, sizeof(unsigned int)); + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); //Process each machine group while(tmp != NULL) { @@ -268,7 +349,7 @@ int transCommit(transrecord_t *record) { newtid++; trans_req_data_t *tosend; if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - perror(""); + printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 1; } tosend->f.control = TRANS_REQUEST; @@ -296,7 +377,8 @@ int transCommit(transrecord_t *record) { return 1; } numthreads++; - //TODO frees ? + //TODO frees + free(tosend); tmp = tmp->next; } @@ -353,7 +435,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { control = READ_REQUEST; buffer[0] = control; memcpy(buffer+1, &oid, sizeof(int)); - if (write(sd, buffer, sizeof(buffer)) < 0) { + if (write(sd, buffer, sizeof(int) + 1) < 0) { perror("Error sending message"); return NULL; } -- 2.34.1