}
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
- size = sizeof(objheader_t) + sizeof(classsize[TYPE(h)]);
+ GETSIZE(size, h);
+ size += sizeof(objheader_t);
+
if (h == NULL) {
ctrl = OBJECT_NOT_FOUND;
if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
ptr = (char *) modptr;
for(i = 0 ; i < fixed.nummod; i++) {
- headaddr = (objheader_t *) ptr;
- oid = OID(headaddr);
- oidmod[i] = oid;
- ptr += sizeof(objheader_t) + classsize[TYPE(headaddr)];
+ int tmpsize;
+ headaddr = (objheader_t *) ptr;
+ oid = OID(headaddr);
+ oidmod[i] = oid;
+ GETSIZE(tmpsize, headaddr);
+ ptr += sizeof(objheader_t) + tmpsize;
}
/*Process the information read */
/* Set all ref counts as 1 and do garbage collection */
ptr = modptr;
for(i = 0; i< fixed->nummod; i++) {
- tmp_header = (objheader_t *)ptr;
- tmp_header->rcount = 0;
- ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
+ int tmpsize;
+ tmp_header = (objheader_t *)ptr;
+ tmp_header->rcount = 0;
+ GETSIZE(tmpsize, tmp_header);
+ ptr += sizeof(objheader_t) + tmpsize;
}
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< transinfo->numlocked; i++) {
incr += sizeof(unsigned int);
version = *((short *)(objread + incr));
} else {//Objs modified
- headptr = (objheader_t *) ptr;
- oid = OID(headptr);
- version = headptr->version;
- ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
+ int tmpsize;
+ headptr = (objheader_t *) ptr;
+ oid = OID(headptr);
+ version = headptr->version;
+ GETSIZE(tmpsize, headptr);
+ ptr += sizeof(objheader_t) + tmpsize;
}
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
/* Change ptr address in mhash table */
mhashRemove(oidmod[i]);
mhashInsert(oidmod[i], (((char *)modptr) + offset));
- offset += sizeof(objheader_t) + classsize[TYPE(header)];
-
+ {
+ int tmpsize;
+ GETSIZE(tmpsize,header);
+ offset += sizeof(objheader_t) + tmpsize;
+ }
/* Update object version number */
header = (objheader_t *) mhashSearch(oidmod[i]);
header->version += 1;
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
- unsigned int oid, index = 0;
- char *ptr, buffer[PRE_BUF_SIZE];
- void *mobj;
- unsigned int objoid;
- char *header, control;
- objheader_t * head;
- int bytesRecvd;
-
- /* Repeatedly recv the oid and offset pairs sent for prefetch */
- while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
- count++;
- if(length == -1)
- break;
- sum = 0;
- index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
- // first 4 bytes are saved to send the
- // size of the buffer (that is computed at the end of the loop)
- bytesRecvd = 0;
- do {
- bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
- sizeof(unsigned int) - bytesRecvd, 0);
- } while (bytesRecvd < sizeof(unsigned int));
- numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
- N = numoffset * sizeof(short);
- short offset[numoffset];
- ptr = (char *)&offset;
- /* Recv the offset values per oid */
- do {
- n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
-
- /* Process each oid */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* send the oid, it's size, it's header and data */
- header = (char *) mobj;
- head = (objheader_t *) header;
- size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
- if((header = (char *) mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- break;
- } else {/* Obj Found */
- /* send the oid, it's size, it's header and data */
- head = (objheader_t *) header;
- size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- continue;
- }
- }
- }
- /* Check for overflow in the buffer */
- if (index >= PRE_BUF_SIZE) {
- printf("Char buffer is overflowing\n");
- return 1;
- }
- /* Send Prefetch response control message only once*/
- if(count == 1) {
- control = TRANS_PREFETCH_RESPONSE;
- if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
- }
-
- /* Add the buffer size into buffer as a parameter */
- *((unsigned int *)buffer)=index;
- /* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending oids found\n");
- return 1;
- }
+ int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
+ unsigned int oid, index = 0;
+ char *ptr, buffer[PRE_BUF_SIZE];
+ void *mobj;
+ unsigned int objoid;
+ char control;
+ objheader_t * header;
+ int bytesRecvd;
+
+ /* Repeatedly recv the oid and offset pairs sent for prefetch */
+ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+ count++;
+ if(length == -1)
+ break;
+ sum = 0;
+ index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
+ // first 4 bytes are saved to send the
+ // size of the buffer (that is computed at the end of the loop)
+ bytesRecvd = 0;
+ do {
+ bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
+ sizeof(unsigned int) - bytesRecvd, 0);
+ } while (bytesRecvd < sizeof(unsigned int));
+ numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
+ N = numoffset * sizeof(short);
+ short offset[numoffset];
+ ptr = (char *)&offset;
+ /* Recv the offset values per oid */
+ do {
+ n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
+ sum += n;
+ } while(sum < N && n != 0);
+
+ /* Process each oid */
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ *(buffer + index) = OBJECT_NOT_FOUND;
+ index += sizeof(char);
+ memcpy(buffer+index, &oid, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* send the oid, it's size, it's header and data */
+ header = mobj;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ *(buffer + index) = OBJECT_FOUND;
+ index += sizeof(char);
+ memcpy(buffer+index, &oid, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ memcpy(buffer+index, &size, sizeof(int));
+ index += sizeof(int);
+ memcpy(buffer + index, header, size);
+ index += size;
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ objoid = *((int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
+ if((header = mhashSearch(objoid)) == NULL) {
+ /* Obj not found, send oid */
+ *(buffer + index) = OBJECT_NOT_FOUND;
+ index += sizeof(char);
+ memcpy(buffer+index, &oid, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ break;
+ } else {/* Obj Found */
+ /* send the oid, it's size, it's header and data */
+ GETSIZE(size, header);
+ size+=sizeof(objheader_t);
+ *(buffer + index) = OBJECT_FOUND;
+ index += sizeof(char);
+ memcpy(buffer+index, &oid, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ memcpy(buffer+index, &size, sizeof(int));
+ index += sizeof(int);
+ memcpy(buffer + index, header, size);
+ index += size;
+ continue;
}
- return 0;
+ }
+ }
+ /* Check for overflow in the buffer */
+ if (index >= PRE_BUF_SIZE) {
+ printf("Char buffer is overflowing\n");
+ return 1;
+ }
+ /* Send Prefetch response control message only once*/
+ if(count == 1) {
+ control = TRANS_PREFETCH_RESPONSE;
+ if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+ perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+ return 1;
+ }
+ }
+
+ /* Add the buffer size into buffer as a parameter */
+ *((unsigned int *)buffer)=index;
+ /* Send the entire buffer with its size and oids found and not found */
+ if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+ perror("Error sending oids found\n");
+ return 1;
+ }
+ }
+ return 0;
}
//Add oid into a machine that is already present in the pile linked list structure
while(tmp != NULL) {
if (tmp->mid == mid) {
- if (STATUS(headeraddr) & NEW) {
- tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
- tmp->numcreated = tmp->numcreated + 1;
- tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
- }else if (STATUS(headeraddr) & DIRTY) {
- tmp->oidmod[tmp->nummod] = OID(headeraddr);
- tmp->nummod = tmp->nummod + 1;
- tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
- } else {
- offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
- *((unsigned int *)(tmp->objread + offset))=OID(headeraddr);
- offset += sizeof(unsigned int);
- memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
- tmp->numread = tmp->numread + 1;
- }
- found = 1;
- break;
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated = tmp->numcreated + 1;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ }else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod = tmp->nummod + 1;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(tmp->objread + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
+ tmp->numread = tmp->numread + 1;
+ }
+ found = 1;
+ break;
}
tmp = tmp->next;
}
//Add oid for any new machine
if (!found) {
- if((ptr = pCreate(num_objs)) == NULL) {
- return NULL;
- }
- ptr->mid = mid;
- if (STATUS(headeraddr) & NEW) {
- ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
- ptr->numcreated = ptr->numcreated + 1;
- ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
- } else if (STATUS(headeraddr) & DIRTY) {
- ptr->oidmod[ptr->nummod] = OID(headeraddr);
- ptr->nummod = ptr->nummod + 1;
- ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
- } else {
- *((unsigned int *)ptr->objread)=OID(headeraddr);
- memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
- ptr->numread = ptr->numread + 1;
- }
- ptr->next = pile;
- pile = ptr;
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated = ptr->numcreated + 1;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod = ptr->nummod + 1;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
+ ptr->numread = ptr->numread + 1;
+ }
+ ptr->next = pile;
+ pile = ptr;
}
-
+
/* Clear Flags */
STATUS(headeraddr) &= ~(NEW);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
tmp = mhashSearch(oid);
- size = sizeof(objheader_t)+classsize[TYPE(tmp)];
+ GETSIZE(size, tmp);
+ size += sizeof(objheader_t);
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
#endif
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
found = 1;
- size = sizeof(objheader_t)+classsize[TYPE(tmp)];
+ GETSIZE(size, tmp);
+ size+=sizeof(objheader_t);
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
/* Insert into cache's lookup table */
/* Check Prefetch cache again */
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
found = 1;
- size = sizeof(objheader_t)+classsize[TYPE(tmp)];
+ GETSIZE(size, tmp);
+ size+=sizeof(objheader_t);
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
/* Insert into cache's lookup table */
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
int size;
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
- size=sizeof(objheader_t)+classsize[TYPE(headeraddr)];
+ GETSIZE(size,headeraddr);
+ size+=sizeof(objheader_t);
if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
perror("Error sending obj modified for thread\n");
pthread_exit(NULL);
/* Write modified objects into the mainobject store */
for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
- size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+ GETSIZE(size,headeraddr);
+ size+=sizeof(objheader_t);
memcpy(modptr+offset, headeraddr, size);
offset += size;
}
/* Write new objects into the mainobject store */
for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
- size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+ GETSIZE(size, headeraddr);
+ size+=sizeof(objheader_t);
memcpy(modptr+offset, headeraddr, size);
offset += size;
}
incr += sizeof(unsigned int);
version = *((short *)(localtdata->tdata->buffer->objread + incr));
} else {//Objs modified
+ int tmpsize;
headptr = (objheader_t *)ptr;
oid = OID(headptr);
version = headptr->version;
- ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
+ GETSIZE(tmpsize, headptr);
+ ptr += sizeof(objheader_t) + tmpsize;
}
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
/* Set all ref counts as 1 and do garbage collection */
ptr = (char *)modptr;
for(i = 0; i< nummod; i++) {
+ int tmpsize;
tmp_header = (objheader_t *)ptr;
tmp_header->rcount = 0;
- ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
+ GETSIZE(tmpsize, tmp_header);
+ ptr += sizeof(objheader_t) + tmpsize;
}
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< numlocked; i++) {
/* Process each modified object saved in the mainobject store */
for(i = 0; i < nummod; i++) {
+ int tmpsize;
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
/* Change ptr address in mhash table */
mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
mhashInsert(oidmod[i], (((char *)modptr) + offset));
- offset += sizeof(objheader_t) + classsize[TYPE(header)];
+ GETSIZE(tmpsize, header);
+ offset += sizeof(objheader_t) + tmpsize;
/* Update object version number */
header = (objheader_t *) mhashSearch(oidmod[i]);
for (i = 0; i < numcreated; i++)
{
+ int tmpsize;
header = (objheader_t *)(((char *)modptr) + offset);
mhashInsert(oidcreated[i], (((char *)modptr) + offset));
- offset += sizeof(objheader_t) + classsize[TYPE(header)];
+ GETSIZE(tmpsize, header);
+ offset += sizeof(objheader_t) + tmpsize;
lhashInsert(oidcreated[i], myIpAddr);
}
if(control == TRANS_PREFETCH_RESPONSE) {
/*For each oid and offset tuple sent as prefetch request to remote machine*/
while(i < count) {
- /* Clear contents of buffer */
- memset(buffer, 0, RECEIVE_BUFFER_SIZE);
sum = 0;
index = 0;
/* Read the size of buffer to be received */
index += sizeof(unsigned int);
/* For each object found add to Prefetch Cache */
memcpy(&objsize, buffer + index, sizeof(int));
+ index+=sizeof(int);
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);