/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+ if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) {
+ printf("DEBUG: %s() mlookup objcopy= %x\n", __func__, objcopy);
+ exit(-1);
+ }
memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
STATUS(objcopy)=0;
/* Look up in prefetch cache */
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+ if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) {
+ printf("DEBUG: %s() prefetch cache objcopy= %x\n", __func__, objcopy);
+ exit(-1);
+ }
memcpy(objcopy, tmp, size);
/* Insert into cache's lookup table */
t_chashInsert(OID(tmp), objcopy);
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+ //printf("%s() DEBUG: type=%d\n",__func__, TYPE(headerObj));
//make an entry in prefetch lookup hashtable
prehashInsert(oid, headerObj);
LOGEVENT('B');
chashlistnode_t * ptr = c_table;
/* Represents number of bins in the chash table */
unsigned int size = c_size;
+#ifdef RECOVERY
+ int phostindex[numHostsInSystem];
+ int k;
+ for(k=0; k<numHostsInSystem; k++)
+ phostindex[k] = -1;
+ int hostIndex = 0;
+#endif
for(i = 0; i < size ; i++) {
chashlistnode_t * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
headeraddr=(objheader_t *) curr->val;
#ifdef RECOVERY
+
oid = OID(headeraddr);
int makedirty = 0;
mid = myIpAddr;
}
- //if(mid == myIpAddr) {
- pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
- //} else {
- // if(bit)
- // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
- // else
- // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
- //}
+ int selectMid=0;
+ if(mid == myIpAddr) {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ if(!checkIndex(findHost(myIpAddr), phostindex)) {
+ phostindex[hostIndex++] = findHost(myIpAddr);
+ }
+ } else {
+ int pindex = findHost(mid);//primary copy's index
+ int bindex = (findHost(mid) + 1) % numHostsInSystem;//backup copy's index
+ if(checkIndex(pindex, phostindex)) {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ selectMid = 1;
+ } else if(checkIndex(bindex, phostindex)) {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ } else {
+ //check if any indexes present in the phostindex arry is odd or even
+ int chktype;
+ if((chktype = typeIndex(phostindex)) != -1) {
+ if(chktype == 1) { //odd indexed machines
+ //pick up either backup or primary copy based on the type of previous indexes
+ if((pindex%2) == 0) {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ phostindex[hostIndex++] = bindex;
+ } else {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ selectMid = 1;
+ phostindex[hostIndex++] = pindex;
+ }
+ } else { //even indexed machines
+ if((pindex%2) == 0) {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ selectMid = 1;
+ phostindex[hostIndex++] = pindex;
+ } else {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ phostindex[hostIndex++] = bindex;
+ }
+ }
+ } else {
+ if(((myIpAddr%2 == 0) && ((mid%2) == 0)) || ((myIpAddr%2 == 0) && ((mid%2) == 0))) {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ selectMid = 1;
+ phostindex[hostIndex++] = pindex;
+ } else {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ phostindex[hostIndex++] = bindex;
+ }
+ }
+ }
+ }
if(numLiveHostsInSystem > 1) {
if(makedirty) {
STATUS(headeraddr) = DIRTY;
- //if(mid == myIpAddr) {
- // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
- //} else {
- // if(bit)
- pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
- // else
- // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
- // }
+ if(mid == myIpAddr) {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ } else {
+ if(selectMid) {
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ } else {
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ }
+ }
}
- //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
}
#else
// Get machine location for object id (and whether local or not)
}
#endif
+/**
+ * This function return 0 if indexes present are even
+ * returns 1 if indexes present are odd
+ * return -1 if indexes present are -1
+**/
+int typeIndex(int *phostindex) {
+ if(phostindex[0] == -1)
+ return -1;
+ if((phostindex[0]%2) == 0)
+ return 0;
+ else
+ return 1;
+}
+
+/**
+ * This function returns 1 is pindex is found
+ * in the phostindex array else
+ * returns 0
+ **/
+int checkIndex(int pindex, int *phostindex) {
+ int i;
+ for(i=0; i<numHostsInSystem; i++) {
+ if(phostindex[i] == pindex)
+ return 1;
+ }
+ return 0;
+}
+
/* This function initiates the transaction commit process
* Spawns threads for each of the new connections with Participants
* and creates new piles by calling the createPiles(),
int localReqsock = -1;
trans_req_data_t *tosend;
tosend = calloc(pilecount, sizeof(trans_req_data_t));
+
while(pile != NULL) {
#ifdef DEBUG
printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid));
tosend[sockindex].oidcreated = pile->oidcreated;
- int sd = 0;
+ int sd = 0;
if(pile->mid != myIpAddr) {
if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
printf("\ntransRequest(): socket create error\n");
offset+=size;
}
send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
- //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+ //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
free(modptr);
} else { //handle request locally
sockindex++;
pile = pile->next;
} //end of pile processing
-
+
/* Recv Ctrl msgs from all machines */
#ifdef DEBUG
printf("%s-> Finished sending transaction read/mod objects\n",__func__);
#endif
}
}
-
- /* Decide the final response */
+ /* Decide the final response */
if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
return 1;
}
-
+
#ifdef ABORTREADERS
removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
}
#endif
}
-
-
+
for(i = 0; i< pilecount; i++) {
if(socklist[i] > 0) {
freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- randomdelay();
+ //treplyretryCount++;
+ //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+ // exponentialdelay();
+ //else
+ randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
#endif
} while (treplyretry && deadmid != -1);
#ifdef RECOVERY
-
//=========== after transaction point
tlist_node_t* tNode = tlistSearch(transList,transID);
inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //A lock is acquired some place else
*getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //Has reached max number of readers or some other transaction
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
header->version += 1;
if(header->notifylist != NULL) {
#ifdef RECOVERY
- printf("%s -> to notifyAll\n",__func__);
+ //printf("%s -> to notifyAll\n",__func__);
if(header->isBackup == 0) { // if it is primary obj, notify
- printf("%s -> Called notifyAll\n",__func__);
+ //printf("%s -> Called notifyAll\n",__func__);
notifyAll(&header->notifylist, OID(header), header->version);
}
else // if not, just clear the notification list