/* Set queue node values */
int len;
int top=endoffsets[ntuples-1];
+
*((int *)(node))=ntuples;
len = sizeof(int);
memcpy(node+len, oids, ntuples*sizeof(unsigned int));
pthread_t thread_Listen;
pthread_attr_t attr;
int master=option!=NULL && strcmp(option, "master")==0;
-
+ int fd;
+
if (processConfigFile() != 0)
return 0; //TODO: return error value, cause main program to exit
#ifdef COMPILER
dstmInit();
transInit();
+ fd=startlistening();
if (master) {
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+ pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
- dstmListen();
+ dstmListen((void *)fd);
return 0;
}
}
objcopy = (objheader_t *) objstrAlloc(record->cache, size);
memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
+ STATUS(objcopy)=0;
chashInsert(record->lookupTable, OID(objheader), objcopy);
#ifdef COMPILER
return &objcopy[1];
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
-
+ STATUS(objcopy)=0;
#ifdef COMPILER
return &objcopy[1];
#else
/* This function creates objects in the transaction record */
objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
- tmp->notifylist = NULL;
OID(tmp) = getNewOID();
tmp->version = 1;
tmp->rcount = 1;
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
int i, j, rc, val;
- int pilecount, offset, threadnum = 0, trecvcount = 0;
+ int pilecount, offset, threadnum, trecvcount;
char control;
char transid[TID_LEN];
trans_req_data_t *tosend;
trans_commit_data_t transinfo;
static int newtid = 0;
- char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
- char localstat = 0;
+ char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
thread_data_array_t *thread_data_array;
local_thread_data_array_t *ltdata;
+ int firsttime=1;
do {
+ treplyctrl=0;
trecvcount = 0;
threadnum = 0;
treplyretry = 0;
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
- pile_ptr = pile = createPiles(record);
-
+ if (firsttime)
+ pile_ptr = pile = createPiles(record);
+ else
+ pile=pile_ptr;
+ firsttime=0;
+
/* Create the packet to be sent in TRANS_REQUEST */
/* Count the number of participants */
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_mutex_init(&tlock, NULL);
pthread_cond_init(&tcond, NULL);
-
+
/* Process each machine pile */
while(pile != NULL) {
//Create transaction id
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
free(listmid);
- pDelete(pile_ptr);
+
+ if (!treplyretry)
+ pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
/* Read control message from Participant */
recv_data(sd, &control, sizeof(char));
recvcontrol = control;
-
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
- pthread_mutex_lock(&atomicObjLock);
- if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
- pthread_mutex_unlock(&atomicObjLock);
+ if (test_and_set(STATUSPTR(mobj))) {
if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
v_matchlock++;
} else {/* If versions don't match ...HARD ABORT */
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
}
- } else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- pthread_mutex_unlock(&atomicObjLock);
+ } else {
+ //we're locked
/* Save all object oids that are locked on this machine during this transaction request call */
oidlocked[objlocked] = OID(((objheader_t *)mobj));
objlocked++;
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- STATUS(((objheader_t *)header)) &= ~(LOCK);
+ UnLock(STATUSPTR(header));
}
return 0;
/*This function completes the COMMIT process is the transaction is commiting*/
int transComProcess(local_thread_data_array_t *localtdata) {
- objheader_t *header, *tcptr;
- int i, nummod, tmpsize, numcreated, numlocked;
- unsigned int *oidmod, *oidcreated, *oidlocked;
- void *ptrcreate;
-
- nummod = localtdata->tdata->buffer->f.nummod;
- oidmod = localtdata->tdata->buffer->oidmod;
- numcreated = localtdata->tdata->buffer->f.numcreated;
- oidcreated = localtdata->tdata->buffer->oidcreated;
- numlocked = localtdata->transinfo->numlocked;
- oidlocked = localtdata->transinfo->objlocked;
-
- for (i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- /* Copy from transaction cache -> main object store */
- if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
- printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize, header);
- pthread_mutex_lock(&mainobjstore_mutex);
- char *tmptcptr = (char *) tcptr;
- memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
- header->version += 1;
- pthread_mutex_lock(¬ifymutex);
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
- }
- pthread_mutex_unlock(¬ifymutex);
- pthread_mutex_unlock(&mainobjstore_mutex);
- }
- /* If object is newly created inside transaction then commit it */
- for (i = 0; i < numcreated; i++) {
- if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
- printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize, header);
- tmpsize += sizeof(objheader_t);
- pthread_mutex_lock(&mainobjstore_mutex);
- if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
- printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mainobjstore_mutex);
- return 1;
- }
- pthread_mutex_unlock(&mainobjstore_mutex);
- memcpy(ptrcreate, header, tmpsize);
- mhashInsert(oidcreated[i], ptrcreate);
- lhashInsert(oidcreated[i], myIpAddr);
- }
- /* Unlock locked objects */
- for(i = 0; i < numlocked; i++) {
- if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- STATUS(header) &= ~(LOCK);
- }
-
- return 0;
+ objheader_t *header, *tcptr;
+ int i, nummod, tmpsize, numcreated, numlocked;
+ unsigned int *oidmod, *oidcreated, *oidlocked;
+ void *ptrcreate;
+
+ nummod = localtdata->tdata->buffer->f.nummod;
+ oidmod = localtdata->tdata->buffer->oidmod;
+ numcreated = localtdata->tdata->buffer->f.numcreated;
+ oidcreated = localtdata->tdata->buffer->oidcreated;
+ numlocked = localtdata->transinfo->numlocked;
+ oidlocked = localtdata->transinfo->objlocked;
+
+ for (i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ /* Copy from transaction cache -> main object store */
+ if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ char *tmptcptr = (char *) tcptr;
+ memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+ header->version += 1;
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ }
+ /* If object is newly created inside transaction then commit it */
+ for (i = 0; i < numcreated; i++) {
+ if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ tmpsize += sizeof(objheader_t);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+ printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ memcpy(ptrcreate, header, tmpsize);
+ mhashInsert(oidcreated[i], ptrcreate);
+ lhashInsert(oidcreated[i], myIpAddr);
+ }
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+
+ return 0;
}
prefetchpile_t *foundLocal(char *ptr) {
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
+ STATUS(modptr)=0;
+
/* Insert the oid and its address into the prefetch hash lookup table */
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
/* This function sends notification request per thread waiting on object(s) whose version
* changes */
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
- int sock,i;
- objheader_t *objheader;
- struct sockaddr_in remoteAddr;
- char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
- char *ptr;
- int bytesSent;
- int status, size;
- unsigned short version;
- unsigned int oid,mid;
- static unsigned int threadid = 0;
- pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
- pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
- notifydata_t *ndata;
-
- oid = oidarry[0];
- if((mid = lhashSearch(oid)) == 0) {
- printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
- return;
- }
-
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("reqNotify():socket()");
- return -1;
- }
-
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- /* Generate unique threadid */
- threadid++;
-
- /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
- if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
- printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
- return -1;
- }
- ndata->numoid = numoid;
- ndata->threadid = threadid;
- ndata->oidarry = oidarry;
- ndata->versionarry = versionarry;
- ndata->threadcond = threadcond;
- ndata->threadnotify = threadnotify;
- if((status = notifyhashInsert(threadid, ndata)) != 0) {
- printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
- free(ndata);
- return -1;
- }
-
- /* Send number of oids, oidarry, version array, machine id and threadid */
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("reqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- free(ndata);
- return -1;
- } else {
- msg[0] = THREAD_NOTIFY_REQUEST;
- *((unsigned int *)(&msg[1])) = numoid;
- /* Send array of oids */
- size = sizeof(unsigned int);
- {
- i = 0;
- while(i < numoid) {
- oid = oidarry[i];
- *((unsigned int *)(&msg[1] + size)) = oid;
- size += sizeof(unsigned int);
- i++;
- }
- }
-
- /* Send array of version */
- {
- i = 0;
- while(i < numoid) {
- version = versionarry[i];
- *((unsigned short *)(&msg[1] + size)) = version;
- size += sizeof(unsigned short);
- i++;
- }
- }
-
- *((unsigned int *)(&msg[1] + size)) = myIpAddr;
- size += sizeof(unsigned int);
- *((unsigned int *)(&msg[1] + size)) = threadid;
- pthread_mutex_lock(&(ndata->threadnotify));
- size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
- send_data(sock, msg, size);
- pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
- pthread_mutex_unlock(&(ndata->threadnotify));
- }
-
- pthread_cond_destroy(&threadcond);
- pthread_mutex_destroy(&threadnotify);
- free(ndata);
- close(sock);
- return status;
+ int sock,i;
+ objheader_t *objheader;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
+ char *ptr;
+ int bytesSent;
+ int status, size;
+ unsigned short version;
+ unsigned int oid,mid;
+ static unsigned int threadid = 0;
+ pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+ pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
+ notifydata_t *ndata;
+
+ oid = oidarry[0];
+ if((mid = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+ return;
+ }
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("reqNotify():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ /* Generate unique threadid */
+ threadid++;
+
+ /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+ if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+ printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ ndata->numoid = numoid;
+ ndata->threadid = threadid;
+ ndata->oidarry = oidarry;
+ ndata->versionarry = versionarry;
+ ndata->threadcond = threadcond;
+ ndata->threadnotify = threadnotify;
+ if((status = notifyhashInsert(threadid, ndata)) != 0) {
+ printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+ free(ndata);
+ return -1;
+ }
+
+ /* Send number of oids, oidarry, version array, machine id and threadid */
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("reqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ free(ndata);
+ return -1;
+ } else {
+ msg[0] = THREAD_NOTIFY_REQUEST;
+ *((unsigned int *)(&msg[1])) = numoid;
+ /* Send array of oids */
+ size = sizeof(unsigned int);
+ {
+ i = 0;
+ while(i < numoid) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
+ i++;
+ }
+ }
+
+ /* Send array of version */
+ {
+ i = 0;
+ while(i < numoid) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = version;
+ size += sizeof(unsigned short);
+ i++;
+ }
+ }
+
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+ size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = threadid;
+ pthread_mutex_lock(&(ndata->threadnotify));
+ size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+ send_data(sock, msg, size);
+ pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+ pthread_mutex_unlock(&(ndata->threadnotify));
+ }
+
+ pthread_cond_destroy(&threadcond);
+ pthread_mutex_destroy(&threadnotify);
+ free(ndata);
+ close(sock);
+ return status;
}
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
}
int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
- threadlist_t *ptr;
- unsigned int mid;
- struct sockaddr_in remoteAddr;
- char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
- int sock, status, size, bytesSent;
-
- while(*head != NULL) {
- ptr = *head;
- mid = ptr->mid;
- //create a socket connection to that machine
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("notifyAll():socket()");
- return -1;
- }
-
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
- //send Thread Notify response and threadid to that machine
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("notifyAll():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
- fflush(stdout);
- } else {
- bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
- msg[0] = THREAD_NOTIFY_RESPONSE;
- *((unsigned int *)&msg[1]) = oid;
- size = sizeof(unsigned int);
- *((unsigned short *)(&msg[1]+ size)) = version;
- size+= sizeof(unsigned short);
- *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
-
- size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
- send_data(sock, msg, size);
- }
- //close socket
- close(sock);
- // Update head
- *head = ptr->next;
- free(ptr);
- }
- return status;
+ threadlist_t *ptr;
+ unsigned int mid;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+ int sock, status, size, bytesSent;
+
+ while(*head != NULL) {
+ ptr = *head;
+ mid = ptr->mid;
+ //create a socket connection to that machine
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("notifyAll():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+ //send Thread Notify response and threadid to that machine
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("notifyAll():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ fflush(stdout);
+ } else {
+ bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+ size)) = version;
+ size+= sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
+
+ size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sock, msg, size);
+ }
+ //close socket
+ close(sock);
+ // Update head
+ *head = ptr->next;
+ free(ptr);
+ }
+ return status;
}
void transAbort(transrecord_t *trans) {