From 98a591f7faf8ef3095a247a509f615e8f202d102 Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 14 Jun 2007 18:00:14 +0000 Subject: [PATCH] Added new testcases Edited the printf messages to get more information TODO: Fix the Trans_abort case --- Robust/src/Runtime/DSTM/interface/Makefile | 6 +- Robust/src/Runtime/DSTM/interface/dstm.h | 1 + .../src/Runtime/DSTM/interface/dstmserver.c | 65 +-- .../src/Runtime/DSTM/interface/testclient.c | 506 ++++++++++++++++-- .../src/Runtime/DSTM/interface/testserver.c | 220 +++++++- Robust/src/Runtime/DSTM/interface/trans.c | 27 +- 6 files changed, 712 insertions(+), 113 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index ee831a54..6ddba666 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,8 +1,8 @@ d-2: gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c -demksy: - gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c +demsky: + gcc -DDEBUG -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c d-1: gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c @@ -13,4 +13,4 @@ all: gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c clean: - rm d-2 d-1 demsky + rm -rf d-2 d-1 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 19dd0b9f..eb0f655c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -93,6 +93,7 @@ typedef struct trans_req_data { unsigned int *oidmod; }trans_req_data_t; +#define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id); //structure for passing multiple arguments to thread typedef struct thread_data_array { int thread_id; diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 56200e78..5e9aa599 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -93,11 +93,11 @@ void *dstmAccept(void *acceptfd) } switch(control) { case READ_REQUEST: - printf("DEBUG -> Recv READ_REQUEST from Coordinator\n"); if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { perror("Error receiving object from cooridnator\n"); return; } + printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid); srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; size = sizeof(objheader_t) + sizeof(classsize[h->type]); @@ -144,7 +144,7 @@ void *dstmAccept(void *acceptfd) break; case TRANS_REQUEST: - printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n"); + printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd); if((val = readClientReq((int)acceptfd, &transinfo)) != 0) { printf("Error in readClientReq\n"); } @@ -154,13 +154,10 @@ void *dstmAccept(void *acceptfd) printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); } if (close((int)acceptfd) == -1) - { perror("close"); - } - else + else printf("Closed connection: fd = %d\n", (int)acceptfd); - pthread_exit(NULL); printf("DEBUG -> Exiting dstmAccept\n"); } @@ -227,19 +224,22 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } //Send control message as per all votes from all oids in the machine - if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { - printf("Handle req error\n"); + if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) { + printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__); } //Read for new control message from Coordiator if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { - perror("Error in receiving control message"); + printf("DEBUG -> Error receiving control, received %d\n", control); return 1; } + printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control); + fflush(stdout); + switch(control) { case TRANS_ABORT: - printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n"); + printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ; //send ack to coordinator sendctrl = TRANS_SUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { @@ -260,29 +260,30 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } ptr = NULL; return 0; - case TRANS_COMMIT: - printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n"); + printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd); if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); } break; case TRANS_ABORT_BUT_RETRY_COMMIT: - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n"); + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd); //Process again after waiting for sometime and on prev control message sent + sleep(2); switch(prevctrl) { case TRANS_AGREE: sendctrl = TRANS_AGREE; if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { perror("Error sending ACK to coordinator\n"); } - sleep(5); + //sleep(5); break; case TRANS_SOFT_ABORT: if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { - printf("Handle req error\n"); + printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__); } - if(newctrl == prevctrl){ + //If no change in previous control message that was sent then ABORT transaction + if(newctrl == TRANS_SOFT_ABORT){ //Send ABORT newctrl = TRANS_DISAGREE; if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) { @@ -300,8 +301,9 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { ptr = mhashSearch(transinfo->objlocked[i]);// find the header address ((objheader_t *)ptr)->status &= ~(LOCK); } - return 0; - } else { + // return 0; + } else if(newctrl == TRANS_AGREE) { + newctrl = TRANS_AGREE; //Send new control message if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) { perror("Error sending ACK to coordinator\n"); @@ -314,13 +316,14 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { break; case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: //TODO expect another transrequest from client - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n"); + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd); break; default: - printf("No response to TRANS_AGREE OR DISAGREE protocol\n"); + printf("No response to TRANS_AGREE OR DISAGREE control\n"); //TODO Use fixed.trans_id TID since Client may have died break; } + //Free memory printf("DEBUG -> Freeing..."); fflush(stdout); @@ -360,8 +363,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran //Process each object present in the pile ptr = modptr; - //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread); - fflush(stdout); + //Process each oid in the machine pile/ group for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) {//Object is read @@ -396,14 +398,14 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE\n"); + printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd); return control; } } else {//Obj is not locked , so lock object ((objheader_t *)mobj)->status |= LOCK; //Save all object oids that are locked on this machine during this transaction request call oidlocked[objlocked] = ((objheader_t *)mobj)->oid; - printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid); + printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid); objlocked++; if (version == ((objheader_t *)mobj)->version) { //If versions match v_matchnolock++; @@ -415,7 +417,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE\n"); + printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd); return control; } } @@ -430,6 +432,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran printf("No of objs modified but not found = %d\n", objmodnotfound); //Decide what control message(s) to send + //Cond to send TRANS_AGREE if(v_matchnolock == fixed->numread + fixed->nummod) { //send TRANS_AGREE to Coordinator control = TRANS_AGREE; @@ -437,9 +440,9 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran perror("Error in sending control to Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_AGREE\n"); + printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd); } - + //Condition to send TRANS_SOFT_ABORT if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { //send TRANS_SOFT_ABORT to Coordinator control = TRANS_SOFT_ABORT; @@ -447,7 +450,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran perror("Error in sending control back to coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); + printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd); //send number of oids not found and the missing oids if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) { perror("Error in sending no of objects that are not found\n"); @@ -497,11 +500,12 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //Process each modified object saved in the mainobject store for(i=0; inummod; i++) { if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { - printf("mhashserach returns NULL\n"); + printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__); } //change reference count of older address and free space in objstr ?? header->rcount = 1; //Not sure what would be th val //change ptr address in mhash table + printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]); mhashRemove(transinfo->objmod[i]); mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); offset += sizeof(objheader_t) + classsize[header->type]; @@ -519,11 +523,12 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //send ack to coordinator control = TRANS_SUCESSFUL; + //FOR TESTING + printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd); if(send((int)acceptfd, &control, sizeof(char), 0) < 0) { perror("Error sending ACK to coordinator\n"); } - printf("DEBUG-> Completed the pending transaction\n"); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index b27456a9..0cf1cf0a 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -3,20 +3,13 @@ #include "dstm.h" #include "llookup.h" #include "ip.h" -//#include -//#include -//#include + #define LISTEN_PORT 2156 extern objstr_t *mainobjstore; -//extern lhashtable_t llookup; //Global Hash table -//extern mhashtable_t mlookup; //Global Hash table int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; -int test1(void); -int test2(void); - unsigned int createObjects(transrecord_t *record) { objheader_t *header, *tmp; unsigned int size, mid; @@ -80,8 +73,11 @@ int main() // test2(); // test3(); // test4(); - //test5(); - test6(); + test5(); +// test5a(); +// test2a(); +// test2b(); +// test7(); } @@ -144,7 +140,200 @@ int test2(void) { return 0; } -//Test Read objects when objects are not found in any participant + +//Read objects when objects are found in remote location +int test2a(void) { + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header, *h1, *h2; + pthread_t thread_Listen; + pthread_attr_t attr; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 22 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 22; + header->type = 3; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + lhashInsert(31, mid); + lhashInsert(32, mid); + + mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(1, mid); + lhashInsert(2, mid); + lhashInsert(3, mid); + lhashInsert(4, mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Check if machine demsky is up and running + checkServer(mid, "128.200.9.10"); + mid = iptoMid("128.200.9.26"); + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + + // Start Transaction + myTrans = transStart(); + + sleep(2); + //read object 1 + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); + } + //read object 2 + if((h2 = transRead(myTrans, 2)) == NULL) { + printf("Object not found\n"); + } + + pthread_join(thread_Listen, NULL); + return 0; +} + +//Read objects that are both remote and local and are available on machines +int test2b(void) { + + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header, *h1, *h2, *h3, *h4; + pthread_t thread_Listen; + pthread_attr_t attr; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 22 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 22; + header->type = 3; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + + mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(1, mid); + lhashInsert(2, mid); + lhashInsert(3, mid); + lhashInsert(4, mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Check if machine demsky is up and running + checkServer(mid, "128.200.9.10"); + mid = iptoMid("128.200.9.26"); + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + + // Start Transaction + myTrans = transStart(); + + //sleep(2); + //read object 1 (found on demksy) + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); + } + //read object 2 (found on demsky) + if((h2 = transRead(myTrans, 2)) == NULL) { + printf("Object not found\n"); + } + + //read object 21 (found on local) + if((h3 = transRead(myTrans, 21)) == NULL) { + printf("Object not found\n"); + } + + //read object 32 (found on d-1) + if((h4 = transRead(myTrans, 32)) == NULL) { + printf("Object not found\n"); + } + + pthread_join(thread_Listen, NULL); + return 0; + +} + + +//Read objects when objects are not found in any participant int test3(void){ transrecord_t *record; objheader_t *h1,*h2; @@ -165,7 +354,7 @@ int test3(void){ return 0; } -//Test Read objects when some objects are found and other objects not found in any participant +//Read objects when some objects are found and other objects not found in any participant int test4(void) { transrecord_t *record; objheader_t *h1,*h2, *h3, *h4; @@ -198,32 +387,20 @@ int test4(void) { return 0; } -//Test for transaction objects when the objs are part of the Coordinator machine starting the -//trans commit +//Commit for transaction objects when the objs are part of other +//transactions running simultaneously int test5(void) { - transrecord_t *record; - objheader_t *header; - unsigned int size, mid; + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header, *h1, *h2, *h3, *h4, *h5, *h6; pthread_t thread_Listen; pthread_attr_t attr; - objheader_t *h1,*h2, *h3, *h4; dstmInit(); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - record = transStart(); - printf("DEBUG -> Init done\n"); - mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu - lhashInsert(1,mid); - lhashInsert(2,mid); - lhashInsert(3,mid); - lhashInsert(4,mid); - lhashInsert(5,mid); - lhashInsert(6,mid); - pthread_create(&thread_Listen, &attr, dstmListen, NULL); - //Create and Insert Oid 20 size = sizeof(objheader_t) + classsize[2] ; header = (objheader_t *) objstrAlloc(mainobjstore, size); @@ -249,25 +426,179 @@ int test5(void) { mhashInsert(header->oid, header); mid = iptoMid("128.200.9.27"); lhashInsert(header->oid, mid); - //read object 1 - if((h1 = transRead(record, 1)) == NULL){ + + //Create and Insert Oid 22 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 22; + header->type = 3; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + + mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(1, mid); + lhashInsert(2, mid); + lhashInsert(3, mid); + lhashInsert(4, mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Check if machine demsky is up and running + checkServer(mid, "128.200.9.10"); + mid = iptoMid("128.200.9.26"); + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + + // Start Transaction + myTrans = transStart(); + + //read object 1 (found on demksy) + if((h1 = transRead(myTrans, 1)) == NULL){ printf("Object not found\n"); } - //read object 5 - if((h2 = transRead(record, 5)) == NULL) { + //read object 31 (found on d-1) + if((h2 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } - //read object 20(present in local machine) - if((h3 = transRead(record, 20)) == NULL) { + + //read object 22 (found locally) + if((h3 = transRead(myTrans, 22)) == NULL) { printf("Object not found\n"); } - //read object 21(present in local machine) - if((h4 = transRead(record, 21)) == NULL) { + + //read object 2 (found on demsky) + if((h4 = transRead(myTrans, 2)) == NULL) { printf("Object not found\n"); } - transCommit(record); + //read object 21 (found locally) + if((h5 = transRead(myTrans, 21)) == NULL) { + printf("Object not found\n"); + } + + //read object 32 (found on d-2) + if((h6 = transRead(myTrans, 32)) == NULL) { + printf("Object not found\n"); + } + + //Commit transaction + if((h1 != NULL) && (h2 != NULL) && (h3 != NULL) && (h4 !=NULL) && (h5 != NULL) && (h6 != NULL)) + transCommit(myTrans); + else + printf("Cannot complete this transaction \n"); + pthread_join(thread_Listen, NULL); + return 0; +} +int test5a(void) { + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header, *h1, *h2, *h3; + pthread_t thread_Listen; + pthread_attr_t attr; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 22 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 22; + header->type = 3; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + + mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(1, mid); + lhashInsert(2, mid); + lhashInsert(3, mid); + lhashInsert(4, mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Check if machine demsky is up and running + checkServer(mid, "128.200.9.10"); + mid = iptoMid("128.200.9.26"); + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + + // Start Transaction + myTrans = transStart(); + + //read object 1 (found on demksy) + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); + } + //read object 31 (found on d-1) + if((h2 = transRead(myTrans, 32)) == NULL) { + printf("Object not found\n"); + } + + //read object 22 (found locally) + if((h3 = transRead(myTrans, 22)) == NULL) { + printf("Object not found\n"); + } + + //Commit transaction + if((h1 != NULL) && (h2 != NULL) && (h3 != NULL)) + transCommit(myTrans); + else + printf("Cannot complete this transaction \n"); + + pthread_join(thread_Listen, NULL); + return 0; } int test6(void) { @@ -359,3 +690,100 @@ int test6(void) { pthread_join(thread_Listen, NULL); return 0; } +//Commit transactions on local and remote objects that are NOT a part of +//any other transaction +int test7(void) { + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header, *h1, *h2, *h3; + pthread_t thread_Listen; + pthread_attr_t attr; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 22 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 22; + header->type = 3; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + + mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(1, mid); + lhashInsert(2, mid); + lhashInsert(3, mid); + lhashInsert(4, mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Check if machine demsky is up and running + checkServer(mid, "128.200.9.10"); + mid = iptoMid("128.200.9.26"); + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + + // Start Transaction + myTrans = transStart(); + + //read object 3 (found on demksy) + if((h1 = transRead(myTrans, 3)) == NULL){ + printf("Object not found\n"); + } + //read object 32 (found on d-1) + if((h2 = transRead(myTrans, 32)) == NULL) { + printf("Object not found\n"); + } + + //read object 22 (found locally) + if((h3 = transRead(myTrans, 22)) == NULL) { + printf("Object not found\n"); + } + + //Commit transaction + transCommit(myTrans); + + pthread_join(thread_Listen, NULL); + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index ace2a94f..c8cd19c3 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -44,8 +44,9 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \ int main() { - //sleep(3); - test3(); + //test1(); + //test3(); + test4(); } int test1() @@ -135,23 +136,55 @@ int test2() { } pthread_join(thread_Listen, NULL); } - +//Commit transaction with all locally available objects int test3() { - unsigned int val, mid; transrecord_t *myTrans; unsigned int size; objheader_t *header; pthread_t thread_Listen; pthread_attr_t attr; - objheader_t *h1, *h2, *h3, *h4, *h5; + objheader_t *h1, *h2, *h3;//h1,h2,h3 from local dstmInit(); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + // Create and Insert Oid 1 + size = sizeof(objheader_t) + classsize[0] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 1, 0, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 2 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 2, 1, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + + // Create and Insert Oid 3 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 3, 2, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 4 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 4, 3, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + //Inserting into lhashtable + mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); @@ -162,43 +195,172 @@ int test3() { lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); -// pthread_create(&thread_Listen, NULL, dstmListen, NULL); + // pthread_create(&thread_Listen, NULL, dstmListen, NULL); - printf("DEBUG -> mid = %d\n", mid); + //Check if machine d-1 is up and running checkServer(mid, "128.200.9.26"); mid = iptoMid("128.200.9.27"); - printf("DEBUG -> mid = %d\n", mid); + //Check if machine d-2 is up and running checkServer(mid, "128.200.9.27"); // Start Transaction myTrans = transStart(); -/* - //Create Object1 - if((val = createObjects(myTrans, 0)) != 0) { - printf("Error transCreateObj1"); + + //read object 1(present in local machine) + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); } - //Create Object2 - if((val = createObjects(myTrans, 1)) != 0) { - printf("Error transCreateObj2"); + //read object 2present in local machine) + if((h2 = transRead(myTrans, 2)) == NULL) { + printf("Object not found\n"); } - //Create Object3 - if((val = createObjects(myTrans, 2)) != 0) { - printf("Error transCreateObj3"); + //read object 3(present in local machine) + if((h3 = transRead(myTrans, 3)) == NULL) { + printf("Object not found\n"); } - //Create Object4 - if((val = createObjects(myTrans, 3)) != 0) { - printf("Error transCreateObj4"); + + // Commit transaction + transCommit(myTrans); + + pthread_join(thread_Listen, NULL); + + return 0; +} + +//Commit transaction with few locally available objects and other objects from machine d-1 +// and d-2 +int test4() { + + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header; + pthread_t thread_Listen; + pthread_attr_t attr; + objheader_t *h1, *h2, *h3, *h4;//h1,h2 from local ; h3 from d-1 , h-4 from d-2 + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + // Create and Insert Oid 1 + size = sizeof(objheader_t) + classsize[0] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 1, 0, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 2 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 2, 1, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + + // Create and Insert Oid 3 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 3, 2, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 4 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 4, 3, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + //Inserting into lhashtable + mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + lhashInsert(20, mid); + lhashInsert(21, mid); + lhashInsert(22, mid); + + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + // pthread_create(&thread_Listen, NULL, dstmListen, NULL); + + //Check if machine d-1 is up and running + checkServer(mid, "128.200.9.26"); + mid = iptoMid("128.200.9.27"); + //Check if machine d-2 is up and running + checkServer(mid, "128.200.9.27"); + + // Start Transaction + myTrans = transStart(); + + //read object 1(present in local machine) + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); } - //Create Object5 - if((val = createObjects(myTrans, 0)) != 0) { - printf("Error transCreateObj5"); + //read object 2present in local machine) + if((h2 = transRead(myTrans, 2)) == NULL) { + printf("Object not found\n"); } - //Create Object6 - if((val = createObjects(myTrans, 1)) != 0) { - printf("Error transCreateObj6"); + //read object 31(present in d-1 machine) + if((h3 = transRead(myTrans, 31)) == NULL) { + printf("Object not found\n"); + } + + //read object 21(present in d-2 machine) + if((h4 = transRead(myTrans, 21)) == NULL) { + printf("Object not found\n"); } - */ + // Commit transaction + transCommit(myTrans); + + pthread_join(thread_Listen, NULL); + + return 0; +} +int test5() { + + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header; + pthread_t thread_Listen; + pthread_attr_t attr; + objheader_t *h1, *h2, *h3, *h4, *h5; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(20, mid); + lhashInsert(21, mid); + lhashInsert(22, mid); + + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); +// pthread_create(&thread_Listen, NULL, dstmListen, NULL); + + printf("DEBUG -> mid = %d\n", mid); + checkServer(mid, "128.200.9.26"); + mid = iptoMid("128.200.9.27"); + printf("DEBUG -> mid = %d\n", mid); + checkServer(mid, "128.200.9.27"); + + // Start Transaction + myTrans = transStart(); + // Create and Insert Oid 1 size = sizeof(objheader_t) + classsize[0] ; header = (objheader_t *) objstrAlloc(mainobjstore, size); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 24f03c63..2c47e2ac 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -26,7 +26,7 @@ transrecord_t *transStart() objheader_t *transRead(transrecord_t *record, unsigned int oid) { - printf("Enter TRANS_READ\n"); +// printf("Enter TRANS_READ\n"); unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; @@ -34,7 +34,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *buf; //check cache if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - printf("DEBUG -> transRead oid %d found local\n", oid); + printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found @@ -55,11 +55,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { //If object is not found in Remote location - printf("Object not found in Machine %d\n", machinenumber); + printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__); return NULL; } - else + else { + printf("Object oid = %d found in Machine %d\n", oid, machinenumber); return(objcopy); + } } } @@ -82,15 +84,15 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { char ctrl, control, *ptr; unsigned int *oidnotfound; objheader_t *header; - +// printf("DEBUG -> pilecount is %d\n", tdata->pilecount); //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { //Switch case control = tdata->recvmsg[i].rcv_status; switch(control) { case TRANS_DISAGREE: - printf("DEBUG-> Inside TRANS_DISAGREE\n"); +// printf("DEBUG-> Inside TRANS_DISAGREE\n"); transdisagree++; //Free transaction records objstrDelete(tdata->rec->cache); @@ -107,12 +109,13 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { return 0; case TRANS_AGREE: - printf("DEBUG-> Inside TRANS_AGREE\n"); + printf("Inside TRANS_AGREE\n"); + PRINT_TID(tdata); transagree++; break; case TRANS_SOFT_ABORT: - printf("DEBUG-> Inside TRANS_SOFT_ABORT\n"); + printf("Inside TRANS_SOFT_ABORT\n"); transsoftabort++; /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */ if ((i == tdata->thread_id) && (val == 0)) { @@ -134,7 +137,6 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { } while(sum < N && n !=0); } } - break; default: printf("Participant sent unknown message\n"); @@ -145,7 +147,7 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { if(transagree == tdata->pilecount){ //Send Commit ctrl = TRANS_COMMIT; - printf("Sending TRANS_COMMIT\n"); + printf("Sending TRANS_COMMIT accept_fd = %d\n", sd); if((retval = write(sd, &ctrl, sizeof(char))) < 0) { perror("Error sending ctrl message for participant\n"); return 1; @@ -156,13 +158,13 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) { //Send abort but retry commit ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; - printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n"); + printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd); if((retval = write(sd, &ctrl, sizeof(char))) <= 0) { perror("Error sending ctrl message for participant\n"); return 1; } //Sleep and the resend the request - sleep(5); + sleep(2); //Read new control message from Participant if((n = read(sd, &control, sizeof(char))) <= 0) { @@ -470,6 +472,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { } switch(control) { case OBJECT_NOT_FOUND: + printf("DEBUG -> Control OBJECT_NOT_FOUND received\n"); return NULL; case OBJECT_FOUND: if((val = read(sd, &size, sizeof(int))) <= 0) { -- 2.34.1