#ifndef _DSMDEBUG_H_
#define _DSMDEBUG_H_
+#include <sys/time.h>
+
#define TABORT1(s) {printf("%s\n", s); fflush(stdout);}
#define TABORT2(s, msg) {printf("%s(): %s\n", s, msg); fflush(stdout);}
#define TABORT3(func, s, msg, d) {printf("%s(): %s: for %s = %d\n", func, s, msg, d); fflush(stdout);}
#define TABORT8(func, s, d) {printf("%s(): %s = %d\n", func, s, d); fflush(stdout);}
#define TABORT9(func, a, b, c, d, val1, val2, val3) {printf("%s(): %s for %s =%x, %s = %d, %s = %x\n", func, a, b, val1, c, val2, d, val3); fflush(stdout);}
+#define ARRAY_SIZE 10100
+#define GETSTARTDELAY(start, count) { \
+ struct timeval tv; \
+ count++; \
+ gettimeofday(&tv, NULL); \
+ start = tv.tv_sec+(tv.tv_usec/1000000.0); \
+}
+
+#define GETSTART(start) { \
+ struct timeval tv; \
+ gettimeofday(&tv, NULL); \
+ start = tv.tv_sec+(tv.tv_usec/1000000.0); \
+}
+
+#define GETENDDELAY(start, end, time) { \
+ struct timeval tv; \
+ gettimeofday(&tv, NULL); \
+ end = tv.tv_sec+(tv.tv_usec/1000000.0); \
+ time = (end-start); \
+}
#endif
sockPoolHashTable_t *transPResponseSocketPool;
+
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
return 1;
}
-
recv_data((int)acceptfd, &control, sizeof(char));
-
/* Process the new control message */
switch(control) {
case TRANS_ABORT:
read_unlock(STATUSPTR(header));
}
}
-
- /* Send ack to Coordinator */
- sendctrl = TRANS_UNSUCESSFUL;
- send_data((int)acceptfd, &sendctrl, sizeof(char));
break;
case TRANS_COMMIT:
//TODO Use fixed.trans_id TID since Client may have died
break;
}
-
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) { //Objs only read and not modified
- int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
incr *= i;
oid = *((unsigned int *)(objread + incr));
incr += sizeof(unsigned int);
printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
return 0;
}
-
return control;
}
}
}
//TODO Update location lookup table
-
- /* Send ack to coordinator */
- control = TRANS_SUCESSFUL;
- send_data((int)acceptfd, &control, sizeof(char));
return 0;
}
#define PREFETCH_CACHE_SIZE 1048576 //1MB
#define CONFIG_FILENAME "dstm.conf"
+
/* Global Variables */
extern int classsize[];
pfcstats_t *evalPrefetch;
local_thread_data_array_t *ltdata;
int firsttime=1;
+
do {
treplyctrl=0;
trecvcount = 0;
else
pile=pile_ptr;
firsttime=0;
-
/* Create the packet to be sent in TRANS_REQUEST */
/* Count the number of participants */
threadnum++;
pile = pile->next;
}
+
/* Free attribute and wait for the other threads */
pthread_attr_destroy(&attr);
-
for (i = 0; i < threadnum; i++) {
rc = pthread_join(thread[i], NULL);
if(rc) {
}
free(thread_data_array[i].buffer);
}
-
/* Free resources */
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
char machineip[16], retval;
tdata = (thread_data_array_t *) threadarg;
-
if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
printf("transRequest(): socket create error\n");
pthread_exit(NULL);
}
/* Send objects that are modified */
+ void *modptr;
+ if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) {
+ printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ int offset = 0;
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
int size;
if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
}
GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
- send_data(sd, headeraddr, size);
+ memcpy(modptr+offset, headeraddr, size);
+ offset+=size;
}
-
+ send_data(sd, modptr, tdata->buffer->f.sum_bytes);
+ free(modptr);
/* Read control message from Participant */
recv_data(sd, &control, sizeof(char));
+
/* Recv Objects if participant sends TRANS_DISAGREE */
#ifdef CACHE
if(control == TRANS_DISAGREE) {
}
}
#endif
-
recvcontrol = control;
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
}
pthread_mutex_unlock(tdata->lock);
- /* clear objects from prefetch cache */
- /*
- if(*(tdata->replyctrl) == TRANS_ABORT) {
- int i;
- for(i=0; i<tdata->buffer->f.nummod; i++) {
- unsigned int oid = tdata->buffer->oidmod[i];
- objheader_t *header;
- if((header = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- }
- }
- for(i=0; i<tdata->buffer->f.numread; i++) {
- char *objread = tdata->buffer->objread;
- unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
- sizeof(unsigned short))*i));
- objheader_t *header;
- if((header = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- }
- }
- }
- */
-
#ifdef CACHE
if(*(tdata->replyctrl) == TRANS_COMMIT) {
int retval;
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
-
- recv_data((int)sd, &control, sizeof(char));
-
- if(control == TRANS_UNSUCESSFUL) {
- //printf("DEBUG-> TRANS_ABORTED\n");
- } else if(control == TRANS_SUCESSFUL) {
- //printf("DEBUG-> TRANS_SUCCESSFUL\n");
- } else {
- //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
- }
pthread_exit(NULL);
}
char control;
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
-
for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
written onto the shared array */
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
-#ifdef CACHE
-#if 0
- /* Turn prefetching on */
- int i;
- for (i=0; i<numprefetchsites; i++)
- evalPrefetch[i].operMode = 1;
-#endif
-#endif
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
/* If the decided response is TRANS_COMMIT */
retval = TRANS_COMMIT;
}
-
return retval;
}
int numread, i;
unsigned int oid;
unsigned short version;
-
localtdata = (local_thread_data_array_t *) threadarg;
-
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
}
pthread_mutex_unlock(localtdata->tdata->lock);
-
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
}
+
/* Free memory */
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
free(localtdata->transinfo->objnotfound);
}
pthread_exit(NULL);
-
}
/* Commit info for objects modified */