- int sd, size, val;
- struct sockaddr_in serv_addr;
- struct hostent *server;
- char control;
- char machineip[16];
- objheader_t *h;
- void *objcopy;
-
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket\n");
- return NULL;
- }
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
- midtoIP(mnum,machineip);
- machineip[15] = '\0';
- serv_addr.sin_addr.s_addr = inet_addr(machineip);
- /* Open connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect\n");
- return NULL;
- }
- char readrequest[sizeof(char)+sizeof(unsigned int)];
- readrequest[0] = READ_REQUEST;
- *((unsigned int *)(&readrequest[1])) = oid;
- if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
- perror("Error sending message\n");
- return NULL;
- }
-
-#ifdef DEBUG1
- printf("DEBUG -> ready to rcv ...\n");
-#endif
- /* Read response from the Participant */
- if((val = read(sd, &control, sizeof(char))) <= 0) {
- perror("No control response for getRemoteObj sent\n");
- return NULL;
- }
- switch(control) {
- case OBJECT_NOT_FOUND:
- return NULL;
- case OBJECT_FOUND:
- /* Read object if found into local cache */
- if((val = read(sd, &size, sizeof(int))) <= 0) {
- perror("No size is read from the participant\n");
- return NULL;
- }
- objcopy = objstrAlloc(record->cache, size);
- if((val = read(sd, objcopy, size)) <= 0) {
- perror("No objects are read from the remote participant\n");
- return NULL;
- }
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, oid, objcopy);
- break;
- default:
- printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
- return NULL;
- }
- /* Close connection */
- close(sd);
- return objcopy;
-}
-
-/* This function handles the local objects involved in a transaction commiting process.
- * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
- * Note Coordinator = local machine
- * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
- * based on common agreement it either commits or aborts the transaction.
- * It also frees the memory resources */
-void *handleLocalReq(void *threadarg) {
- int val, i = 0, size, offset = 0;
- short version;
- char control = 0, *ptr;
- unsigned int oid;
- unsigned int *oidnotfound = NULL, *oidlocked = NULL;
- void *mobj, *modptr;
- objheader_t *headptr, *headeraddr;
- local_thread_data_array_t *localtdata;
-
- localtdata = (local_thread_data_array_t *) threadarg;
-
- /* Counters and arrays to formulate decision on control message to be sent */
- oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0;
- int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
- /* modptr points to the beginning of the object store
- * created at the Pariticipant */
- pthread_mutex_lock(&mainobjstore_mutex);
- if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
- printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mainobjstore_mutex);
- return NULL;
- }
- pthread_mutex_unlock(&mainobjstore_mutex);
- /* Write modified objects into the mainobject store */
- for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
- headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
- GETSIZE(size,headeraddr);
- size+=sizeof(objheader_t);
- memcpy(modptr+offset, headeraddr, size);
- offset += size;
- }
- /* Write new objects into the mainobject store */
- for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
- headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
- GETSIZE(size, headeraddr);
- size+=sizeof(objheader_t);
- memcpy(modptr+offset, headeraddr, size);
- offset += size;
- }
-
- ptr = modptr;
- offset = 0; //Reset
-
- /* Process each oid in the machine pile/ group per thread */
- for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
- if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
- int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
- incr *= i;
- oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
- incr += sizeof(unsigned int);
- version = *((short *)(localtdata->tdata->buffer->objread + incr));
- } else {//Objs modified
- int tmpsize;
- headptr = (objheader_t *)ptr;
- oid = OID(headptr);
- version = headptr->version;
- GETSIZE(tmpsize, headptr);
- ptr += sizeof(objheader_t) + tmpsize;
- }
-
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
- /* Save the oids not found and number of oids not found for later use */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if (STATUS(((objheader_t *)mobj)) & LOCK) {
- if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */
- v_matchlock++;
- } else {/* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
- } else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- //TODO Remove this for Testing
- //randomdelay(); -- Why is this here. BCD
-
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
- }
- }
- }
-
- /* Condition to send TRANS_AGREE */
- if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
- }
- /* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- //TODO currently the only soft abort case that is supported is when object locked by previous
- //transaction => v_matchlock > 0
- //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported
- /* Send number of oids not found and the missing oids if objects are missing in the machine */
- /* TODO Remember to store the oidnotfound for later use
- if(objnotfound != 0) {
- int size = sizeof(unsigned int)* objnotfound;
- }
- */
- }
-
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objlocked = oidlocked;
- localtdata->transinfo->objnotfound = oidnotfound;
- localtdata->transinfo->modptr = modptr;
- localtdata->transinfo->numlocked = objlocked;
- localtdata->transinfo->numnotfound = objnotfound;
-
- /* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
- pthread_mutex_lock(localtdata->tdata->lock);
- (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
- if (decideResponse(localtdata->tdata) != 0) {
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(localtdata->tdata->lock);
- return NULL;
- }
- pthread_cond_broadcast(localtdata->tdata->threshold);
- } else {
- pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
- }
- pthread_mutex_unlock(localtdata->tdata->lock);
-
- /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
- if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
- printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
- return NULL;
- }
- }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
- if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
- printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
- return NULL;
- }
- }
-
- /* Free memory */
-
- if (localtdata->transinfo->objlocked != NULL) {
- free(localtdata->transinfo->objlocked);
- localtdata->transinfo->objlocked = NULL;
- }
- if (localtdata->transinfo->objnotfound != NULL) {
- free(localtdata->transinfo->objnotfound);
- localtdata->transinfo->objnotfound = NULL;
- }
-
- pthread_exit(NULL);