#define MSG_NOSIGNAL 0
#endif
+/***********************************************************
+ * Macros
+ **********************************************************/
#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-
-
-//Coordinator Messages
+/*****************************************
+ * Coordinator Messages
+ ***************************************/
#define READ_REQUEST 1
#define READ_MULT_REQUEST 2
#define MOVE_REQUEST 3
#define TRANS_PREFETCH 8
#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9
-//Participant Messages
+/*********************************
+ * Participant Messages
+ *******************************/
#define OBJECT_FOUND 10
#define OBJECT_NOT_FOUND 11
#define OBJECTS_FOUND 12
#define THREAD_NOTIFY_RESPONSE 25
#define TRANS_UNSUCESSFUL 26
-//Control bits for status of objects in Machine pile
-#define OBJ_LOCKED_BUT_VERSION_MATCH 14
-#define OBJ_UNLOCK_BUT_VERSION_MATCH 15
-#define VERSION_NO_MATCH 16
-
//Max number of objects
#define MAX_OBJECTS 20
+//Max remote-machine connections
+#define NUM_MACHINES 2
+#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
+//Transaction id per machine
+#define TID_LEN 20
#include <stdlib.h>
#include "threadnotify.h"
-#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
-#define TID_LEN 20
//bit designations for status field of objheader
#define DIRTY 0x01
#define NEW 0x02
struct ___Object___ * revertlist;
#endif
} transrecord_t;
+
// Structure is a shared structure that keeps track of responses from the participants
typedef struct thread_response {
char rcv_status;
//Structure to store mid and socketid information
typedef struct midSocketInfo {
- unsigned int mid;
+ unsigned int mid; /* To communicate with mid use sockid in this data structure*/
int sockid;
} midSocketInfo_t;
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
+void send_data(int fd, void *buf, int buflen);
+void recv_data(int fd, void *buf, int buflen);
/* Prototypes for object header */
unsigned int getNewOID(void);
void randomdelay();
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *, unsigned int);
-objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
+objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header
int transCommit(transrecord_t *record); //return 0 if successful
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-void *handleLocalReq(void *);
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);// returns object header from main object store after object is copied into it from remote machine
+void *handleLocalReq(void *);//handles Local requests
int transComProcess(local_thread_data_array_t *);
int transAbortProcess(local_thread_data_array_t *);
void transAbort(transrecord_t *trans);
void *transPrefetch(void *);
void *mcqProcess(void *);
void checkPrefetchTuples(prefetchqelem_t *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);
-prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
+prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets)
void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
int transPrefetchProcess(transrecord_t *, int **, short);
void sendPrefetchReq(prefetchpile_t*, int);