1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
21 int numTransCommit = 0;
22 int numTransAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
29 /* ==================================================
31 * This function starts up the transaction runtime.
32 * ==================================================
38 /* ======================================
40 * - create an object store of given size
41 * ======================================
43 objstr_t *objstrCreate(unsigned int size) {
45 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
46 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
51 tmp->top = tmp + 1; //points to end of objstr_t structure!
56 while(t_cache->next!=NULL) {
57 objstr_t *next=t_cache->next;
58 t_cache->next=t_reserve;
62 t_cache->top=t_cache+1;
65 //free entire list, starting at store
66 void objstrDelete(objstr_t *store) {
68 while (store != NULL) {
76 /* =================================================
78 * This function initializes things required in the
80 * =================================================
83 //Transaction start is currently free...commit and aborting is not
86 /* =======================================================
88 * This function creates objects in the transaction record
89 * =======================================================
91 objheader_t *transCreateObj(void * ptr, unsigned int size) {
92 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
93 objheader_t *retval=&tmp[1];
94 tmp->lock=RW_LOCK_BIAS;
97 // don't insert into table
98 if (newobjs->offset<MAXOBJLIST) {
99 newobjs->objs[newobjs->offset++]=retval;
101 struct objlist *tmp=malloc(sizeof(struct objlist));
107 return retval; //want space after object header
110 /* This functions inserts randowm wait delays in the order of msec
111 * Mostly used when transaction commits retry*/
118 req.tv_nsec = (long)(t%4); //1-11 microsec
119 nanosleep(&req, NULL);
123 /* ==============================================
125 * - allocate space in an object store
126 * ==============================================
128 void *objstrAlloc(unsigned int size) {
131 objstr_t *store=t_cache;
137 if (OSFREE(store)>=size) {
142 if ((store=store->next)==NULL)
147 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
148 objstr_t **otmp=&t_reserve;
150 while((ptr=*otmp)!=NULL) {
151 if (ptr->size>=newsize) {
156 ptr->top=((char *)(&ptr[1]))+size;
161 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
166 os->top=((char *)nptr)+size;
171 /* =============================================================
173 * -finds the objects either in main heap
174 * -copies the object into the transaction cache
175 * =============================================================
177 __attribute__((pure)) void *transRead(void * oid) {
178 objheader_t *tmp, *objheader;
179 objheader_t *objcopy;
182 //quick case for new objects
183 if (((struct ___Object___ *)oid)->___objstatus___ & NEW)
186 /* Read from the main heap */
188 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
189 GETSIZE(size, header);
190 size += sizeof(objheader_t);
191 objcopy = (objheader_t *) objstrAlloc(size);
192 memcpy(objcopy, header, size);
193 /* Insert into cache's lookup table */
195 t_chashInsert(oid, &objcopy[1]);
200 struct objlist *ptr=newobjs;
201 while(ptr->next!=NULL) {
202 struct objlist *tmp=ptr->next;
210 /* ================================================================
212 * - This function initiates the transaction commit process
213 * - goes through the transaction cache and decides
215 * ================================================================
222 /* Look through all the objects in the transaction hash table */
223 int finalResponse = traverseCache();
224 if(finalResponse == TRANS_ABORT) {
236 if(finalResponse == TRANS_COMMIT) {
248 /* wait a random amount of time before retrying to commit transaction*/
249 if(finalResponse == TRANS_SOFT_ABORT) {
256 printf("Error: in %s() Unknown outcome", __func__);
262 /* ==================================================
264 * - goes through the transaction cache and
265 * - decides if a transaction should commit or abort
266 * ==================================================
268 int traverseCache() {
269 /* Create info to keep track of objects that can be locked */
270 int numoidrdlocked=0;
271 int numoidwrlocked=0;
272 void * rdlocked[200];
273 void * wrlocked[200];
278 if (c_numelements<200) {
279 oidrdlocked=rdlocked;
280 oidwrlocked=wrlocked;
282 int size=c_numelements*sizeof(void*);
283 oidrdlocked=malloc(size);
284 oidwrlocked=malloc(size);
286 chashlistnode_t *ptr = c_table;
287 /* Represents number of bins in the chash table */
288 unsigned int size = c_size;
289 for(i = 0; i<size; i++) {
290 chashlistnode_t *curr = &ptr[i];
291 /* Inner loop to traverse the linked list of the cache lookupTable */
292 while(curr != NULL) {
293 //if the first bin in hash table is empty
294 if(curr->key == NULL)
296 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
298 unsigned int version = headeraddr->version;
299 objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
301 if(STATUS(headeraddr) & DIRTY) {
302 /* Read from the main heap and compare versions */
303 if(write_trylock(&header->lock)) { //can aquire write lock
304 if (version == header->version) {/* versions match */
305 /* Keep track of objects locked */
306 oidwrlocked[numoidwrlocked++] = OID(header);
308 oidwrlocked[numoidwrlocked++] = OID(header);
309 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
312 } else { /* cannot aquire lock */
313 if(version == header->version) /* versions match */
316 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
321 /* Read from the main heap and compare versions */
322 if(read_trylock(&header->lock)) { //can further aquire read locks
323 if(version == header->version) {/* versions match */
324 oidrdlocked[numoidrdlocked++] = OID(header);
326 oidrdlocked[numoidrdlocked++] = OID(header);
327 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
330 } else { /* cannot aquire lock */
331 if(version == header->version)
334 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
344 /* Decide the final response */
346 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
347 return TRANS_SOFT_ABORT;
349 transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
355 /* ==================================
358 * =================================
360 int transAbortProcess(void **oidrdlocked, int *numoidrdlocked, void **oidwrlocked, int *numoidwrlocked) {
363 /* Release read locks */
364 for(i=0; i< *numoidrdlocked; i++) {
365 /* Read from the main heap */
366 header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
367 read_unlock(&header->lock);
370 /* Release write locks */
371 for(i=0; i< *numoidwrlocked; i++) {
372 /* Read from the main heap */
373 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
374 write_unlock(&header->lock);
376 if (c_numelements>=200) {
382 /* ==================================
385 * =================================
387 int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
388 void ** oidwrlocked, int *numoidwrlocked) {
392 struct objlist *ptr=newobjs;
397 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
402 /* Copy from transaction cache -> main object store */
403 for (i = 0; i < *numoidwrlocked; i++) {
404 /* Read from the main heap */
405 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
407 GETSIZE(tmpsize, header);
408 struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
409 struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
410 dst->___cachedCode___=src->___cachedCode___;
411 dst->___cachedHash___=src->___cachedHash___;
412 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
413 header->version += 1;
416 /* Release read locks */
417 for(i=0; i< *numoidrdlocked; i++) {
418 /* Read from the main heap */
419 header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
420 read_unlock(&header->lock);
423 /* Release write locks */
424 for(i=0; i< *numoidwrlocked; i++) {
425 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
426 write_unlock(&header->lock);
428 if (c_numelements>=200) {