--- /dev/null
+#include "dstm.h"
+
+/* Make a queue of prefetchpile_t type */
+prefetchpile_t poolqueue; //Global queue for machine piles
+
+/* Create new machine group */
+prefetchpile_t *createPile(int numoffsets) {
+ prefetchpile_t *pile;
+ if((pile = calloc(1, sizeof(prefetchpile_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+ /* Create a new object pile */
+ if((pile->objpiles = calloc(1, sizeof(objpile_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+ /* Create a ptr to the offset array for a given prefetch oid tuple */
+ if((pile->objpiles->offset = calloc(numoffsets, sizeof(short))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+ pile->next = NULL;
+
+ return pile;
+}
+
+/* Into into prefetch pile*/
+void pileIns(prefetchpile_t *pile, short *endoffsets, short* arryfields, unsigned int *oid,int mnum,int noffsets, int index) {
+ prefetchpile_t *tmp, *ptr;
+ objpile_t *opile;
+ short *offsetarry;
+ int found = 0, k;
+
+ tmp = pile;
+ while(tmp != NULL) {
+ //Check if mnum already exists in the pile
+ if(tmp->mid == mnum) {
+ /* Create a new object pile */
+ if((opile = calloc(1, sizeof(objpile_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ opile->next = tmp->objpiles;
+ tmp->objpiles = opile;
+
+ tmp->objpiles->oid = oid[index];
+ if(index == 0)
+ k = 0;
+ else
+ k = endoffsets[index -1];
+ //Copy the offset values into objpile
+ for(i = 0; i < numoffsets[i]; i++) {
+ ptr->objpile->offsets[i] = arryfields[k];
+ k++;
+ }
+ /* Create a ptr to the offset array for a given prefetch oid tuple */
+ if((offsetarry = calloc(numoffsets, sizeof(short))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+
+ //Add New machine pile to the linked list
+ if(!found) {
+ if((ptr = createPile(noffsets)) == NULL) {
+ printf("No new pile created %s %d\n", __FILE__, __LINE__);
+ return;
+ }
+ ptr->mid = mnum;
+ ptr->objpile->oid = oid[index];
+ if(index == 0)
+ k = 0;
+ else
+ k = endoffsets[index -1];
+ //Copy the offset values into objpile
+ for(i = 0; i < numoffsets[i]; i++) {
+ ptr->objpile->offsets[i] = arryfields[k];
+ k++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ return pile;
+}
+
+/* Insert into object pile */
+
+
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
+#include <errno.h>
#include <time.h>
#include <string.h>
#include <pthread.h>
/* Allocate for the queue node*/
char *node;
qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
- if((node = calloc(1,qnodesize)) == NULL) {
+ if((node = calloc(1, qnodesize)) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
return;
}
return;
}
}
+ //TODO when to deletethreads
}
/* This function stops the threads spawned */
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
- int size;
+ int size, rc, found = 0;
void *buf;
- /* Search local cache */
+ struct timespec ts;
+ struct timeval tp;
+
+ rc = gettimeofday(&tp, NULL);
+
+ /* Convert from timeval to timespec */
+ ts.tv_nsec = tp.tv_usec * 1000;
+
+ /* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
- // tmp = mhashSearch(oid);
+ tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, objheader->oid, objcopy);
return(objcopy);
- } else { /* If not found in machine look up */
+ } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+ found = 1;
+ size = sizeof(objheader_t)+classsize[tmp->type];
+ objcopy = objstrAlloc(record->cache, size);
+ memcpy(objcopy, (void *)tmp, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, tmp->oid, objcopy);
+ return(objcopy);
+ } else { /* If not found anywhere, then block until object appears in prefetch cache */
+ pthread_mutex_lock(&pflookup.lock);
+ while(!found) {
+ rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
+ if(rc == ETIMEDOUT) {
+ printf("Wait timed out\n");
+ /* Check Prefetch cache again */
+ if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+ found = 1;
+ size = sizeof(objheader_t)+classsize[tmp->type];
+ objcopy = objstrAlloc(record->cache, size);
+ memcpy(objcopy, (void *)tmp, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, tmp->oid, objcopy);
+ return(objcopy);
+ } else {
+ pthread_mutex_unlock(&pflookup.lock);
+ break;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ }
+ }
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
/* Check if local or not */
if((localmachinenum = mhashSearch(curr->key)) != NULL) {
+ /* Set the pile->local flag*/
pile->local = 1; //True i.e. local
}
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+ //TODO currently the only soft abort case that is supported is when object locked by previous
+ //transaction => v_matchlock > 0
+ //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported
/* Send number of oids not found and the missing oids if objects are missing in the machine */
/* TODO Remember to store the oidnotfound for later use
if(objnotfound != 0) {
header = mhashSearch(objlocked[i]);// find the header address
((objheader_t *)header)->status &= ~(LOCK);
}
- //TODO/* Unset the bit for local objects */
/* Send ack to Coordinator */
printf("DEBUG-> TRANS_SUCCESSFUL\n");
}
//TODO Update location lookup table
- //TODO/* Unset the bit for local objects */
/* Send ack to Coordinator */
printf("DEBUG-> TRANS_SUCESSFUL\n");
mcpileenqueue(pilehead);
/* Broadcast signal on machine pile queue */
pthread_cond_broadcast(&mcqueue.qcond);
- /* Unlock mutex of mcahine pile queue */
+ /* Unlock mutex of machine pile queue */
pthread_mutex_unlock(&mcqueue.qlock);
+ /* Deallocate the prefetch queue pile node */
+ predealloc(qnode);
+
}
}
while(1) {
/* Lock mutex of mc pile queue */
pthread_mutex_lock(&mcqueue.qlock);
- /* while mc pile queue is empty, then wait */
+ /* When mc pile queue is empty, wait */
while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
}
- /* dequeue node to send remote machine connections*/
+ /* Dequeue node to send remote machine connections*/
if((mcpilenode = mcpiledequeue()) == NULL) {
printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
return NULL;
sendPrefetchReq(mcpilenode, tid);
/* TODO: For each object not found query DHT for new location and retrieve the object */
- /* Deallocate the dequeued node */
+ /* Deallocate the machine queue pile node */
+ mcdealloc(mcpilenode);
}
}
memcpy(modptr, buffer+index, objsize);
index += sizeof(int);
/* Add pointer and oid to hash table */
- //TODO Do we need a version comparison herei ??
+ //TODO Do we need a version comparison here??
prehashInsert(oid, modptr);
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);