d-2:
gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
-demksy:
- gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+demsky:
+ gcc -DDEBUG -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
d-1:
gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
clean:
- rm d-2 d-1 demsky
+ rm -rf d-2 d-1 demsky
unsigned int *oidmod;
}trans_req_data_t;
+#define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
//structure for passing multiple arguments to thread
typedef struct thread_data_array {
int thread_id;
}
switch(control) {
case READ_REQUEST:
- printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
perror("Error receiving object from cooridnator\n");
return;
}
+ printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
size = sizeof(objheader_t) + sizeof(classsize[h->type]);
break;
case TRANS_REQUEST:
- printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd);
if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
printf("Error in readClientReq\n");
}
printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
}
if (close((int)acceptfd) == -1)
- {
perror("close");
- }
- else
+ else
printf("Closed connection: fd = %d\n", (int)acceptfd);
-
pthread_exit(NULL);
printf("DEBUG -> Exiting dstmAccept\n");
}
}
//Send control message as per all votes from all oids in the machine
- if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
- printf("Handle req error\n");
+ if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) {
+ printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__);
}
//Read for new control message from Coordiator
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
- perror("Error in receiving control message");
+ printf("DEBUG -> Error receiving control, received %d\n", control);
return 1;
}
+ printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control);
+ fflush(stdout);
+
switch(control) {
case TRANS_ABORT:
- printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ;
//send ack to coordinator
sendctrl = TRANS_SUCESSFUL;
if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
}
ptr = NULL;
return 0;
-
case TRANS_COMMIT:
- printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
}
break;
case TRANS_ABORT_BUT_RETRY_COMMIT:
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd);
//Process again after waiting for sometime and on prev control message sent
+ sleep(2);
switch(prevctrl) {
case TRANS_AGREE:
sendctrl = TRANS_AGREE;
if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
}
- sleep(5);
+ //sleep(5);
break;
case TRANS_SOFT_ABORT:
if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
- printf("Handle req error\n");
+ printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__);
}
- if(newctrl == prevctrl){
+ //If no change in previous control message that was sent then ABORT transaction
+ if(newctrl == TRANS_SOFT_ABORT){
//Send ABORT
newctrl = TRANS_DISAGREE;
if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
((objheader_t *)ptr)->status &= ~(LOCK);
}
- return 0;
- } else {
+ // return 0;
+ } else if(newctrl == TRANS_AGREE) {
+ newctrl = TRANS_AGREE;
//Send new control message
if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
//TODO expect another transrequest from client
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd);
break;
default:
- printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
+ printf("No response to TRANS_AGREE OR DISAGREE control\n");
//TODO Use fixed.trans_id TID since Client may have died
break;
}
+
//Free memory
printf("DEBUG -> Freeing...");
fflush(stdout);
//Process each object present in the pile
ptr = modptr;
- //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
- fflush(stdout);
+
//Process each oid in the machine pile/ group
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) {//Object is read
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd);
return control;
}
} else {//Obj is not locked , so lock object
((objheader_t *)mobj)->status |= LOCK;
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
- printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
+ printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
return control;
}
}
printf("No of objs modified but not found = %d\n", objmodnotfound);
//Decide what control message(s) to send
+ //Cond to send TRANS_AGREE
if(v_matchnolock == fixed->numread + fixed->nummod) {
//send TRANS_AGREE to Coordinator
control = TRANS_AGREE;
perror("Error in sending control to Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_AGREE\n");
+ printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd);
}
-
+ //Condition to send TRANS_SOFT_ABORT
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
//send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
perror("Error in sending control back to coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+ printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd);
//send number of oids not found and the missing oids
if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
perror("Error in sending no of objects that are not found\n");
//Process each modified object saved in the mainobject store
for(i=0; i<transinfo->nummod; i++) {
if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
- printf("mhashserach returns NULL\n");
+ printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__);
}
//change reference count of older address and free space in objstr ??
header->rcount = 1; //Not sure what would be th val
//change ptr address in mhash table
+ printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]);
mhashRemove(transinfo->objmod[i]);
mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
offset += sizeof(objheader_t) + classsize[header->type];
//send ack to coordinator
control = TRANS_SUCESSFUL;
+ //FOR TESTING
+ printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd);
if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
}
- printf("DEBUG-> Completed the pending transaction\n");
return 0;
}
#include "dstm.h"
#include "llookup.h"
#include "ip.h"
-//#include <sys/socket.h>
-//#include <netinet/in.h>
-//#include <arpa/inet.h>
+
#define LISTEN_PORT 2156
extern objstr_t *mainobjstore;
-//extern lhashtable_t llookup; //Global Hash table
-//extern mhashtable_t mlookup; //Global Hash table
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
-int test1(void);
-int test2(void);
-
unsigned int createObjects(transrecord_t *record) {
objheader_t *header, *tmp;
unsigned int size, mid;
// test2();
// test3();
// test4();
- //test5();
- test6();
+ test5();
+// test5a();
+// test2a();
+// test2b();
+// test7();
}
return 0;
}
-//Test Read objects when objects are not found in any participant
+
+//Read objects when objects are found in remote location
+int test2a(void) {
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header, *h1, *h2;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 22
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 22;
+ header->type = 3;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+
+ mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(1, mid);
+ lhashInsert(2, mid);
+ lhashInsert(3, mid);
+ lhashInsert(4, mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Check if machine demsky is up and running
+ checkServer(mid, "128.200.9.10");
+ mid = iptoMid("128.200.9.26");
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ sleep(2);
+ //read object 1
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 2
+ if((h2 = transRead(myTrans, 2)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ pthread_join(thread_Listen, NULL);
+ return 0;
+}
+
+//Read objects that are both remote and local and are available on machines
+int test2b(void) {
+
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header, *h1, *h2, *h3, *h4;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 22
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 22;
+ header->type = 3;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+
+ mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(1, mid);
+ lhashInsert(2, mid);
+ lhashInsert(3, mid);
+ lhashInsert(4, mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Check if machine demsky is up and running
+ checkServer(mid, "128.200.9.10");
+ mid = iptoMid("128.200.9.26");
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ //sleep(2);
+ //read object 1 (found on demksy)
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 2 (found on demsky)
+ if((h2 = transRead(myTrans, 2)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 21 (found on local)
+ if((h3 = transRead(myTrans, 21)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 32 (found on d-1)
+ if((h4 = transRead(myTrans, 32)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ pthread_join(thread_Listen, NULL);
+ return 0;
+
+}
+
+
+//Read objects when objects are not found in any participant
int test3(void){
transrecord_t *record;
objheader_t *h1,*h2;
return 0;
}
-//Test Read objects when some objects are found and other objects not found in any participant
+//Read objects when some objects are found and other objects not found in any participant
int test4(void) {
transrecord_t *record;
objheader_t *h1,*h2, *h3, *h4;
return 0;
}
-//Test for transaction objects when the objs are part of the Coordinator machine starting the
-//trans commit
+//Commit for transaction objects when the objs are part of other
+//transactions running simultaneously
int test5(void) {
- transrecord_t *record;
- objheader_t *header;
- unsigned int size, mid;
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header, *h1, *h2, *h3, *h4, *h5, *h6;
pthread_t thread_Listen;
pthread_attr_t attr;
- objheader_t *h1,*h2, *h3, *h4;
dstmInit();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- record = transStart();
- printf("DEBUG -> Init done\n");
- mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu
- lhashInsert(1,mid);
- lhashInsert(2,mid);
- lhashInsert(3,mid);
- lhashInsert(4,mid);
- lhashInsert(5,mid);
- lhashInsert(6,mid);
- pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-
//Create and Insert Oid 20
size = sizeof(objheader_t) + classsize[2] ;
header = (objheader_t *) objstrAlloc(mainobjstore, size);
mhashInsert(header->oid, header);
mid = iptoMid("128.200.9.27");
lhashInsert(header->oid, mid);
- //read object 1
- if((h1 = transRead(record, 1)) == NULL){
+
+ //Create and Insert Oid 22
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 22;
+ header->type = 3;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+
+ mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(1, mid);
+ lhashInsert(2, mid);
+ lhashInsert(3, mid);
+ lhashInsert(4, mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Check if machine demsky is up and running
+ checkServer(mid, "128.200.9.10");
+ mid = iptoMid("128.200.9.26");
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ //read object 1 (found on demksy)
+ if((h1 = transRead(myTrans, 1)) == NULL){
printf("Object not found\n");
}
- //read object 5
- if((h2 = transRead(record, 5)) == NULL) {
+ //read object 31 (found on d-1)
+ if((h2 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
- //read object 20(present in local machine)
- if((h3 = transRead(record, 20)) == NULL) {
+
+ //read object 22 (found locally)
+ if((h3 = transRead(myTrans, 22)) == NULL) {
printf("Object not found\n");
}
- //read object 21(present in local machine)
- if((h4 = transRead(record, 21)) == NULL) {
+
+ //read object 2 (found on demsky)
+ if((h4 = transRead(myTrans, 2)) == NULL) {
printf("Object not found\n");
}
- transCommit(record);
+ //read object 21 (found locally)
+ if((h5 = transRead(myTrans, 21)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 32 (found on d-2)
+ if((h6 = transRead(myTrans, 32)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //Commit transaction
+ if((h1 != NULL) && (h2 != NULL) && (h3 != NULL) && (h4 !=NULL) && (h5 != NULL) && (h6 != NULL))
+ transCommit(myTrans);
+ else
+ printf("Cannot complete this transaction \n");
+
pthread_join(thread_Listen, NULL);
+ return 0;
+}
+int test5a(void) {
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header, *h1, *h2, *h3;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 22
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 22;
+ header->type = 3;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+
+ mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(1, mid);
+ lhashInsert(2, mid);
+ lhashInsert(3, mid);
+ lhashInsert(4, mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Check if machine demsky is up and running
+ checkServer(mid, "128.200.9.10");
+ mid = iptoMid("128.200.9.26");
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ //read object 1 (found on demksy)
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 31 (found on d-1)
+ if((h2 = transRead(myTrans, 32)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 22 (found locally)
+ if((h3 = transRead(myTrans, 22)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //Commit transaction
+ if((h1 != NULL) && (h2 != NULL) && (h3 != NULL))
+ transCommit(myTrans);
+ else
+ printf("Cannot complete this transaction \n");
+
+ pthread_join(thread_Listen, NULL);
+ return 0;
}
int test6(void) {
pthread_join(thread_Listen, NULL);
return 0;
}
+//Commit transactions on local and remote objects that are NOT a part of
+//any other transaction
+int test7(void) {
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header, *h1, *h2, *h3;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 22
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 22;
+ header->type = 3;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+
+ mid = iptoMid("128.200.9.10"); //demsky.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(1, mid);
+ lhashInsert(2, mid);
+ lhashInsert(3, mid);
+ lhashInsert(4, mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Check if machine demsky is up and running
+ checkServer(mid, "128.200.9.10");
+ mid = iptoMid("128.200.9.26");
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ //read object 3 (found on demksy)
+ if((h1 = transRead(myTrans, 3)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 32 (found on d-1)
+ if((h2 = transRead(myTrans, 32)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 22 (found locally)
+ if((h3 = transRead(myTrans, 22)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //Commit transaction
+ transCommit(myTrans);
+
+ pthread_join(thread_Listen, NULL);
+ return 0;
+}
int main()
{
- //sleep(3);
- test3();
+ //test1();
+ //test3();
+ test4();
}
int test1()
}
pthread_join(thread_Listen, NULL);
}
-
+//Commit transaction with all locally available objects
int test3() {
-
unsigned int val, mid;
transrecord_t *myTrans;
unsigned int size;
objheader_t *header;
pthread_t thread_Listen;
pthread_attr_t attr;
- objheader_t *h1, *h2, *h3, *h4, *h5;
+ objheader_t *h1, *h2, *h3;//h1,h2,h3 from local
dstmInit();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ // Create and Insert Oid 1
+ size = sizeof(objheader_t) + classsize[0] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 1, 0, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 2
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 2, 1, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+
+ // Create and Insert Oid 3
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 3, 2, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 4
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 4, 3, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
//Inserting into lhashtable
+ mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-// pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+ // pthread_create(&thread_Listen, NULL, dstmListen, NULL);
- printf("DEBUG -> mid = %d\n", mid);
+ //Check if machine d-1 is up and running
checkServer(mid, "128.200.9.26");
mid = iptoMid("128.200.9.27");
- printf("DEBUG -> mid = %d\n", mid);
+ //Check if machine d-2 is up and running
checkServer(mid, "128.200.9.27");
// Start Transaction
myTrans = transStart();
-/*
- //Create Object1
- if((val = createObjects(myTrans, 0)) != 0) {
- printf("Error transCreateObj1");
+
+ //read object 1(present in local machine)
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
}
- //Create Object2
- if((val = createObjects(myTrans, 1)) != 0) {
- printf("Error transCreateObj2");
+ //read object 2present in local machine)
+ if((h2 = transRead(myTrans, 2)) == NULL) {
+ printf("Object not found\n");
}
- //Create Object3
- if((val = createObjects(myTrans, 2)) != 0) {
- printf("Error transCreateObj3");
+ //read object 3(present in local machine)
+ if((h3 = transRead(myTrans, 3)) == NULL) {
+ printf("Object not found\n");
}
- //Create Object4
- if((val = createObjects(myTrans, 3)) != 0) {
- printf("Error transCreateObj4");
+
+ // Commit transaction
+ transCommit(myTrans);
+
+ pthread_join(thread_Listen, NULL);
+
+ return 0;
+}
+
+//Commit transaction with few locally available objects and other objects from machine d-1
+// and d-2
+int test4() {
+
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ objheader_t *h1, *h2, *h3, *h4;//h1,h2 from local ; h3 from d-1 , h-4 from d-2
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ // Create and Insert Oid 1
+ size = sizeof(objheader_t) + classsize[0] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 1, 0, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 2
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 2, 1, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+
+ // Create and Insert Oid 3
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 3, 2, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 4
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 4, 3, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ //Inserting into lhashtable
+ mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ lhashInsert(20, mid);
+ lhashInsert(21, mid);
+ lhashInsert(22, mid);
+
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+ // pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+
+ //Check if machine d-1 is up and running
+ checkServer(mid, "128.200.9.26");
+ mid = iptoMid("128.200.9.27");
+ //Check if machine d-2 is up and running
+ checkServer(mid, "128.200.9.27");
+
+ // Start Transaction
+ myTrans = transStart();
+
+ //read object 1(present in local machine)
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
}
- //Create Object5
- if((val = createObjects(myTrans, 0)) != 0) {
- printf("Error transCreateObj5");
+ //read object 2present in local machine)
+ if((h2 = transRead(myTrans, 2)) == NULL) {
+ printf("Object not found\n");
}
- //Create Object6
- if((val = createObjects(myTrans, 1)) != 0) {
- printf("Error transCreateObj6");
+ //read object 31(present in d-1 machine)
+ if((h3 = transRead(myTrans, 31)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ //read object 21(present in d-2 machine)
+ if((h4 = transRead(myTrans, 21)) == NULL) {
+ printf("Object not found\n");
}
- */
+ // Commit transaction
+ transCommit(myTrans);
+
+ pthread_join(thread_Listen, NULL);
+
+ return 0;
+}
+int test5() {
+
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ objheader_t *h1, *h2, *h3, *h4, *h5;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(20, mid);
+ lhashInsert(21, mid);
+ lhashInsert(22, mid);
+
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+// pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+
+ printf("DEBUG -> mid = %d\n", mid);
+ checkServer(mid, "128.200.9.26");
+ mid = iptoMid("128.200.9.27");
+ printf("DEBUG -> mid = %d\n", mid);
+ checkServer(mid, "128.200.9.27");
+
+ // Start Transaction
+ myTrans = transStart();
+
// Create and Insert Oid 1
size = sizeof(objheader_t) + classsize[0] ;
header = (objheader_t *) objstrAlloc(mainobjstore, size);
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
- printf("Enter TRANS_READ\n");
+// printf("Enter TRANS_READ\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- printf("DEBUG -> transRead oid %d found local\n", oid);
+ printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- printf("Object not found in Machine %d\n", machinenumber);
+ printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__);
return NULL;
}
- else
+ else {
+ printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
return(objcopy);
+ }
}
}
char ctrl, control, *ptr;
unsigned int *oidnotfound;
objheader_t *header;
-
+// printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
//Switch case
control = tdata->recvmsg[i].rcv_status;
switch(control) {
case TRANS_DISAGREE:
- printf("DEBUG-> Inside TRANS_DISAGREE\n");
+// printf("DEBUG-> Inside TRANS_DISAGREE\n");
transdisagree++;
//Free transaction records
objstrDelete(tdata->rec->cache);
return 0;
case TRANS_AGREE:
- printf("DEBUG-> Inside TRANS_AGREE\n");
+ printf("Inside TRANS_AGREE\n");
+ PRINT_TID(tdata);
transagree++;
break;
case TRANS_SOFT_ABORT:
- printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+ printf("Inside TRANS_SOFT_ABORT\n");
transsoftabort++;
/* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
if ((i == tdata->thread_id) && (val == 0)) {
} while(sum < N && n !=0);
}
}
-
break;
default:
printf("Participant sent unknown message\n");
if(transagree == tdata->pilecount){
//Send Commit
ctrl = TRANS_COMMIT;
- printf("Sending TRANS_COMMIT\n");
+ printf("Sending TRANS_COMMIT accept_fd = %d\n", sd);
if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
+ printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd);
if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
//Sleep and the resend the request
- sleep(5);
+ sleep(2);
//Read new control message from Participant
if((n = read(sd, &control, sizeof(char))) <= 0) {
}
switch(control) {
case OBJECT_NOT_FOUND:
+ printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
return NULL;
case OBJECT_FOUND:
if((val = read(sd, &size, sizeof(int))) <= 0) {