fd=startlistening();
udpfd = udpInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
thread_data_array[threadnum].replyctrl = &treplyctrl;
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
- thread_data_array[threadnum].pilehead = pile_ptr;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
do {
}
pthread_mutex_unlock(tdata->lock);
- /* Invalidate objects in other machine cache */
if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(tdata)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+
+ /* Invalidate objects in other machine cache */
if(tdata->buffer->f.nummod > 0) {
if((retval = invalidateObj(tdata)) != 0) {
printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
}
}
-
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
} else {
//printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
}
-
pthread_exit(NULL);
}
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(tdata)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
unsigned short version;
void *mobj;
objheader_t *headptr;
-
+
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
} else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
}
if (localtdata->transinfo->objnotfound != NULL) {
free(localtdata->transinfo->objnotfound);
}
-
pthread_exit(NULL);
}
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
// exit(-1);
} else {
printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("notifyAll():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
fflush(stdout);
+ status = -1;
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;