pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch;
extern objstr_t *mainobjstore;
+unsigned int myIpAddr;
plistnode_t *createPiles(transrecord_t *);
inline int arrayLength(int *array) {
pthread_mutex_unlock(&pqueue.qlock);
}
-/* This function allocates an object on the local machine */
-//FIXME
-
-void * dstmalloc(transrecord_t *trans, int size) {
- objheader_t * newobj=(objheader_t *)objstrAlloc(trans->cache, size+sizeof(objheader_t));
- //Need to assign OID
-
- return newobj;
-}
-
/* This function starts up the transaction runtime. */
int dstmStartup(const char * option) {
pthread_t thread_Listen;
pthread_attr_t attr;
int master=strcmp(option, "master")==0;
+ myIpAddr = getMyIpAddr("eth0");
+
dstmInit();
transInit();
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
objheader_t *transRead(transrecord_t *record, unsigned int oid) {
- printf("Inside transaction read call\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- printf("Inside transaction cache \n");
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
- printf("Inside mainobject store \n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[TYPE(tmp)];
objcopy = objstrAlloc(record->cache, size);
chashInsert(record->lookupTable, OID(objheader), objcopy);
return(objcopy);
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
- printf("Inside prefetch cache \n");
found = 1;
size = sizeof(objheader_t)+classsize[TYPE(tmp)];
objcopy = objstrAlloc(record->cache, size);
OID(tmp) = getNewOID();
TYPE(tmp) = type;
tmp->version = 1;
- tmp->rcount = 0; //? not sure how to handle this yet
+ tmp->rcount = 1;
STATUS(tmp) = NEW;
chashInsert(record->lookupTable, OID(tmp), tmp);
return tmp;
unsigned int machinenum;
void *localmachinenum;
objheader_t *headeraddr;
-
+
ptr = record->lookupTable->table;
size = record->lookupTable->size;
break;
}
next = curr->next;
- //Get machine location for object id
- //TODO Check is the object is newly created ...if not then lookup the location table
- if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+ if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+ printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
return NULL;
}
- if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
- printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
+ //Get machine location for object id (and whether local or not)
+ if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) {
+ machinenum = myIpAddr;
+ } else if ((machinenum = lhashSearch(curr->key)) == 0) {
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return NULL;
}
+
//Make machine groups
if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
printf("pInsert error %s, %d\n", __FILE__, __LINE__);
return NULL;
}
- /* Check if local or not */
- if((localmachinenum = mhashSearch(curr->key)) != NULL) {
- /* Set the pile->local flag*/
- pile->local = 1; //True i.e. local
- }
-
curr = next;
}
}
char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
char localstat = 0;
+ do {
- /* Look through all the objects in the transaction record and make piles
- * for each machine involved in the transaction*/
- pile_ptr = pile = createPiles(record);
-
- /* Create the packet to be sent in TRANS_REQUEST */
+ /* Look through all the objects in the transaction record and make piles
+ * for each machine involved in the transaction*/
+ pile_ptr = pile = createPiles(record);
- /* Count the number of participants */
- pilecount = pCount(pile);
+ /* Create the packet to be sent in TRANS_REQUEST */
- /* Create a list of machine ids(Participants) involved in transaction */
- if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- pListMid(pile, listmid);
+ /* Count the number of participants */
+ pilecount = pCount(pile);
-
- /* Initialize thread variables,
- * Spawn a thread for each Participant involved in a transaction */
- pthread_t thread[pilecount];
- pthread_attr_t attr;
- pthread_cond_t tcond;
- pthread_mutex_t tlock;
- pthread_mutex_t tlshrd;
-
- thread_data_array_t *thread_data_array;
- thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
- local_thread_data_array_t *ltdata;
- if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
-
- thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
-
- /* Initialize and set thread detach attribute */
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_mutex_init(&tlock, NULL);
- pthread_cond_init(&tcond, NULL);
-
- /* Process each machine pile */
- while(pile != NULL) {
- //Create transaction id
- newtid++;
- if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+ /* Create a list of machine ids(Participants) involved in transaction */
+ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ pListMid(pile, listmid);
+
+
+ /* Initialize thread variables,
+ * Spawn a thread for each Participant involved in a transaction */
+ pthread_t thread[pilecount];
+ pthread_attr_t attr;
+ pthread_cond_t tcond;
+ pthread_mutex_t tlock;
+ pthread_mutex_t tlshrd;
+
+ thread_data_array_t *thread_data_array;
+ thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+ local_thread_data_array_t *ltdata;
+ if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
}
- tosend->f.control = TRANS_REQUEST;
- sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
- tosend->f.mcount = pilecount;
- tosend->f.numread = pile->numread;
- tosend->f.nummod = pile->nummod;
- tosend->f.sum_bytes = pile->sum_bytes;
- tosend->listmid = listmid;
- tosend->objread = pile->objread;
- tosend->oidmod = pile->oidmod;
- thread_data_array[threadnum].thread_id = threadnum;
- thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].pilecount = pilecount;
- thread_data_array[threadnum].buffer = tosend;
- thread_data_array[threadnum].recvmsg = rcvd_control_msg;
- thread_data_array[threadnum].threshold = &tcond;
- thread_data_array[threadnum].lock = &tlock;
- thread_data_array[threadnum].count = &trecvcount;
- thread_data_array[threadnum].replyctrl = &treplyctrl;
- thread_data_array[threadnum].replyretry = &treplyretry;
- thread_data_array[threadnum].rec = record;
- /* If local do not create any extra connection */
- if(pile->local != 1) { /* Not local */
- rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
- if (rc) {
- perror("Error in pthread create\n");
+
+ thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
+
+ /* Initialize and set thread detach attribute */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_mutex_init(&tlock, NULL);
+ pthread_cond_init(&tcond, NULL);
+
+ /* Process each machine pile */
+ while(pile != NULL) {
+ //Create transaction id
+ newtid++;
+ if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
}
- } else { /*Local*/
- /*Unset the pile->local flag*/
- pile->local = 0;
- /*Set flag to identify that Local machine is involved*/
- ltdata->tdata = &thread_data_array[threadnum];
- ltdata->transinfo = &transinfo;
- val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
- if (val) {
- perror("Error in pthread create\n");
- return 1;
+ tosend->f.control = TRANS_REQUEST;
+ sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+ tosend->f.mcount = pilecount;
+ tosend->f.numread = pile->numread;
+ tosend->f.nummod = pile->nummod;
+ tosend->f.numcreated = pile->numcreated;
+ tosend->f.sum_bytes = pile->sum_bytes;
+ tosend->listmid = listmid;
+ tosend->objread = pile->objread;
+ tosend->oidmod = pile->oidmod;
+ tosend->oidcreated = pile->oidcreated;
+ thread_data_array[threadnum].thread_id = threadnum;
+ thread_data_array[threadnum].mid = pile->mid;
+ thread_data_array[threadnum].pilecount = pilecount;
+ thread_data_array[threadnum].buffer = tosend;
+ thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+ thread_data_array[threadnum].threshold = &tcond;
+ thread_data_array[threadnum].lock = &tlock;
+ thread_data_array[threadnum].count = &trecvcount;
+ thread_data_array[threadnum].replyctrl = &treplyctrl;
+ thread_data_array[threadnum].replyretry = &treplyretry;
+ thread_data_array[threadnum].rec = record;
+ /* If local do not create any extra connection */
+ if(pile->mid != myIpAddr) { /* Not local */
+ rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
+ if (rc) {
+ perror("Error in pthread create\n");
+ return 1;
+ }
+ } else { /*Local*/
+ ltdata->tdata = &thread_data_array[threadnum];
+ ltdata->transinfo = &transinfo;
+ val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
+ if (val) {
+ perror("Error in pthread create\n");
+ return 1;
+ }
}
+ threadnum++;
+ pile = pile->next;
}
- threadnum++;
- pile = pile->next;
- }
- /* Free attribute and wait for the other threads */
- pthread_attr_destroy(&attr);
- for (i = 0 ;i < pilecount ; i++) {
- rc = pthread_join(thread[i], NULL);
- if (rc)
- {
- printf("ERROR return code from pthread_join() is %d\n", rc);
- return 1;
+ /* Free attribute and wait for the other threads */
+ pthread_attr_destroy(&attr);
+ for (i = 0 ;i < pilecount ; i++) {
+ rc = pthread_join(thread[i], NULL);
+ if (rc)
+ {
+ printf("ERROR return code from pthread_join() is %d\n", rc);
+ return 1;
+ }
+ free(thread_data_array[i].buffer);
}
- free(thread_data_array[i].buffer);
- }
- /* Free resources */
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- free(listmid);
- pDelete(pile_ptr);
- free(thread_data_array);
- free(ltdata);
+ /* Free resources */
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ free(listmid);
+ pDelete(pile_ptr);
+ free(thread_data_array);
+ free(ltdata);
- /* Retry trans commit procedure if not sucessful in the first try */
- if(treplyretry == 1) {
/* wait a random amount of time */
- randomdelay();
- /* Retry the commiting transaction again */
- transCommit(record);
- }
+ if (treplyretry == 1)
+ randomdelay();
+
+ /* Retry trans commit procedure if not sucessful in the first try */
+ } while (treplyretry == 1);
return 0;
}
written onto the shared array */
switch(control) {
case TRANS_DISAGREE:
- printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
transdisagree++;
break;
case TRANS_AGREE:
- printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
transagree++;
break;
case TRANS_SOFT_ABORT:
- printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
transsoftabort++;
break;
default:
/* Send Abort */
if(transdisagree > 0) {
*(tdata->replyctrl) = TRANS_ABORT;
- printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
/* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
} else if(transagree == tdata->pilecount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
- printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
/* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 1;
- printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
} else {
- printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
return -1;
}
}
switch(control) {
case OBJECT_NOT_FOUND:
- printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
return NULL;
case OBJECT_FOUND:
/* Read object if found into local cache */
short version;
char control = 0, *ptr;
unsigned int oid;
- unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
+ unsigned int *oidnotfound = NULL, *oidlocked = NULL;
void *mobj, *modptr;
objheader_t *headptr, *headeraddr;
local_thread_data_array_t *localtdata;
memcpy(modptr+offset, headeraddr, size);
offset += size;
}
+ /* Write new objects into the mainobject store */
+ for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
+ headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
+ size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+ memcpy(modptr+offset, headeraddr, size);
+ offset += size;
+ }
ptr = modptr;
offset = 0; //Reset
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
}
} else {/* If Obj is not locked then lock object */
STATUS(((objheader_t *)mobj)) |= LOCK;
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
}
}
}
/* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
- printf("DEBUG -> Sending TRANS_AGREE\n");
}
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
//TODO currently the only soft abort case that is supported is when object locked by previous
//transaction => v_matchlock > 0
//The other case for SOFT ABORT i.e. when object is not found but versions match is not supported
return NULL;
}
}else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
- if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
+ if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
}
/* Free memory */
- printf("DEBUG -> Freeing...\n");
- fflush(stdout);
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
objheader_t *tmp_header;
void *header;
- printf("DEBUG -> Recv TRANS_ABORT\n");
/* Set all ref counts as 1 and do garbage collection */
ptr = (char *)modptr;
for(i = 0; i< nummod; i++) {
tmp_header = (objheader_t *)ptr;
- tmp_header->rcount = 1;
+ tmp_header->rcount = 0;
ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
}
/* Unlock objects that was locked due to this transaction */
}
/* Send ack to Coordinator */
- printf("DEBUG-> TRANS_SUCCESSFUL\n");
/*Free the pointer */
ptr = NULL;
/*This function completes the COMMIT process is the transaction is commiting
*/
-int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
objheader_t *header;
int i = 0, offset = 0;
char control;
return 1;
}
/* Change reference count of older address and free space in objstr ?? */
- header->rcount = 1; //TODO Not sure what would be the val
+ header->rcount = 0;
/* Change ptr address in mhash table */
- mhashRemove(oidmod[i]);
+ mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
mhashInsert(oidmod[i], (((char *)modptr) + offset));
offset += sizeof(objheader_t) + classsize[TYPE(header)];
header->version += 1;
}
+ for (i = 0; i < numcreated; i++)
+ {
+ header = (objheader_t *)(((char *)modptr) + offset);
+ mhashInsert(oidcreated[i], (((char *)modptr) + offset));
+ offset += sizeof(objheader_t) + classsize[TYPE(header)];
+
+ lhashInsert(oidcreated[i], myIpAddr);
+ }
+
/* Unlock locked objects */
for(i = 0; i < numlocked; i++) {
if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
//TODO Update location lookup table
/* Send ack to Coordinator */
- printf("DEBUG-> TRANS_SUCESSFUL\n");
return 0;
}