3 #include "methodheaders.h"
4 __thread int transaction_check_counter;
5 __thread jmp_buf aborttrans;
6 __thread int abortenabled;
7 __thread int * counter_reset_pointer;
8 extern unsigned int myIpAddr;
9 extern sockPoolHashTable_t *transRequestSockPool;
12 void errorhandler(int sig, struct sigcontext ctx) {
13 if (abortenabled&&checktrans()) {
15 sigemptyset(&toclear);
16 sigaddset(&toclear, sig);
17 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
21 objstrDelete(t_cache);
23 _longjmp(aborttrans, 1);
25 printf("Error in System at %s, %s(), %d\n", __FILE__, __func__, __LINE__);
27 threadhandler(sig, ctx);
32 * returns 0 when read set objects are consistent
33 * returns 1 when objects are inconsistent
36 /* Create info to keep track of numelements */
37 unsigned int size = c_size;
38 chashlistnode_t *ptr = c_table;
40 nodeElem_t *head=NULL;
43 for(i = 0; i< size; i++) {
44 chashlistnode_t *curr = &ptr[i];
45 /* Inner loop to traverse the linked list of the cache lookupTable */
49 objheader_t *headeraddr=(objheader_t*) curr->val;
50 unsigned int machinenum;
53 if (STATUS(headeraddr) & NEW) {
54 //new objects cannot be stale
55 } else if ((tmp=mhashSearch(curr->key)) != NULL) {
58 if (tmp->version!=headeraddr->version) {
61 return 1; //return 1 when objects are inconsistent
64 machinenum = lhashSearch(curr->key);
65 head = createList(head, headeraddr, machinenum, c_numelements);
71 /* Send oid and versions for checking */
75 int retval = verify(head);
80 nodeElem_t * createList(nodeElem_t *head, objheader_t *headeraddr, unsigned int mid,
81 unsigned int c_numelements) {
83 nodeElem_t *ptr, *tmp;
84 int found = 0, offset = 0;
89 if (STATUS(headeraddr) & DIRTY) {
90 offset = (sizeof(unsigned int) + sizeof(short)) * tmp->nummod;
91 *((unsigned int *)(((char *)tmp->objmod) + offset))=OID(headeraddr);
92 offset += sizeof(unsigned int);
93 *((unsigned short *)(((char *)tmp->objmod) + offset)) = headeraddr->version;
96 offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
97 *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
98 offset += sizeof(unsigned int);
99 *((unsigned short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
107 //Add oid for any new machine
110 ptr = makehead(c_numelements);
111 if((ptr = makehead(c_numelements)) == NULL) {
112 printf("Error in Allocating memory %s, %d\n", __func__, __LINE__);
116 if (STATUS(headeraddr) & DIRTY) {
117 offset = (sizeof(unsigned int) + sizeof(short)) * ptr->nummod;
118 *((unsigned int *)(((char *)ptr->objmod) + offset))=OID(headeraddr);
119 offset += sizeof(unsigned int);
120 *((unsigned short *)(((char *)ptr->objmod) + offset)) = headeraddr->version;
123 offset = (sizeof(unsigned int) + sizeof(short)) * ptr->numread;
124 *((unsigned int *)(((char *)ptr->objread) + offset))=OID(headeraddr);
125 offset += sizeof(unsigned int);
126 *((unsigned short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
135 nodeElem_t * makehead(unsigned int numelements) {
137 //Create the first element
138 if((head = calloc(1, sizeof(nodeElem_t))) == NULL) {
139 printf("Calloc error %s %d\n", __func__, __LINE__);
143 if ((head->objmod = calloc(numelements,sizeof(unsigned int) + sizeof(unsigned short))) == NULL) {
144 printf("Calloc error %s %d\n", __func__, __LINE__);
149 if ((head->objread = calloc(numelements,sizeof(unsigned int) + sizeof(unsigned short))) == NULL) {
150 printf("Calloc error %s %d\n", __func__, __LINE__);
157 head->nummod = head->numread = 0;
162 //Delete the entire list
163 void deletehead(nodeElem_t *head) {
164 nodeElem_t *next, *tmp;
177 /* returns 0 => Inconsistent Objects found, abort transaction */
178 /* returns 1 => consistent objects found, error in system */
179 /* Process the linked list of objects */
180 int verify(nodeElem_t *pile) {
181 /* create and initialize an array of sockets and reply receiving buffer */
183 char getReplyCtrl[numNode];
185 for(i=0; i<numNode; i++) {
190 /* send objects for consistency check to remote machine */
191 objData_t tosend[numNode];
193 while(pile != NULL) {
194 /* send total bytes */
195 tosend[pilecount].control = CHECK_OBJECTS;
196 tosend[pilecount].numread = pile->numread;
197 tosend[pilecount].nummod = pile->nummod;
199 if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
200 printf("Error: Getting a socket descriptor at %s(), %s(), %d\n", __FILE__, __func__, __LINE__);
203 sock[pilecount] = sd;
205 /* Send starting information of data */
206 send_data(sd, &(tosend[pilecount]), sizeof(objData_t));
209 /* Send objetcs that are read */
211 size=(sizeof(unsigned int)+sizeof(unsigned short)) * pile->numread;
212 send_data(sd, (char *)pile->objread, size);
215 /* Send objects that are modified */
217 size=(sizeof(unsigned int)+sizeof(unsigned short)) * pile->nummod;
218 send_data(sd, (char *)pile->objmod, size);
222 } // end of pile processing
225 int countConsistent = 0;
227 /* Recv replies from remote machines */
228 for(i = 0; i<numNode; i++) {
232 recv_data(sd, &control, sizeof(char));
233 getReplyCtrl[i] = control;
234 if(control == OBJ_INCONSISTENT) { /* Inconsistent */
242 /* Decide final response */
244 printf("Inconsistent Object-> Abort Transaction\n");
248 if(countConsistent == numNode) {
256 void checkObjects() {
257 if (abortenabled&&checktrans()) {
258 printf("Loop Abort\n");
259 transaction_check_counter=(*counter_reset_pointer=HIGH_CHECK_FREQUENCY);
263 objstrDelete(t_cache);
265 _longjmp(aborttrans, 1);
267 transaction_check_counter=*counter_reset_pointer;
270 /* Check excessive memory allocation */
271 void check_mem_alloc() {
272 if (abortenabled&&checktrans()) {
273 printf("Excessive Allocation\n");
274 trans_allocation_bytes=0;
275 transaction_check_counter=(*counter_reset_pointer=HIGH_CHECK_FREQUENCY);
279 objstrDelete(t_cache);
281 _longjmp(aborttrans, 1);
283 trans_allocation_bytes=0;
286 /* Obtain a backtrace and print it to stdout */
293 size = backtrace(array, 100);
294 strings = backtrace_symbols(array, size);
296 printf("Obtained %zd stack frames.\n", size);
297 for (i = 0; i < size; i++)
298 printf("%s\n", strings[i]);
302 void checkObjVersion(struct readstruct * readbuffer, int sd, unsigned int numread, unsigned int nummod) {
306 /* Recv objects read with versions */
307 int size=(sizeof(unsigned int)+sizeof(unsigned short)) * numread;
310 recv_data_buf(sd, readbuffer, objread, size);
313 /* Recv objects modified with versions */
314 size=(sizeof(unsigned int)+sizeof(unsigned short)) * nummod;
317 recv_data_buf(sd, readbuffer, objmod, size);
322 for(i=0; i<numread; i++) {
323 size = sizeof(unsigned int)+sizeof(unsigned short);
325 unsigned int oid = *((unsigned int *)(objread + size));
326 size += sizeof(unsigned int);
327 unsigned short version = *((unsigned short *)(objread + size));
329 if((header = mhashSearch(oid)) == NULL) { /* Obj not found */
330 control = OBJ_INCONSISTENT;
331 send_data(sd, &control, sizeof(char));
334 if(is_write_locked(STATUSPTR(header))) { //object write locked
335 control = OBJ_INCONSISTENT;
336 send_data(sd, &control, sizeof(char));
341 if(version == header->version)
344 control = OBJ_INCONSISTENT;
345 send_data(sd, &control, sizeof(char));
349 } // end of objects read
351 for(i=0; i<nummod; i++) {
352 //unsigned int oid = objmod[i].oid;
353 //unsigned short version = objmod[i].version;
354 size = sizeof(unsigned int)+sizeof(unsigned short);
356 unsigned int oid = *((unsigned int *)(objmod + size));
357 size += sizeof(unsigned int);
358 unsigned short version = *((unsigned short *)(objmod + size));
360 if((header = mhashSearch(oid)) == NULL) { /* Obj not found */
361 control = OBJ_INCONSISTENT;
362 send_data(sd, &control, sizeof(char));
365 if(is_write_locked(STATUSPTR(header))) { //object write locked
366 control = OBJ_INCONSISTENT;
367 send_data(sd, &control, sizeof(char));
371 if(version == header->version)
374 control = OBJ_INCONSISTENT;
375 send_data(sd, &control, sizeof(char));
379 } // end of objects modified
381 if(v_match = (numread + nummod)) {
382 control = OBJ_CONSISTENT;
383 send_data(sd, &control, sizeof(char));