#include "thread.h"
#endif
#include "gCollect.h"
+#include "readstruct.h"
#ifdef RECOVERY
#include <unistd.h>
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
- int n, v;
+ int n, v;
#ifdef DEBUG
printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
/* Receive control messages from other machines */
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ //int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
dupeptr = NULL;
if (ret==0)
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
+ //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
return 0;
/* Read modified objects */
+ //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes);
if(fixed.nummod != 0) {
if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
ptr = (char *) modptr;
+ //printf("fixed.nummod= %d\n", fixed.nummod);
+ //fflush(stdout);
for(i = 0 ; i < fixed.nummod; i++) {
int tmpsize=0;
headaddr = (objheader_t *) ptr;
oid = OID(headaddr);
oidmod[i] = oid;
GETSIZE(tmpsize, headaddr);
+ //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid);
ptr += sizeof(objheader_t) + tmpsize;
}
#ifdef DEBUG
__thread objstr_t *t_cache;
__thread struct ___Object___ *revertlist;
+__thread struct timespec exponential_backoff;
+__thread int count_exponential_backoff;
+__thread const int max_exponential_backoff = 1000; // safety limit
#ifdef ABORTREADERS
__thread int t_abort;
__thread jmp_buf aborttrans;
return;
}
+/* This functions inserts exponential backoff delays in the order of msec
+ * Mostly used when transaction commits retry*/
+void exponentialdelay() {
+ exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2;
+ nanosleep(&exponential_backoff, NULL);
+ ++count_exponential_backoff;
+ if (count_exponential_backoff >= max_exponential_backoff) {
+ printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__);
+ exit(-1);
+ }
+ return;
+}
+
/* This function initializes things required in the transaction start*/
void transStart() {
t_cache = objstrCreate(1048576);
if((mid=lhashSearch(oid)) == 0) {
mid = myIpAddr;
}
-
- pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
if(numLiveHostsInSystem > 1) {
if(makedirty) {
STATUS(headeraddr) = DIRTY;
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
}
- pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
}
#else
// Get machine location for object id (and whether local or not)
}
#endif
+ int treplyretryCount = 0;
+ /* Initialize timeout for exponential delay */
+ exponential_backoff.tv_sec = 0;
+ exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+ count_exponential_backoff = 0;
do {
treplyretry = 0;
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
- //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
- send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ //send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
- //send_data(sd, tosend[sockindex].listmid, size);
- send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
+ send_data(sd, tosend[sockindex].listmid, size);
+ //send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
- //send_data(sd, tosend[sockindex].objread, size);
- send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
+ send_data(sd, tosend[sockindex].objread, size);
+ //send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
}
/* Send objects that are modified */
}
int offset = 0;
int i;
+ //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod);
for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
int size;
objheader_t *headeraddr;
return 1;
}
GETSIZE(size,headeraddr);
+ //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr));
size+=sizeof(objheader_t);
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- //send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
- forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+ //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes);
+ //fflush(stdout);
+ send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
#ifdef RECOVERY
/* send transaction id, number of machine involved, machine ids */
- //send_data(sd, &transID, sizeof(unsigned int));
- forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
+ send_data(sd, &transID, sizeof(unsigned int));
+ //forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
#endif
free(modptr);
} else { //handle request locally
pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- randomdelay();
+ treplyretryCount++;
+ //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+ // exponentialdelay();
+ //else
+ randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
#endif
}
-
-
} while (treplyretry && deadmid != -1);
if(finalResponse == TRANS_ABORT) {
//Add to remote requests
machinenum=lhashSearch(oid);
+#ifdef RECOVERY
+ static int flipBit = 0;// Used to distribute requests between primary and backup evenly
+ // either primary or backup machine
+ machinenum = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
+ flipBit ^= 1;
+#ifdef DEBUG
+ printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber));
+#endif
+#endif
insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
tuple:
;
#ifdef RECOVERY
unsigned int mid = lhashSearch(oid);
unsigned int machineID;
- static flipBit = 0;
+ static int flipBit = 0;
machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid));
int sd = getSock2(transReadSockPool, machineID);
#else