objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
+pthread_mutex_t threadnotify_mutex = PTHREAD_MUTEX_INITIALIZER;
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
+
+ int i;
transinfo.objlocked = NULL;
transinfo.objnotfound = NULL;
/* Receive control messages from other machines */
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
- if (retval == 0) {
- pthread_exit(NULL); // Testing connection
- }
- perror("Error in receiving control from coordinator\n");
+ perror("Error: in receiving control from coordinator\n");
pthread_exit(NULL);
}
switch(control) {
case READ_REQUEST:
- printf("DEBUG -> Recv READ_REQUEST\n");
/* Read oid requested and search if available */
if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
- perror("Error receiving object from cooridnator\n");
+ perror("Error: receiving 0x0 object from cooridnator\n");
pthread_exit(NULL);
}
if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Object 0x%x is not found in Main Object Store %s %d\n", oid, __FILE__, __LINE__);
+ printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
pthread_exit(NULL);
}
h = (objheader_t *) srcObj;
break;
case READ_MULT_REQUEST:
- printf("DEBUG-> READ_MULT_REQUEST\n");
break;
case MOVE_REQUEST:
- printf("DEBUG -> MOVE_REQUEST\n");
break;
case MOVE_MULT_REQUEST:
- printf("DEBUG -> MOVE_MULT_REQUEST\n");
break;
case TRANS_REQUEST:
/* Read transaction request */
- printf("DEBUG -> Recv TRANS_REQUEST\n");
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
- printf("Error in readClientReq\n");
+ printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
break;
case TRANS_PREFETCH:
- printf("DEBUG -> Recv TRANS_PREFETCH\n");
if((val = prefetchReq((int)acceptfd)) != 0) {
- printf("Error in transPrefetch\n");
+ printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
break;
if (retval <= 0)
perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
else if (retval != sizeof(unsigned int))
- printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
- retval);
+ printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
+ retval, __FILE__, __LINE__);
else
{
objType = getObjType(oid);
case THREAD_NOTIFY_REQUEST:
size = sizeof(unsigned int);
- retval = recv((int)acceptfd, &numoid, size, 0);
+ bzero(&buffer, RECEIVE_BUFFER_SIZE);
+ retval = recv((int)acceptfd, &buffer, size, 0);
+ numoid = *((unsigned int *) &buffer);
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
bzero(&buffer, RECEIVE_BUFFER_SIZE);
retval = recv((int)acceptfd, &buffer, size, 0);
- oidarry = calloc(numoid, sizeof(unsigned int));
- memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
- size = sizeof(unsigned int) * numoid;
- versionarry = calloc(numoid, sizeof(unsigned short));
- memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
- size += sizeof(unsigned short) * numoid;
- mid = *((unsigned int *)(buffer+size));
- size += sizeof(unsigned int);
- threadid = *((unsigned int *)(buffer+size));
- processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+ if(retval <=0)
+ perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST");
+ else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid))
+ printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval,
+ __FILE__, __LINE__);
+ else {
+ oidarry = calloc(numoid, sizeof(unsigned int));
+ memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
+ size = sizeof(unsigned int) * numoid;
+ versionarry = calloc(numoid, sizeof(unsigned short));
+ memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
+ size += sizeof(unsigned short) * numoid;
+ mid = *((unsigned int *)(buffer+size));
+ size += sizeof(unsigned int);
+ threadid = *((unsigned int *)(buffer+size));
+ processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+ }
break;
bzero(&buffer, RECEIVE_BUFFER_SIZE);
retval = recv((int)acceptfd, &buffer, size, 0);
if(retval <= 0)
- perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg");
+ perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE");
else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
- printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval);
+ printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n",
+ retval, __FILE__, __LINE__);
else {
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
break;
default:
- printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
+ printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
}
/* Close connection */
/*Process the information read */
if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
- printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
+ printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
/* Free resources */
if(oidmod != NULL) {
free(oidmod);
* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
- char *ptr, control, sendctrl;
+ char control, sendctrl;
objheader_t *tmp_header;
void *header;
int i = 0, val, retval;
/* Send reply to the Coordinator */
if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
- printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
+ printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
return 1;
}
- /* Read new control message from Coordiator */
- if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
- perror("Error in receiving control message\n");
- return 1;
- }
+ do {
+ retval = recv((int)acceptfd, &control, sizeof(char), 0);
+ } while(retval < sizeof(char));
/* Process the new control message */
switch(control) {
}
/* Send ack to Coordinator */
- sendctrl = TRANS_SUCESSFUL;
+ sendctrl = TRANS_UNSUCESSFUL;
if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
+ perror("Error: In sending ACK to coordinator\n");
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
}
return 1;
}
- ptr = NULL;
break;
case TRANS_COMMIT:
/* Invoke the transCommit process() */
if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
- printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
+ printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
- //TODO expect another transrequest from client
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
break;
default:
- printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
+ printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
//TODO Use fixed.trans_id TID since Client may have died
break;
}
/* Decide what control message to send to Coordinator */
if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
- printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__);
+ printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
return 0;
}
/* Process each modified object saved in the mainobject store */
for(i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
GETSIZE(tmpsize,header);
pthread_mutex_lock(&mainobjstore_mutex);
- memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t));
+ memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
header->version += 1;
/* If threads are waiting on this object to be updated, notify them */
if(header->notifylist != NULL) {
/* Unlock locked objects */
for(i = 0; i < numlocked; i++) {
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
STATUS(header) &= ~(LOCK);
control = TRANS_SUCESSFUL;
if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
+ return 1;
}
return 0;
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, size, count = 0;
- int isArray = 0, bytesRecvd;
+ int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
+ int isArray = 0;
unsigned int oid, index = 0;
- unsigned int objoid, myIpAddr;
- char *ptr, control, buffer[PRE_BUF_SIZE];
+ char *ptr, buffer[PRE_BUF_SIZE];
void *mobj;
+ unsigned int objoid;
+ char control;
objheader_t * header;
+ int bytesRecvd;
+/*
+ unsigned int myIpAddr;
#ifdef MAC
myIpAddr = getMyIpAddr("en1");
#else
myIpAddr = getMyIpAddr("eth0");
#endif
-
+*/
/* Repeatedly recv the oid and offset pairs sent for prefetch */
while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
count++;
}
/* Check for overflow in the buffer */
if (index >= PRE_BUF_SIZE) {
- printf("Char buffer is overflowing\n");
+ printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
return 1;
}
/* Send Prefetch response control message only once*/
if(count == 1){
control = TRANS_PREFETCH_RESPONSE;
if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+ perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
return 1;
}
}
/* Add the buffer size into buffer as a parameter */
*((unsigned int *)buffer)=index;
+
/* Send the entire buffer with its size and oids found and not found */
if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending oids found\n");
+ perror("Error: sending oids found\n");
return 1;
}
}
objheader_t *header;
unsigned int oid;
unsigned short newversion;
- char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
int sd;
struct sockaddr_in remoteAddr;
int bytesSent;
int status, size, retry = 0;
-
+
int i = 0;
while(i < numoid) {
oid = *(oidarry + i);
if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
- printf("processReqNotify(): Object is not found in mlookup %s, %d\n", __FILE__, __LINE__);
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return;
} else {
/* Check to see if versions are same */
checkversion:
if ((STATUS(header) & LOCK) != LOCK) {
STATUS(header) |= LOCK;
- if(header->version == *(versionarry + i)) {
+ newversion = header->version;
+ if(newversion == *(versionarry + i)) {
//Add to the notify list
- insNode(header->notifylist, threadid, mid);
+ if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+ printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ STATUS(header) &= ~(LOCK);
} else {
+ STATUS(header) &= ~(LOCK);
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
perror("processReqNotify():socket()");
return;
remoteAddr.sin_addr.s_addr = htonl(mid);
if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("processReqNotify():error %d connecting to %s:%d\n", errno,
+ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
status = -1;
} else {
//Send Update notification
msg[0] = THREAD_NOTIFY_RESPONSE;
- msg[1] = oid;
+ *((unsigned int *)&msg[1]) = oid;
size = sizeof(unsigned int);
*((unsigned short *)(&msg[1]+size)) = newversion;
size += sizeof(unsigned short);
perror("processReqNotify():send()");
status = -1;
} else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
- printf("processReqNotify(): error, sent %d bytes\n", bytesSent);
+ printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n",
+ bytesSent, __FILE__, __LINE__);
status = -1;
} else {
status = 0;
}
close(sd);
}
- STATUS(header) &= ~(LOCK);
} else {
randomdelay();
- printf("processReqNotify() Object is still locked\n");
goto checkversion;
}
}
+ i++;
}
free(oidarry);
free(versionarry);
unsigned int oidsPerBlock;
unsigned int oidMin;
unsigned int oidMax;
+void *mlist[10000];
+pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
int qnodesize;
int len = 0;
- int i;
-
+ int i, rc;
+
+ //do {
+ // rc=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+ //} while(rc!=0);
+
/* Allocate for the queue node*/
char *node;
if(ntuples > 0) {
- qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
+ qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(unsigned short);
if((node = calloc(1, qnodesize)) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
return;
len += sizeof(int);
memcpy(node + len, oids, ntuples*sizeof(unsigned int));
len += ntuples * sizeof(unsigned int);
- memcpy(node + len, endoffsets, ntuples*sizeof(short));
- len += ntuples * sizeof(short);
- memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
-
+ memcpy(node + len, endoffsets, ntuples*sizeof(unsigned short));
+ len += ntuples * sizeof(unsigned short);
+ memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(unsigned short));
/* Lock and insert into primary prefetch queue */
pthread_mutex_lock(&pqueue.qlock);
pre_enqueue((prefetchqelem_t *)node);
if (processConfigFile() != 0)
return 0; //TODO: return error value, cause main program to exit
+ //TODO Remove after testing
+ //Initializing the global array
+ int i;
+ for (i = 0; i < 10000; i++)
+ mlist[i] = NULL;
+ ////////
dstmInit();
transInit();
queueInit();
//Initialize machine pile w/prefetch oids and offsets shared queue
mcpileqInit();
+
//Create the primary prefetch thread
-
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
-
pthread_detach(tPrefetch);
//Create and Initialize a pool of threads
transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+ //TODO Remove after testing
+ //Filling the global array when transansaction's record lookupTable
+ //is calloced
+ pthread_mutex_lock(&mlock);
+ int ii;
+ for (ii = 0; ii < 10000; ii++) {
+ if (mlist[ii] == NULL) {
+ mlist[ii] = (void *)tmp->lookupTable;
+ break;
+ }
+ }
+ if (ii == 10000) { fprintf(stderr, "Error"); }
+ pthread_mutex_unlock(&mlock);
+ ////////////
#ifdef COMPILER
tmp->revertlist=NULL;
#endif
/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)objheader, size);
+ objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, OID(objheader), objcopy);
#ifdef COMPILER
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
+ objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ memcpy(objcopy, tmp, size);
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, OID(tmp), objcopy);
#ifdef COMPILER
/*If object not found in prefetch cache then block until object appears in the prefetch cache */
pthread_mutex_lock(&pflookup.lock);
while(!found) {
- rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
+ rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
/* Check Prefetch cache again */
if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
found = 1;
GETSIZE(size,tmp);
size+=sizeof(objheader_t);
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
+ objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ memcpy(objcopy, tmp, size);
chashInsert(record->lookupTable, OID(tmp), objcopy);
pthread_mutex_unlock(&pflookup.lock);
#ifdef COMPILER
return objcopy;
#endif
} else if (rc == ETIMEDOUT) {
- printf("Wait timed out\n");
- pthread_mutex_unlock(&pflookup.lock);
- break;
+ pthread_mutex_unlock(&pflookup.lock);
+ break;
}
}
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
- printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+ printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
#endif
}
}
/* This function creates objects in the transaction record */
objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
{
- objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
- tmp->notifylist = NULL;
- OID(tmp) = getNewOID();
- tmp->version = 1;
- tmp->rcount = 1;
- STATUS(tmp) = NEW;
- chashInsert(record->lookupTable, OID(tmp), tmp);
+ objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
+ tmp->notifylist = NULL;
+ OID(tmp) = getNewOID();
+ tmp->version = 1;
+ tmp->rcount = 1;
+ STATUS(tmp) = NEW;
+ chashInsert(record->lookupTable, OID(tmp), tmp);
#ifdef COMPILER
- return &tmp[1]; //want space after object header
+ return &tmp[1]; //want space after object header
#else
- return tmp;
+ return tmp;
#endif
}
unsigned int machinenum;
void *localmachinenum;
objheader_t *headeraddr;
-
+
ptr = record->lookupTable->table;
size = record->lookupTable->size;
}
//Get machine location for object id (and whether local or not)
- if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) {
+ if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
machinenum = myIpAddr;
} else if ((machinenum = lhashSearch(curr->key)) == 0) {
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
int i, j, rc, val;
- int pilecount, offset, threadnum, trecvcount;
+ int pilecount, offset, threadnum = 0, trecvcount = 0;
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
trans_req_data_t *tosend;
char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
char localstat = 0;
- do {
- trecvcount = 0;
- threadnum = 0;
- treplyretry = 0;
+ /* Look through all the objects in the transaction record and make piles
+ * for each machine involved in the transaction*/
+ pile_ptr = pile = createPiles(record);
- /* Look through all the objects in the transaction record and make piles
- * for each machine involved in the transaction*/
- pile_ptr = pile = createPiles(record);
+ /* Create the packet to be sent in TRANS_REQUEST */
- /* Create the packet to be sent in TRANS_REQUEST */
+ /* Count the number of participants */
+ pilecount = pCount(pile);
- /* Count the number of participants */
- pilecount = pCount(pile);
+ /* Create a list of machine ids(Participants) involved in transaction */
+ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ pListMid(pile, listmid);
- /* Create a list of machine ids(Participants) involved in transaction */
- if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- //free(record);
- return 1;
- }
- pListMid(pile, listmid);
+ /* Initialize thread variables,
+ * Spawn a thread for each Participant involved in a transaction */
+ pthread_t thread[pilecount];
+ pthread_attr_t attr;
+ pthread_cond_t tcond;
+ pthread_mutex_t tlock;
+ pthread_mutex_t tlshrd;
- /* Initialize thread variables,
- * Spawn a thread for each Participant involved in a transaction */
- pthread_t thread[pilecount];
- pthread_attr_t attr;
- pthread_cond_t tcond;
- pthread_mutex_t tlock;
- pthread_mutex_t tlshrd;
+ thread_data_array_t *thread_data_array;
+ if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ return 1;
+ }
- thread_data_array_t *thread_data_array;
- if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
- printf("Malloc error %s, %d\n", __FILE__, __LINE__);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- //free(record);
- return 1;
- }
+ local_thread_data_array_t *ltdata;
+ if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
+ return 1;
+ }
+
+ thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
- local_thread_data_array_t *ltdata;
- if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
+ /* Initialize and set thread detach attribute */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_mutex_init(&tlock, NULL);
+ pthread_cond_init(&tcond, NULL);
+
+ /* Process each machine pile */
+ while(pile != NULL) {
+ //Create transaction id
+ newtid++;
+ if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
free(thread_data_array);
- //free(record);
+ free(ltdata);
return 1;
}
-
- thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
-
- /* Initialize and set thread detach attribute */
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_mutex_init(&tlock, NULL);
- pthread_cond_init(&tcond, NULL);
-
- /* Process each machine pile */
- while(pile != NULL) {
- //Create transaction id
- newtid++;
- if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ tosend->f.control = TRANS_REQUEST;
+ sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+ tosend->f.mcount = pilecount;
+ tosend->f.numread = pile->numread;
+ tosend->f.nummod = pile->nummod;
+ tosend->f.numcreated = pile->numcreated;
+ tosend->f.sum_bytes = pile->sum_bytes;
+ tosend->listmid = listmid;
+ tosend->objread = pile->objread;
+ tosend->oidmod = pile->oidmod;
+ tosend->oidcreated = pile->oidcreated;
+ thread_data_array[threadnum].thread_id = threadnum;
+ thread_data_array[threadnum].mid = pile->mid;
+ thread_data_array[threadnum].buffer = tosend;
+ thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+ thread_data_array[threadnum].threshold = &tcond;
+ thread_data_array[threadnum].lock = &tlock;
+ thread_data_array[threadnum].count = &trecvcount;
+ thread_data_array[threadnum].replyctrl = &treplyctrl;
+ thread_data_array[threadnum].replyretry = &treplyretry;
+ thread_data_array[threadnum].rec = record;
+ /* If local do not create any extra connection */
+ if(pile->mid != myIpAddr) { /* Not local */
+ do {
+ rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
+ } while(rc!=0);
+ if(rc) {
+ perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- //free(record);
return 1;
}
- tosend->f.control = TRANS_REQUEST;
- sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
- tosend->f.mcount = pilecount;
- tosend->f.numread = pile->numread;
- tosend->f.nummod = pile->nummod;
- tosend->f.numcreated = pile->numcreated;
- tosend->f.sum_bytes = pile->sum_bytes;
- tosend->listmid = listmid;
- tosend->objread = pile->objread;
- tosend->oidmod = pile->oidmod;
- tosend->oidcreated = pile->oidcreated;
- thread_data_array[threadnum].thread_id = threadnum;
- thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].buffer = tosend;
- thread_data_array[threadnum].recvmsg = rcvd_control_msg;
- thread_data_array[threadnum].threshold = &tcond;
- thread_data_array[threadnum].lock = &tlock;
- thread_data_array[threadnum].count = &trecvcount;
- thread_data_array[threadnum].replyctrl = &treplyctrl;
- thread_data_array[threadnum].replyretry = &treplyretry;
- thread_data_array[threadnum].rec = record;
- /* If local do not create any extra connection */
- if(pile->mid != myIpAddr) { /* Not local */
- do {
- rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
- } while(rc!=0);
- if(rc) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
- //free(record);
- return 1;
- }
- } else { /*Local*/
- ltdata->tdata = &thread_data_array[threadnum];
- ltdata->transinfo = &transinfo;
- do {
+ } else { /*Local*/
+ ltdata->tdata = &thread_data_array[threadnum];
+ ltdata->transinfo = &transinfo;
+ do {
val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
- } while(val!=0);
- if(val) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
- //free(record);
- return 1;
- }
- }
-
- threadnum++;
- pile = pile->next;
- }
-
- /* Free attribute and wait for the other threads */
- pthread_attr_destroy(&attr);
-
- for (i = 0; i < pilecount; i++) {
- rc = pthread_join(thread[i], NULL);
- if(rc)
- {
- printf("ERROR return code from pthread_join() is %d\n", rc);
+ } while(val!=0);
+ if(val) {
+ perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
- for (j = i; j < pilecount; j++)
- free(thread_data_array[j].buffer);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- //free(record);
return 1;
}
- free(thread_data_array[i].buffer);
}
- /* Free resources */
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- free(listmid);
- pDelete(pile_ptr);
- free(thread_data_array);
- free(ltdata);
+ threadnum++;
+ pile = pile->next;
+ }
- /* wait a random amount of time before retrying to commit transaction*/
- if(treplyretry == 1) {
- randomdelay();
+ /* Free attribute and wait for the other threads */
+ pthread_attr_destroy(&attr);
+
+ for (i = 0; i < threadnum; i++) {
+ rc = pthread_join(thread[i], NULL);
+ if(rc)
+ {
+ printf("Error: return code from pthread_join() is %d\n", rc);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ for (j = i; j < threadnum; j++) {
+ free(thread_data_array[j].buffer);
+ }
+ return 1;
}
-
- /* Retry trans commit procedure if not sucessful in the first try */
- } while (treplyretry == 1);
+ free(thread_data_array[i].buffer);
+ }
+
+ /* Free resources */
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ free(listmid);
+ pDelete(pile_ptr);
- /* Free Resources */
- objstrDelete(record->cache);
- chashDelete(record->lookupTable);
- free(record);
+
+ if(treplyctrl == TRANS_ABORT) {
+ /* Free Resources */
+ objstrDelete(record->cache);
+ //TODO Remove after testing
+ pthread_mutex_lock(&mlock);
+ int count = 0, jj = 0, ii = 0;
+ for (ii = 0; ii < 10000; ii++) {
+ if (mlist[ii] == (void *) record->lookupTable) {
+ count++;
+ jj = ii;
+ }
+ }
+ if (count==2 || count == 0) {
+ fprintf(stderr, "TRANS_ABORT CASE: Count for same addr:%d\n", count);
+ }
+ if (count == 1)
+ mlist[jj] = 0;
+ pthread_mutex_unlock(&mlock);
+ ////////////
+ chashDelete(record->lookupTable);
+ free(record);
+ free(thread_data_array);
+ free(ltdata);
+ return TRANS_ABORT;
+ } else if(treplyctrl == TRANS_COMMIT) {
+ /* Free Resources */
+ objstrDelete(record->cache);
+ //TODO Remove after Testing
+ pthread_mutex_lock(&mlock);
+ int count = 0, jj = 0, ii = 0;
+ for (ii = 0; ii < 10000; ii++) {
+ if (mlist[ii] == (void *) record->lookupTable) {count++; jj = ii;}
+ }
+ if (count==2 || count == 0) {
+ fprintf(stderr, "TRANS_COMMIT CASE: Count for same addr:%d\n", count);
+ }
+ if (count == 1) mlist[jj] = 0;
+ pthread_mutex_unlock(&mlock);
+ ///////////////////////
+ chashDelete(record->lookupTable);
+ free(record);
+ free(thread_data_array);
+ free(ltdata);
+ return 0;
+ } else {
+ //TODO Add other cases
+ printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n");
+ exit(-1);
+ }
+
return 0;
}
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
char machineip[16], retval;
+
tdata = (thread_data_array_t *) threadarg;
/* Send Trans Request */
close(sd);
pthread_exit(NULL);
}
+
recvcontrol = control;
/* Update common data structure and increment count */
pthread_exit(NULL);
}
+ if ((retval = recv((int)sd, &control, sizeof(char), 0))<= 0) {
+ printf("Error: In receiving control %s,%d\n", __FILE__, __LINE__);
+ close(sd);
+ pthread_exit(NULL);
+ }
+
+ if(control == TRANS_UNSUCESSFUL) {
+ //printf("DEBUG-> TRANS_ABORTED\n");
+ } else if(control == TRANS_SUCESSFUL) {
+ //printf("DEBUG-> TRANS_SUCCESSFUL\n");
+ } else {
+ //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
+ }
+
/* Close connection */
close(sd);
pthread_exit(NULL);
*(tdata->replyretry) = 0;
/* clear objects from prefetch cache */
for (i = 0; i < tdata->buffer->f.numread; i++)
- prehashRemove(*(unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
for (i = 0; i < tdata->buffer->f.nummod; i++)
prehashRemove(tdata->buffer->oidmod[i]);
} else if(transagree == tdata->buffer->f.mcount){
* It returns a char that is only needed to check the correctness of execution of this function inside
* transRequest()*/
char sendResponse(thread_data_array_t *tdata, int sd) {
- int n, N, sum, oidcount = 0;
+ int n, N, sum, oidcount = 0, control;
char *ptr, retval = 0;
unsigned int *oidnotfound;
if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
+ return 0;
}
return retval;
objheader_t *h;
void *objcopy;
+
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket\n");
return NULL;
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
- if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
+ if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
perror("Error sending message\n");
return NULL;
}
return NULL;
}
objcopy = objstrAlloc(record->cache, size);
- if((val = read(sd, objcopy, size)) <= 0) {
+ if((val = read(sd, (char *)objcopy, size)) <= 0) {
perror("No objects are read from the remote participant\n");
return NULL;
}
if (i < localtdata->tdata->buffer->f.numread) {
int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
incr *= i;
- oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
- version = *((short *)(localtdata->tdata->buffer->objread + incr + sizeof(unsigned int)));
+ oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
+ version = *((short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
} else { // Objects Modified
int tmpsize;
headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL\n");
+ printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
return NULL;
}
oid = OID(headptr);
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
- if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {
+ if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
v_matchlock++;
} else {/* If versions don't match ...HARD ABORT */
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
- printf("TRANS_ABORTED\n");
-
return 0;
}
oidcreated = localtdata->tdata->buffer->oidcreated;
numlocked = localtdata->transinfo->numlocked;
oidlocked = localtdata->transinfo->objlocked;
+
for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
GETSIZE(tmpsize, header);
-
pthread_mutex_lock(&mainobjstore_mutex);
- memcpy(header, tcptr, tmpsize + sizeof(objheader_t));
+ memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
header->version += 1;
- /* If threads are waiting on this object to be updated, notify them */
if(header->notifylist != NULL) {
notifyAll(&header->notifylist, OID(header), header->version);
}
-
pthread_mutex_unlock(&mainobjstore_mutex);
}
/* If object is newly created inside transaction then commit it */
tmpsize += sizeof(objheader_t);
pthread_mutex_lock(&mainobjstore_mutex);
if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
- printf("Error: transComProcess() failed objstrAlloc\n");
+ printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&mainobjstore_mutex);
return 1;
}
pthread_mutex_unlock(&mainobjstore_mutex);
- memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t));
+ memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
int ntuples, i, machinenum, count=0;
unsigned int *oid;
short *endoffsets, *arryfields, *offset;
- prefetchpile_t *head = NULL;
+ prefetchpile_t *head = NULL, *tmp = NULL;
/* Check for the case x.y.z and a.b.c are same oids */
ptr = (char *) node;
/* Check for redundant tuples by comparing oids of each tuple */
for(i = 0; i < ntuples; i++) {
- if(oid[i] == 0)
+ if(oid[i] == 0){
+ if(head->next != NULL) {
+ if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+ printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ tmp->mid = myIpAddr;
+ tmp->next = head;
+ head = tmp;
+ } else {
+ head->mid = myIpAddr;
+ }
continue;
+ }
/* For each tuple make piles */
if ((machinenum = lhashSearch(oid[i])) == 0) {
printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-
+
/* Find offset length for each tuple */
int numoffset[ntuples];//Number of offsets for each tuple
numoffset[0] = endoffsets[0];
pthread_mutex_unlock(&mcqueue.qlock);
/* Deallocate the prefetch queue pile node */
predealloc(qnode);
- pthread_exit(NULL);
}
}
/*Initiate connection to remote host and send request */
/* Process Request */
- sendPrefetchReq(mcpilenode, tid);
+ if(mcpilenode->mid != myIpAddr)
+ sendPrefetchReq(mcpilenode);
/* Deallocate the machine queue pile node */
mcdealloc(mcpilenode);
- pthread_exit(NULL);
}
}
-void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
+void sendPrefetchReq(prefetchpile_t *mcpilenode) {
int sd, i, off, len, endpair, count = 0;
struct sockaddr_in serv_addr;
struct hostent *server;
while(tmp != NULL) {
off = 0;
count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
- len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short));
char oidnoffset[len];
bzero(oidnoffset, len);
- memcpy(oidnoffset, &len, sizeof(int));
+ *((unsigned int*)oidnoffset) = len;
+ //memcpy(oidnoffset, &len, sizeof(int));
off = sizeof(int);
- memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
+ *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid;
+ //memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
off += sizeof(unsigned int);
for(i = 0; i < tmp->numoffset; i++) {
- memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
- off+=sizeof(short);
+ *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i];
+ //memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
+ off+=sizeof(unsigned short);
}
- if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
+
+ if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) {
perror("Error sending fixed bytes for thread\n");
close(sd);
return;
}
-
+
tmp = tmp->next;
}
char *ptr;
void *modptr, *oldptr;
-
/* Read prefetch response from the Remote machine */
if((val = read(sd, &control, sizeof(char))) <= 0) {
perror("No control response for Prefetch request sent\n");
index+=sizeof(int);
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
- printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&prefetchcache_mutex);
return;
}
int bytesSent;
int status, size;
unsigned short version;
- unsigned int oid, threadid, mid;
- pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification
- pthread_cond_t threadcond;
+ unsigned int oid,mid;
+ static unsigned int threadid = 0;
+ pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+ pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
notifydata_t *ndata;
//FIXME currently all oids belong to one machine
remoteAddr.sin_addr.s_addr = htonl(mid);
/* Generate unique threadid */
- threadid = (unsigned int) pthread_self();
+ threadid++;
/* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
ndata->oidarry = oidarry;
ndata->versionarry = versionarry;
ndata->threadcond = threadcond;
+ ndata->threadnotify = threadnotify;
if((status = notifyhashInsert(threadid, ndata)) != 0) {
printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
free(ndata);
size += sizeof(unsigned int);
*((unsigned int *)(&msg[1] + size)) = threadid;
- pthread_mutex_lock(&threadnotify);
- bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int) , 0);
+ pthread_mutex_lock(&(ndata->threadnotify));
+ bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0);
if (bytesSent < 0){
perror("reqNotify():send()");
status = -1;
- } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int)){
- printf("reNotify(): error, sent %d bytes\n", bytesSent);
+ } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){
+ printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__);
status = -1;
} else {
status = 0;
}
- pthread_cond_wait(&threadcond, &threadnotify);
- pthread_mutex_unlock(&threadnotify);
+
+ pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+ pthread_mutex_unlock(&(ndata->threadnotify));
}
+ pthread_cond_destroy(&threadcond);
+ pthread_mutex_destroy(&threadnotify);
+ free(ndata);
close(sock);
return status;
}
+
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
notifydata_t *ndata;
int i, objIsFound = 0, index;
}
}
if(objIsFound == 0){
+ printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
return;
} else {
if(version <= ndata->versionarry[index]){
+ printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
return;
} else {
/* Clear from prefetch cache and free thread related data structure */
- if((ptr = prehashSearch(oid)) == NULL) {
- //TODO Ask about freeing
- printf("threadnotify(): No such oid %s, %d\n", __FILE__, __LINE__);
- pthread_cond_signal(&ndata->threadcond);
- free(ndata);
- return;
- } else {
+ if((ptr = prehashSearch(oid)) != NULL) {
prehashRemove(oid);
- pthread_cond_signal(&ndata->threadcond);
- free(ndata);
}
+ pthread_cond_signal(&(ndata->threadcond));
}
}
}
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
int sock, status, size, bytesSent;
+
while(*head != NULL) {
ptr = *head;
mid = ptr->mid;
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
status = -1;
} else {
+ bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
- msg[1] = oid;
+ *((unsigned int *)&msg[1]) = oid;
size = sizeof(unsigned int);
*((unsigned short *)(&msg[1]+ size)) = version;
size+= sizeof(unsigned short);
*((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
- bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
+ bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0);
if (bytesSent < 0){
perror("notifyAll():send()");
status = -1;
} else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
- printf("notifyAll(): error, sent %d bytes\n", bytesSent);
+ printf("notifyAll(): error, sent %d bytes %s, %d\n",
+ bytesSent, __FILE__, __LINE__);
status = -1;
} else {
status = 0;