numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
if (numbytes == -1) {
perror("send");
- exit(-1);
+ return;
}
buffer += numbytes;
size -= numbytes;
numbytes = recv(fd, buffer, size, 0);
if (numbytes == -1) {
perror("recv");
- exit(-1);
+ return;
}
buffer += numbytes;
size -= numbytes;
void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
- char * node= getmemory(qnodesize);
- /* Set queue node values */
int len;
+ char * node= getmemory(qnodesize);
int top=endoffsets[ntuples-1];
+
+ if (node==NULL)
+ return;
+ /* Set queue node values */
/* TODO: Remove this after testing */
evalPrefetch[siteid].callcount++;
fd=startlistening();
udpfd = udpInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
thread_data_array[threadnum].replyctrl = &treplyctrl;
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
- thread_data_array[threadnum].pilehead = pile_ptr;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
do {
/* Send objects that are modified */
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
int size;
- headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
+ printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
send_data(sd, headeraddr, size);
}
pthread_mutex_unlock(tdata->lock);
- /* Invalidate objects in other machine cache */
if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(tdata)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+
+ /* Invalidate objects in other machine cache */
if(tdata->buffer->f.nummod > 0) {
if((retval = invalidateObj(tdata)) != 0) {
printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
}
}
-
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
} else {
//printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
}
-
pthread_exit(NULL);
}
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(tdata)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
unsigned short version;
void *mobj;
objheader_t *headptr;
-
+
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
int tmpsize;
headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
- return NULL;
+ printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+ return NULL;
}
oid = OID(headptr);
version = headptr->version;
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
} else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
}
if (localtdata->transinfo->objnotfound != NULL) {
free(localtdata->transinfo->objnotfound);
}
-
pthread_exit(NULL);
}
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
// exit(-1);
} else {
printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
objheader_t *objheader;
unsigned short numoffset[] ={0};
short fieldoffset[] ={};
-
+
if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
- if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- prefetch(0, 1, &oid, numoffset, fieldoffset);
- pthread_mutex_lock(&pflookup.lock);
- while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- pthread_cond_wait(&pflookup.cond, &pflookup.lock);
- }
- pthread_mutex_unlock(&pflookup.lock);
+ if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+ unsigned int mid = lhashSearch(oid);
+ int sd = getSock2(transReadSockPool, mid);
+ char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+ remotereadrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&remotereadrequest[1])) = oid;
+ send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+ /* Read response from the Participant */
+ char control;
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+ fflush(stdout);
+ exit(-1);
+ } else {
+ /* Read object if found into local cache */
+ int size;
+ recv_data(sd, &size, sizeof(int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, objheader, size);
+ prehashInsert(oid, objheader);
}
+ }
}
-
return TYPE(objheader);
}
#else
myIpAddr = getMyIpAddr("eth0");
#endif
-
-#ifdef CHECKTA
- printf("My ip address = %x", myIpAddr);
-#endif
myIndexInHostArray = findHost(myIpAddr);
if (myIndexInHostArray == -1)
{
return;
} else {
if(version <= ndata->versionarry[index]){
- printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
+ printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
return;
} else {
/* Clear from prefetch cache and free thread related data structure */
if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("notifyAll():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
fflush(stdout);
+ status = -1;
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;