extern int classsize[];
objstr_t *mainobjstore;
+pthread_mutex_t mainobjstore_mutex;
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
int dstmInit(void)
{
- mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+ mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+ pthread_mutex_init(&mainobjstore_mutex, NULL);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
if(fixed.nummod != 0) { // If pile contains more than one modified object,
// allocate new object store and recv all modified objects
// TODO deallocate this space
+ pthread_mutex_lock(&mainobjstore_mutex);
if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mainobjstore_mutex);
return 1;
}
+ pthread_mutex_unlock(&mainobjstore_mutex);
sum = 0;
do { // Recv the objs that are modified by the Coordinator
n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
unsigned int objoid;
char *header, control;
objheader_t * head;
+ int bytesRecvd;
/* Repeatedly recv the oid and offset pairs sent for prefetch */
while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
// first 4 bytes are saved to send the
// size of the buffer (that is computed at the end of the loop)
- oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+ bytesRecvd = 0;
+ do {
+ bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
+ sizeof(unsigned int) - bytesRecvd, 0);
+ } while (bytesRecvd < sizeof(unsigned int));
numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
N = numoffset * sizeof(short);
short offset[numoffset];
}
/* Add the buffer size into buffer as a parameter */
- memcpy(buffer, &index, sizeof(unsigned int));
+ *((unsigned int *)buffer)=index;
/* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
+ if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
perror("Error sending oids found\n");
return 1;
}
void mcpileenqueue(prefetchpile_t *node) {
prefetchpile_t *tmp, *prev;
if(mcqueue.front == NULL && mcqueue.rear == NULL) {
- tmp = mcqueue.front = node;
+ mcqueue.front = mcqueue.rear = node;
+ /*tmp = mcqueue.front = node;
while(tmp != NULL) {
prev = tmp;
tmp = tmp->next;
}
- mcqueue.rear = prev;
+ mcqueue.rear = prev;*/
} else {
tmp = mcqueue.rear->next = node;
while(tmp != NULL) {
}
retnode = mcqueue.front;
mcqueue.front = mcqueue.front->next;
+ if (mcqueue.front == NULL)
+ mcqueue.rear = NULL;
retnode->next = NULL;
return retnode;
}
}
+/* Delete prefetchpile_t and everything it points to */
void mcdealloc(prefetchpile_t *node) {
- /* Remove the offset ptr and linked lists of objpile_t */
- objpile_t *delnode;
- while(node->objpiles != NULL) {
- node->objpiles->offset = NULL;
- delnode = node->objpiles;
- node->objpiles = node->objpiles->next;
- free(delnode);
- node->objpiles->next = NULL;
+ prefetchpile_t *prefetchpile_ptr;
+ prefetchpile_t *prefetchpile_next_ptr;
+ objpile_t *objpile_ptr;
+ objpile_t *objpile_next_ptr;
+
+ prefetchpile_ptr = node;
+
+ while (prefetchpile_ptr != NULL)
+ {
+ objpile_ptr = prefetchpile_ptr->objpiles;
+ while (objpile_ptr != NULL)
+ {
+ if (objpile_ptr->numoffset > 0)
+ free(objpile_ptr->offset);
+ objpile_next_ptr = objpile_ptr->next;
+ free(objpile_ptr);
+ objpile_ptr = objpile_next_ptr;
+ }
+ prefetchpile_next_ptr = prefetchpile_ptr->next;
+ free(prefetchpile_ptr);
+ prefetchpile_ptr = prefetchpile_next_ptr;
}
- free(node);
- node->next = NULL;
}
+
+
extern int classsize[];
extern primarypfq_t pqueue; // shared prefetch queue
extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
-objstr_t *prefetchcache; //Global Prefetch cache
+objstr_t *prefetchcache; //Global Prefetch cache
+pthread_mutex_t prefetchcache_mutex;
+extern pthread_mutex_t mainobjstore_mutex;
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch;
int t, rc;
//Create and initialize prefetch cache structure
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+ pthread_mutex_init(&prefetchcache_mutex, NULL);
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR))
return; //Failure
/* modptr points to the beginning of the object store
* created at the Pariticipant */
+ pthread_mutex_lock(&mainobjstore_mutex);
if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mainobjstore_mutex);
return NULL;
}
+ pthread_mutex_unlock(&mainobjstore_mutex);
/* Write modified objects into the mainobject store */
for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
}
/* Insert into machine pile */
offset = &arryfields[endoffsets[i-1]];
- insertPile(machinenum, oid[i], numoffset[i], offset, head);
+ insertPile(machinenum, oid[i], numoffset[i], offset, &head);
}
-
return head;
}
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
- int sd, i, offset, off, len, endpair, numoffsets, count = 0;
+ int sd, i, offset, off, len, endpair, count = 0;
struct sockaddr_in serv_addr;
struct hostent *server;
char machineip[16], control;
off = sizeof(int);
memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
off += sizeof(unsigned int);
- for(i = 0; i < numoffsets; i++) {
- offset = off + (i * sizeof(short));
- memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
+ for(i = 0; i < tmp->numoffset; i++) {
+ memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
+ off+=sizeof(short);
}
if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
perror("Error sending fixed bytes for thread\n");
index += sizeof(char);
memcpy(&oid, buffer + index, sizeof(unsigned int));
index += sizeof(unsigned int);
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
/* For each object found add to Prefetch Cache */
memcpy(&objsize, buffer + index, sizeof(int));
+ pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
return;
}
+ pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(modptr, buffer+index, objsize);
- index += sizeof(int);
+ index += objsize;
/* Insert the oid and its address into the prefetch hash lookup table */
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
} else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
/* Throw an error */
printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
exit(-1);
- } else
+ } else {
printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
+ return;
+ }
}
i++;
if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
{
prefetch(1, &oid, &numoffsets, NULL);
- pthread_mutex_lock(&pflookup.lock);
while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+ {
+ pthread_mutex_lock(&pflookup.lock);
pthread_cond_wait(&pflookup.cond, &pflookup.lock);
- pthread_mutex_unlock(&pflookup.lock);
+ pthread_mutex_unlock(&pflookup.lock);
+ }
}
}