switch(control) {
case READ_REQUEST:
+ printf("DEBUG -> Recv READ_REQUEST\n");
/* Read oid requested and search if available */
if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
perror("Error receiving object from cooridnator\n");
pthread_exit(NULL);
}
if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+ printf("Object 0x%x is not found in Main Object Store %s %d\n", oid, __FILE__, __LINE__);
pthread_exit(NULL);
}
h = (objheader_t *) srcObj;
case THREAD_NOTIFY_REQUEST:
size = sizeof(unsigned int);
- retval = recv((int)acceptfd, ptr, size, 0);
- numoid = *((unsigned int *) ptr);
+ retval = recv((int)acceptfd, &numoid, size, 0);
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
- retval = recv((int)acceptfd, ptr, size, 0);
+ bzero(&buffer, RECEIVE_BUFFER_SIZE);
+ retval = recv((int)acceptfd, &buffer, size, 0);
oidarry = calloc(numoid, sizeof(unsigned int));
- memcpy(oidarry, ptr, sizeof(unsigned int) * numoid);
+ memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
size = sizeof(unsigned int) * numoid;
versionarry = calloc(numoid, sizeof(unsigned short));
- memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid);
+ memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
size += sizeof(unsigned short) * numoid;
- mid = *((unsigned int *)(ptr+size));
+ mid = *((unsigned int *)(buffer+size));
size += sizeof(unsigned int);
- threadid = *((unsigned int *)(ptr+size));
+ threadid = *((unsigned int *)(buffer+size));
processReqNotify(numoid, oidarry, versionarry, mid, threadid);
break;
case THREAD_NOTIFY_RESPONSE:
size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
- retval = recv((int)acceptfd, ptr, size, 0);
+ bzero(&buffer, RECEIVE_BUFFER_SIZE);
+ retval = recv((int)acceptfd, &buffer, size, 0);
if(retval <= 0)
perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg");
else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval);
else {
- oid = *((unsigned int *)ptr);
+ oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
- version = *((unsigned short *)(ptr+size));
+ version = *((unsigned short *)(buffer+size));
size += sizeof(unsigned short);
- threadid = *((unsigned int *)(ptr+size));
+ threadid = *((unsigned int *)(buffer+size));
threadNotify(oid,version,threadid);
}
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
- int isArray = 0;
- unsigned int oid, index = 0;
- char *ptr, buffer[PRE_BUF_SIZE];
- void *mobj;
- unsigned int objoid;
- char control;
- objheader_t * header;
- int bytesRecvd;
-
- /* Repeatedly recv the oid and offset pairs sent for prefetch */
- while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
- count++;
- if(length == -1)
- break;
- sum = 0;
- index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
- // first 4 bytes are saved to send the
- // size of the buffer (that is computed at the end of the loop)
- bytesRecvd = 0;
- do {
- bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
- sizeof(unsigned int) - bytesRecvd, 0);
- } while (bytesRecvd < sizeof(unsigned int));
- numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
- N = numoffset * sizeof(short);
- short offset[numoffset];
- ptr = (char *)&offset;
- /* Recv the offset values per oid */
- do {
- n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
-
- /* Process each oid */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* send the oid, it's size, it's header and data */
- header = mobj;
- GETSIZE(size, header);
- size += sizeof(objheader_t);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
- }
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
- } else {
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
- }
- if((header = mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- break;
- } else {/* Obj Found */
- /* send the oid, it's size, it's header and data */
- GETSIZE(size, header);
- size+=sizeof(objheader_t);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer+index, header, size);
- index += size;
- isArray = 0;
- continue;
- }
- }
- }
- /* Check for overflow in the buffer */
- if (index >= PRE_BUF_SIZE) {
- printf("Char buffer is overflowing\n");
- return 1;
- }
- /* Send Prefetch response control message only once*/
- if(count == 1) {
- control = TRANS_PREFETCH_RESPONSE;
- if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
- }
-
- /* Add the buffer size into buffer as a parameter */
- *((unsigned int *)buffer)=index;
- /* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending oids found\n");
- return 1;
- }
- }
- return 0;
+ int i, length, sum, n, numbytes, numoffset, N, size, count = 0;
+ int isArray = 0, bytesRecvd;
+ unsigned int oid, index = 0;
+ unsigned int objoid, myIpAddr;
+ char *ptr, control, buffer[PRE_BUF_SIZE];
+ void *mobj;
+ objheader_t * header;
+
+#ifdef MAC
+ myIpAddr = getMyIpAddr("en1");
+#else
+ myIpAddr = getMyIpAddr("eth0");
+#endif
+
+ /* Repeatedly recv the oid and offset pairs sent for prefetch */
+ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+ count++;
+ if(length == -1)
+ break;
+ index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
+ // first 4 bytes are saved to send the
+ // size of the buffer (that is computed at the end of the loop)
+ bytesRecvd = 0;
+ do {
+ bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
+ sizeof(unsigned int) - bytesRecvd, 0);
+ } while (bytesRecvd < sizeof(unsigned int));
+ numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
+ N = numoffset * sizeof(short);
+ short offset[numoffset];
+ ptr = (char *)&offset;
+ sum = 0;
+ /* Recv the offset values per oid */
+ do {
+ n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
+ sum += n;
+ } while(sum < N && n != 0);
+
+ /* Process each oid */
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ *(buffer + index) = OBJECT_NOT_FOUND;
+ index += sizeof(char);
+ *((unsigned int *)(buffer+index)) = oid;
+ index += sizeof(unsigned int);
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* send the oid, it's size, it's header and data */
+ header = (objheader_t *)mobj;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ *(buffer + index) = OBJECT_FOUND;
+ index += sizeof(char);
+ *((unsigned int *)(buffer+index)) = oid;
+ index += sizeof(unsigned int);
+ *((int *)(buffer+index)) = size;
+ index += sizeof(int);
+ memcpy(buffer + index, header, size);
+ index += size;
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ isArray = 1;
+ }
+ if(isArray == 1) {
+ int elementsize = classsize[TYPE(header)];
+ objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
+ } else {
+ objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
+ }
+ if((header = mhashSearch(objoid)) == NULL) {
+ /* Obj not found, send oid */
+ *(buffer + index) = OBJECT_NOT_FOUND;
+ index += sizeof(char);
+ *((unsigned int *)(buffer+index)) = objoid;
+ index += sizeof(unsigned int);
+ break;
+ } else {/* Obj Found */
+ /* send the oid, it's size, it's header and data */
+ GETSIZE(size, header);
+ size+=sizeof(objheader_t);
+ *(buffer+index) = OBJECT_FOUND;
+ index += sizeof(char);
+ *((unsigned int *)(buffer+index)) = objoid;
+ index += sizeof(unsigned int);
+ *((int *)(buffer+index)) = size;
+ index += sizeof(int);
+ memcpy(buffer+index, header, size);
+ index += size;
+ isArray = 0;
+ continue;
+ }
+ }
+ }
+ /* Check for overflow in the buffer */
+ if (index >= PRE_BUF_SIZE) {
+ printf("Char buffer is overflowing\n");
+ return 1;
+ }
+ /* Send Prefetch response control message only once*/
+ if(count == 1){
+ control = TRANS_PREFETCH_RESPONSE;
+ if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+ perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+ return 1;
+ }
+ }
+
+ /* Add the buffer size into buffer as a parameter */
+ *((unsigned int *)buffer)=index;
+ /* Send the entire buffer with its size and oids found and not found */
+ if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+ perror("Error sending oids found\n");
+ return 1;
+ }
+ }
+ return 0;
}
void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
STATUS(header) &= ~(LOCK);
} else {
randomdelay();
- printf("DEBUG-> processReqNotify() Object is still locked\n");
+ printf("processReqNotify() Object is still locked\n");
goto checkversion;
}
}
#include "machinepile.h"
-int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
- prefetchpile_t *tmp = *head;
+prefetchpile_t *insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
+ prefetchpile_t *tmp = head;
+ prefetchpile_t *ptr;
objpile_t *objnode;
unsigned int *oidarray;
- int ntuples;
+ short *offvalues;
+ int i;
char found = 0;
while (tmp != NULL) {
if (tmp->mid == mid) { // Found a match with exsisting machine id
if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
printf("Calloc error: %s %d\n", __FILE__, __LINE__);
- return -1;
+ return NULL;
+ }
+ if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
}
/* Fill objpiles DS */
objnode->oid = oid;
objnode->numoffset = numoffset;
- objnode->offset = offset;
+ for(i = 0; i<numoffset; i++)
+ offvalues[i] = offset[i];
+ objnode->offset = offvalues;
objnode->next = tmp->objpiles;
tmp->objpiles = objnode;
found = 1;
}
tmp = tmp->next;
}
- if (!found) {// Not found => insert new mid DS
- if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
- printf("Calloc error: %s %d\n", __FILE__, __LINE__);
- return -1;
- }
- tmp->mid = mid;
- if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
- printf("Calloc error: %s %d\n", __FILE__, __LINE__);
- return -1;
+
+ tmp = head;
+ if(found != 1) {
+ if(tmp->mid == 0) {//First time
+ tmp->mid = mid;
+ if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ // Fill objpiles DS
+ objnode->oid = oid;
+ objnode->numoffset = numoffset;
+ for(i = 0; i<numoffset; i++)
+ offvalues[i] = *((short *)offset + i);
+ objnode->offset = offvalues;
+ objnode->next = NULL;
+ tmp->objpiles = objnode;
+ tmp->next = NULL;
+ } else {
+ if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ tmp->mid = mid;
+ if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ // Fill objpiles DS
+ objnode->oid = oid;
+ objnode->numoffset = numoffset;
+ for(i = 0; i<numoffset; i++)
+ offvalues[i] = *((short *)offset + i);
+ objnode->offset = offvalues;
+ objnode->next = NULL;
+ tmp->objpiles = objnode;
+ tmp->next = head;
+ head = tmp;
}
- /* Fill objpiles DS */
- objnode->oid = oid;
- objnode->numoffset = numoffset;
- objnode->offset = offset;
- objnode->next = tmp->objpiles; // i.e., objnode->next = NULL;
- tmp->objpiles = objnode;
- tmp->next = *head;
- *head = tmp;
}
- return 0;
+
+ return head;
}
-
-
#include <stdio.h>
#include <stdlib.h>
-int insertPile(int, unsigned int, short, short *, prefetchpile_t **);
+prefetchpile_t *insertPile(int, unsigned int, short, short *, prefetchpile_t *);
#endif
#include "mcpileq.h"
-mcpileq_t mcqueue;
+mcpileq_t mcqueue; //Global queue
void mcpileqInit(void) {
/* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */
}
/* Insert to the rear of machine pile queue */
-void mcpileenqueue(prefetchpile_t *node) {
- prefetchpile_t *tmp, *prev;
+void mcpileenqueue(prefetchpile_t *node, prefetchpile_t *tail) {
if(mcqueue.front == NULL && mcqueue.rear == NULL) {
- mcqueue.front = mcqueue.rear = node;
+ mcqueue.front = node;
+ mcqueue.rear = tail;
} else {
- tmp = mcqueue.rear->next = node;
- while(tmp != NULL) {
- prev = tmp;
- tmp = tmp->next;
- }
- mcqueue.rear = prev;
+ mcqueue.rear->next = node;
+ mcqueue.rear = tail;
}
}
prefetchpile_t *mcpiledequeue(void) {
prefetchpile_t *retnode;
if(mcqueue.front == NULL) {
- printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__);
+ printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__);
return NULL;
}
retnode = mcqueue.front;
while (prefetchpile_ptr != NULL)
{
- objpile_ptr = prefetchpile_ptr->objpiles;
- while (objpile_ptr != NULL)
- {
- if (objpile_ptr->numoffset > 0)
- free(objpile_ptr->offset);
- objpile_next_ptr = objpile_ptr->next;
+ prefetchpile_next_ptr = prefetchpile_ptr;
+ while(prefetchpile_ptr->objpiles != NULL) {
+ if(prefetchpile_ptr->objpiles->numoffset > 0) {
+ free(prefetchpile_ptr->objpiles->offset);
+ }
+ objpile_ptr = prefetchpile_ptr->objpiles;
+ prefetchpile_ptr->objpiles = objpile_ptr->next;
free(objpile_ptr);
- objpile_ptr = objpile_next_ptr;
}
- prefetchpile_next_ptr = prefetchpile_ptr->next;
- free(prefetchpile_ptr);
- prefetchpile_ptr = prefetchpile_next_ptr;
+ prefetchpile_ptr = prefetchpile_next_ptr->next;
+ free(prefetchpile_next_ptr);
}
}
-
-
}mcpileq_t;
void mcpileqInit(void);
-void mcpileenqueue(prefetchpile_t *);
+void mcpileenqueue(prefetchpile_t *, prefetchpile_t *);
prefetchpile_t *mcpiledequeue(void);
void mcpiledelete();
void mcpiledisplay();
objstr_t *objstrCreate(unsigned int size)
{
- objstr_t *tmp = malloc(sizeof(objstr_t) + size);
+ objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
tmp->size = size;
tmp->next = NULL;
tmp->top = tmp + 1; //points to end of objstr_t structure!
{ //end of list, all full
if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
{
- store->next = (objstr_t *)malloc(sizeof(objstr_t) + size);
+ store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size));
if (store->next == NULL)
return NULL;
store = store->next;
}
else
{
- store->next = malloc(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE);
+ store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE));
if (store->next == NULL)
return NULL;
store = store->next;
pqueue.front = pqueue.front->next;
if (pqueue.front == NULL)
pqueue.rear = NULL;
+ retnode->next = NULL;
return retnode;
}
void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
int qnodesize;
int len = 0;
+ int i;
/* Allocate for the queue node*/
char *node;
memcpy(node + len, endoffsets, ntuples*sizeof(short));
len += ntuples * sizeof(short);
memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
+
/* Lock and insert into primary prefetch queue */
pthread_mutex_lock(&pqueue.qlock);
pre_enqueue((prefetchqelem_t *)node);
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
+
pthread_detach(tPrefetch);
//Create and Initialize a pool of threads
/* This function initializes things required in the transaction start*/
transrecord_t *transStart()
{
- transrecord_t *tmp = malloc(sizeof(transrecord_t));
+ transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
#ifdef COMPILER
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
- char* ipaddr;
- midtoIP(machinenumber, ipaddr);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
pthread_mutex_t tlshrd;
thread_data_array_t *thread_data_array;
- if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
+ if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
printf("Malloc error %s, %d\n", __FILE__, __LINE__);
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
/* Close connection */
close(sd);
- //pthread_exit(NULL);
+ pthread_exit(NULL);
}
/* This function decides the reponse that needs to be sent to
/* This function opens a connection, places an object read request to the
* remote machine, reads the control message and object if available and
* copies the object and its header to the local cache.
- * TODO replace mnum and midtoIP() with MACHINE_IP address later */
+ * */
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
int sd, size, val;
perror("Error in socket\n");
return NULL;
}
+
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
midtoIP(mnum,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);
+
/* Open connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect\n");
free(localtdata->transinfo->objnotfound);
}
- //pthread_exit(NULL);
+ pthread_exit(NULL);
}
/* This function completes the ABORT process if the transaction is aborting */
}
printf("TRANS_ABORTED\n");
+
return 0;
}
/*This function completes the COMMIT process is the transaction is commiting*/
int transComProcess(local_thread_data_array_t *localtdata) {
- static int prevsize = 0, *prevptr;
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
unsigned int *oidmod, *oidcreated, *oidlocked;
}
/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
- char *ptr, *tmp;
- int ntuples, slength, i, machinenum;
- int maxoffset;
+ char *ptr;
+ int ntuples, i, machinenum, count=0;
unsigned int *oid;
short *endoffsets, *arryfields, *offset;
prefetchpile_t *head = NULL;
endoffsets = GET_PTR_EOFF(ptr, ntuples);
arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
/* Check for redundant tuples by comparing oids of each tuple */
for(i = 0; i < ntuples; i++) {
if(oid[i] == 0)
return NULL;
}
/* Insert into machine pile */
- offset = &arryfields[endoffsets[i-1]];
- insertPile(machinenum, oid[i], numoffset[i], offset, &head);
+ if(i == 0){
+ offset = &arryfields[0];
+ } else {
+ offset = &arryfields[endoffsets[i-1]];
+ }
+
+ if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){
+ printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
}
+
return head;
}
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
/* Find offset length for each tuple */
int numoffset[ntuples];//Number of offsets for each tuple
numoffset[0] = endoffsets[0];
/* Look in Prefetch cache */
checkPreCache(node, numoffset, oid[i],i);
}
-
}
+
/* Make machine groups */
- head = makePreGroups(node, numoffset);
+ if((head = makePreGroups(node, numoffset)) == NULL) {
+ printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
return head;
}
void *transPrefetch(void *t) {
prefetchqelem_t *qnode;
prefetchpile_t *pilehead = NULL;
+ prefetchpile_t *ptr = NULL, *piletail = NULL;
while(1) {
/* lock mutex of primary prefetch queue */
pthread_exit(NULL);
}
pthread_mutex_unlock(&pqueue.qlock);
+
/* Reduce redundant prefetch requests */
checkPrefetchTuples(qnode);
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
- pilehead = foundLocal(qnode);
+ if((pilehead = foundLocal(qnode)) == NULL) {
+ printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+
+ ptr = pilehead;
+ while(ptr != NULL) {
+ if(ptr->next == NULL) {
+ piletail = ptr;
+ }
+ ptr = ptr->next;
+ }
/* Lock mutex of pool queue */
pthread_mutex_lock(&mcqueue.qlock);
/* Update the pool queue with the new remote machine piles generated per prefetch call */
- mcpileenqueue(pilehead);
+ mcpileenqueue(pilehead, piletail);
/* Broadcast signal on machine pile queue */
pthread_cond_broadcast(&mcqueue.qcond);
/* Unlock mutex of machine pile queue */
pthread_mutex_unlock(&mcqueue.qlock);
/* Deallocate the prefetch queue pile node */
predealloc(qnode);
-
+ pthread_exit(NULL);
}
}
/* Deallocate the machine queue pile node */
mcdealloc(mcpilenode);
+ pthread_exit(NULL);
}
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
- int sd, i, offset, off, len, endpair, count = 0;
+ int sd, i, off, len, endpair, count = 0;
struct sockaddr_in serv_addr;
struct hostent *server;
char machineip[16], control;
objpile_t *tmp;
-
/* Send Trans Prefetch Request */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for SEND_PREFETCH_REQUEST\n");
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
- off = offset = 0;
+ off = 0;
count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
char oidnoffset[len];
+ bzero(oidnoffset, len);
memcpy(oidnoffset, &len, sizeof(int));
off = sizeof(int);
memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
off += sizeof(unsigned int);
for(i = 0; i < tmp->numoffset; i++) {
- memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
+ memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
off+=sizeof(short);
}
if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
close(sd);
return;
}
+
tmp = tmp->next;
}
char *ptr;
void *modptr, *oldptr;
+
/* Read prefetch response from the Remote machine */
if((val = read(sd, &control, sizeof(char))) <= 0) {
perror("No control response for Prefetch request sent\n");
perror("Size of buffer not recv\n");
return;
}
- memcpy(&bufsize, buffer, sizeof(unsigned int));
+ bufsize = *((unsigned int *) buffer);
ptr = buffer + sizeof(unsigned int);
/* Keep receiving the buffer containing oid info */
do {
n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
sum +=n;
} while(sum < bufsize && n != 0);
+
/* Decode the contents of the buffer */
index = sizeof(unsigned int);
while(index < (bufsize - sizeof(unsigned int))) {
if(buffer[index] == OBJECT_FOUND) {
/* Increment it to get the object */
index += sizeof(char);
- memcpy(&oid, buffer + index, sizeof(unsigned int));
+ oid = *((unsigned int *)(buffer+index));
index += sizeof(unsigned int);
/* For each object found add to Prefetch Cache */
- memcpy(&objsize, buffer + index, sizeof(int));
+ objsize = *((int *)(buffer+index));
index+=sizeof(int);
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
/* Increment it to get the object */
/* TODO: For each object not found query DHT for new location and retrieve the object */
index += sizeof(char);
- //memcpy(&oid, buffer + index, sizeof(unsigned int));
oid = *((unsigned int *)(buffer + index));
index += sizeof(unsigned int);
/* Throw an error */
return -1;
} else {
msg[0] = THREAD_NOTIFY_REQUEST;
- msg[1] = numoid;
+ *((unsigned int *)(&msg[1])) = numoid;
/* Send array of oids */
size = sizeof(unsigned int);
{
i = 0;
while(i < numoid) {
version = versionarry[i];
- *((unsigned short *)(&msg[1] + size)) = oid;
+ *((unsigned short *)(&msg[1] + size)) = version;
size += sizeof(unsigned short);
i++;
}
if (bytesSent < 0){
perror("reqNotify():send()");
status = -1;
- } else if (bytesSent != 1 + 5*sizeof(unsigned int)){
+ } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int)){
printf("reNotify(): error, sent %d bytes\n", bytesSent);
status = -1;
} else {
close(sock);
return status;
}
-
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
notifydata_t *ndata;
int i, objIsFound = 0, index;