11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //todo:initialize main object store
22 //do we want this to be a global variable, or provide
23 //separate access funtions and hide the structure?
24 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
25 if (mhashCreate(HASH_SIZE, LOADFACTOR))
28 if (lhashCreate(HASH_SIZE, LOADFACTOR))
31 //pthread_t threadListen;
32 //pthread_create(&threadListen, NULL, dstmListen, NULL);
39 int listenfd, acceptfd;
40 struct sockaddr_in my_addr;
41 struct sockaddr_in client_addr;
42 socklen_t addrlength = sizeof(struct sockaddr);
43 pthread_t thread_dstm_accept;
46 listenfd = socket(AF_INET, SOCK_STREAM, 0);
53 my_addr.sin_family = AF_INET;
54 my_addr.sin_port = htons(LISTEN_PORT);
55 my_addr.sin_addr.s_addr = INADDR_ANY;
56 memset(&(my_addr.sin_zero), '\0', 8);
58 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
64 if (listen(listenfd, BACKLOG) == -1)
70 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
73 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
74 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
79 void *dstmAccept(void *acceptfd)
81 int numbytes,i,choice, oid;
82 char buffer[RECEIVE_BUFFER_SIZE], control;
85 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
87 printf("Recieved connection: fd = %d\n", (int)acceptfd);
88 while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0)
90 printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
94 printf("DEBUG -> READ_REQUEST\n");
95 oid = *((int *)(buffer+1));
97 printf("DEBUG -> Received oid is %d\n", oid);
99 srcObj = mhashSearch(oid);
100 h = (objheader_t *) srcObj;
102 buffer[0] = OBJECT_NOT_FOUND;
104 buffer[0] = OBJECT_FOUND;
105 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
106 memcpy(buffer+1, srcObj, size);
109 printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
112 if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
116 case READ_MULT_REQUEST:
117 printf("DEBUG-> READ_MULT_REQUEST\n");
120 printf("DEBUG -> MOVE_REQUEST\n");
122 case MOVE_MULT_REQUEST:
123 printf("DEBUG -> MOVE_MULT_REQUEST\n");
126 printf("DEBUG -> TRANS_REQUEST\n");
127 printf("Client sent %d\n",buffer[0]);
128 // handleTransReq(acceptfd, buffer);
131 printf("DEBUG -> TRANS_ABORT\n");
134 printf("DEBUG -> TRANS_COMMIT\n");
135 printf("Client sent %d\n",buffer[0]);
136 //TODO copy the objects into the machine
137 /*copy the object into the object store from its old
138 location in the objstore(pointer to its header is already stored before)*/
141 printf("Error receiving");
143 //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
144 //printf("%s", buffer);
146 if (close((int)acceptfd) == -1)
151 printf("Closed connection: fd = %d\n", (int)acceptfd);
155 //TOOD put __FILE__ __LINE__ for all error conditions
157 int handleTransReq(int acceptfd, char *buf) {
158 short numread = 0, nummod = 0;
160 int offset = 0, size,i;
161 int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
162 objheader_t *headptr = NULL;
166 char sendbuf[RECEIVE_BUFFER_SIZE];
169 offset = sizeof(fixed_data_t);
170 list = *((short *)(buf+offset));
171 offset += sizeof(short);
172 nummod = *((short *)(buf+offset));
173 offset += sizeof(short);
175 //Make an array to store the object headers for all objects that are only read
176 if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
177 perror("handleTransReq: Calloc error");
180 //Process each object id that is only read
181 for (i = 0; i < numread; i++) {
183 tmp = (objheader_t *) (buf + offset);
184 //find if object is still present in the same machine since TRANS_REQUEST
185 if ((mobj = mhashSearch(tmp->oid)) == NULL) {
188 sendbuf[0] = OBJECT_NOT_FOUND;
189 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
193 } else { // If obj found in machine (i.e. has not moved)
194 //Check if obj is locked
195 if ((((objheader_t *)mobj)->status >> 3) == 1) {
196 //Check version of the object
197 if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
200 sendbuf[0] = TRANS_DISAGREE;
201 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
205 } else {//If versions don't match ..HARD ABORT
208 sendbuf[0] = TRANS_DISAGREE_ABORT;
209 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
214 } else {// If object not locked then lock it
215 ((objheader_t *)mobj)->status |= LOCK;
216 if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
219 sendbuf[0] = TRANS_AGREE;
220 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
224 } else {//If versions don't match
227 sendbuf[0] = TRANS_DISAGREE_ABORT;
228 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
235 memcpy(headptr, buf+offset, sizeof(objheader_t));
236 offset += sizeof(objheader_t);
240 if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
241 perror("handleTransReq: Calloc error");
245 //Process each object id that is only modified
246 for(i = 0; i < nummod; i++) {
248 tmp = (objheader_t *)(buf + offset);
249 //find if object is still present in the same machine since TRANS_REQUEST
250 if ((mobj = mhashSearch(tmp->oid)) == NULL) {
253 sendbuf[0] = OBJECT_NOT_FOUND;
254 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
258 } else { // If obj found in machine (i.e. has not moved)
259 //Check if obj is locked
260 if ((((objheader_t *)mobj)->status >> 3) == 1) {
261 //Check version of the object
262 if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
265 sendbuf[0] = TRANS_DISAGREE;
266 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
270 } else {//If versions don't match ..HARD ABORT
273 sendbuf[0] = TRANS_DISAGREE_ABORT;
274 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
279 } else {// If object not locked then lock it
280 ((objheader_t *)mobj)->status |= LOCK;
281 if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
284 sendbuf[0] = TRANS_AGREE;
285 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
289 } else {//If versions don't match
292 sendbuf[0] = TRANS_DISAGREE_ABORT;
293 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
301 size = sizeof(objheader_t) + classsize[tmp->type];
302 if ((top = objstrAlloc(tmpholder, size)) == NULL) {
303 perror("handleTransReq: Calloc error");
306 memcpy(top, buf+offset, size);
312 sendbuf[0] = TRANS_DISAGREE_ABORT;
313 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
317 } else if(transagree == numread+nummod) {
318 sendbuf[0] = TRANS_AGREE;
319 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
323 sendbuf[0] = TRANS_DISAGREE;
324 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {