/* Coordinator => Machine that initiates the transaction request call for commiting a transaction
* Participant => Machines that host the objects involved in a transaction commit */
+#include <netinet/tcp.h>
#include "dstm.h"
#include "mlookup.h"
#include "llookup.h"
extern int classsize[];
extern int numHostsInSystem;
+extern pthread_mutex_t notifymutex;
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
+pthread_mutex_t lockObjHeader;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
sockPoolHashTable_t *transPResponseSocketPool;
pthread_mutexattr_init(&mainobjstore_mutex_attr);
pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+ pthread_mutex_init(&lockObjHeader,NULL);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
return 1; //failure
//Initialize socket pool
- if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) {
+ if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
return 0;
}
return 0;
}
-/* This function starts the thread to listen on a socket
- * for tranaction calls */
-void *dstmListen()
-{
- int listenfd, acceptfd;
- struct sockaddr_in my_addr;
- struct sockaddr_in client_addr;
- socklen_t addrlength = sizeof(struct sockaddr);
- pthread_t thread_dstm_accept;
- int i;
- int setsockflag=1;
-
- listenfd = socket(AF_INET, SOCK_STREAM, 0);
- if (listenfd == -1)
- {
- perror("socket");
- exit(1);
- }
- if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
- perror("socket");
- exit(1);
- }
+int startlistening() {
+ int listenfd;
+ struct sockaddr_in my_addr;
+ socklen_t addrlength = sizeof(struct sockaddr);
+ int setsockflag=1;
+
+ listenfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (listenfd == -1) {
+ perror("socket");
+ exit(1);
+ }
+
+ if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
#ifdef MAC
- if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
- perror("socket");
- exit(1);
- }
+ if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
#endif
+
+ my_addr.sin_family = AF_INET;
+ my_addr.sin_port = htons(LISTEN_PORT);
+ my_addr.sin_addr.s_addr = INADDR_ANY;
+ memset(&(my_addr.sin_zero), '\0', 8);
+
+ if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
+ perror("bind");
+ exit(1);
+ }
+
+ if (listen(listenfd, BACKLOG) == -1) {
+ perror("listen");
+ exit(1);
+ }
+ return listenfd;
+}
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(LISTEN_PORT);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- memset(&(my_addr.sin_zero), '\0', 8);
-
- if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
- {
- perror("bind");
- exit(1);
- }
-
- if (listen(listenfd, BACKLOG) == -1)
- {
- perror("listen");
- exit(1);
- }
-
- printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
- while(1)
- {
- int retval;
- acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
- do {
- retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
- } while(retval!=0);
- pthread_detach(thread_dstm_accept);
- }
+/* This function starts the thread to listen on a socket
+ * for tranaction calls */
+void *dstmListen(void *lfd) {
+ int listenfd=(int)lfd;
+ int acceptfd;
+ struct sockaddr_in client_addr;
+ socklen_t addrlength = sizeof(struct sockaddr);
+ pthread_t thread_dstm_accept;
+
+ printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+ while(1) {
+ int retval;
+ int flag=1;
+ acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+ setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
+ do {
+ retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+ } while(retval!=0);
+ pthread_detach(thread_dstm_accept);
+ }
}
/* This function accepts a new connection request, decodes the control message in the connection
* and accordingly calls other functions to process new requests */
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
- /*
- transinfo.objlocked = NULL;
- transinfo.objnotfound = NULL;
- transinfo.modptr = NULL;
- transinfo.numlocked = 0;
- transinfo.numnotfound = 0;
- */
-
/* Receive control messages from other machines */
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
- if (ret==-1)
+ if (ret==0)
+ break;
+ if (ret==-1) {
+ printf("DEBUG -> RECV Error!.. retrying\n");
break;
+ }
switch(control) {
case READ_REQUEST:
/* Read oid requested and search if available */
printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
return 1;
}
- STATUS(((objheader_t *)header)) &= ~(LOCK);
+ UnLock(STATUSPTR(header));
}
/* Send ack to Coordinator */
int tmpsize;
headptr = (objheader_t *) ptr;
oid = OID(headptr);
- version = headptr->version;
- GETSIZE(tmpsize, headptr);
- ptr += sizeof(objheader_t) + tmpsize;
- }
-
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else {/* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- STATUS(headptr) &= ~(LOCK);
- }
- free(oidlocked);
- }
- send_data(acceptfd, &control, sizeof(char));
- return control;
- }
- } else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- STATUS(headptr) &= ~(LOCK);
- }
- free(oidlocked);
- }
-
- /* Send TRANS_DISAGREE to Coordinator */
- send_data(acceptfd, &control, sizeof(char));
- return control;
- }
- }
- }
+ version = headptr->version;
+ GETSIZE(tmpsize, headptr);
+ ptr += sizeof(objheader_t) + tmpsize;
+ }
+
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[objnotfound] = oid;
+ objnotfound++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (test_and_set(STATUSPTR(mobj))) {
+ //don't have lock
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ v_matchlock++;
+ } else {/* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ UnLock(STATUSPTR(headptr));
+ }
+ free(oidlocked);
+ }
+ send_data(acceptfd, &control, sizeof(char));
+ return control;
+ }
+ } else {/* If Obj is not locked then lock object */
+ /* Save all object oids that are locked on this machine during this transaction request call */
+ oidlocked[objlocked] = OID(((objheader_t *)mobj));
+ objlocked++;
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ v_matchnolock++;
+ } else { /* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ UnLock(STATUSPTR(headptr));
+ }
+ free(oidlocked);
+ }
+
+ /* Send TRANS_DISAGREE to Coordinator */
+ send_data(acceptfd, &control, sizeof(char));
+ return control;
+ }
+ }
+ }
}
/* Decide what control message to send to Coordinator */
* addresses in lookup table and also changes version number
* Sends an ACK back to Coordinator */
int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
- objheader_t *header;
- objheader_t *newheader;
- int i = 0, offset = 0;
- char control;
- int tmpsize;
-
- /* Process each modified object saved in the mainobject store */
- for(i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize,header);
- pthread_mutex_lock(&mainobjstore_mutex);
- memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
- header->version += 1;
- /* If threads are waiting on this object to be updated, notify them */
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
- }
- pthread_mutex_unlock(&mainobjstore_mutex);
- offset += sizeof(objheader_t) + tmpsize;
- }
-
- if (nummod > 0)
- free(modptr);
-
- /* Unlock locked objects */
- for(i = 0; i < numlocked; i++) {
- if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- STATUS(header) &= ~(LOCK);
- }
- //TODO Update location lookup table
-
- /* Send ack to coordinator */
- control = TRANS_SUCESSFUL;
- send_data((int)acceptfd, &control, sizeof(char));
- return 0;
+ objheader_t *header;
+ objheader_t *newheader;
+ int i = 0, offset = 0;
+ char control;
+ int tmpsize;
+
+ /* Process each modified object saved in the mainobject store */
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize,header);
+ memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+ header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ offset += sizeof(objheader_t) + tmpsize;
+ }
+
+ if (nummod > 0)
+ free(modptr);
+
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+ //TODO Update location lookup table
+
+ /* Send ack to coordinator */
+ control = TRANS_SUCESSFUL;
+ send_data((int)acceptfd, &control, sizeof(char));
+ return 0;
}
/* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
- int length;
- char *recvbuffer, *sendbuffer, control;
- unsigned int oid, mid;
- objheader_t *header;
- struct sockaddr_in remoteAddr;
- oidmidpair_t oidmid;
-
- do {
- recv_data((int)acceptfd, &length, sizeof(int));
- if(length != -1) {
- recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
- oid = oidmid.oid;
- mid = oidmid.mid;
- size = length - sizeof(int) - (2 * sizeof(unsigned int));
- numoffset = size/sizeof(short);
- short offsetarry[numoffset];
- recv_data((int) acceptfd, offsetarry, size);
-
- int sd = -1;
- if((sd = getSock(transPResponseSocketPool, mid)) == -1) {
- printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
- exit(-1);
- }
-
- /*Process each oid */
- if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- } else { /* Object Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
-
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
- }
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
- unsigned short length = ao->___length___;
- /* Check if array out of bounds */
- if(offsetarry[i]< 0 || offsetarry[i] >= length) {
- break;
- }
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
- } else {
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
- }
-
- if((header = mhashSearch(oid)) == NULL) {
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- break;
- } else {/* Obj Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- }
- isArray = 0;
- }
- }
-
- //Release socket
- int status;
- if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) {
- printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__);
- return -1;
- }
- }
- } while (length != -1);
- return 0;
+ int i, size, objsize, numoffset = 0;
+ int length;
+ char *recvbuffer, control;
+ unsigned int oid, mid=-1;
+ objheader_t *header;
+ oidmidpair_t oidmid;
+ int sd = -1;
+
+ while(1) {
+ recv_data((int)acceptfd, &numoffset, sizeof(int));
+ if(numoffset == -1)
+ break;
+ recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ if (mid != oidmid.mid) {
+ if (mid!=-1) {
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ }
+ mid=oidmid.mid;
+ sd = getSockWithLock(transPResponseSocketPool, mid);
+ }
+ short offsetarry[numoffset];
+ recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ char sendbuffer[size];
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ } else { /* Object Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size];
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ /* Don't continue if we hit a NULL pointer */
+ if (oid==0)
+ break;
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ char sendbuffer[size];
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ break;
+ } else {/* Obj Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size];
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ }
+ }
+ }
+ }
+ //Release socket
+ if (mid!=-1)
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+
+ return 0;
}
-int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
- int numbytes = 0;
-
+void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
send_data(sd, control, sizeof(char));
/* Send the buffer with its size */
int length = *(size);
send_data(sd, sendbuffer, length);
- free(sendbuffer);
- return 0;
}
void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
- objheader_t *header;
- unsigned int oid;
- unsigned short newversion;
- char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
- int sd;
- struct sockaddr_in remoteAddr;
- int bytesSent;
- int size;
-
- int i = 0;
- while(i < numoid) {
- oid = *(oidarry + i);
- if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return;
- } else {
- /* Check to see if versions are same */
-checkversion:
- if ((STATUS(header) & LOCK) != LOCK) {
- //FIXME make locking atomic
- STATUS(header) |= LOCK;
- newversion = header->version;
- if(newversion == *(versionarry + i)) {
- //Add to the notify list
- if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
- printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
- return;
- }
- STATUS(header) &= ~(LOCK);
- } else {
- STATUS(header) &= ~(LOCK);
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("processReqNotify():socket()");
- return;
- }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return;
- } else {
- //Send Update notification
- msg[0] = THREAD_NOTIFY_RESPONSE;
- *((unsigned int *)&msg[1]) = oid;
- size = sizeof(unsigned int);
- *((unsigned short *)(&msg[1]+size)) = newversion;
- size += sizeof(unsigned short);
- *((unsigned int *)(&msg[1]+size)) = threadid;
- size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
- send_data(sd, msg, size);
- }
- close(sd);
- }
- } else {
- randomdelay();
- goto checkversion;
- }
- }
- i++;
+ objheader_t *header;
+ unsigned int oid;
+ unsigned short newversion;
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
+ int sd;
+ struct sockaddr_in remoteAddr;
+ int bytesSent;
+ int size;
+ int i = 0;
+
+ while(i < numoid) {
+ oid = *(oidarry + i);
+ if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ /* Check to see if versions are same */
+ checkversion:
+ if (test_and_set(STATUSPTR(header))==0) {
+ //have lock
+ newversion = header->version;
+ if(newversion == *(versionarry + i)) {
+ //Add to the notify list
+ if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+ printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ UnLock(STATUSPTR(header));
+ } else {
+ UnLock(STATUSPTR(header));
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("processReqNotify():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return;
+ } else {
+ //Send Update notification
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+size)) = newversion;
+ size += sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+size)) = threadid;
+ size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sd, msg, size);
+ }
+ close(sd);
}
- free(oidarry);
- free(versionarry);
+ } else {
+ randomdelay();
+ goto checkversion;
+ }
+ }
+ i++;
+ }
+ free(oidarry);
+ free(versionarry);
}