#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
#include "gCollect.h"
#ifdef COMPILER
#include "thread.h"
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
pthread_mutex_t notifymutex;
pthread_mutex_t atomicObjLock;
//Initialize socket pool
transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+ transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
dstmInit();
transInit();
pthread_mutex_t tlshrd;
thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-
ltdata = calloc(1, sizeof(local_thread_data_array_t));
thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
thread_data_array[threadnum].replyctrl = &treplyctrl;
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
+ thread_data_array[threadnum].pilehead = pile_ptr;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
do {
objheader_t *headeraddr;
char control, recvcontrol;
char machineip[16], retval;
-
+
tdata = (thread_data_array_t *) threadarg;
-
- /* Send Trans Request */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) {
- printf("transRequest():error %d\n", errno);
- perror("transRequest() socket error");
- pthread_exit(NULL);
- }
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
- /* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- printf("transRequest():error %d, sd= %d\n", errno, sd);
- perror("transRequest() connect");
- close(sd);
+
+ if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
+ printf("transRequest(): socket create error\n");
pthread_exit(NULL);
}
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- close(sd);
pthread_exit(NULL);
}
pthread_mutex_unlock(&prefetchcache_mutex);
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
- close(sd);
pthread_exit(NULL);
}
} else {
//printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
}
-
- /* Close connection */
- close(sd);
pthread_exit(NULL);
}
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
/* clear objects from prefetch cache */
- for (i = 0; i < tdata->buffer->f.numread; i++) {
- prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
- }
- for (i = 0; i < tdata->buffer->f.nummod; i++) {
- prehashRemove(tdata->buffer->oidmod[i]);
- }
+ cleanPCache(tdata);
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
- /* update prefetch cache */
- /* For objects read */
- char oidType;
int retval;
- oidType = 'R';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- oidType = 'M';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+ if((retval = updatePrefetchCache(tdata)) != 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
return;
}
+
/* Invalidate objects in other machine cache */
if(tdata->buffer->f.nummod > 0) {
if((retval = invalidateObj(tdata)) != 0) {
return;
}
-/* This function updates the prefetch cache when commiting objects
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
- int i;
- for (i = 0; i < numoid; i++) {
- //find address object
- objheader_t *header, *newAddr;
- int size;
- unsigned int oid;
- if(oidType == 'R') {
- oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
- } else {
- oid = tdata->buffer->oidmod[i];
- }
- pthread_mutex_lock(&prefetchcache_mutex);
- header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
- header->version += 1;
- //copy object into prefetch cache
- GETSIZE(size, header);
- if ((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
- printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- return -1;
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(newAddr, header, (size + sizeof(objheader_t)));
- //make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, newAddr);
- } else {
- prehashInsert(oid, newAddr);
- }
- }
- return 0;
-}
-
-
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function
chashDelete(trans->lookupTable);
free(trans);
}
+
+/* This function inserts necessary information into
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+ plistnode_t *ptr, *tmp;
+ int found = 0, offset = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is already present in the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ }else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+ tmp->numread ++;
+ }
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ offset = sizeof(unsigned int);
+ *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+ ptr->numread ++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ /* Clear Flags */
+ STATUS(headeraddr) =0;
+
+ return pile;
+}