From ba8111dac1d4c79c8edcc955275df61d11d53831 Mon Sep 17 00:00:00 2001
From: adash <adash>
Date: Mon, 30 Jul 2007 19:47:14 +0000
Subject: [PATCH] bug fixes and add machine pile queue DS that saves oids and
 offsets meant for remote machines

---
 Robust/src/Runtime/DSTM/interface/dstm.h      |  29 ++--
 .../src/Runtime/DSTM/interface/machinepile.c  |  48 ++++++
 .../src/Runtime/DSTM/interface/machinepile.h  |  10 ++
 Robust/src/Runtime/DSTM/interface/mcpileq.c   |  77 ++++++++++
 Robust/src/Runtime/DSTM/interface/mcpileq.h   |  37 +++++
 Robust/src/Runtime/DSTM/interface/queue.c     |   6 +-
 Robust/src/Runtime/DSTM/interface/trans.c     | 138 ++++++++++--------
 7 files changed, 262 insertions(+), 83 deletions(-)
 create mode 100644 Robust/src/Runtime/DSTM/interface/machinepile.c
 create mode 100644 Robust/src/Runtime/DSTM/interface/machinepile.h
 create mode 100644 Robust/src/Runtime/DSTM/interface/mcpileq.c
 create mode 100644 Robust/src/Runtime/DSTM/interface/mcpileq.h

diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h
index c937a57b..957a68e9 100644
--- a/Robust/src/Runtime/DSTM/interface/dstm.h
+++ b/Robust/src/Runtime/DSTM/interface/dstm.h
@@ -5,6 +5,12 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+#define GET_NTUPLES(x) 	((int *)(x + sizeof(prefetchqelem_t)))
+#define GET_PTR_OID(x) 	((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+
+
 //Coordinator Messages
 #define READ_REQUEST 		1
 #define READ_MULT_REQUEST 	2
@@ -42,6 +48,7 @@
 #include <pthread.h>
 #include "clookup.h"
 #include "queue.h"
+#include "mcpileq.h"
 
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 #define TID_LEN 20
@@ -135,27 +142,15 @@ typedef struct objinfo {
 
 //Structure for members within prefetch tuples
 typedef struct member {
-	 short offset;
-	 short index;
-	 struct member *next;
- }trans_member_t;
-
-
-//Structure for prefetching tuples generated by teh compiler
- typedef struct prefetchpile{
-	 int mid;
-	 int *oids;
-
-	 int **numofarrys;
-	 struct prefetchpile *next;
- }prefetchpile_t;
-
-//Structure per Oid in the prefetch call
+	short offset;
+	short index;
+	struct member *next;
+}trans_member_t;
 
 /*
 //Structure that holds the compiler generated prefetch data
 typedef struct compprefetchdata {
-	transrecord_t *record;
+transrecord_t *record;
 } compprefetchdata_t;
 */
 
diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c
new file mode 100644
index 00000000..58fe1b92
--- /dev/null
+++ b/Robust/src/Runtime/DSTM/interface/machinepile.c
@@ -0,0 +1,48 @@
+#include "machinepile.h"
+
+int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
+	prefetchpile_t *tmp = head;
+	objpile_t *objnode;
+	unsigned int *oidarray;
+	int ntuples;
+	char found = 0;
+
+	while (tmp != NULL) {
+		if (tmp->mid == mid) { // Found a match with exsisting machine id
+			if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+				printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+				return -1;
+			}
+			/* Fill objpiles DS */
+			objnode->oid = oid;
+			objnode->numoffset = numoffset;
+			objnode->offset = offset;
+			objnode->next = tmp->objpiles;
+			tmp->objpiles = objnode;
+			found = 1;
+			break;
+		}
+		tmp = tmp->next;
+	}
+	if (!found) {// Not found => insert new mid DS
+		if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+			printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+			return -1;
+		}
+		tmp->mid = mid;
+		if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+			printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+			return -1;
+		}
+		/* Fill objpiles DS */
+		objnode->oid = oid;
+		objnode->numoffset = numoffset;
+		objnode->offset = offset;
+		objnode->next = tmp->objpiles; // i.e., objnode->next = NULL;
+		tmp->objpiles = objnode;
+		tmp->next = head;
+		head = tmp;
+	}
+	return 0;
+}
+
diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h
new file mode 100644
index 00000000..b8ca3d69
--- /dev/null
+++ b/Robust/src/Runtime/DSTM/interface/machinepile.h
@@ -0,0 +1,10 @@
+#ifndef _MACHINEPILE_H_
+#define _MACHINEPILE_H_
+
+#include "mcpileq.h"
+#include <stdio.h>
+#include <stdlib.h>
+
+int insertPile(int, unsigned int, short, short *, prefetchpile_t *);
+
+#endif
diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c
new file mode 100644
index 00000000..bbb608d6
--- /dev/null
+++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c
@@ -0,0 +1,77 @@
+#include "mcpileq.h"
+
+mcpileq_t mcqueue;
+
+void mcpileqInit(void) {
+	/* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
+	mcqueue.front = mcqueue.rear = NULL;
+	pthread_mutex_init(&mcqueue.qlock, NULL); 
+	pthread_cond_init(&mcqueue.qcond, NULL); 
+}
+
+/* Insert to the rear of machine pile queue */
+void mcpileenqueue(prefetchpile_t *node) {
+	if(mcqueue.front == NULL && mcqueue.rear == NULL) {
+		mcqueue.front = mcqueue.rear = node;
+	} else {
+		node->next = NULL;
+		mcqueue.rear->next = node;
+		mcqueue.rear = node;
+	}
+}
+
+/* Return the node pointed to by the front ptr of the queue */
+prefetchpile_t *mcpiledequeue(void) {
+	prefetchpile_t *retnode;
+	if(mcqueue.front == NULL) {
+		printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__);
+		return NULL;
+	}
+	retnode = mcqueue.front;
+	mcqueue.front = mcqueue.front->next;
+
+	return retnode;
+}
+
+/* Delete the node pointed to by the front ptr of the queue */
+void delnode() {
+	prefetchpile_t *delnode;
+	if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+		printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+		return;
+	} else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) {
+		printf("TEST1\n");
+		free(mcqueue.front);
+		mcqueue.front = mcqueue.rear = NULL;
+	} else {
+		delnode = mcqueue.front;
+		mcqueue.front = mcqueue.front->next;
+		printf("TEST2\n");
+		free(delnode);
+	}
+}
+
+void mcpiledelete(void) {
+	/* Remove each element */
+	while(mcqueue.front != NULL)
+		delqnode();
+	mcqueue.front = mcqueue.rear = NULL;
+}
+
+
+void mcpiledisplay() {
+	int mid;
+
+	prefetchpile_t *tmp = mcqueue.front;
+	while(tmp != NULL) {
+		printf("Remote machine id = %d\n", tmp->mid);
+		tmp = tmp->next;
+	}
+}
+
+
+
+
+
+
+
diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h
new file mode 100644
index 00000000..7add8deb
--- /dev/null
+++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h
@@ -0,0 +1,37 @@
+#ifndef _MCPILEQ_H_
+#define _MCPILEQ_H_
+
+#include<pthread.h>
+#include<stdio.h>
+#include<stdlib.h>
+#include<string.h>
+
+//Structure to make machine groups when prefetching
+typedef struct objpile { 
+	unsigned int oid;
+	short numoffset;
+	short *offset;
+	struct objpile *next;
+}objpile_t;
+
+//Structure for prefetching tuples generated by the compiler
+typedef struct prefetchpile {
+	int mid;
+	objpile_t *objpiles;
+	struct prefetchpile *next;
+}prefetchpile_t;
+
+typedef struct mcpileq {
+	prefetchpile_t *front, *rear;
+	pthread_mutex_t qlock;
+	pthread_cond_t qcond;
+}mcpileq_t;
+
+void mcpileqInit(void);
+void mcpileenqueue(prefetchpile_t *);
+prefetchpile_t *mcpiledequeue(void);
+void delnode();
+void mcpiledelete();
+void mcpiledisplay();
+
+#endif
diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c
index f164528f..298e0d11 100644
--- a/Robust/src/Runtime/DSTM/interface/queue.c
+++ b/Robust/src/Runtime/DSTM/interface/queue.c
@@ -3,13 +3,13 @@
 primarypfq_t pqueue; //Global queue
 
 void queueInit(void) {
-	/* Intitialize primary thread */
+	/* Intitialize primary queue */
 	pqueue.front = pqueue.rear = NULL;
 	pthread_mutex_init(&pqueue.qlock, NULL);
 	pthread_cond_init(&pqueue.qcond, NULL);
 }
 
