objstr_t *cache;
chashtable_t *lookupTable;
} transrecord_t;
-
+/*
typedef struct pile {
unsigned int mid;
unsigned int oid;
struct pile *next;
}pile_t;
-
+*/
// Structure that keeps track of responses from the participants
typedef struct thread_response {
char rcv_status;
thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv
pthread_cond_t *threshold; //threshhold for waking up a thread
pthread_mutex_t *lock; //lock the count variable
- int *count; //count variable
+ int *count; //variable to count responses of TRANS_REQUEST protocol from all participants
transrecord_t *rec; // To send modified objects
}thread_data_array_t;
typedef struct trans_commit_data{
unsigned int *objmod;
unsigned int *objlocked;
+ unsigned int *objnotfound;
void *modptr;
int nummod;
int numlocked;
+ int numnotfound;
}trans_commit_data_t;
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *record, unsigned int oid);
objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
-int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides what response to send to the participant
+int decideResponse(thread_data_array_t *tdata, int sd, int status);// Coordinator decides what response to send to the participant
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
int transCommit(transrecord_t *record); //return 0 if successful
void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
if (h == NULL) {
ctrl = OBJECT_NOT_FOUND;
+ if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+ perror("Error sending control msg to coordinator\n");
+ }
} else {
- char responsemessage[sizeof(char)+sizeof(int)];
- /* Type */
- responsemessage[0]=OBJECT_FOUND;
- /* Size of object */
- *((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
- if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
- perror("Error sending control msg to coordinator\n");
- }
- if(send((int)acceptfd, h, size, 0) < 0) {
- perror("Error in sending object\n");
- }
+ //char responsemessage[sizeof(char)+sizeof(int)];
+ /* Type */
+ ctrl = OBJECT_FOUND;
+ if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+ perror("Error sending control msg to coordinator\n");
+ }
+
+ //responsemessage[0]=OBJECT_FOUND;
+ /* Size of object */
+ //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
+ //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
+ // perror("Error sending control msg to coordinator\n");
+ //}
+
+ /* Size of object */
+ if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
+ perror("Error sending size of object to coordinator\n");
+ }
+ if(send((int)acceptfd, h, size, 0) < 0) {
+ perror("Error in sending object\n");
+ }
}
break;
else
printf("Closed connection: fd = %d\n", (int)acceptfd);
+ //Free memory
+ free(transinfo.objmod);
+ free(transinfo.objlocked);
+ free(transinfo.objnotfound);
pthread_exit(NULL);
}
int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
- char *ptr, control, prevctrl, sendctrl;
- void *modptr;
- objheader_t *h, tmp_header;
+ char *ptr, control, prevctrl, sendctrl, newctrl;
+ void *modptr, *header;
+ objheader_t *tmp_header;
fixed_data_t fixed;
- int sum = 0, N, n, val;
+ int sum = 0, i, N, n, val;
//Reads to process the TRANS_REQUEST protocol further
// Read fixed_data
sum += n;
} while (sum < fixed.sum_bytes && n != 0);
- //Send control message as per all votes from the particpants
+ //Send control message as per all votes from all oids in the machine
if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
printf("Handle req error\n");
}
if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
}
- break;
+ //Mark all ref counts as 1 and do garbage collection
+ ptr = modptr;
+ for(i = 0; i< fixed.nummod; i++) {
+ tmp_header = (objheader_t *)ptr;
+ tmp_header->rcount = 1;
+ ptr += sizeof(objheader_t) + classsize[tmp_header->type];
+ }
+ //Unlock objects that was locked in this machine due to this transaction
+ for(i = 0; i< transinfo->numlocked; i++) {
+ header = mhashSearch(transinfo->objlocked[i]);// find the header address
+ ((objheader_t *)header)->status &= ~(LOCK);
+ }
+ ptr = NULL;
+ return 0;
case TRANS_COMMIT:
printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
//Process again after waiting for sometime and on prev control message sent
switch(prevctrl) {
case TRANS_AGREE:
- sleep(2);
- break;
- case TRANS_AGREE_BUT_MISSING_OBJECTS:
+ sendctrl = TRANS_AGREE;
+ if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+ perror("Error sending ACK to coordinator\n");
+ }
+ sleep(5);
break;
case TRANS_SOFT_ABORT:
+ if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
+ printf("Handle req error\n");
+ }
+ if(newctrl == prevctrl){
+ //Send ABORT
+ newctrl = TRANS_DISAGREE;
+ if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+ perror("Error sending ACK to coordinator\n");
+ }
+ //Set the reference count of the object to 1 in mainstore for garbage collection
+ ptr = modptr;
+ for(i = 0; i< fixed.nummod; i++) {
+ tmp_header = (objheader_t *) ptr;
+ tmp_header->rcount = 1;
+ ptr += sizeof(objheader_t) + classsize[tmp_header->type];
+ }
+ //Unlock objects that was locked in this machine due to this transaction
+ for(i = 0; i< transinfo->numlocked; i++) {
+ ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
+ ((objheader_t *)ptr)->status &= ~(LOCK);
+ }
+ return 0;
+ } else {
+ //Send new control message
+ if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+ perror("Error sending ACK to coordinator\n");
+ }
+ }
+
break;
}
- //Try sending either agree or disagree after sometime
- //TODO
- //Wait in a blocking thread or something
- //Recv from client new listmid, mcount and pilecount
- //call 2 new functions that are similar to readClientReq and handleRequest
+
break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
+ //TODO expect another transrequest from client
printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
break;
default:
//and returns the appropriate control message to the Ccordinator
char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
short version;
- char control = 0, ctrlmissoid, *ptr, *oidmodnotfound;
+ char control = 0, ctrlmissoid, *ptr;
int i, j = 0;
unsigned int oid;
unsigned int *oidnotfound, *oidlocked, *oidmod;
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
- oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char));
// Counters and arrays to formulate decision on control message to be sent
int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
int objmodnotfound = 0, nummodfound = 0;
void *mobj;
objheader_t *headptr;
- //TODO remove this deadcode from here
- objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
//Process each object present in the pile
ptr = modptr;
}
//Check if object is still present in the machine since the beginning of TRANS_REQUEST
if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
- objinfo[i].poss_val = OBJECT_NOT_FOUND;
- if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) {
- oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine
- objmodnotfound++;
- }
//Save the oids not found for later use
oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
objnotfound++;
//Check if obj is locked
if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
if (version == ((objheader_t *)mobj)->version) { // If version match
- objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
v_matchlock++;
} else {//If versions don't match ..HARD ABORT
- objinfo[i].poss_val = VERSION_NO_MATCH;
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
- objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
v_matchnolock++;
} else { //If versions don't match
- objinfo[i].poss_val = VERSION_NO_MATCH;
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
write(acceptfd, &control, sizeof(char));
printf("DEBUG -> Sending TRANS_AGREE\n");
}
-
- if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
- //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
- control = TRANS_AGREE_BUT_MISSING_OBJECTS;
- write(acceptfd, &control, sizeof(char));
- printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n");
- //send number of oids not found and the missing oids
- write(acceptfd, &objnotfound, sizeof(int));
- write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
- }
-
- if(v_matchlock > 0 && v_nomatch == 0) {
+
+ if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
//send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
write(acceptfd, &control, sizeof(char));
}
}
- // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine
- nummodfound = fixed->nummod - objmodnotfound;
- unsigned int oidmodfound[nummodfound];
- for(i = 0; i< fixed->nummod; i++) {
- if(oidmodnotfound[i] == 0) {
- oidmodfound[j] = oidmod[i];
- j++;
- }
- }
//Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
transinfo->objmod = oidmod;
transinfo->objlocked = oidlocked;
+ transinfo->objnotfound = oidnotfound;
transinfo->modptr = modptr;
transinfo->nummod = fixed->nummod;
transinfo->numlocked = objlocked;
+ transinfo->numnotfound = objnotfound;
return control;
}
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
-//TODO Change the values inside write() to exact size of the message being sent
//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
-int decideResponse(thread_data_array_t *tdata, int sd) {
- int i, j, n, N, sum, oidcount = 0, transagree = 0, transdisagree = 0, transsoftabort = 0, transmiss = 0;
+int decideResponse(thread_data_array_t *tdata, int sd, int val) {
+ int i, n, N, sum, oidcount = 0;
+ int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
char ctrl, control, *ptr;
- unsigned int *oidnotfound[tdata->pilecount];
+ unsigned int *oidnotfound;
objheader_t *header;
//Check common data structure
perror("Error sending ctrl message for participant\n");
return 1;
}
- break;
-
+ return 0;
+
case TRANS_AGREE:
printf("DEBUG-> Inside TRANS_AGREE\n");
transagree++;
case TRANS_SOFT_ABORT:
printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
transsoftabort++;
- //Read list of objects missing
- if(read(sd, &oidcount, sizeof(int)) != 0) {
- if (oidcount == 0) {
- sleep(2);
- break;
+ /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
+ if ((i == tdata->thread_id) && (val == 0)) {
+ //Read list of objects missing
+ if(read(sd, &oidcount, sizeof(int)) != 0) {
+ //Break if only objs are locked at the Participant side
+ if (oidcount == 0) {
+ break;
+ }
+ transsoftabortmiss++;
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound;
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
}
- N = oidcount * sizeof(unsigned int);
- if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- }
- ptr = (char *) oidnotfound[i];
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
}
- break;
-
- case TRANS_AGREE_BUT_MISSING_OBJECTS:
- printf("DEBUG-> Inside TRANS_AGREE_BUT_MISSING_OBJECTS\n");
- transmiss++;
- //Read list of objects missing
- read(sd, &oidcount, sizeof(int));
- N = oidcount * sizeof(unsigned int);
- if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- }
- ptr = (char *) oidnotfound[i];
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
-
break;
default:
}
}
- //For Debug purposes
- for(i=0 ; i< tdata->pilecount; i++) {
- for(j=0 ; j < oidcount; j++) {
- printf("DEBUG-> Oid %d missing for pilecount: %d\n", oidnotfound[j], i+1);
- }
- }
-
//Decide what control message to send to Participant
if(transagree == tdata->pilecount){
//Send Commit
}
}
- if(transsoftabort > 0 && transdisagree == 0) {
+ if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
perror("Error sending ctrl message for participant\n");
return 1;
}
- //TODO
- //Relookup all missing objects
- //set up a new connection readClientReq()
- //rebuilt the pile and llookup table
- //i.e. wait at the participant end and then resend either agree or disagree
+ //Sleep
+ sleep(5);
+ //Read new control message from Participant
+ n = read(sd, &control, sizeof(char));
+
+ //Update common data structure and increment count
+ tdata->recvmsg[tdata->thread_id].rcv_status = control;
+ val = 1;
+ decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
}
- if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) {
- //Send abort but retry commit
- ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+
+ if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
+ //Send abort but retry commit after relloking up objects
+ //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+ ctrl = TRANS_ABORT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
if (write(sd, &ctrl, sizeof(char)) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
//TODO
- //Relookup all missing objects
- //set up a new connection readClientReq()
- //rebuilt the pile and llookup table
- //send missing mising object/ objects
+ //Relook up objects
+ //update location table
+ //Free pointers
+ free(oidnotfound);
}
- //Free pointers
- for(i=0 ; i< tdata->pilecount; i++) {
- free(oidnotfound[i]);
- }
-
-
return 0;
}
struct hostent *server;
thread_data_array_t *tdata;
objheader_t *headeraddr;
+ //unsigned int *oidnotfound;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
tdata = (thread_data_array_t *) threadarg;
n = read(sd, &control, sizeof(char));
recvcontrol = control;
printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
-// while(n != 0) {
-// n = read(sd, buffer, sizeof(buffer));
-// }
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
//process the participant's request
- if (decideResponse(tdata, sd) != 0) {
+ if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
return NULL;
}
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
pthread_mutex_unlock(tdata->lock);
+
close(sd);
pthread_exit(NULL);
}
objheader_t *headeraddr;
plistnode_t *tmp, *pile = NULL;
int i, rc;
- int pilecount = 0, offset, numthreads = 0, trecvcount = 0;
+ int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
static int newtid = 0;
pthread_attr_t attr;
pthread_cond_t tcond;
pthread_mutex_t tlock;
+ pthread_mutex_t tlshrd;
thread_data_array_t thread_data_array[pilecount];
thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
return 0;
}
-int transSoftAbort(transrecord_t *record){
-
-}
-
-int transAbort(transrecord_t *record){
-
-
-}
-
//mnun will be used to represent the machine IP address later
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
int sd, size;
struct sockaddr_in serv_addr;
struct hostent *server;
- char buffer[RECEIVE_BUFFER_SIZE],control;
+ char control;
objheader_t *h;
void *objcopy;
perror("Error in connect");
return NULL;
}
- bzero((char *)buffer,sizeof(buffer));
- control = READ_REQUEST;
- buffer[0] = control;
- memcpy(buffer+1, &oid, sizeof(int));
- if (write(sd, buffer, sizeof(int) + 1) < 0) {
+// bzero((char *)buffer,sizeof(buffer));
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ //buffer[0] = READ_REQUEST;
+ //memcpy(buffer+1, &oid, sizeof(int));
+ //if (write(sd, buffer, sizeof(int) + sizeof(char)) < 0) {
+ if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
perror("Error sending message");
return NULL;
}
printf("Error in recv request from participant on a READ_REQUEST\n");
return NULL;
}
-
-#if 0
- read(sd, buffer, sizeof(buffer));
- close(sd);
- if (buffer[0] == OBJECT_NOT_FOUND) {
- return NULL;
- } else {
-
- h = (objheader_t *) buffer+1;
- size = sizeof(objheader_t) + sizeof(classsize[h->type]);
-#ifdef DEBUG1
- printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
-#endif
- }
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)buffer+1, size);
-
-#endif
//Insert into cache's lookup table
chashInsert(record->lookupTable, oid, objcopy);
return objcopy;