#include "dstm.h"
#include "ip.h"
-#include "clookup.h"
#include "machinepile.h"
#include "mlookup.h"
#include "llookup.h"
#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
+#include "gCollect.h"
#ifdef COMPILER
#include "thread.h"
#endif
/* Global Variables */
extern int classsize[];
+pfcstats_t *evalPrefetch;
objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
pthread_mutex_t notifymutex;
pthread_mutex_t atomicObjLock;
/***********************************
* Global Variables for statistics
**********************************/
-extern int numTransCommit;
-extern int numTransAbort;
+int numTransCommit = 0;
+int numTransAbort = 0;
+int nchashSearch = 0;
+int nmhashSearch = 0;
+int nprehashSearch = 0;
+int nRemoteSend = 0;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
if (numbytes == -1) {
perror("send");
- exit(-1);
+ return;
}
buffer += numbytes;
size -= numbytes;
numbytes = recv(fd, buffer, size, 0);
if (numbytes == -1) {
perror("recv");
- exit(-1);
+ return;
}
buffer += numbytes;
size -= numbytes;
if (numbytes==0)
return 0;
if (numbytes == -1) {
+ perror("recv");
return -1;
}
buffer += numbytes;
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
-void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
+void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
- int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
- char * node= getmemory(qnodesize);
- /* Set queue node values */
+ int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
+ char * node= getmemory(qnodesize);
int top=endoffsets[ntuples-1];
+
+ if (node==NULL)
+ return;
+ /* Set queue node values */
- *((int *)(node))=ntuples;
- len = sizeof(int);
+ /* TODO: Remove this after testing */
+ evalPrefetch[siteid].callcount++;
+
+ *((int *)(node))=siteid;
+ *((int *)(node + sizeof(int))) = ntuples;
+ len = 2*sizeof(int);
memcpy(node+len, oids, ntuples*sizeof(unsigned int));
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
//Initialize socket pool
transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+ transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
dstmInit();
transInit();
fd=startlistening();
udpfd = udpInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
int retval;
//Create and initialize prefetch cache structure
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+ initializePCache();
+ if((evalPrefetch = initPrefetchStats()) == NULL) {
+ printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
+ exit(0);
+ }
/* Initialize attributes for mutex */
pthread_mutexattr_init(&prefetchcache_mutex_attr);
return NULL;
}
tmp->cache = objstrCreate(1048576);
- tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+ tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
#ifdef COMPILER
tmp->revertlist=NULL;
#endif
unsigned int machinenumber;
objheader_t *tmp, *objheader;
objheader_t *objcopy;
- int size, found = 0;
+ int size;
void *buf;
if(oid == 0) {
return NULL;
}
- if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+ if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
+#ifdef TRANSSTATS
+ nchashSearch++;
+#endif
/* Search local transaction cache */
#ifdef COMPILER
return &objheader[1];
return objheader;
#endif
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+ nmhashSearch++;
+#endif
/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
return objcopy;
#endif
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+ nprehashSearch++;
+#endif
+#ifdef CHECKTA
+ printf("Prefetch cache read, oid = %x, oidtype =%d\n", oid, TYPE(tmp));
+ fflush(stdout);
+#endif
/* Look up in prefetch cache */
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
+#ifdef TRANSSTATS
+ nRemoteSend++;
+#endif
+#ifdef CHECKTA
+ printf("Remote read, oid = %x, oidtype =%d\n", oid, TYPE(objcopy));
+ fflush(stdout);
+#endif
STATUS(objcopy)=0;
#ifdef COMPILER
return &objcopy[1];
if(curr->key == 0)
break;
- if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+ if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
return NULL;
}
pthread_mutex_t tlshrd;
thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-
ltdata = calloc(1, sizeof(local_thread_data_array_t));
thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
if(treplyctrl == TRANS_ABORT) {
#ifdef TRANSSTATS
- ++numTransAbort;
+ numTransAbort++;
+#endif
+#ifdef CHECKTA
+ char a[] = "Aborting";
+ TABORT1(a);
#endif
/* Free Resources */
objstrDelete(record->cache);
return TRANS_ABORT;
} else if(treplyctrl == TRANS_COMMIT) {
#ifdef TRANSSTATS
- ++numTransCommit;
+ numTransCommit++;
+#endif
+#ifdef CHECKTA
+ char a[] = "Commiting";
+ TABORT1(a);
#endif
/* Free Resources */
objstrDelete(record->cache);
objheader_t *headeraddr;
char control, recvcontrol;
char machineip[16], retval;
-
+
tdata = (thread_data_array_t *) threadarg;
-
- /* Send Trans Request */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket for TRANS_REQUEST\n");
- pthread_exit(NULL);
- }
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
- /* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- perror("Error in connect for TRANS_REQUEST\n");
- close(sd);
+
+ if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
+ printf("transRequest(): socket create error\n");
pthread_exit(NULL);
}
-
+
/* Send bytes of data with TRANS_REQUEST control message */
send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-
+
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
/* Send objects that are modified */
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
int size;
- headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
+ printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
send_data(sd, headeraddr, size);
recv_data(sd, &length, sizeof(int));
void *newAddr;
pthread_mutex_lock(&prefetchcache_mutex);
- if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+ if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- close(sd);
pthread_exit(NULL);
}
pthread_mutex_unlock(&prefetchcache_mutex);
objheader_t * header;
header = (objheader_t *) (((char *)newAddr) + offset);
oidToPrefetch = OID(header);
+#ifdef CHECKTA
+ printf("Trans disagree for oid = %x: ", OID(header));
+ char a[] = "object type";
+ TABORT8(__func__, a, TYPE(header));
+#endif
int size = 0;
GETSIZE(size, header);
size += sizeof(objheader_t);
offset += size;
}
}
+
recvcontrol = control;
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
pthread_cond_wait(tdata->threshold, tdata->lock);
}
pthread_mutex_unlock(tdata->lock);
+
+ /* clear objects from prefetch cache */
+ /*
+ if(*(tdata->replyctrl) == TRANS_ABORT) {
+ int i;
+ for(i=0; i<tdata->buffer->f.nummod; i++) {
+ unsigned int oid = tdata->buffer->oidmod[i];
+ objheader_t *header;
+ if((header = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+ }
+ for(i=0; i<tdata->buffer->f.numread; i++) {
+ char *objread = tdata->buffer->objread;
+ unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
+ sizeof(unsigned short))*i));
+ objheader_t *header;
+ if((header = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+ }
+ }
+ */
+
+ if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(tdata)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+
+ /* Invalidate objects in other machine cache */
+ if(tdata->buffer->f.nummod > 0) {
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ }
+ }
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
- close(sd);
pthread_exit(NULL);
}
} else {
//printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
}
-
- /* Close connection */
- close(sd);
pthread_exit(NULL);
}
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
/* clear objects from prefetch cache */
- for (i = 0; i < tdata->buffer->f.numread; i++) {
- prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
- }
- for (i = 0; i < tdata->buffer->f.nummod; i++) {
- prehashRemove(tdata->buffer->oidmod[i]);
- }
+ cleanPCache(tdata);
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
- /* update prefetch cache */
- /* For objects read */
- char oidType;
- int retval;
- oidType = 'R';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- oidType = 'M';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- /* Invalidate objects in other machine cache */
- if(tdata->buffer->f.nummod > 0) {
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
return;
}
-/* This function updates the prefetch cache when commiting objects
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
- int i;
- for (i = 0; i < numoid; i++) {
- //find address object
- objheader_t *header, *newAddr;
- int size;
- unsigned int oid;
- if(oidType == 'R') {
- oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
- } else {
- oid = tdata->buffer->oidmod[i];
- }
- pthread_mutex_lock(&prefetchcache_mutex);
- header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
- //copy object into prefetch cache
- GETSIZE(size, header);
- if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
- printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(newAddr, header, (size + sizeof(objheader_t)));
- //make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, newAddr);
- } else {
- prehashInsert(oid, newAddr);
- }
- }
- return 0;
-}
-
-
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function
unsigned short version;
void *mobj;
objheader_t *headptr;
-
+
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
int tmpsize;
headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
- return NULL;
+ printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+ return NULL;
}
oid = OID(headptr);
version = headptr->version;
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+ printf("Trans disagree for oid = %x: ", OID(mobj));
+ char a[] = "object type";
+ TABORT8(__func__, a, TYPE(mobj));
+#endif
+
+#ifdef CHECKTA
+ //char a[] = "mid";
+ //char b[] = "version mismatch";
+ //char c[] = "object type";
+ //char d[] = "oid";
+ //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
+#endif
+ break;
}
} else {
//we're locked
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+ printf("Trans disagree for oid = %x: ", OID(mobj));
+ char a[] = "object type";
+ TABORT8(__func__, a, TYPE(mobj));
+#endif
+#ifdef CHECKTA
+ //char a[] = "mid";
+ //char b[] = "version mismatch";
+ //char c[] = "object type";
+ //char d[] = "oid";
+ //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
+#endif
+ break;
}
}
}
}
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+#ifdef CHECKTA
+ //char a[] = "mid";
+ //char b[] = "version mismatch";
+ //char c[] = "object type";
+ //TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
+ printf("%s() Soft abort\n", __func__);
+#endif
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
}
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
} else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+ /* Invalidate objects in other machine cache */
+ if(localtdata->tdata->buffer->f.nummod > 0) {
+ int retval;
+ if((retval = invalidateObj(localtdata->tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ }
if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
}
if (localtdata->transinfo->objnotfound != NULL) {
free(localtdata->transinfo->objnotfound);
}
-
pthread_exit(NULL);
}
return 0;
}
-/*This function completes the COMMIT process is the transaction is commiting*/
+/*This function completes the COMMIT process if the transaction is commiting*/
int transComProcess(local_thread_data_array_t *localtdata) {
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
oidcreated = localtdata->tdata->buffer->oidcreated;
numlocked = localtdata->transinfo->numlocked;
oidlocked = localtdata->transinfo->objlocked;
-
+
for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
}
prefetchpile_t *foundLocal(char *ptr) {
+ int siteid = *(GET_SITEID(ptr));
int ntuples = *(GET_NTUPLES(ptr));
unsigned int * oidarray = GET_PTR_OID(ptr);
unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
prefetchpile_t * head=NULL;
+ int numLocal = 0;
int i;
for(i=0;i<ntuples; i++) {
goto tuple;
}
//Entire prefetch is local
- if (newbase==endindex&&checkoid(oid))
+ if (newbase==endindex&&checkoid(oid)){
+ numLocal++;
goto tuple;
+ }
//Add to remote requests
machinenum=lhashSearch(oid);
insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
tuple:
;
}
+
+ /* handle dynamic prefetching */
+ handleDynPrefetching(numLocal, ntuples, siteid);
return head;
}
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
- //printf("oid %d found\n",oid);
size = size - (sizeof(char) + sizeof(unsigned int));
pthread_mutex_lock(&prefetchcache_mutex);
- if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+ if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&prefetchcache_mutex);
return -1;
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
// exit(-1);
} else {
printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
objheader_t *objheader;
unsigned short numoffset[] ={0};
short fieldoffset[] ={};
-
+
if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
- if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- prefetch(1, &oid, numoffset, fieldoffset);
- pthread_mutex_lock(&pflookup.lock);
- while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- pthread_cond_wait(&pflookup.cond, &pflookup.lock);
- }
- pthread_mutex_unlock(&pflookup.lock);
+ if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+ unsigned int mid = lhashSearch(oid);
+ int sd = getSock2(transReadSockPool, mid);
+ char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+ remotereadrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&remotereadrequest[1])) = oid;
+ send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+ /* Read response from the Participant */
+ char control;
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+ fflush(stdout);
+ exit(-1);
+ } else {
+ /* Read object if found into local cache */
+ int size;
+ recv_data(sd, &size, sizeof(int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, objheader, size);
+ prehashInsert(oid, objheader);
}
+ }
}
-
return TYPE(objheader);
}
return;
} else {
if(version <= ndata->versionarry[index]){
- printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
+ printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
return;
} else {
/* Clear from prefetch cache and free thread related data structure */
if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("notifyAll():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
fflush(stdout);
+ status = -1;
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
chashDelete(trans->lookupTable);
free(trans);
}
+
+/* This function inserts necessary information into
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+ plistnode_t *ptr, *tmp;
+ int found = 0, offset = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is already present in the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ }else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+ tmp->numread ++;
+ }
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ offset = sizeof(unsigned int);
+ *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+ ptr->numread ++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ /* Clear Flags */
+ STATUS(headeraddr) =0;
+
+ return pile;
+}