-/* Removes the first element of the queue */
+/* Delete the node pointed to by the front ptr of the queue */
 void delqnode() {
 	prefetchqelem_t *delnode;
 	if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
@@ -45,6 +45,7 @@ void enqueue(prefetchqelem_t *qnode) {
 	}
 }
 
+/* Return the node pointed to by the front ptr of the queue */
 prefetchqelem_t *dequeue(void) {
 	prefetchqelem_t *retnode;
 	if (pqueue.front == NULL) {
@@ -52,7 +53,6 @@ prefetchqelem_t *dequeue(void) {
 		return NULL;
 	}
 	retnode = pqueue.front;
-	//TODO make this atomic
 	pqueue.front = pqueue.front->next;
 
 	return retnode;
diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c
index 2bf2d019..2ddf7d26 100644
--- a/Robust/src/Runtime/DSTM/interface/trans.c
+++ b/Robust/src/Runtime/DSTM/interface/trans.c
@@ -1,6 +1,7 @@
 #include "dstm.h"
 #include "ip.h"
 #include "clookup.h"
+#include "machinepile.h"
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
@@ -20,15 +21,17 @@
 #define RECEIVE_BUFFER_SIZE 2048
 #define NUM_THREADS 10
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
-
+/*
 #define GET_NTUPLES(x) 	((int *)(x + sizeof(prefetchqelem_t)))
 #define GET_PTR_OID(x) 	((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
 #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+*/
 
 /* Global Variables */
 extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
+extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
@@ -41,6 +44,16 @@ inline int arrayLength(int *array) {
 		;
 	return i;
 }
+inline int findmax(int *array, int arraylength) {
+	int max, i;
+	max = array[0];
+	for(i = 0; i < arraylength; i++){
+		if(array[i] > max) {
+			max = array[i];
+		}
+	}
+	return max;
+}
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
 void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
@@ -84,6 +97,8 @@ void transInit() {
 			return; //Failure
 	//Initialize primary shared queue
 	queueInit();
+	//Initialize machine pile w/prefetch oids and offsets shared queue
+	mcpileqInit();
 	//Create the primary prefetch thread 
 	pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
 	//Create and Initialize a pool of threads 
@@ -919,9 +934,19 @@ void *transPrefetch(void *prefdata) {
 		}
 		pthread_mutex_unlock(&pqueue.qlock);
 		/* Reduce redundant prefetch requests */
-		/* Group Requests by where objects are located */
-	
+		checkPrefetchTuples(qnode);
+		/* Check if the tuples are found locally, if yes then reduce them further*/ 
+		/* and group requests by remote machine ids by calling the makePreGroups() */
+		foundLocal(qnode);
+		
+		/* Lock mutex of pool queue */
+		pthread_mutex_lock(&mcqueue.qlock);
+		/* Update the pool queue with the new remote machine piles generated per prefetch call */
+
 
+		/* Broadcast signal on pool queue */
+
+		/* Unlock mutex of pool queue */
 
 	}
 }
@@ -984,24 +1009,19 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
 						k++;
 					}	
 				} else {
-					printf("i = %d, j = %d\n", i, j);
 					k = endoffsets[i-1];
 					index = endoffsets[j-1];
 					printf("Value of slength = %d\n", slength);
 					for(count = 0; count < slength; count++) {
-						printf("Value of count =%d\n", count);
 						if(arryfields[k] != arryfields[index]) {
 							break;
 						}
 						index++;
 						k++;
 					}
-					printf("Value of count =%d\n", count);
 				}
-				printf("The value of sindex = %d\n", sindex);
 
 				if(slength == count) {
-					printf("DEBUG-> Inside slength if %d\n", sindex);
 					oid[sindex] = -1;
 				}
 			}
@@ -1058,22 +1078,50 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopc
 	 * and copy left over offsets into the arrayoffsetfieldarray*/
 	oid[iter] = objoid;
 	numoffset[iter] = numoffset[iter] - (i+1);
