/***********************************************************
* Macros
**********************************************************/
-#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
-#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
-#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
-#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+#define GET_NTUPLES(x) ((int *)(x))
+#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
/*****************************************
* Coordinator Messages
***************************************/
void prefetch(int, unsigned int *, unsigned short *, short*);
void *transPrefetch(void *);
void *mcqProcess(void *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *foundLocal(char *);// returns node with prefetch elements(oids, offsets)
int lookupObject(unsigned int * oid, short offset);
int transPrefetchProcess(transrecord_t *, int **, short);
void sendPrefetchReq(prefetchpile_t*, int);
/* Coordinator => Machine that initiates the transaction request call for commiting a transaction
* Participant => Machines that host the objects involved in a transaction commit */
+#include <netinet/tcp.h>
#include "dstm.h"
#include "mlookup.h"
#include "llookup.h"
while(1)
{
int retval;
+ int flag=1;
acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+ setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
do {
retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
} while(retval!=0);
/* Receive control messages from other machines */
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ if (ret==0)
+ return;
if (ret==-1) {
printf("DEBUG -> RECV Error!.. retrying\n");
break;
return retnode;
}
-void mcpiledelete(void) {
- /* Remove each element */
- while(mcqueue.front != NULL)
- delqnode();
-}
-
-
void mcpiledisplay() {
int mid;
void mcpileqInit(void);
void mcpileenqueue(prefetchpile_t *, prefetchpile_t *);
prefetchpile_t *mcpiledequeue(void);
-void mcpiledelete();
void mcpiledisplay();
void mcdealloc(prefetchpile_t *);
#include "queue.h"
-primarypfq_t pqueue; //Global queue
+volatile int headoffset, tailoffset;
+char * memory;
+pthread_mutex_t qlock;
+pthread_mutexattr_t qlockattr;
+pthread_cond_t qcond;
+
+
+#define QSIZE 1000000 //1 MB
void queueInit(void) {
/* Intitialize primary queue */
- pqueue.front = pqueue.rear = NULL;
- pthread_mutexattr_init(&pqueue.qlockattr);
- pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
- pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr);
- pthread_cond_init(&pqueue.qcond, NULL);
+ headoffset=0;
+ tailoffset=0;
+ memory=malloc(QSIZE);
+ pthread_mutexattr_init(&qlockattr);
+ pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+ pthread_mutex_init(&qlock, &qlockattr);
+ pthread_cond_init(&qcond, NULL);
}
-/* Delete the node pointed to by the front ptr of the queue */
-void delqnode() {
- prefetchqelem_t *delnode;
- if(pqueue.front == NULL) {
- printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
- return;
- } else if (pqueue.front == pqueue.rear) {
- free(pqueue.front);
- pqueue.front = pqueue.rear = NULL;
+void * getmemory(int size) {
+ int tmpoffset=headoffset+size+sizeof(int);
+ if (tmpoffset>QSIZE) {
+ //Wait for tail to go past end
+ tmpoffset=size+sizeof(int);
+ while(headoffset<tailoffset)
+ ;
+ //Wait for tail to go past new start
+ while(tailoffset<tmpoffset)
+ ;
+ *((int *)(memory+headoffset))=-1;
+ *((int*)memory)=size+sizeof(int);
+ return memory+sizeof(int);
} else {
- delnode = pqueue.front;
- pqueue.front = pqueue.front->next;
- free(delnode);
+ while(headoffset<tailoffset&&tailoffset<tmpoffset)
+ ;
+ *((int*)(memory+headoffset))=size+sizeof(int);
+ return memory+headoffset+sizeof(int);
}
}
-void queueDelete(void) {
- /* Remove each element */
- while(pqueue.front != NULL)
- delqnode();
+void movehead(int size) {
+ int tmpoffset=headoffset+size+sizeof(int);
+ if (tmpoffset>QSIZE) {
+ headoffset=size+sizeof(int);
+ } else
+ headoffset=tmpoffset;
+ pthread_cond_signal(&qcond);//wake the other thread up
}
-/* Inserts to the rear of primary prefetch queue */
-void pre_enqueue(prefetchqelem_t *qnode) {
- if(pqueue.front == NULL) {
- pqueue.front = pqueue.rear = qnode;
- qnode->next=NULL;
- } else {
- qnode->next = NULL;
- pqueue.rear->next = qnode;
- pqueue.rear = qnode;
+void * gettail() {
+ while(tailoffset==headoffset) {
+ //Sleep
+ pthread_mutex_lock(&qlock);
+ if (tailoffset==headoffset)
+ pthread_cond_wait(&qcond, &qlock);
+ pthread_mutex_unlock(&qlock);
}
-}
-
-/* Return the node pointed to by the front ptr of the queue */
-prefetchqelem_t *pre_dequeue(void) {
- prefetchqelem_t *retnode;
- if (pqueue.front == NULL) {
- printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
- return NULL;
+ if (*((int *)(memory+tailoffset))==-1) {
+ tailoffset=0;//do loop
}
- retnode = pqueue.front;
- pqueue.front = pqueue.front->next;
- if (pqueue.front == NULL)
- pqueue.rear = NULL;
- retnode->next = NULL;
-
- return retnode;
+
+ return memory+tailoffset+sizeof(int);
}
-void queueDisplay() {
- int offset = sizeof(prefetchqelem_t);
- int *ptr;
- int ntuples;
- char *ptr1;
- prefetchqelem_t *tmp = pqueue.front;
- while(tmp != NULL) {
- ptr1 = (char *) tmp;
- ptr = (int *)(ptr1 + offset);
- ntuples = *ptr;
- tmp = tmp->next;
- }
+void inctail() {
+ int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
+ if (tmpoffset>QSIZE)
+ tailoffset=0;
+ else
+ tailoffset=tmpoffset;
}
-void predealloc(prefetchqelem_t *node) {
- free(node);
+
+void predealloc() {
+ free(memory);
}
#include<pthread.h>
#include<string.h>
-// DS that contains information to be shared between threads.
-typedef struct prefetchqelem {
- struct prefetchqelem *next;
-} prefetchqelem_t;
-
-typedef struct primarypfq {
- prefetchqelem_t *front, *rear;
- pthread_mutex_t qlock;
- pthread_mutexattr_t qlockattr;
- pthread_cond_t qcond;
-} primarypfq_t;
-
-
void queueInit(void);
-void delqnode();
-void queueDelete(void);
-void pre_enqueue(prefetchqelem_t *);
-prefetchqelem_t *pre_dequeue(void);
-void queueDisplay();
-void predealloc(prefetchqelem_t *);
+void * getmemory(int size);
+void movehead(int size);
+void * gettail();
+void inctail();
+void predealloc();
#endif
#include "sockpool.h"
-
+#include <netinet/tcp.h>
#if defined(__i386__)
inline static int test_and_set(volatile unsigned int *addr) {
int createNewSocket(unsigned int mid) {
int sd;
+ int flag=1;
if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
return -1;
}
+ setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
struct sockaddr_in remoteAddr;
bzero(&remoteAddr, sizeof(remoteAddr));
remoteAddr.sin_family = AF_INET;
/* Global Variables */
extern int classsize[];
-extern primarypfq_t pqueue; //Shared prefetch queue
objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
int numbytes;
while (size > 0) {
numbytes = recv(fd, buffer, size, 0);
+ if (numbytes==0)
+ return 0;
if (numbytes == -1) {
return -1;
}
buffer += numbytes;
size -= numbytes;
}
- return 0;
+ return 1;
}
void printhex(unsigned char *ptr, int numBytes) {
* populates the shared primary prefetch queue*/
void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
- int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
- char * node= malloc(qnodesize);
+ int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
+ char * node= getmemory(qnodesize);
/* Set queue node values */
- int len = sizeof(prefetchqelem_t);
+ int len;
int top=endoffsets[ntuples-1];
- *((int *)(node+len))=ntuples;
- len += sizeof(int);
+ *((int *)(node))=ntuples;
+ len = sizeof(int);
memcpy(node+len, oids, ntuples*sizeof(unsigned int));
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
/* Lock and insert into primary prefetch queue */
- pthread_mutex_lock(&pqueue.qlock);
- pre_enqueue((prefetchqelem_t *)node);
- pthread_cond_signal(&pqueue.qcond);
- pthread_mutex_unlock(&pqueue.qlock);
+ movehead(qnodesize);
}
/* This function starts up the transaction runtime. */
return 0;
}
-prefetchpile_t *foundLocal(prefetchqelem_t *node) {
- char * ptr = (char *) node;
+prefetchpile_t *foundLocal(char *ptr) {
int ntuples = *(GET_NTUPLES(ptr));
unsigned int * oidarray = GET_PTR_OID(ptr);
unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
void *transPrefetch(void *t) {
while(1) {
/* lock mutex of primary prefetch queue */
- pthread_mutex_lock(&pqueue.qlock);
- /* while primary queue is empty, then wait */
- while(pqueue.front == NULL) {
- pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
- }
-
- /* dequeue node to create a machine piles and finally unlock mutex */
- prefetchqelem_t *qnode = pre_dequeue();
- pthread_mutex_unlock(&pqueue.qlock);
-
+ void *node=gettail();
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
- prefetchpile_t *pilehead = foundLocal(qnode);
+ prefetchpile_t *pilehead = foundLocal(node);
if (pilehead!=NULL) {
// Get sock from shared pool
/* Deallocated pilehead */
mcdealloc(pilehead);
-
}
// Deallocate the prefetch queue pile node
- predealloc(qnode);
+ inctail();
}
}
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ //printf("oid %d found\n",oid);
size = size - (sizeof(char) + sizeof(unsigned int));
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {