if(ntuples > 0) {
int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
char * node;
-
+
if((node = calloc(1, qnodesize)) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
return;
/* This function starts up the transaction runtime. */
int dstmStartup(const char * option) {
- pthread_t thread_Listen;
- pthread_attr_t attr;
- int master=option!=NULL && strcmp(option, "master")==0;
-
- if (processConfigFile() != 0)
- return 0; //TODO: return error value, cause main program to exit
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ int master=option!=NULL && strcmp(option, "master")==0;
+
+ if (processConfigFile() != 0)
+ return 0; //TODO: return error value, cause main program to exit
#ifdef COMPILER
- if (!master)
- threadcount--;
+ if (!master)
+ threadcount--;
#endif
-
- //Initialize socket pool
- if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
- printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
- return 0;
- }
- if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
- printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
- return 0;
- }
-
- dstmInit();
- transInit();
-
- if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- return 1;
- } else {
- dstmListen();
- return 0;
- }
-
+
+ //Initialize socket pool
+ transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
+ transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+
+ dstmInit();
+ transInit();
+
+ if (master) {
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+ return 1;
+ } else {
+ dstmListen();
+ return 0;
+ }
}
//TODO Use this later
* */
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
- int size, val;
- struct sockaddr_in serv_addr;
- char machineip[16];
- char control;
- objheader_t *h;
- void *objcopy = NULL;
-
- int sd;
- if((sd = getSock(transReadSockPool, mnum)) == -1) {
- printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
- return NULL;
- }
+ int size, val;
+ struct sockaddr_in serv_addr;
+ char machineip[16];
+ char control;
+ objheader_t *h;
+ void *objcopy = NULL;
+
+ int sd = getSock2(transReadSockPool, mnum);
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ send_data(sd, readrequest, sizeof(readrequest));
+
+ /* Read response from the Participant */
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ objcopy = NULL;
+ } else {
+ /* Read object if found into local cache */
+ recv_data(sd, &size, sizeof(int));
+ objcopy = objstrAlloc(record->cache, size);
+ recv_data(sd, objcopy, size);
- char readrequest[sizeof(char)+sizeof(unsigned int)];
- readrequest[0] = READ_REQUEST;
- *((unsigned int *)(&readrequest[1])) = oid;
- send_data(sd, readrequest, sizeof(readrequest));
-
- /* Read response from the Participant */
- recv_data(sd, &control, sizeof(char));
-
- switch(control) {
- case OBJECT_NOT_FOUND:
- objcopy = NULL;
- break;
- case OBJECT_FOUND:
- /* Read object if found into local cache */
- recv_data(sd, &size, sizeof(int));
- objcopy = objstrAlloc(record->cache, size);
- recv_data(sd, objcopy, size);
-
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, oid, objcopy);
- break;
- default:
- printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
- break;
- }
-
- int status;
- if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
- printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
- return NULL;
- }
-
- return objcopy;
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, oid, objcopy);
+ }
+
+ // freeSock(transReadSockPool, mnum, sd);
+
+ return objcopy;
}
/* This function handles the local objects involved in a transaction commiting process.
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
- while(1) {
- /* lock mutex of primary prefetch queue */
- pthread_mutex_lock(&pqueue.qlock);
- /* while primary queue is empty, then wait */
- while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
- pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
- }
-
- /* dequeue node to create a machine piles and finally unlock mutex */
- prefetchqelem_t *qnode;
- if((qnode = pre_dequeue()) == NULL) {
- printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&pqueue.qlock);
- continue;
- }
- pthread_mutex_unlock(&pqueue.qlock);
-
- /* Reduce redundant prefetch requests */
- checkPrefetchTuples(qnode);
- /* Check if the tuples are found locally, if yes then reduce them further*/
- /* and group requests by remote machine ids by calling the makePreGroups() */
- prefetchpile_t *pilehead = NULL;
- if((pilehead = foundLocal(qnode)) == NULL) {
- printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
- pre_enqueue(qnode);
- continue;
- }
-
- // Get sock from shared pool
- int sd = -1;
- if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
- printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
- exit(-1);
- }
-
- /* Send Prefetch Request */
- prefetchpile_t *ptr = pilehead;
- while(ptr != NULL) {
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
- }
-
- /* Release socket */
- int status;
- if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
- printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
- return;
- }
-
- /* Deallocated pilehead */
- mcdealloc(pilehead);
-
- // Deallocate the prefetch queue pile node
- predealloc(qnode);
- }
+ while(1) {
+ /* lock mutex of primary prefetch queue */
+ pthread_mutex_lock(&pqueue.qlock);
+ /* while primary queue is empty, then wait */
+ while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+ }
+
+ /* dequeue node to create a machine piles and finally unlock mutex */
+ prefetchqelem_t *qnode;
+ if((qnode = pre_dequeue()) == NULL) {
+ printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pqueue.qlock);
+ continue;
+ }
+ pthread_mutex_unlock(&pqueue.qlock);
+
+ /* Reduce redundant prefetch requests */
+ checkPrefetchTuples(qnode);
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ prefetchpile_t *pilehead = foundLocal(qnode);
+
+ // Get sock from shared pool
+ int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ /* Release socket */
+ // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
+
+ // Deallocate the prefetch queue pile node
+ predealloc(qnode);
+ }
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
- int off, len, endpair, count = 0;
- char control;
- objpile_t *tmp;
-
- /* Send TRANS_PREFETCH control message */
- control = TRANS_PREFETCH;
- send_data(sd, &control, sizeof(char));
-
- /* Send Oids and offsets in pairs */
- tmp = mcpilenode->objpiles;
- while(tmp != NULL) {
- off = 0;
- count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
- len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
- char oidnoffset[len];
- bzero(oidnoffset, len);
- *((int*)oidnoffset) = len;
- off = sizeof(int);
- *((unsigned int *)(oidnoffset + off)) = tmp->oid;
- off += sizeof(unsigned int);
- *((unsigned int *)(oidnoffset + off)) = myIpAddr;
- off += sizeof(unsigned int);
- int i;
- for(i = 0; i < tmp->numoffset; i++) {
- *((short*)(oidnoffset + off)) = tmp->offset[i];
- off+=sizeof(short);
- }
- send_data(sd, oidnoffset, len);
- tmp = tmp->next;
- }
-
- /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
- endpair = -1;
- send_data(sd, &endpair, sizeof(int));
-
- return;
+ int off, len, endpair, count = 0;
+ char control;
+ objpile_t *tmp;
+
+ /* Send TRANS_PREFETCH control message */
+ control = TRANS_PREFETCH;
+ send_data(sd, &control, sizeof(char));
+
+ /* Send Oids and offsets in pairs */
+ tmp = mcpilenode->objpiles;
+ while(tmp != NULL) {
+ off = 0;
+ count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
+ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len];
+ bzero(oidnoffset, len);
+ *((int*)oidnoffset) = len;
+ off = sizeof(int);
+ *((unsigned int *)(oidnoffset + off)) = tmp->oid;
+ off += sizeof(unsigned int);
+ *((unsigned int *)(oidnoffset + off)) = myIpAddr;
+ off += sizeof(unsigned int);
+ int i;
+ for(i = 0; i < tmp->numoffset; i++) {
+ *((short*)(oidnoffset + off)) = tmp->offset[i];
+ off+=sizeof(short);
+ }
+ send_data(sd, oidnoffset, len);
+ tmp = tmp->next;
+ }
+
+ /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+ endpair = -1;
+ send_data(sd, &endpair, sizeof(int));
+
+ return;
}
int getPrefetchResponse(int sd) {