int dstmInit(void)
{
- //todo:initialize main object store
- //do we want this to be a global variable, or provide
- //separate access funtions and hide the structure?
+ //Initialize main object store
mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
{
int numbytes,i, val;
unsigned int oid;
- char buffer[RECEIVE_BUFFER_SIZE], control;
+ char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
char *ptr;
void *srcObj;
objheader_t *h;
+ trans_commit_data_t transinfo;
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
recv((int)acceptfd, &control, sizeof(char), 0);
switch(control) {
case READ_REQUEST:
+ printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
if (h == NULL) {
- buffer[0] = OBJECT_NOT_FOUND;
+ ctrl = OBJECT_NOT_FOUND;
} else {
- buffer[0] = OBJECT_FOUND;
+ ctrl = OBJECT_FOUND;
size = sizeof(objheader_t) + sizeof(classsize[h->type]);
- memcpy(buffer+1, srcObj, size);
- }
- if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
- perror("");
+ if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+ perror("Error sending control msg to coordinator\n");
+ }
+ if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
+ perror("Error sending size of object to coordinator\n");
+ }
+ if(send((int)acceptfd, h, size, 0) < 0) {
+ perror("Error in sending object\n");
+ }
}
break;
break;
case TRANS_REQUEST:
- //printf("DEBUG -> TRANS_REQUEST\n");
- if((val = readClientReq((int)acceptfd)) == 1) {
+ printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
+ if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
printf("Error in readClientReq\n");
}
break;
default:
printf("Error receiving\n");
}
-
- //Read for new control message from Coordiator
- recv((int)acceptfd, &control, sizeof(char), 0);
- switch(control) {
- case TRANS_ABORT:
- printf("DEBUG -> TRANS_ABORT\n");
- write((int)acceptfd, &control, sizeof(char));
- break;
-
- case TRANS_COMMIT:
- printf("DEBUG -> TRANS_COMMIT\n");
- write((int)acceptfd, &control, sizeof(char));
- //TODO
- //change ptr address in mhash table
- //unlock objects
- //update object version
- //change reference count of older address??
- //free space in objstr ??
- //Update location lookup table
- break;
- }
-
if (close((int)acceptfd) == -1)
{
perror("close");
pthread_exit(NULL);
}
-int readClientReq(int acceptfd) {
- char *ptr, control;
+int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
+ char *ptr, control, prevctrl, sendctrl;
void *modptr;
objheader_t *h, tmp_header;
fixed_data_t fixed;
- int sum = 0, N, n;
+ int sum = 0, N, n, val;
+ //Reads to process the TRANS_REQUEST protocol further
// Read fixed_data
N = sizeof(fixed) - 1;
ptr = (char *)&fixed;;
// Read modified objects
if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
- // printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+ printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
return 1;
}
sum = 0;
//printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
sum += n;
} while (sum < fixed.sum_bytes && n != 0);
+
//Send control message as per all votes from the particpants
- handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
+ if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
+ printf("Handle req error\n");
+ }
+
+ //Read for new control message from Coordiator
+ recv((int)acceptfd, &control, sizeof(char), 0);
+ switch(control) {
+ case TRANS_ABORT:
+ printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
+ //send ack to coordinator
+ sendctrl = TRANS_SUCESSFUL;
+ if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+ perror("Error sending ACK to coordinator\n");
+ }
+ break;
+
+ case TRANS_COMMIT:
+ printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
+ if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
+ printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
+ }
+ break;
+ case TRANS_ABORT_BUT_RETRY_COMMIT:
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
+ //Process again after waiting for sometime and on prev control message sent
+ switch(prevctrl) {
+ case TRANS_AGREE:
+ sleep(2);
+ break;
+ case TRANS_AGREE_BUT_MISSING_OBJECTS:
+ break;
+ case TRANS_SOFT_ABORT:
+ break;
+ }
+ //Try sending either agree or disagree after sometime
+ //TODO
+ //Wait in a blocking thread or something
+ //Recv from client new listmid, mcount and pilecount
+ //call 2 new functions that are similar to readClientReq and handleRequest
+ break;
+ case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
+ break;
+ default:
+ printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
+ break;
+ }
-
-
return 0;
}
//This function runs a decision after all objects are weighed under one of the 4 possibilities
//and returns the appropriate control message to the Ccordinator
-int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
+char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
short version;
- char control, *ptr;
- int i;
- unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
- int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
+ char control = 0, ctrlmissoid, *ptr, *oidmodnotfound;
+ int i, j = 0;
+ unsigned int oid;
+ unsigned int *oidnotfound, *oidlocked, *oidmod;
+
+ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+ oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
+ oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char));
+
+ // Counters and arrays to formulate decision on control message to be sent
+ int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int objmodnotfound = 0, nummodfound = 0;
void *mobj;
objheader_t *headptr;
+ //TODO remove this deadcode from here
objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
//Process each object present in the pile
} else {//Obj is modified
headptr = (objheader_t *) ptr;
oid = headptr->oid;
+ oidmod[objmod] = oid;//Array containing modified oids
+ objmod++;
version = headptr->version;
ptr += sizeof(objheader_t) + classsize[headptr->type];
}
//Check if object is still present in the machine since the beginning of TRANS_REQUEST
if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
objinfo[i].poss_val = OBJECT_NOT_FOUND;
+ if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) {
+ oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine
+ objmodnotfound++;
+ }
//Save the oids not found for later use
oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
objnotfound++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
write(acceptfd, &control, sizeof(char));
- //TODO when TRANS_DISAGREE is sent
- //Free space allocated in main objstore
- //Unlock objects that was locked in the trans
- return 0;
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ return control;
}
} else {//Obj is not locked , so lock object
((objheader_t *)mobj)->status |= LOCK;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
write(acceptfd, &control, sizeof(char));
- return 0;
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ return control;
}
}
}
}
+ printf("No of objs locked = %d\n", objlocked);
+ printf("No of v_nomatch = %d\n", v_nomatch);
+ printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+ printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+ printf("No of objs not found = %d\n", objnotfound);
+ printf("No of objs modified but not found = %d\n", objmodnotfound);
+
//Decide what control message(s) to send
if(v_matchnolock == fixed->numread + fixed->nummod) {
//send TRANS_AGREE to Coordinator
control = TRANS_AGREE;
write(acceptfd, &control, sizeof(char));
+ printf("DEBUG -> Sending TRANS_AGREE\n");
}
if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
//send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
control = TRANS_AGREE_BUT_MISSING_OBJECTS;
write(acceptfd, &control, sizeof(char));
- //send missing oids and number of oids not found with it
+ printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n");
+ //send number of oids not found and the missing oids
write(acceptfd, &objnotfound, sizeof(int));
write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
}
//send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
write(acceptfd, &control, sizeof(char));
- //send missing oids and number of oids not found with it
+ printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+ //send number of oids not found and the missing oids
write(acceptfd, &objnotfound, sizeof(int));
- write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+ if(objnotfound != 0)
+ write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
}
- //TODO when TRANS_DISAGREE is sent
- //Free space allocated in main objstore
- //Unlock objects that was locked in the trans
+ //Do the following when TRANS_DISAGREE is sent
if(control == TRANS_DISAGREE) {
+ //Set the reference count of the object to 1 in mainstore for garbage collection
+ ptr = modptr;
+ for(i = 0; i< fixed->nummod; i++) {
+ headptr = (objheader_t *) ptr;
+ headptr->rcount = 1;
+ ptr += sizeof(objheader_t) + classsize[headptr->type];
+ }
+ //Unlock objects that was locked in the trans
for(i = 0; i< objlocked ; i++) {
mobj = mhashSearch(oidlocked[i]);// find the header address
((objheader_t *)mobj)->status &= ~(LOCK);
}
}
+
+ // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine
+ nummodfound = fixed->nummod - objmodnotfound;
+ unsigned int oidmodfound[nummodfound];
+ for(i = 0; i< fixed->nummod; i++) {
+ if(oidmodnotfound[i] == 0) {
+ oidmodfound[j] = oidmod[i];
+ j++;
+ }
+ }
+ //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
+ transinfo->objmod = oidmod;
+ transinfo->objlocked = oidlocked;
+ transinfo->modptr = modptr;
+ transinfo->nummod = fixed->nummod;
+ transinfo->numlocked = objlocked;
+
+ return control;
+}
+
+//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
+int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
+ objheader_t *header;
+ int i = 0, offset = 0;
+ char control;
+ //Process each modified object saved in the mainobject store
+ for(i=0; i<transinfo->nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ printf("mhashserach returns NULL\n");
+ }
+ //change reference count of older address and free space in objstr ??
+ header->rcount = 1; //Not sure what would be th val
+ //change ptr address in mhash table
+ mhashRemove(transinfo->objmod[i]);
+ mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ offset += sizeof(objheader_t) + classsize[header->type];
+ //update object version
+ header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header->version += 1;
+ }
+ for(i=0; i<transinfo->numlocked; i++) {
+ //unlock objects
+ header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ header->status &= ~(LOCK);
+ }
+
+ //TODO Update location lookup table
+
+ //send ack to coordinator
+ control = TRANS_SUCESSFUL;
+ if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
+ perror("Error sending ACK to coordinator\n");
+ }
+
return 0;
}
//Free transaction records
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
+ free(tdata->rec);
//send Abort
ctrl = TRANS_ABORT;
if (write(sd, &ctrl, sizeof(char)) < 0) {
printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
transsoftabort++;
//Read list of objects missing
- read(sd, &oidcount, sizeof(int));
- N = oidcount * sizeof(unsigned int);
- if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ if(read(sd, &oidcount, sizeof(int)) != 0) {
+ if (oidcount == 0) {
+ sleep(2);
+ break;
+ }
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound[i];
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
}
- ptr = (char *) oidnotfound[i];
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
-
break;
case TRANS_AGREE_BUT_MISSING_OBJECTS:
if(transagree == tdata->pilecount){
//Send Commit
ctrl = TRANS_COMMIT;
+ printf("Sending TRANS_COMMIT\n");
if (write(sd, &ctrl, sizeof(char)) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
if(transsoftabort > 0 && transdisagree == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
+ printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
if (write(sd, &ctrl, sizeof(char)) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
- //lookup objects and then retry commit
+ //TODO
+ //Relookup all missing objects
//set up a new connection readClientReq()
//rebuilt the pile and llookup table
//i.e. wait at the participant end and then resend either agree or disagree
}
if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) {
+ //Send abort but retry commit
+ ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+ printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
+ return 1;
+ }
+ //TODO
//Relookup all missing objects
+ //set up a new connection readClientReq()
+ //rebuilt the pile and llookup table
//send missing mising object/ objects
}
for(i=0 ; i< tdata->pilecount; i++) {
free(oidnotfound[i]);
}
+
return 0;
}
//Multiple writes for sending packets of data
//Send first few fixed bytes of the TRANS_REQUEST protocol
printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-// printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
+// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
perror("Error sending fixed bytes for thread");
return NULL;
}
//Send list of machines involved in the transaction
-// printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
+// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
perror("Error sending list of machines for thread");
return NULL;
}
//Send oids and version number tuples for objects that are read
-// printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
+// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
perror("Error sending tuples for thread");
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
//process the participant's request
- if (decideResponse(tdata, sd) == 1) {
+ if (decideResponse(tdata, sd) != 0) {
printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
return NULL;
}
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
free(listmid);
+ pDelete(pile);
return 0;
}
#ifdef DEBUG1
printf("DEBUG -> ready to rcv ...\n");
#endif
+ //Read response from the Participant
+ read(sd, &control, sizeof(char));
+ switch(control) {
+ case OBJECT_NOT_FOUND:
+ return NULL;
+ break;
+ case OBJECT_FOUND:
+ read(sd, &size, sizeof(int));
+ objcopy = objstrAlloc(record->cache, size);
+ read(sd, objcopy, size);
+ break;
+ default:
+ printf("Error in recv request from participant on a READ_REQUEST\n");
+ return NULL;
+ }
+
+#if 0
read(sd, buffer, sizeof(buffer));
close(sd);
if (buffer[0] == OBJECT_NOT_FOUND) {
}
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)buffer+1, size);
+
+#endif
//Insert into cache's lookup table
chashInsert(record->lookupTable, oid, objcopy);
return objcopy;