/* Global Variables */
extern int classsize[];
pfcstats_t *evalPrefetch;
+extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
-extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */
transInit();
fd=startlistening();
- udpfd = udpInit();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+#ifdef CACHE
+ udpfd = udpInit();
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
+#endif
if (master) {
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
* prefetch requests */
void transInit() {
- int t, rc;
- int retval;
//Create and initialize prefetch cache structure
+#ifdef CACHE
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
initializePCache();
if((evalPrefetch = initPrefetchStats()) == NULL) {
printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
exit(0);
}
+#endif
/* Initialize attributes for mutex */
pthread_mutexattr_init(&prefetchcache_mutex_attr);
pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
-
pthread_mutex_init(¬ifymutex, NULL);
pthread_mutex_init(&atomicObjLock, NULL);
+#ifdef CACHE
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
printf("ERROR\n");
mcpileqInit();
//Create the primary prefetch thread
+ int retval;
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
pthread_detach(tPrefetch);
+#endif
}
/* This function stops the threads spawned */
void transExit() {
+#ifdef CACHE
int t;
pthread_cancel(tPrefetch);
for(t = 0; t < NUM_THREADS; t++)
pthread_cancel(wthreads[t]);
+#endif
return;
}
#else
return objcopy;
#endif
- } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ } else {
+#ifdef CACHE
+ if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
#ifdef TRANSSTATS
- nprehashSearch++;
+ nprehashSearch++;
#endif
-#ifdef CHECKTA
- printf("Prefetch cache read, oid = %x, oidtype =%d\n", oid, TYPE(tmp));
- fflush(stdout);
-#endif
- /* Look up in prefetch cache */
- GETSIZE(size, tmp);
- size+=sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(record->cache, size);
- memcpy(objcopy, tmp, size);
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, OID(tmp), objcopy);
+ /* Look up in prefetch cache */
+ GETSIZE(size, tmp);
+ size+=sizeof(objheader_t);
+ objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ memcpy(objcopy, tmp, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, OID(tmp), objcopy);
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
+#endif
+ }
#endif
- } else {
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
return NULL;
}
objcopy = getRemoteObj(record, machinenumber, oid);
-
+
if(objcopy == NULL) {
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
#ifdef TRANSSTATS
- nRemoteSend++;
-#endif
-#ifdef CHECKTA
- printf("Remote read, oid = %x, oidtype =%d\n", oid, TYPE(objcopy));
- fflush(stdout);
+ nRemoteSend++;
#endif
STATUS(objcopy)=0;
#ifdef COMPILER
if(treplyctrl == TRANS_ABORT) {
#ifdef TRANSSTATS
numTransAbort++;
-#endif
-#ifdef CHECKTA
- char a[] = "Aborting";
- TABORT1(a);
#endif
/* Free Resources */
objstrDelete(record->cache);
} else if(treplyctrl == TRANS_COMMIT) {
#ifdef TRANSSTATS
numTransCommit++;
-#endif
-#ifdef CHECKTA
- char a[] = "Commiting";
- TABORT1(a);
#endif
/* Free Resources */
objstrDelete(record->cache);
/* Read control message from Participant */
recv_data(sd, &control, sizeof(char));
/* Recv Objects if participant sends TRANS_DISAGREE */
+#ifdef CACHE
if(control == TRANS_DISAGREE) {
int length;
recv_data(sd, &length, sizeof(int));
objheader_t * header;
header = (objheader_t *) (((char *)newAddr) + offset);
oidToPrefetch = OID(header);
-#ifdef CHECKTA
- printf("Trans disagree for oid = %x: ", OID(header));
- char a[] = "object type";
- TABORT8(__func__, a, TYPE(header));
-#endif
int size = 0;
GETSIZE(size, header);
size += sizeof(objheader_t);
offset += size;
}
}
+#endif
recvcontrol = control;
/* Update common data structure and increment count */
}
*/
+#ifdef CACHE
if(*(tdata->replyctrl) == TRANS_COMMIT) {
int retval;
/* Update prefetch cache */
}
}
}
+#endif
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
/* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
+#ifdef CACHE
/* clear objects from prefetch cache */
cleanPCache(tdata);
+#endif
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-#ifdef CHECKTA
- printf("Trans disagree for oid = %x: ", OID(mobj));
- char a[] = "object type";
- TABORT8(__func__, a, TYPE(mobj));
-#endif
-
-#ifdef CHECKTA
- //char a[] = "mid";
- //char b[] = "version mismatch";
- //char c[] = "object type";
- //char d[] = "oid";
- //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
-#endif
break;
}
} else {
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-#ifdef CHECKTA
- printf("Trans disagree for oid = %x: ", OID(mobj));
- char a[] = "object type";
- TABORT8(__func__, a, TYPE(mobj));
-#endif
-#ifdef CHECKTA
- //char a[] = "mid";
- //char b[] = "version mismatch";
- //char c[] = "object type";
- //char d[] = "oid";
- //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
-#endif
break;
}
}
}
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
-#ifdef CHECKTA
- //char a[] = "mid";
- //char b[] = "version mismatch";
- //char c[] = "object type";
- //TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
- printf("%s() Soft abort\n", __func__);
-#endif
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
}
pthread_exit(NULL);
}
} else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+#ifdef CACHE
/* Invalidate objects in other machine cache */
if(localtdata->tdata->buffer->f.nummod > 0) {
int retval;
return;
}
}
+#endif
if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
fflush(stdout);
short fieldoffset[] ={};
if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
+#ifdef CACHE
if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+#endif
unsigned int mid = lhashSearch(oid);
int sd = getSock2(transReadSockPool, mid);
char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
/* Read object if found into local cache */
int size;
recv_data(sd, &size, sizeof(int));
+#ifdef CACHE
pthread_mutex_lock(&prefetchcache_mutex);
if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
pthread_mutex_unlock(&prefetchcache_mutex);
recv_data(sd, objheader, size);
prehashInsert(oid, objheader);
+ return TYPE(objheader);
+#else
+ char *buffer;
+ if((buffer = calloc(1, size)) == NULL) {
+ printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
+ fflush(stdout);
+ return 0;
+ }
+ recv_data(sd, buffer, size);
+ objheader = (objheader_t *)buffer;
+ unsigned short type = TYPE(objheader);
+ free(buffer);
+ return type;
+#endif
}
+#ifdef CACHE
}
+#endif
}
return TYPE(objheader);
}
printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
return;
} else {
+#ifdef CACHE
/* Clear from prefetch cache and free thread related data structure */
if((ptr = prehashSearch(oid)) != NULL) {
prehashRemove(oid);
}
+#endif
pthread_cond_signal(&(ndata->threadcond));
}
}