-	if(iter == 0)
-		endoffsets[iter] = numoffset[iter];
-	else
-		endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
 	for(k = 0; k < numoffset[iter] ; k++) {
-		arryfields[k] = arryfields[counter+1];
-		counter++;
+		arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
 	}
 
 	if(flag == 0) {
 		oid[iter] = -1;
 		numoffset[iter] = 0;
-		endoffsets[iter] = 0;
 	}
 }
 
+/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
+void makePreGroups(prefetchqelem_t *node, int *numoffset) {
+	char *ptr, *tmp;
+	int ntuples, slength, i, machinenum;
+	int maxoffset;
+	unsigned int *oid;
+	short *endoffsets, *arryfields, *offset; 
+	prefetchpile_t *head = NULL;
+
+	/* Check for the case x.y.z and a.b.c are same oids */ 
+ 	ptr = (char *) node;
+	ntuples = *(GET_NTUPLES(ptr));
+	oid = GET_PTR_OID(ptr);
+	endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+	arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
+	/* Check for redundant tuples by comparing oids of each tuple */
+	for(i = 0; i < ntuples; i++) {
+		if(oid[i] == -1)
+			continue;
+		/* For each tuple make piles */
+		if ((machinenum = lhashSearch(oid[i])) == 0) {
+			printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
+			return;
+		}
+		/* Insert into machine pile */
+		offset = &arryfields[endoffsets[i-1]];
+		insertPile(machinenum, oid[i], numoffset[i], offset, head);
+	}
+
+	return;
+}
+
+
 /* This function checks if the oids within the prefetch tuples are available locally.
  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
  * the prefetchqelem_t node to represent a new prefetch tuple */
@@ -1110,75 +1158,39 @@ void foundLocal(prefetchqelem_t *node) {
 				index = endoffsets[i - 1];
 			for(j = 0 ; j < numoffset[i] ; j++) {
 				objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+				/*If oid found locally then 
+				 *assign the latest oid found as the new oid 
+				 *and copy left over offsets into the arrayoffsetfieldarray*/
+				oid[i] = objoid;
+				numoffset[i] = numoffset[i] - (j+1);
+				for(k = 0; k < numoffset[i]; k++)
+					arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
 				index++;
 				/*New offset oid not found */
 				if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
 					flag = 1;
-					checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
+					checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
 					break;
 				} else 
 					flag = 0;
 			}
-			/*If oid not found locally then 
-			 *assign the latest oid found as the new oid 
-			 *and copy left over offsets into the arrayoffsetfieldarray*/
-			oid[i] = objoid;
-			numoffset[i] = numoffset[i] - (j+1);
-			if(i == 0)
-				endoffsets[i] = numoffset[i];
-			else 
-				endoffsets[i] = numoffset[i] - endoffsets[i - 1];
-			for(k = 0; k < numoffset[i]; k++) {
-				arryfields[k] = arryfields[j+1];
-				j++;
-			}
+		
 			/*If all offset oids are found locally,make the prefetch tuple invalid */
 			if(flag == 0) {
 				oid[i] = -1;
 				numoffset[i] = 0;
-				endoffsets[i] = 0;
 			}
 		} else {
 			oidnfound = 1;
 			/* Look in Prefetch cache */
-			checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
+			checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
 		}
 
 	}
+	// Make machine groups
+	makePreGroups(node, numoffset);
 }
 
-void makePreGroups(prefetchqelem_t *node) {
-	char *ptr, *tmp;
-	int ntuples, slength, i, machinenum;
-	unsigned int *oid;
-	short *endoffsets, *arryfields; 
-
-
-	/* Check for the case x.y.z and a.b.c are same oids */ 
- 	ptr = (char *) node;
-	ntuples = *(GET_NTUPLES(ptr));
-	oid = GET_PTR_OID(ptr);
-	endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-	arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-	/* Find offset length for each tuple */
-	int numoffset[ntuples];
-	numoffset[0] = endoffsets[0];
-	for(i = 1; i<ntuples; i++) {
-		numoffset[i] = endoffsets[i] - endoffsets[i-1];
-	}
-
-	/* Check for redundant tuples by comparing oids of each tuple */
-	for(i = 0; i < ntuples; i++) {
-		if(oid[i] == -1)
-			continue;
-		/* For each tuple make piles */
-		if ((machinenum = lhashSearch(oid[i])) == 0) {
-			printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
-			return;
-		}
-	}
-
-}
 
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
-- 
2.34.1