return 1; //failure
//Initialize socket pool
- if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) {
+ if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
return 0;
}
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
- int length;
- char *recvbuffer, *sendbuffer, control;
- unsigned int oid, mid;
- objheader_t *header;
- struct sockaddr_in remoteAddr;
- oidmidpair_t oidmid;
-
- do {
- recv_data((int)acceptfd, &length, sizeof(int));
- if(length != -1) {
- recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
- oid = oidmid.oid;
- mid = oidmid.mid;
- size = length - sizeof(int) - (2 * sizeof(unsigned int));
- numoffset = size/sizeof(short);
- short offsetarry[numoffset];
- recv_data((int) acceptfd, offsetarry, size);
-
- int sd = -1;
- if((sd = getSock(transPResponseSocketPool, mid)) == -1) {
- printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
- exit(-1);
- }
-
- /*Process each oid */
- if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- } else { /* Object Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
-
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
- }
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
- unsigned short length = ao->___length___;
- /* Check if array out of bounds */
- if(offsetarry[i]< 0 || offsetarry[i] >= length) {
- break;
- }
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
- } else {
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
- }
-
- if((header = mhashSearch(oid)) == NULL) {
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- break;
- } else {/* Obj Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- }
- isArray = 0;
- }
- }
-
- //Release socket
- int status;
- if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) {
- printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__);
- return -1;
- }
- }
- } while (length != -1);
- return 0;
+ int i, size, objsize, numoffset = 0;
+ int length;
+ char *recvbuffer, *sendbuffer, control;
+ unsigned int oid, mid=-1;
+ objheader_t *header;
+ oidmidpair_t oidmid;
+ int sd = -1;
+
+ while(1) {
+ recv_data((int)acceptfd, &length, sizeof(int));
+ if(length == -1)
+ break;
+ recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ if (mid != oidmid.mid) {
+ if (mid!=-1) {
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ }
+ mid=oidmid.mid;
+ if((sd = getSockWithLock(transPResponseSocketPool, mid)) == -1) {
+ printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+ exit(-1);
+ }
+ }
+ size = length - sizeof(int) - (2 * sizeof(unsigned int));
+ numoffset = size/sizeof(short);
+ short offsetarry[numoffset];
+ recv_data((int) acceptfd, offsetarry, size);
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ sendbuffer = calloc(1, size);
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ control = TRANS_PREFETCH_RESPONSE;
+
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ } else { /* Object Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ sendbuffer = calloc(1, size);
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ break;
+ } else {/* Obj Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ }
+ }
+ }
+ }
+ //Release socket
+ if (mid!=-1)
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+
+ return 0;
}
int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
- int numbytes = 0;
-
send_data(sd, control, sizeof(char));
/* Send the buffer with its size */
int length = *(size);
#define MAXSPINS 1000
-inline void Lock(unsigned int *s) {
+inline void Lock(volatile unsigned int *s) {
while(test_and_set(s)) {
int i=0;
while(*s) {
Lock(&sockhash->mylock);
ptr=&sockhash->table[key];
- while(ptr!=NULL) {
+ while(*ptr!=NULL) {
if (mid == (*ptr)->mid) {
socknode_t *tmp=*ptr;
sd = tmp->sd;
ptr=&sockhash->table[key];
- while(ptr!=NULL) {
+ while(*ptr!=NULL) {
if (mid == (*ptr)->mid) {
socknode_t *tmp=*ptr;
sd = tmp->sd;
socknode_t *inusenode = calloc(1, sizeof(socknode_t));
inusenode->next=sockhash->inuse;
sockhash->inuse=inusenode;
- inusenode->next=sockhash;
- sockhash=inusenode;
return sd;
} else {
return -1;
}
}
+int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
+ socknode_t **ptr;
+ int key = mid%(sockhash->size);
+ int sd;
+
+ ptr=&sockhash->table[key];
+
+ while(*ptr!=NULL) {
+ if (mid == (*ptr)->mid) {
+ return (*ptr)->sd;
+ }
+ ptr=&((*ptr)->next);
+ }
+ if((sd = createNewSocket(mid)) != -1) {
+ *ptr=calloc(1, sizeof(socknode_t));
+ (*ptr)->mid=mid;
+ (*ptr)->sd=sd;
+ return sd;
+ } else {
+ return -1;
+ }
+}
+
+
void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
Lock(&sockhash->mylock);
inusenode->next = sockhash->inuse;
volatile unsigned int mylock;
} sockPoolHashTable_t;
-sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float);
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int);
int getSock(sockPoolHashTable_t *, unsigned int);
+int getSock2(sockPoolHashTable_t *, unsigned int);
int getSockWithLock(sockPoolHashTable_t *, unsigned int);
-int freeSock(sockPoolHashTable_t *, unsigned int, int);
-int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
+void freeSock(sockPoolHashTable_t *, unsigned int, int);
+void freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
void insToList(sockPoolHashTable_t *, socknode_t *);
void insToListWithLock(sockPoolHashTable_t *, socknode_t *);
int createNewSocket(unsigned int);
-int CompareAndSwap(int *, int, int);
-void InitLock(SpinLock *);
-void Lock (SpinLock *);
-void UnLock (SpinLock *);
#if 0
/************************************************
if(ntuples > 0) {
int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
char * node;
-
+
if((node = calloc(1, qnodesize)) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
return;
/* This function starts up the transaction runtime. */
int dstmStartup(const char * option) {
- pthread_t thread_Listen;
- pthread_attr_t attr;
- int master=option!=NULL && strcmp(option, "master")==0;
-
- if (processConfigFile() != 0)
- return 0; //TODO: return error value, cause main program to exit
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ int master=option!=NULL && strcmp(option, "master")==0;
+
+ if (processConfigFile() != 0)
+ return 0; //TODO: return error value, cause main program to exit
#ifdef COMPILER
- if (!master)
- threadcount--;
+ if (!master)
+ threadcount--;
#endif
-
- //Initialize socket pool
- if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
- printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
- return 0;
- }
- if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
- printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
- return 0;
- }
-
- dstmInit();
- transInit();
-
- if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- return 1;
- } else {
- dstmListen();
- return 0;
- }
-
+
+ //Initialize socket pool
+ transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
+ transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+
+ dstmInit();
+ transInit();
+
+ if (master) {
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+ return 1;
+ } else {
+ dstmListen();
+ return 0;
+ }
}
//TODO Use this later
* */
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
- int size, val;
- struct sockaddr_in serv_addr;
- char machineip[16];
- char control;
- objheader_t *h;
- void *objcopy = NULL;
-
- int sd;
- if((sd = getSock(transReadSockPool, mnum)) == -1) {
- printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
- return NULL;
- }
+ int size, val;
+ struct sockaddr_in serv_addr;
+ char machineip[16];
+ char control;
+ objheader_t *h;
+ void *objcopy = NULL;
+
+ int sd = getSock2(transReadSockPool, mnum);
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ send_data(sd, readrequest, sizeof(readrequest));
+
+ /* Read response from the Participant */
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ objcopy = NULL;
+ } else {
+ /* Read object if found into local cache */
+ recv_data(sd, &size, sizeof(int));
+ objcopy = objstrAlloc(record->cache, size);
+ recv_data(sd, objcopy, size);
- char readrequest[sizeof(char)+sizeof(unsigned int)];
- readrequest[0] = READ_REQUEST;
- *((unsigned int *)(&readrequest[1])) = oid;
- send_data(sd, readrequest, sizeof(readrequest));
-
- /* Read response from the Participant */
- recv_data(sd, &control, sizeof(char));
-
- switch(control) {
- case OBJECT_NOT_FOUND:
- objcopy = NULL;
- break;
- case OBJECT_FOUND:
- /* Read object if found into local cache */
- recv_data(sd, &size, sizeof(int));
- objcopy = objstrAlloc(record->cache, size);
- recv_data(sd, objcopy, size);
-
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, oid, objcopy);
- break;
- default:
- printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
- break;
- }
-
- int status;
- if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
- printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
- return NULL;
- }
-
- return objcopy;
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, oid, objcopy);
+ }
+
+ // freeSock(transReadSockPool, mnum, sd);
+
+ return objcopy;
}
/* This function handles the local objects involved in a transaction commiting process.
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
- while(1) {
- /* lock mutex of primary prefetch queue */
- pthread_mutex_lock(&pqueue.qlock);
- /* while primary queue is empty, then wait */
- while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
- pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
- }
-
- /* dequeue node to create a machine piles and finally unlock mutex */
- prefetchqelem_t *qnode;
- if((qnode = pre_dequeue()) == NULL) {
- printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&pqueue.qlock);
- continue;
- }
- pthread_mutex_unlock(&pqueue.qlock);
-
- /* Reduce redundant prefetch requests */
- checkPrefetchTuples(qnode);
- /* Check if the tuples are found locally, if yes then reduce them further*/
- /* and group requests by remote machine ids by calling the makePreGroups() */
- prefetchpile_t *pilehead = NULL;
- if((pilehead = foundLocal(qnode)) == NULL) {
- printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
- pre_enqueue(qnode);
- continue;
- }
-
- // Get sock from shared pool
- int sd = -1;
- if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
- printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
- exit(-1);
- }
-
- /* Send Prefetch Request */
- prefetchpile_t *ptr = pilehead;
- while(ptr != NULL) {
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
- }
-
- /* Release socket */
- int status;
- if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
- printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
- return;
- }
-
- /* Deallocated pilehead */
- mcdealloc(pilehead);
-
- // Deallocate the prefetch queue pile node
- predealloc(qnode);
- }
+ while(1) {
+ /* lock mutex of primary prefetch queue */
+ pthread_mutex_lock(&pqueue.qlock);
+ /* while primary queue is empty, then wait */
+ while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+ }
+
+ /* dequeue node to create a machine piles and finally unlock mutex */
+ prefetchqelem_t *qnode;
+ if((qnode = pre_dequeue()) == NULL) {
+ printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pqueue.qlock);
+ continue;
+ }
+ pthread_mutex_unlock(&pqueue.qlock);
+
+ /* Reduce redundant prefetch requests */
+ checkPrefetchTuples(qnode);
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ prefetchpile_t *pilehead = foundLocal(qnode);
+
+ // Get sock from shared pool
+ int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ /* Release socket */
+ // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
+
+ // Deallocate the prefetch queue pile node
+ predealloc(qnode);
+ }
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
- int off, len, endpair, count = 0;
- char control;
- objpile_t *tmp;
-
- /* Send TRANS_PREFETCH control message */
- control = TRANS_PREFETCH;
- send_data(sd, &control, sizeof(char));
-
- /* Send Oids and offsets in pairs */
- tmp = mcpilenode->objpiles;
- while(tmp != NULL) {
- off = 0;
- count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
- len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
- char oidnoffset[len];
- bzero(oidnoffset, len);
- *((int*)oidnoffset) = len;
- off = sizeof(int);
- *((unsigned int *)(oidnoffset + off)) = tmp->oid;
- off += sizeof(unsigned int);
- *((unsigned int *)(oidnoffset + off)) = myIpAddr;
- off += sizeof(unsigned int);
- int i;
- for(i = 0; i < tmp->numoffset; i++) {
- *((short*)(oidnoffset + off)) = tmp->offset[i];
- off+=sizeof(short);
- }
- send_data(sd, oidnoffset, len);
- tmp = tmp->next;
- }
-
- /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
- endpair = -1;
- send_data(sd, &endpair, sizeof(int));
-
- return;
+ int off, len, endpair, count = 0;
+ char control;
+ objpile_t *tmp;
+
+ /* Send TRANS_PREFETCH control message */
+ control = TRANS_PREFETCH;
+ send_data(sd, &control, sizeof(char));
+
+ /* Send Oids and offsets in pairs */
+ tmp = mcpilenode->objpiles;
+ while(tmp != NULL) {
+ off = 0;
+ count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
+ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len];
+ bzero(oidnoffset, len);
+ *((int*)oidnoffset) = len;
+ off = sizeof(int);
+ *((unsigned int *)(oidnoffset + off)) = tmp->oid;
+ off += sizeof(unsigned int);
+ *((unsigned int *)(oidnoffset + off)) = myIpAddr;
+ off += sizeof(unsigned int);
+ int i;
+ for(i = 0; i < tmp->numoffset; i++) {
+ *((short*)(oidnoffset + off)) = tmp->offset[i];
+ off+=sizeof(short);
+ }
+ send_data(sd, oidnoffset, len);
+ tmp = tmp->next;
+ }
+
+ /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+ endpair = -1;
+ send_data(sd, &endpair, sizeof(int));
+
+ return;
}
int getPrefetchResponse(int sd) {