#include "addPrefetchEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
extern int numprefetchsites; // Number of prefetch sites
extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
* we take action accordingly */
void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
if(numLocal < ntuples) {
- /* prefetch not found locally(miss in cache) */
+ /* prefetch not found locally(miss in cache); turn on prefetching*/
evalPrefetch[siteid].operMode = 1;
evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
} else {
+ //Turn off prefetch site
if(getOperationMode(siteid) != 0) {
evalPrefetch[siteid].uselesscount--;
if(evalPrefetch[siteid].uselesscount <= 0) {
int updatePrefetchCache(trans_req_data_t *tdata) {
int retval;
char oidType;
- /* TODO commit it for now because objects read
- * are already copied to cache during remote reading */
- //oidType = 'R';
- //if(tdata->f.numread > 0) {
- // if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
- // printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
- // return -1;
- // }
- //}
+ /*//TODO comment it for now because remote objects read are already in the prefetch cache
+ oidType = 'R';
+ if(tdata->f.numread > 0) {
+ if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
+ printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ }
+ */
if(tdata->f.nummod > 0) {
oidType = 'M';
if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) {
int i;
for (i = 0; i < numoid; i++) {
unsigned int oid;
- if(oidType == 'R') {
- char * objread = (char *) oidarray;
- oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
- sizeof(unsigned short))*i));
- } else {
+ //if(oidType == 'R') {
+ // char * objread = (char *) oidarray;
+ // oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
+ // sizeof(unsigned short))*i));
+ //} else {
oid = oidarray[i];
- }
+ //}
pthread_mutex_lock(&prefetchcache_mutex);
objheader_t * header;
if((header = (objheader_t *) t_chashSearch(oid)) == NULL) {
newAddr->version += 1;
newAddr->notifylist = NULL;
}
+ STATUS(newAddr)=0;
+
//make an entry in prefetch lookup hashtable
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, newAddr);
- } else {
- prehashInsert(oid, newAddr);
- }
+ prehashInsert(oid, newAddr);
} //end of for
return 0;
}
#define _ADDPREFETCHENHANCE_H_
#include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "gCollect.h"
typedef struct prefetchCountStats {
#include <math.h>
#include <netinet/tcp.h>
#include "addUdpEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#ifdef ABORTREADERS
#include "abortreaders.h"
#endif
return 0;
}
-#if 0
-int invalidateObj(trans_req_data_t *tdata) {
- struct sockaddr_in clientaddr;
- int retval;
-
- bzero(&clientaddr, sizeof(clientaddr));
- clientaddr.sin_family = AF_INET;
- clientaddr.sin_port = htons(UDP_PORT);
- clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
- int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
- /* send single udp msg */
- if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
- printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- return 0;
-}
-
-#endif
-
/* Function sends a udp broadcast, also distinguishes
* msg size to be sent based on the total number of objects modified
* returns -1 on error and 0 on success */
return 0;
}
-#if 0
-
-/* Function sends a udp broadcast, also distinguishes
- * msg size to be sent based on the iteration flag
- * returns -1 on error and 0 on success */
-int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
- char writeBuffer[MAX_SIZE];
- int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
- int offset = 0;
- *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
- offset += sizeof(short);
- *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
- offset += sizeof(unsigned int);
- if(iteration == 0) { // iteration flag == zero, send single udp msg
- *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod)); //sizeof msg
- offset += sizeof(short);
- int i;
- for(i = 0; i < tdata->f.nummod; i++) {
- *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i]; //copy objects
- offset += sizeof(unsigned int);
- }
- } else { // iteration flag > zero, send multiple udp msg
- int numObj;
- if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0)
- numObj = maxObjsPerMsg;
- else
- numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg);
- *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
- offset += sizeof(short);
- int index = (iteration - 1) * maxObjsPerMsg;
- int i;
- for(i = 0; i < numObj; i++) {
- *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i];
- offset += sizeof(unsigned int);
- }
- }
- int n;
- if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
- perror("sendto error- ");
- printf("DEBUG-> sendto error: errorno %d\n", errno);
- return -1;
- }
- return 0;
-}
-#endif
-
/* Function searches given oid in prefetch cache and invalidates obj from cache
* returns -1 on error and 0 on success */
int invalidateFromPrefetchCache(char *buffer) {
#define _ADDUDPENHANCE_H
#include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
/*******************************
int createUdpSocket();
int udpInit();
void *udpListenBroadcast(void *);
-//int invalidateObj(trans_req_data_t *);
int invalidateObj(trans_req_data_t *, int, char, int*);
int invalidateFromPrefetchCache(char *);
-//int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*);
#endif
--- /dev/null
+#include "altprelookup.h"
+#include "dsmlock.h"
+#include "gCollect.h"
+extern objstr_t *prefetchcache;
+extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
+extern prefetchNodeInfo_t pNodeInfo;
+
+prehashtable_t pflookup; //Global prefetch cache table
+
+unsigned int prehashCreate(unsigned int size, float loadfactor) {
+ prehashlistnode_t *nodes;
+ int i;
+
+ // Allocate space for the hash table
+ if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ pflookup.table = nodes;
+ pflookup.size = size;
+ pflookup.mask = size -1;
+ pflookup.numelements = 0; // Initial number of elements in the hash
+ pflookup.loadfactor = loadfactor;
+ pflookup.threshold=loadfactor*size;
+
+ //Initilize
+ for(i=0;i<PRENUMLOCKS;i++){
+ pflookup.larray[i].lock=RW_LOCK_BIAS;
+ }
+
+ /*
+ //Intiliaze and set prefetch table mutex attribute
+ pthread_mutexattr_init(&pflookup.prefetchmutexattr);
+ //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
+ //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
+ pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
+
+ //Initialize mutex var
+ pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
+ //pthread_mutex_init(&pflookup.lock, NULL);
+ pthread_cond_init(&pflookup.cond, NULL);
+ */
+
+ return 0;
+}
+
+//Assign keys to bins inside hash table
+unsigned int prehashFunction(unsigned int key) {
+ return ( key & pflookup.mask) >> 1;
+}
+
+//Store oids and their pointers into hash
+void prehashInsert(unsigned int key, void *val) {
+
+ int isFound=0;
+ prehashlistnode_t *ptr, *tmp, *node;
+
+ if(pflookup.numelements > (pflookup.threshold)) {
+ //Resize
+ unsigned int newsize = pflookup.size << 1;
+ prehashResize(newsize);
+ }
+
+ unsigned int keyindex=key>>1;
+ volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+ while(!write_trylock(lockptr)) {
+ sched_yield();
+ }
+
+ ptr = &pflookup.table[keyindex&pflookup.mask];
+
+ if(ptr->key==0) { //Insert at the first bin of the table
+ ptr->key = key;
+ ptr->val = val;
+ atomic_inc(&pflookup.numelements);
+ } else {
+ tmp = ptr;
+ while(tmp != NULL) {
+ if(tmp->key == key) {
+ isFound=1;
+ tmp->val = val;//Replace value for an exsisting key
+ write_unlock(lockptr);
+ return;
+ }
+ tmp=tmp->next;
+ }
+ if(!isFound) { //Insert new key and value into the chain of linked list for the given bin
+ node = calloc(1, sizeof(prehashlistnode_t));
+ node->key = key;
+ node->val = val ;
+ node->next = ptr->next;
+ ptr->next=node;
+ atomic_inc(&pflookup.numelements);
+ }
+ }
+ write_unlock(lockptr);
+ return;
+}
+
+// Search for an address for a given oid
+void *prehashSearch(unsigned int key) {
+ int index;
+
+ unsigned int keyindex=key>>1;
+ volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+ while(!read_trylock(lockptr)) {
+ sched_yield();
+ }
+ prehashlistnode_t *node = &pflookup.table[keyindex&pflookup.mask];
+
+ do {
+ if(node->key == key) {
+ void * tmp=node->val;
+ read_unlock(lockptr);
+ return tmp;
+ }
+ node = node->next;
+ } while (node!=NULL);
+ read_unlock(lockptr);
+ return NULL;
+}
+
+unsigned int prehashRemove(unsigned int key) {
+ unsigned int keyindex = key >> 1;
+ volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+ prehashlistnode_t *node, *prev;
+
+ while(!write_trylock(lockptr)) {
+ sched_yield();
+ }
+ prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask];
+ // If there are no elements
+ //delete from first bin of table
+ if (curr->next == NULL && curr->key == key) {
+ curr->key = 0;
+ //TODO free(val) ?
+ curr->val = NULL;
+ atomic_dec(&(pflookup.numelements));
+ write_unlock(lockptr);
+ return 0;
+ }
+ //delete from first bin of table but elements follow in linked list
+ if (curr->next != NULL && curr->key == key) {
+ curr->key = curr->next->key;
+ curr->val = curr->next->val;
+ node = curr->next;
+ curr->next = node->next;
+ free(node);
+ atomic_dec(&(pflookup.numelements));
+ write_unlock(lockptr);
+ return 0;
+ }
+ prev = curr;
+ curr = curr->next;
+ //delete from elements in the linked list
+ for(; curr != NULL; curr = curr->next) {
+ if (curr->key == key) {
+ prev->next = curr->next;
+ free(curr);
+ atomic_dec(&(pflookup.numelements));
+ write_unlock(lockptr);
+ return 0;
+ }
+ prev = curr;
+ }
+ write_unlock(lockptr);
+ return 1;
+}
+
+unsigned int prehashResize(unsigned int newsize) {
+ prehashlistnode_t *node, *ptr; // curr and next keep track of the current and the next chashlistnodes in a linked list
+ unsigned int oldsize;
+ int i,index;
+ unsigned int mask;
+
+ for(i=0;i<PRENUMLOCKS;i++) {
+ volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+
+ while(!write_trylock(lockptr)) {
+ sched_yield();
+ }
+ }
+
+ if (pflookup.numelements < pflookup.threshold) {
+ //release lock and return
+ for(i=0;i<PRENUMLOCKS;i++) {
+ volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+ write_unlock(lockptr);
+ }
+ return;
+ }
+
+ ptr = pflookup.table;
+ oldsize = pflookup.size;
+
+ if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ pflookup.table = node; //Update the global hashtable upon resize()
+ pflookup.size = newsize;
+ pflookup.threshold=newsize*pflookup.loadfactor;
+ mask=pflookup.mask = newsize -1;
+
+ for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table
+ prehashlistnode_t * curr = &ptr[i];
+ prehashlistnode_t *tmp, *next;
+ int isfirst = 1;
+ do {
+ unsigned int key;
+ if ((key=curr->key) == 0) { //Exit inner loop if there the first element for a given bin/index is NULL
+ break; //key = val =0 for element if not present within the hash table
+ }
+ next = curr->next;
+ //index = (key & mask)>>1;
+ index = (key >> 1) & mask;
+ tmp=&pflookup.table[index];
+ // Insert into the new table
+ if(tmp->key==0) {
+ tmp->key=curr->key;
+ tmp->val=curr->val;
+ if (!isfirst)
+ free(curr);
+ } /*
+ NOTE: Add this case if you change this...
+ This case currently never happens because of the way things rehash....
+else if (isfirst) {
+ prehashlistnode_t * newnode = calloc(1, sizeof(prehashlistnode_t));
+ newnode->key = curr->key;
+ newnode->val = curr->val;
+ newnode->next = tmp->next;
+ tmp->next=newnode;
+ } */
+ else {
+ curr->next=tmp->next;
+ tmp->next=curr;
+ }
+
+ isfirst = 0;
+ curr = next;
+ } while(curr!=NULL);
+ }
+
+ free(ptr); //Free the memory of the old hash table
+ for(i=0;i<PRENUMLOCKS;i++) {
+ volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+ write_unlock(lockptr);
+ }
+ return 0;
+}
+
+//Note: This is based on the implementation of the inserting a key in the first position of the hashtable
+void prehashClear() {
+ /*
+#ifdef CACHE
+ int i, isFirstBin;
+ prehashlistnode_t *ptr, *prev, *curr;
+
+ pthread_mutex_lock(&pflookup.lock);
+
+ ptr = pflookup.table;
+ for(i = 0; i < pflookup.size; i++) {
+ prev = &ptr[i];
+ isFirstBin = 1;
+ while(prev->next != NULL) {
+ isFirstBin = 0;
+ curr = prev->next;
+ prev->next = curr->next;
+ free(curr);
+ }
+ if(isFirstBin == 1) {
+ prev->key = 0;
+ prev->next = NULL;
+ }
+ }
+ {
+ int stale;
+ pthread_mutex_unlock(&pflookup.lock);
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if (pNodeInfo.newstale==NULL) {
+ //transfer the list wholesale;
+ pNodeInfo.oldstale=pNodeInfo.oldptr;
+ pNodeInfo.newstale=pNodeInfo.newptr;
+ } else {
+ //merge the two lists
+ pNodeInfo.newstale->prev=pNodeInfo.oldptr;
+ pNodeInfo.newstale=pNodeInfo.newptr;
+ }
+ stale=STALL_THRESHOLD-pNodeInfo.stale_count;
+
+ if (stale>0&&stale>pNodeInfo.stall)
+ pNodeInfo.stall=stale;
+
+ pNodeInfo.stale_count+=pNodeInfo.os_count;
+ pNodeInfo.oldptr=getObjStr(DEFAULT_OBJ_STORE_SIZE);
+ pNodeInfo.newptr=pNodeInfo.oldptr;
+ pNodeInfo.os_count=1;
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ }
+#endif
+ */
+}
+
--- /dev/null
+#ifndef _PRELOOKUP_H_
+#define _PRELOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#define PLOADFACTOR 0.25
+#define PHASH_SIZE 1024
+
+typedef struct prehashlistnode {
+ unsigned int key;
+ void *val; //this can be cast to another type or used to point to a larger structure
+ struct prehashlistnode *next;
+} prehashlistnode_t;
+
+
+struct prelockarray {
+ volatile unsigned int lock;
+ int buf[15];
+};
+
+#define PRENUMLOCKS 16
+#define PRELOCKMASK (PRENUMLOCKS-1)
+
+
+struct objstr;
+
+typedef struct prehashtable {
+ prehashlistnode_t *table; // points to beginning of hash table
+ unsigned int size;
+ unsigned int mask;
+ unsigned int numelements;
+ unsigned int threshold;
+ double loadfactor;
+ struct prelockarray larray[PRENUMLOCKS];
+} prehashtable_t;
+
+/* Prototypes for hash*/
+unsigned int prehashCreate(unsigned int size, float loadfactor);
+unsigned int prehashFunction(unsigned int key);
+void prehashInsert(unsigned int key, void *val);
+void *prehashSearch(unsigned int key); //returns val, NULL if not found
+unsigned int prehashRemove(unsigned int key); //returns -1 if not found
+unsigned int prehashResize(unsigned int newsize);
+void prehashClear();
+/* end hash */
+
+#endif
+
#define RETRYINTERVAL 75 //N (For MatrixMultiply, 2DFFT benchmarks)
#define SHUTDOWNINTERVAL 1 //M
#define NUM_TRY_TO_COMMIT 2
+#define MEM_ALLOC_THRESHOLD 20485760//20MB
#include <stdlib.h>
#include <stdio.h>
char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
-void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
+char getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short);
-void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
+char getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
int *, int *, char *, unsigned int, unsigned short);
+void procRestObjs(char *, char *, int , int, int, unsigned int *, unsigned int *, int *, int *, int *, int *);
+void processVerNoMatch(unsigned int *, unsigned int *, int *, int *, int *, int *, unsigned int, unsigned short);
/* end server portion */
/* Prototypes for transactions */
char control,ctrl, response;
char *ptr;
void *srcObj;
-
#ifdef RECOVERY
void *dupeptr;
unsigned int transIDreceived;
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
- //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
threadNotify(oid,version,threadid);
free(buffer);
break;
+
#ifdef RECOVERY
case CLEAR_NOTIFY_LIST:
#ifdef DEBUG
return 0;
/* Read modified objects */
- //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes);
if(fixed.nummod != 0) {
if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
ptr = (char *) modptr;
- //printf("fixed.nummod= %d\n", fixed.nummod);
- //fflush(stdout);
- for(i = 0 ; i < fixed.nummod; i++) {
+ for(i = 0 ; i < fixed.nummod; i++){
int tmpsize=0;
headaddr = (objheader_t *) ptr;
oid = OID(headaddr);
oidmod[i] = oid;
GETSIZE(tmpsize, headaddr);
- //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid);
ptr += sizeof(objheader_t) + tmpsize;
}
#ifdef DEBUG
* Object store holds the modified objects involved in the transaction request */
ptr = (char *) modptr;
+ char retval;
+
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) { //Objs only read and not modified
#ifdef DEBUG
printf("%s -> oid : %u version : %d\n",__func__,oid,version);
#endif
- getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+ retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
&v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
} else { //Objs modified
if(i == fixed->numread) {
GETSIZE(tmpsize, headptr);
ptr += sizeof(objheader_t) + tmpsize;
- getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+ retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
&objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
&numBytes, &control, oid, version);
}
+ if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+ //unlock objects as soon versions mismatch or else
+ //locks cannot be acquired elsewhere
+ if (objlocked > 0) {
+ int useWriteUnlock = 0;
+ for(j = 0; j < objlocked; j++) {
+ if(oidlocked[j] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(headptr));
+ } else {
+ read_unlock(STATUSPTR(headptr));
+ }
+ }
+ if(v_nomatch > 0)
+ free(oidlocked);
+ }
+ objlocked=0;
+ break;
+ }
+ }
+
+ //go through rest of the objects for version mismatches
+ if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+ i++;
+ procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
}
- /* send TRANS_DISAGREE and objs*/
+ /* send TRANS_DISAGREE and objs that caused the ABORTS*/
if(v_nomatch > 0) {
#ifdef CACHE
char *objs = calloc(1, numBytes);
offset += size;
}
#endif
+ /*
if (objlocked > 0) {
int useWriteUnlock = 0;
for(j = 0; j < objlocked; j++) {
}
free(oidlocked);
}
+ */
-
#ifdef DEBUG
printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
#endif
if(control < 0)
printf("control = %d\n",control);
+ control=TRANS_DISAGREE;
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
send_data(acceptfd, &numBytes, sizeof(int));
send_data(acceptfd, objs, numBytes);
-
transinfo->objvernotmatch = oidvernotmatch;
transinfo->numvernotmatch = objvernotmatch;
free(objs);
}
/* Update Commit info for objects that are modified */
-void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
char *control, unsigned int oid, unsigned short version) {
/* Save the oids not found and number of oids not found for later use */
oidnotfound[*objnotfound] = oid;
(*objnotfound)++;
+ *control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
#endif
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
+ *control = TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
} else { //we are locked
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
+ *control = TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
#ifdef DEBUG
printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
#endif
+ return *control;
}
/* Update Commit info for objects that are read */
-void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
void *mobj;
/* Save the oids not found and number of oids not found for later use */
oidnotfound[*objnotfound] = oid;
(*objnotfound)++;
+ *control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
#ifdef DEBUG
printf("%s -> Obj found!!\n",__func__);
if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
+ *control = TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[(*objvernotmatch)++] = oid;
} else { /* Some other transaction has aquired a write lock on this object */
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
+ *control = TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
#ifdef DEBUG
printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
#endif
+ return *control;
+}
+
+void procRestObjs(char *objread,
+ char *objmod,
+ int index,
+ int numread,
+ int nummod,
+ unsigned int *oidnotfound,
+ unsigned int *oidvernotmatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
+ int *numBytes) {
+ int i;
+ unsigned int oid;
+ unsigned short version;
+
+ /* Process each oid in the machine pile/ group per thread */
+ for (i = index; i < numread+nummod; i++) {
+ if (i < numread) { //Objs only read and not modified
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(objread + incr));
+ incr += sizeof(unsigned int);
+ version = *((unsigned short *)(objread + incr));
+ } else { //Objs modified
+ objheader_t *headptr;
+ headptr = (objheader_t *) objmod;
+ oid = OID(headptr);
+ version = headptr->version;
+ int tmpsize;
+ GETSIZE(tmpsize, headptr);
+ objmod += sizeof(objheader_t) + tmpsize;
+ }
+ processVerNoMatch(oidnotfound,
+ oidvernotmatch,
+ objnotfound,
+ objvernotmatch,
+ v_nomatch,
+ numBytes,
+ oid,
+ version);
+ }
+ return;
+}
+
+void processVerNoMatch(unsigned int *oidnotfound,
+ unsigned int *oidvernotmatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
+ int *numBytes,
+ unsigned int oid,
+ unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*objnotfound] = oid;
+ (*objnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (version != ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
+ }
+ }
}
/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
#include "gCollect.h"
-#include "prelookup.h"
+#include "altprelookup.h"
extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
pNodeInfo.newptr->prev=tmp;
pNodeInfo.newptr=tmp;
pNodeInfo.os_count++;
-
+
if (pNodeInfo.os_count>PREFETCH_FLUSH_THRESHOLD) {
//remove oldest from linked list
objstr_t *tofree=pNodeInfo.oldptr;
}
void clearBlock(objstr_t *block) {
+
unsigned long int tmpbegin=(unsigned int)block;
unsigned long int tmpend=(unsigned int)block->top;
int i, j;
prehashlistnode_t *ptr;
- pthread_mutex_lock(&pflookup.lock);
+ int lockindex=0;
ptr = pflookup.table;
+ volatile unsigned int * lockptr_current=&pflookup.larray[lockindex].lock;
+ while(!write_trylock(lockptr_current)) {
+ sched_yield();
+ }
+
for(i = 0; i<pflookup.size; i++) {
+
prehashlistnode_t *orig=&ptr[i];
prehashlistnode_t *curr = orig;
prehashlistnode_t *next=curr->next;
if ((val>=tmpbegin)&(val<tmpend)) {
prehashlistnode_t *tmp=curr->next=next->next;
free(next);
- next=tmp;
+ next=curr;
//loop condition is broken now...need to check before incrementing
- if (next==NULL)
- break;
+ // if (next==NULL)
+ // break;
}
}
{
}
}
}
- }
- pthread_mutex_unlock(&pflookup.lock);
+
+ if(((i+1)&(pflookup.mask>>4))==0 && (i+1)<pflookup.size){
+ // try to grab new lock
+ lockindex++;
+ volatile unsigned int * lockptr_new=&pflookup.larray[lockindex].lock;
+ while(!write_trylock(lockptr_new)){
+ sched_yield();
+ }
+ write_unlock(lockptr_current);
+ lockptr_current=lockptr_new;
+ }
+
+ }// end of for (pflokup)
+
+ write_unlock(lockptr_current);
}
objstr_t *allocateNew(unsigned int size) {
#include "prefetch.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#include "sockpool.h"
#include "gCollect.h"
#include "altmlookup.h"
#include "llookup.h"
#include "plookup.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
-extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */
extern objstr_t *mainobjstore;
return 0; // got all the data
}
+int recvw(int fd, void *buf, int len, int flags) {
+ return recv(fd, buf, len, flags);
+}
+
+void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+ char *buf=(char *)buffer;
+ int numbytes=readbuffer->head-readbuffer->tail;
+ if (numbytes>buflen)
+ numbytes=buflen;
+ if (numbytes>0) {
+ memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+ readbuffer->tail+=numbytes;
+ buflen-=numbytes;
+ buf+=numbytes;
+ }
+ if (buflen==0) {
+ return;
+ }
+ if (buflen>=MAXBUF) {
+ recv_data(fd, buf, buflen);
+ return;
+ }
+
+ int maxbuf=MAXBUF;
+ int obufflen=buflen;
+ readbuffer->head=0;
+
+ while (buflen > 0) {
+ int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+ if (numbytes == -1) {
+ perror("recv");
+ exit(0);
+ }
+ bytesRecv+=numbytes;
+ buflen-=numbytes;
+ readbuffer->head+=numbytes;
+ maxbuf-=numbytes;
+ }
+ memcpy(buf,readbuffer->buf,obufflen);
+ readbuffer->tail=obufflen;
+}
+
int recv_data_errorcode(int fd, void *buf, int buflen) {
#ifdef DEBUG
printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
#endif
objcopy = getRemoteObj(machinenumber, oid);
-
-#ifdef RECOVERY
- if(transRetryFlag) {
- restoreDuplicationState(machinenumber);
-#ifdef DEBUG
- printf("%s -> Recall transRead2\n",__func__);
-#endif
- return transRead2(oid);
- }
-#endif
-
- if(objcopy == NULL) {
- printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
- return NULL;
- } else {
#ifdef TRANSSTATS
LOGEVENT('R');
nRemoteSend++;
#endif
-#ifdef COMPILER
+
+ if(objcopy!=NULL) {
#ifdef CACHE
//Copy object to prefetch cache
pthread_mutex_lock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
//make an entry in prefetch lookup hashtable
prehashInsert(oid, headerObj);
+ LOGEVENT('B');
#endif
+ }
+#ifdef RECOVERY
+ if(transRetryFlag) {
+ restoreDuplicationState(machinenumber);
+#ifdef DEBUG
+ printf("%s -> Recall transRead2\n",__func__);
+#endif
+ return transRead2(oid);
+ }
+#endif
+
+ if(objcopy == NULL) {
+ printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ } else {
+#ifdef COMPILER
return &objcopy[1];
#else
return objcopy;
OID(tmp) = getNewOID();
tmp->notifylist = NULL;
tmp->version = 1;
- //tmp->rcount = 1;
tmp->isBackup = 0;
STATUS(tmp) = NEW;
t_chashInsert(OID(tmp), tmp);
-
#ifdef COMPILER
return &tmp[1]; //want space after object header
#else
chashlistnode_t * ptr = c_table;
/* Represents number of bins in the chash table */
unsigned int size = c_size;
-
for(i = 0; i < size ; i++) {
chashlistnode_t * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
mid = myIpAddr;
}
+ //if(mid == myIpAddr) {
pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ //} else {
+ // if(bit)
+ // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ // else
+ // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ //}
if(numLiveHostsInSystem > 1) {
- if(makedirty) {
- STATUS(headeraddr) = DIRTY;
- pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
- }
- //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ if(makedirty) {
+ STATUS(headeraddr) = DIRTY;
+ //if(mid == myIpAddr) {
+ // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ //} else {
+ // if(bit)
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+ // else
+ // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+ // }
+ }
+ //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
}
#else
// Get machine location for object id (and whether local or not)
int treplyretryCount = 0;
/* Initialize timeout for exponential delay */
exponential_backoff.tv_sec = 0;
- exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+ exponential_backoff.tv_nsec = (long)(12000);//12 microsec
count_exponential_backoff = 0;
do {
treplyretry = 0;
}
int offset = 0;
int i;
- //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod);
for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
int size;
objheader_t *headeraddr;
return 1;
}
GETSIZE(size,headeraddr);
- //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr));
size+=sizeof(objheader_t);
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes);
- //fflush(stdout);
send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
- //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+ //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
#ifdef RECOVERY
/* send transaction id, number of machine involved, machine ids */
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
- } else {
- prehashInsert(oidToPrefetch, header);
- }
+ prehashInsert(oidToPrefetch, header);
length = length - size;
offset += size;
}
pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- treplyretryCount++;
+ //treplyretryCount++;
//if(treplyretryCount >= NUM_TRY_TO_COMMIT)
// exponentialdelay();
//else
return TRANS_ABORT;
#ifdef CACHE
/* clear objects from prefetch cache */
- cleanPCache();
+ //cleanPCache();
#endif
} else if(transagree == pilecount) {
/* Send Commit */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
/* If older version then update with new object ptr */
- if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
- prehashRemove(oid);
+ if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
prehashInsert(oid, modptr);
}
} else { /* Else add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
+#if 0
/* Lock the Prefetch Cache look up table*/
pthread_mutex_lock(&pflookup.lock);
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
pthread_mutex_unlock(&pflookup.lock);
+#endif
} else if(control == OBJECT_NOT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */