-d-2:
- gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-3:
+ gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
demsky:
- gcc -DDEBUG -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
-d-1:
- gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-4:
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
all:
- gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
- gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
- gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
mac:
- gcc -DMAC -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
- gcc -DMAC -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
- gcc -DMAC -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
clean:
- rm -rf d-2 d-1 demsky
+ rm -rf d-3 d-4 demsky
#define TRANS_REQUEST 5
#define TRANS_ABORT 6
#define TRANS_COMMIT 7
-#define TRANS_ABORT_BUT_RETRY_COMMIT 8
#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9
//Participant Messages
#define OBJ_LOCKED_BUT_VERSION_MATCH 14
#define OBJ_UNLOCK_BUT_VERSION_MATCH 15
#define VERSION_NO_MATCH 16
-//TODO REMOVE THIS
-#define NO_MISSING_OIDS 22
-#define MISSING_OIDS_PRESENT 23
#include <stdlib.h>
objstr_t *cache;
chashtable_t *lookupTable;
} transrecord_t;
-/*
-typedef struct pile {
- unsigned int mid;
- unsigned int oid;
- struct pile *next;
-}pile_t;
-*/
// Structure that keeps track of responses from the participants
typedef struct thread_response {
char rcv_status;
pthread_cond_t *threshold; //threshhold for waking up a thread
pthread_mutex_t *lock; //lock the count variable
int *count; //variable to count responses of TRANS_REQUEST protocol from all participants
+ char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp
+ char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case)
transrecord_t *rec; // To send modified objects
}thread_data_array_t;
+
+
+
// Structure to save information about an oid necesaary for the decideControl()
typedef struct objinfo {
unsigned int oid;
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *record, unsigned int oid);
objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
-int decideResponse(thread_data_array_t *tdata, int sd, int status);// Coordinator decides what response to send to the participant
+int decideResponse(thread_data_array_t *tdata);// Coordinator decides what response to send to the participant
+char sendResponse(thread_data_array_t *tdata, int sd); //Sends control message back to Participants
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
int transCommit(transrecord_t *record); //return 0 if successful
void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
printf("Recieved connection: fd = %d\n", (int)acceptfd);
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
+ if (retval == 0) {
+ return; // Testing connection
+ }
perror("Error in receiving control from coordinator\n");
return;
}
perror("Error receiving object from cooridnator\n");
return;
}
- printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
size = sizeof(objheader_t) + sizeof(classsize[h->type]);
break;
case TRANS_REQUEST:
- printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_REQUEST\n");
if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
printf("Error in readClientReq\n");
}
printf("Closed connection: fd = %d\n", (int)acceptfd);
pthread_exit(NULL);
- printf("DEBUG -> Exiting dstmAccept\n");
}
+// Reads transaction request per thread
int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
char *ptr, control, prevctrl, sendctrl, newctrl;
void *modptr, *header;
int numread = fixed.numread;
N = numread * (sizeof(unsigned int) + sizeof(short));
char objread[N];
- if(numread != 0) { // If pile contains objects to be read
+ if(numread != 0) { //If pile contains more than one object to be read,
+ // keep reading all objects
sum = 0;
do {
n = recv((int)acceptfd, (void *) objread, N, 0);
}
// Read modified objects
- if(fixed.nummod != 0) { // If pile contains modified objects
+ if(fixed.nummod != 0) { // If pile contains more than one modified object,
+ // allocate new object store and recv all modified objects
if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
- printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+ printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
sum = 0;
} while (sum < fixed.sum_bytes && n != 0);
}
+ // Process the information available in the TRANS_REQUEST control
//Send control message as per all votes from all oids in the machine
if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
- printf("Handle req error\n");
+ printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
+ return 1;
}
-
//Read for new control message from Coordiator
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
- perror("Error in receiving control message");
+ perror("Error in receiving control message\n");
return 1;
}
switch(control) {
case TRANS_ABORT:
- printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
- //send ack to coordinator
- sendctrl = TRANS_SUCESSFUL;
- if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
- return 1;
- }
//Mark all ref counts as 1 and do garbage collection
ptr = modptr;
for(i = 0; i< fixed.nummod; i++) {
}
//Unlock objects that was locked in this machine due to this transaction
for(i = 0; i< transinfo->numlocked; i++) {
+ printf("DEBUG-> Unlocking objects\n");
header = mhashSearch(transinfo->objlocked[i]);// find the header address
((objheader_t *)header)->status &= ~(LOCK);
}
+
+ //send ack to coordinator
+ printf("DEBUG -> Recv TRANS_ABORT\n");
+ sendctrl = TRANS_SUCESSFUL;
+ if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+ perror("Error sending ACK to coordinator\n");
+ return 1;
+ }
+
ptr = NULL;
return 0;
case TRANS_COMMIT:
- printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
+ printf("DEBUG -> Recv TRANS_COMMIT \n");
if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
}
break;
- case TRANS_ABORT_BUT_RETRY_COMMIT:
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
- //Process again after waiting for sometime and on prev control message sent
- sleep(2);
- switch(prevctrl) {
- case TRANS_AGREE:
- sendctrl = TRANS_AGREE;
- if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
- }
- break;
- case TRANS_SOFT_ABORT:
- if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
- printf("Handle req error\n");
- }
- //If no change in previous control message that was sent then ABORT transaction
- if(newctrl == TRANS_SOFT_ABORT){
- //Send ABORT
- newctrl = TRANS_DISAGREE;
- if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
- }
- //Set the reference count of the object to 1 in mainstore for garbage collection
- ptr = modptr;
- for(i = 0; i< fixed.nummod; i++) {
- tmp_header = (objheader_t *) ptr;
- tmp_header->rcount = 1;
- ptr += sizeof(objheader_t) + classsize[tmp_header->type];
- }
- //Unlock objects that was locked in this machine due to this transaction
- for(i = 0; i< transinfo->numlocked; i++) {
- ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
- ((objheader_t *)ptr)->status &= ~(LOCK);
- }
- } else if(newctrl == TRANS_AGREE) {
- newctrl = TRANS_AGREE;
- //Send new control message
- if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
- }
- }
-
- break;
- }
- break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
//TODO expect another transrequest from client
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
break;
default:
printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
break;
}
//Free memory
- printf("DEBUG -> Freeing...");
+ printf("DEBUG -> Freeing...\n");
fflush(stdout);
if (transinfo->objmod != NULL) {
free(transinfo->objmod);
return 0;
}
-//This function runs a decision after all objects are weighed under one of the 4 possibilities
-//and returns the appropriate control message to the Ccordinator
+//This function runs a decision after all objects involved in TRANS_REQUEST
+//and returns the appropriate control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT to the Ccordinator
char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
int val;
short version;
oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
// Counters and arrays to formulate decision on control message to be sent
+ // version match or no match
int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
int objmodnotfound = 0, nummodfound = 0;
void *mobj;
//Process each object present in the pile
ptr = modptr;
- //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
- fflush(stdout);
- //Process each oid in the machine pile/ group
+
+ //Process each oid in the machine pile/ group per thread
+ //Should be a new function
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) {//Object is read
int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
//Check if obj is locked
if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
if (version == ((objheader_t *)mobj)->version) { // If version match
+ printf("DEBUG -> obj = %d locked\n", ((objheader_t *)mobj)->oid);
v_matchlock++;
} else {//If versions don't match ..HARD ABORT
v_nomatch++;
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
return control;
}
} else {//Obj is not locked , so lock object
((objheader_t *)mobj)->status |= LOCK;
- //FOR TESTING
+ // TESTING Add sleep to make transactions run for a long time such that
+ // we can test for soft abort case
sleep(1);
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
- printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
}
}
}
-
- printf("No of objs locked = %d\n", objlocked);
- printf("No of v_nomatch = %d\n", v_nomatch);
- printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
- printf("No of objs v_match but had locks before = %d\n", v_matchlock);
- printf("No of objs not found = %d\n", objnotfound);
- printf("No of objs modified but not found = %d\n", objmodnotfound);
-
+
//Decide what control message(s) to send
- //Cond to send TRANS_AGREE
+ // Should be a new function
if(v_matchnolock == fixed->numread + fixed->nummod) {
//send TRANS_AGREE to Coordinator
control = TRANS_AGREE;
}
//Condition to send TRANS_SOFT_ABORT
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
- //send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
- *((int*)&msg[1])=objnotfound;
+ *((int*)&msg[1])= objnotfound;
printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
- //send number of oids not found and the missing oids
+ //Sending control message
if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
perror("Error in sending no of objects that are not found\n");
return 0;
}
+ //send number of oids not found and the missing oids if objects are missing in the machine
if(objnotfound != 0) {
- int size=sizeof(unsigned int)*objnotfound;
+ int size = sizeof(unsigned int)*objnotfound;
if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
perror("Error in sending objects that are not found\n");
return 0;
return control;
}
-//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
+//Process oids in the TRANS_COMMIT requested by the participant and sends an ACK back to Coordinator
int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
objheader_t *header;
int i = 0, offset = 0;
//Process each modified object saved in the mainobject store
for(i=0; i<transinfo->nummod; i++) {
if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
- printf("mhashserach returns NULL\n");
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
}
//change reference count of older address and free space in objstr ??
header->rcount = 1; //Not sure what would be th val
//send ack to coordinator
control = TRANS_SUCESSFUL;
- //FOR TESTING
- printf("DEBUG-> Transaction is SUCCESSFUL \n");
+ printf("DEBUG-> TRANS_SUCESSFUL\n");
if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
}
while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
sleep(1);
}
- printf("DEBUG -> Connection established with %s\n", machineip);
close(tmpsd);
return 0;
}
int main()
{
//test1();
- //test3();
+// test3();
test4();
}
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
- mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
//Inserting into lhashtable
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- // pthread_create(&thread_Listen, NULL, dstmListen, NULL);
- //Check if machine d-1 is up and running
- checkServer(mid, "128.200.9.26");
- mid = iptoMid("128.200.9.27");
- //Check if machine d-2 is up and running
- checkServer(mid, "128.200.9.27");
+ //Check if machine d-4 is up and running
+ checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.200.9.29");
+ //Check if machine d-3 is up and running
+ checkServer(mid, "128.200.9.29");
// Start Transaction
myTrans = transStart();
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
- mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
//Inserting into lhashtable
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
- pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- // pthread_create(&thread_Listen, NULL, dstmListen, NULL);
- //Check if machine d-1 is up and running
- checkServer(mid, "128.200.9.26");
- mid = iptoMid("128.200.9.27");
- //Check if machine d-2 is up and running
- checkServer(mid, "128.200.9.27");
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+ //Check if machine d-4 is up and running
+ checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.200.9.29");
+ //Check if machine d-3 is up and running
+ checkServer(mid, "128.200.9.29");
// Start Transaction
myTrans = transStart();
//read object 1(present in local machine)
- if((h1 = transRead(myTrans, 1)) == NULL){
+ if((h1 = transRead(myTrans, 2)) == NULL){
printf("Object not found\n");
}
+
//read object 2present in local machine)
- if((h2 = transRead(myTrans, 2)) == NULL) {
+ if((h2 = transRead(myTrans, 1)) == NULL) {
printf("Object not found\n");
}
- //read object 31(present in d-1 machine)
+ //read object 31(present in d-4 machine)
if((h3 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
-
- //read object 21(present in d-2 machine)
+ //read object 21(present in d-3 machine)
if((h4 = transRead(myTrans, 21)) == NULL) {
printf("Object not found\n");
}
-
+
// Commit transaction
transCommit(myTrans);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-// pthread_create(&thread_Listen, NULL, dstmListen, NULL);
printf("DEBUG -> mid = %d\n", mid);
checkServer(mid, "128.200.9.26");
#include<sys/socket.h>
#include<netdb.h>
#include<netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <time.h>
#define LISTEN_PORT 2156
#define MACHINE_IP "127.0.0.1"
extern int classsize[];
+void randomdelay(void)
+{
+ struct timespec req, rem;
+ time_t t;
+
+ t = time(NULL);
+ req.tv_sec = 0;
+ req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+ nanosleep(&req, &rem);
+ return;
+}
+
transrecord_t *transStart()
{
transrecord_t *tmp = malloc(sizeof(transrecord_t));
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
- printf("Enter TRANS_READ\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- printf("DEBUG -> transRead oid %d found local\n", oid);
+ //printf("DEBUG -> transRead oid %d found local\n", oid);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
+ //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
return NULL;
}
else {
- printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
+ //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
return(objcopy);
}
}
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
-//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
-int decideResponse(thread_data_array_t *tdata, int sd, int val) {
- int i, n, N, sum, retval, oidcount = 0;
- int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
- char ctrl, control, *ptr;
- unsigned int *oidnotfound;
- objheader_t *header;
- printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+//This function decides the reponse that needs to be sent to all other machines involved in a
+//transaction by the machine that initiated the transaction request
+
+int decideResponse(thread_data_array_t *tdata) {
+ char control;
+ int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
+
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
//Switch case
control = tdata->recvmsg[i].rcv_status;
switch(control) {
case TRANS_DISAGREE:
- printf("DEBUG-> Inside TRANS_DISAGREE\n");
+ printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
transdisagree++;
- //Free transaction records
- objstrDelete(tdata->rec->cache);
- chashDelete(tdata->rec->lookupTable);
- free(tdata->rec);
- //send Abort
- ctrl = TRANS_ABORT;
- for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
- if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- }
- return 0;
+ break;
case TRANS_AGREE:
- printf("DEBUG-> Inside TRANS_AGREE\n");
- PRINT_TID(tdata);
+ printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
transagree++;
break;
case TRANS_SOFT_ABORT:
- printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+ printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
transsoftabort++;
- /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
- if ((i == tdata->thread_id) && (val == 0)) {
- //Read list of objects missing
- if(read(sd, &oidcount, sizeof(int)) != 0) {
- //Break if only objs are locked at the Participant side
- if (oidcount == 0) {
- break;
- }
- transsoftabortmiss++;
- N = oidcount * sizeof(unsigned int);
- if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- }
- ptr = (char *) oidnotfound;
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
- }
- }
-
break;
default:
- printf("Participant sent unknown message\n");
+ printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ return -1;
}
}
//Decide what control message to send to Participant
- if(transagree == tdata->pilecount){
+ if(transdisagree > 0) {
+ //Send Abort
+ *(tdata->replyctrl) = TRANS_ABORT;
+ printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+ objstrDelete(tdata->rec->cache);
+ chashDelete(tdata->rec->lookupTable);
+ free(tdata->rec);
+ } else if(transagree == tdata->pilecount){
//Send Commit
- ctrl = TRANS_COMMIT;
- printf("Sending TRANS_COMMIT\n");
- if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
+ *(tdata->replyctrl) = TRANS_COMMIT;
+ printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+ objstrDelete(tdata->rec->cache);
+ chashDelete(tdata->rec->lookupTable);
+ free(tdata->rec);
+ } else if(transsoftabort > 0 && transdisagree == 0) {
+ //Send Abort
+ *(tdata->replyctrl) = TRANS_ABORT;
+ *(tdata->replyretry) = 1;
+ //objstrDelete(tdata->rec->cache);
+ //chashDelete(tdata->rec->lookupTable);
+ //free(tdata->rec);
+ printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+ } else {
+ printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
+ return -1;
}
+
+ return 0;
+}
+//This function sends the final response to all threads in their respective socket id
+char sendResponse(thread_data_array_t *tdata, int sd) {
+ int n, N, sum, oidcount = 0;
+ char *ptr, retval = 0;
+ unsigned int *oidnotfound;
- if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
- //Send abort but retry commit
- ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
- if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- /*
- //Sleep and the resend the request
- sleep(2);
- //Read new control message from Participant
-
- if((n = read(sd, &control, sizeof(char))) <= 0) {
- perror("No bytes are read for participant\n");
- return 1;
+ //If the decided response is due to a soft abort and missing objects at the Participant's side
+ if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
+ //Read list of objects missing
+ if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+ //Break if only objs are locked at the Participant side
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound;
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
}
-
- //Update common data structure and increment count
- tdata->recvmsg[tdata->thread_id].rcv_status = control;
- val = 1;
- decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
- */
+ retval = TRANS_SOFT_ABORT;
}
-
- if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
- //Send abort but retry commit after relooking up objects
- ctrl = TRANS_ABORT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
- if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- //TODO
- //Relook up objects
- //update location table
-
- //Free pointers
- free(oidnotfound);
+ //If the decided response is TRANS_ABORT
+ if(*(tdata->replyctrl) == TRANS_ABORT) {
+ retval = TRANS_ABORT;
}
-
- return 0;
+ if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ retval = TRANS_COMMIT;
+ }
+ // Send response to the Participant
+ if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
+ perror("Error sending ctrl message for participant\n");
+ }
+
+ return retval;
}
void *transRequest(void *threadarg) {
struct hostent *server;
thread_data_array_t *tdata;
objheader_t *headeraddr;
- //unsigned int *oidnotfound;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
- char machineip[16];
+ char machineip[16], retval;
tdata = (thread_data_array_t *) threadarg;
//Send Trans Request
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket for TRANS_REQUEST");
+ perror("Error in socket for TRANS_REQUEST\n");
return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
midtoIP(tdata->mid,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect for TRANS_REQUEST");
+ perror("Error in connect for TRANS_REQUEST\n");
return NULL;
}
-
- //Multiple writes for sending packets of data
- //Send first few fixed bytes of the TRANS_REQUEST protocol
- printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
-// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+
+ printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
+ //Send bytes of data with TRANS_REQUEST control message
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
- perror("Error sending fixed bytes for thread");
+ perror("Error sending fixed bytes for thread\n");
return NULL;
}
//Send list of machines involved in the transaction
-// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
{
int size=sizeof(unsigned int)*tdata->pilecount;
if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
- perror("Error sending list of machines for thread");
+ perror("Error sending list of machines for thread\n");
return NULL;
}
}
//Send oids and version number tuples for objects that are read
-// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
{
int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
- perror("Error sending tuples for thread");
+ perror("Error sending tuples for thread\n");
return NULL;
}
}
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
size=sizeof(objheader_t)+classsize[headeraddr->type];
if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
- perror("Error sending obj modified for thread");
+ perror("Error sending obj modified for thread\n");
return NULL;
}
}
-
+
//Read message control message from participant side
if((n = read(sd, &control, sizeof(char))) <= 0) {
perror("Error in reading control message from Participant\n");
//Lock and update count
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(tdata->lock);
- (*(tdata->count))++;
+
+ (*(tdata->count))++;
if(*(tdata->count) == tdata->pilecount) {
+ if (decideResponse(tdata) != 0) {
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ close(sd);
+ return NULL;
+ }
pthread_cond_broadcast(tdata->threshold);
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
- //process the participant's request
- if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+
+ if (sendResponse(tdata, sd) == 0) {
+ printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(tdata->lock);
+ close(sd);
return NULL;
}
- pthread_mutex_unlock(tdata->lock);
-
close(sd);
pthread_exit(NULL);
}
char transid[TID_LEN];
static int newtid = 0;
trans_req_data_t *tosend;
+ char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
ptr = record->lookupTable->table;
size = record->lookupTable->size;
//Get machine location for object id
if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine\n");
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return 1;
}
- //TODO only for debug
- //machinenum = 1;
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
- printf("Error: No such oid\n");
+ printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
return 1;
}
//Make machine groups
pListMid(pile, listmid);
//Process each machine group
+ //Should be a new function for while loop
while(tmp != NULL) {
//Create transaction id
newtid++;
thread_data_array[numthreads].threshold = &tcond;
thread_data_array[numthreads].lock = &tlock;
thread_data_array[numthreads].count = &trecvcount;
+ thread_data_array[numthreads].replyctrl = &treplyctrl;
+ thread_data_array[numthreads].replyretry = &treplyretry;
thread_data_array[numthreads].rec = record;
rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
perror("Error in pthread create");
return 1;
}
-
numthreads++;
//TODO frees
- //free(tosend);
tmp = tmp->next;
}
//Free resources
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
-// for(i = 0 ;i< pilecount ;i++) {
- free(tosend);
-// }
+
+ free(tosend);
free(listmid);
pDelete(pile);
+ if(treplyretry == 1) {
+ //wait a random amount of time
+ randomdelay();
+ //sleep(1);
+ //Retry the commiting transaction again
+ transCommit(record);
+ }
+
return 0;
}
chashInsert(record->lookupTable, oid, objcopy);
break;
default:
- printf("Error in recv request from participant on a READ_REQUEST\n");
+ printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
return NULL;
}
close(sd);