1 #ifndef _CLIFFC_HASHTABLE_H
2 #define _CLIFFC_HASHTABLE_H
12 This header file declares and defines a simplified version of Cliff Click's
13 NonblockingHashMap. It contains all the necessary structrues and main
14 functions. In simplified_cliffc_hashtable.cc file, it has the definition for
18 template<typename TypeK, typename TypeV, class Hash, class KeyEqualsTo, class ValEqualsTo>
19 class cliffc_hashtable;
22 Corresponding the the Object[] array in Cliff Click's Java implementation.
23 It keeps the first two slots for CHM (Hashtable control unit) and the hash
24 records (an array of hash used for fast negative key-equality check).
32 int real_size = sizeof(atomic<void*>) * 2 + 2;
33 _data = new atomic<void*>[real_size];
34 // The control block should be initialized in resize()
35 // Init the hash record array
36 int *hashes = new int[_size];
38 for (i = 0; i < _size; i++) {
42 // Initialize the array
43 _data[0].store(NULL, memory_order_relaxed);
44 _data[1].store(hashes, memory_order_relaxed);
45 // Init the data to Null slot
46 for (i = 2; i < real_size; i++) {
47 _data[i].store(NULL, memory_order_relaxed);
52 int *hashes = (int*) _data[1].load(memory_order_relaxed);
62 slot(bool prime, void *ptr) {
71 TypeK and TypeV should define their own copy constructor.
73 template<typename TypeK, typename TypeV, class Hash, class KeyEqualsTo, class ValEqualsTo>
74 class cliffc_hashtable {
76 # The synchronization we have for the hashtable gives us the property of
77 # serializability, so we should have a sequential hashtable when we check the
78 # correctness. The key thing is to identify all the commit point.
83 spec_hashtable<TypeK, TypeV*> __map;
84 spec_hashtable<TypeK, Tag> __id_map;
87 static bool _val_equals(TypeV *ptr1, TypeV *ptr2) {
91 # Update the tag for the current key slot if the corresponding tag
92 # is NULL, otherwise just return that tag. It will update the next
93 # available tag too if it requires a new tag for that key slot.
94 static Tag _getKeyTag(TypeK &key) {
95 if (__id_map.get(key) == NULL) {
96 Tag cur_tag = tag.current();
97 __id_map.put(key, cur_tag);
101 return __id_map.get(key);
117 PutIfAbsent(COND_PutIfAbsentSucc),
119 RemoveIfMatch(COND_RemoveIfMatchSucc),
121 ReplaceIfMatch(COND_ReplaceIfMatchSucc)
124 Write_interface -> Read_interface
130 The control structure for the hashtable
134 friend class cliffc_hashtable;
136 atomic<kvs_data*> _newkvs;
138 // Size of active K,V pairs
141 // Count of used slots
144 // The next part of the table to copy
145 atomic_int _copy_idx;
147 // Work-done reporting
148 atomic_int _copy_done;
152 _size.store(size, memory_order_relaxed);
153 _slots.store(0, memory_order_relaxed);
155 _copy_idx.store(0, memory_order_relaxed);
156 _copy_done.store(0, memory_order_relaxed);
163 // Heuristic to decide if the table is too full
164 bool table_full(int reprobe_cnt, int len) {
166 reprobe_cnt >= REPROBE_LIMIT &&
167 _slots.load(memory_order_relaxed) >= reprobe_limit(len);
170 kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
171 kvs_data *newkvs = _newkvs.load(memory_order_acquire);
175 // No copy in-progress, start one; Only double the table size
176 int oldlen = kvs->_size;
177 int sz = _size.load(memory_order_relaxed);
180 // Just follow Cliff Click's heuristic to decide the new size
181 if (sz >= (oldlen >> 2)) { // If we are 25% full
182 newsz = oldlen << 1; // Double size
183 if (sz >= (oldlen >> 1))
184 newsz = oldlen << 2; // Double double size
187 // We do not record the record timestamp
188 if (newsz <= oldlen) newsz = oldlen << 1;
189 // Do not shrink ever
190 if (newsz < oldlen) newsz = oldlen;
192 // Last check cause the 'new' below is expensive
193 newkvs = _newkvs.load(memory_order_acquire);
194 if (newkvs != NULL) return newkvs;
196 newkvs = new kvs_data(newsz);
197 void *chm = (void*) new CHM(sz);
198 newkvs->_data[0].store(chm, memory_order_relaxed);
200 kvs_data *cur_newkvs;
201 // Another check after the slow allocation
202 if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
204 // CAS the _newkvs to the allocated table
205 kvs_data *desired = (kvs_data*) NULL;
206 kvs_data *expected = (kvs_data*) newkvs;
207 if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
208 memory_order_release)) {
209 // Should clean the allocated area
211 newkvs = _newkvs.load(memory_order_acquire);
216 void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
218 assert (get_chm(oldkvs) == this);
219 kvs_data *newkvs = _newkvs.load(memory_order_acquire);
220 int oldlen = oldkvs->_size;
221 int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
223 // Just follow Cliff Click's code here
224 int panic_start = -1;
226 while (_copy_done.load(memory_order_acquire) < oldlen) {
227 copyidx = _copy_idx.load(memory_order_acquire);
228 if (panic_start == -1) { // No painc
229 copyidx = _copy_idx.load(memory_order_acquire);
230 while (copyidx < (oldlen << 1) &&
231 !_copy_idx.compare_exchange_strong(copyidx, copyidx +
232 min_copy_work, memory_order_release, memory_order_release))
233 copyidx = _copy_idx.load(memory_order_relaxed);
234 if (!(copyidx < (oldlen << 1)))
235 panic_start = copyidx;
238 // Now copy the chunk of work we claimed
240 for (int i = 0; i < min_copy_work; i++)
241 if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
245 copy_check_and_promote(topmap, oldkvs, workdone);
247 copyidx += min_copy_work;
248 if (!copy_all && panic_start == -1)
249 return; // We are done with the work we claim
251 copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
254 kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
255 *oldkvs, int idx, void *should_help) {
256 kvs_data *newkvs = _newkvs.load(memory_order_acquire);
257 // We're only here cause the caller saw a Prime
258 if (copy_slot(topmap, idx, oldkvs,
259 _newkvs.load(memory_order_relaxed)))
260 copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
261 return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
264 void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
265 oldkvs, int workdone) {
266 int oldlen = oldkvs->_size;
267 int copyDone = _copy_done.load(memory_order_relaxed);
270 copyDone = _copy_done.load(memory_order_relaxed);
271 if (_copy_done.compare_exchange_weak(copyDone, copyDone +
272 workdone, memory_order_relaxed, memory_order_relaxed))
277 // Promote the new table to the current table
278 if (copyDone + workdone == oldlen &&
279 topmap->_kvs.load(memory_order_acquire) == oldkvs)
280 topmap->_kvs.compare_exchange_strong(oldkvs,
281 _newkvs.load(memory_order_relaxed), memory_order_release,
282 memory_order_release);
285 bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
288 while ((key_slot = key(oldkvs, idx)) == NULL)
289 CAS_key(oldkvs, idx, NULL, TOMBSTONE);
291 // First CAS old to Prime
292 slot *oldval = val(oldkvs, idx);
293 while (!is_prime(oldval)) {
294 slot *box = (oldval == NULL || oldval == TOMBSTONE)
295 ? TOMBPRIME : new slot(true, oldval->_ptr);
296 if (CAS_val(oldkvs, idx, oldval, box)) {
297 if (box == TOMBPRIME)
298 return 1; // Copy done
299 // Otherwise we CAS'd the box
300 oldval = box; // Record updated oldval
303 oldval = val(oldkvs, idx); // Else re-try
306 if (oldval == TOMBPRIME) return false; // Copy already completed here
308 slot *old_unboxed = new slot(false, oldval->_ptr);
309 int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
312 // Old value is exposed in the new table
313 while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
314 oldval = val(oldkvs, idx);
316 return copied_into_new;
323 static const int Default_Init_Size = 8; // Intial table size
325 static slot* const MATCH_ANY;
326 static slot* const NO_MATCH_OLD;
328 static slot* const TOMBPRIME;
329 static slot* const TOMBSTONE;
331 static const int REPROBE_LIMIT = 10; // Forces a table-resize
333 static Hash hashFunc;
334 static KeyEqualsTo keyEqualsTo;
335 static ValEqualsTo valEqualsTo;
337 atomic<kvs_data*> _kvs;
341 // Should initialize the CHM for the construction of the table
342 // For other CHM in kvs_data, they should be initialzed in resize()
343 // because the size is determined dynamically
344 kvs_data *kvs = new kvs_data(Default_Init_Size);
345 void *chm = (void*) new CHM(0);
346 kvs->_data[0].store(chm, memory_order_relaxed);
347 _kvs.store(kvs, memory_order_release);
350 cliffc_hashtable(int init_size) {
351 // Should initialize the CHM for the construction of the table
352 // For other CHM in kvs_data, they should be initialzed in resize()
353 // because the size is determined dynamically
354 kvs_data *kvs = new kvs_data(init_size);
355 void *chm = (void*) new CHM(0);
356 kvs->_data[0].store(chm, memory_order_relaxed);
357 _kvs.store(kvs, memory_order_release);
363 @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
366 @DefineVar: TypeV *_Old_Val = __map.get(key)
368 _equals_val(_Old_Val, __RET__)
371 TypeV* get(TypeK& key) {
372 void *key_ptr = (void*) new TypeK(key);
373 slot *key_slot = new slot(false, key_ptr);
374 int fullhash = hash(key_slot);
375 slot *V = get_impl(this, _kvs.load(memory_order_relaxed), key_slot, fullhash);
376 if (V == NULL) return NULL;
377 assert (!is_prime(V));
378 return (TypeV*) V->_ptr;
384 @Commit_point_set: Write_Val_Point
387 # Remember this old value at checking point
388 @DefineVar: TypeV *_Old_Val = __map.get(key)
389 @Code: __map.put(key, &val);
391 _equals_val(__RET__, _Old_Val)
394 TypeV* put(TypeK& key, TypeV& val) {
395 return putIfMatch(key, val, NO_MATCH_OLD);
400 @Interface: PutIfAbsent
402 Write_Val_Point | PutIfAbsent_Fail_Point
403 @Condition: __map.get(key) == NULL
405 COND_PutIfAbsentSucc :: __RET__ == NULL
408 @DefineVar: TypeV *_Old_Val = __map.get(key)
411 __map.put(key, &value);
413 COND_SAT ? __RET__ == NULL : _equals_val(_Old_Val, __RET__)
416 TypeV* putIfAbsent(TypeK& key, TypeV& value) {
417 return putIfMatch(key, val, TOMBSTONE);
422 @Interface: RemoveAny
423 @Commit_point_set: Write_Val_Point
426 @DefineVar: TypeV *_Old_Val = __map.get(key)
427 @Code: __map.put(key, NULL);
429 _equals_val(__RET__, _Old_Val)
432 TypeV* remove(TypeK& key) {
433 return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
438 @Interface: RemoveIfMatch
440 Write_Val_Point | RemoveIfMatch_Fail_Point
442 _equals_val(__map.get(key), &val)
444 COND_RemoveIfMatchSucc :: __RET__ == true
449 __map.put(key, NULL);
451 COND_SAT ? __RET__ : !__RET__
454 bool remove(TypeK& key, TypeV& val) {
455 slot *val_slot = val == NULL ? NULL : new slot(false, val);
456 return putIfMatch(key, TOMBSTONE, val) == val;
462 @Interface: ReplaceAny
467 @DefineVar: TypeV *_Old_Val = __map.get(key)
469 _equals_val(__RET__, _Old_Val)
472 TypeV* replace(TypeK& key, TypeV& val) {
473 return putIfMatch(key, val, MATCH_ANY);
478 @Interface: ReplaceIfMatch
480 Write_Val_Point | ReplaceIfMatch_Fail_Point
482 _equals_val(__map.get(key), &oldval)
484 COND_ReplaceIfMatchSucc :: __RET__ == true
489 __map.put(key, &newval);
491 COND_SAT ? __RET__ : !__RET__
494 bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
495 return putIfMatch(key, newval, oldval) == oldval;
499 static CHM* get_chm(kvs_data* kvs) {
500 return (CHM*) kvs->_data[0].load(memory_order_relaxed);
503 static int* get_hashes(kvs_data *kvs) {
504 return (int *) kvs->_data[1].load(memory_order_relaxed);
507 // Preserve happens-before semantics on newly inserted keys
508 static inline slot* key(kvs_data *kvs, int idx) {
509 assert (idx >= 0 && idx < kvs->_size);
510 // Corresponding to the volatile read in get_impl() and putIfMatch in
511 // Cliff Click's Java implementation
512 return (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
516 The atomic operation in val() function is a "potential" commit point,
517 which means in some case it is a real commit point while it is not for
518 some other cases. This so happens because the val() function is such a
519 fundamental function that many internal operation will call. Our
520 strategy is that we label any potential commit points and check if they
521 really are the commit points later.
523 // Preserve happens-before semantics on newly inserted values
524 static inline slot* val(kvs_data *kvs, int idx) {
525 assert (idx >= 0 && idx < kvs->_size);
526 // Corresponding to the volatile read in get_impl() and putIfMatch in
527 // Cliff Click's Java implementation
528 slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
531 # This is a complicated potential commit point since many many functions are
533 @Potential_commit_point_define: true
534 @Label: Read_Val_Point
542 static int hash(slot *key_slot) {
543 assert(key_slot != NULL && key_slot->_ptr != NULL);
544 TypeK* key = (TypeK*) key_slot->_ptr;
545 int h = hashFunc(*key);
546 // Spread bits according to Cliff Click's code
547 h += (h << 15) ^ 0xffffcd7d;
551 h += (h << 2) + (h << 14);
552 return h ^ (h >> 16);
555 // Heuristic to decide if reprobed too many times.
556 // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
557 // put it triggers a table resize. Several places MUST have exact agreement.
558 static int reprobe_limit(int len) {
559 return REPROBE_LIMIT + (len >> 2);
562 static inline bool is_prime(slot *val) {
563 return (val != NULL) && val->_prime;
566 // Check for key equality. Try direct pointer comparison first (fast
567 // negative teset) and then the full 'equals' call
568 static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
570 // Caller should've checked this.
572 TypeK* key_ptr = (TypeK*) key_slot->_ptr;
575 ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
577 keyEqualsTo(*key_ptr, *((TypeK*) K->_ptr)));
580 static bool valeq(slot *val_slot1, slot *val_slot2) {
581 assert (val_slot1 != NULL);
582 TypeK* ptr1 = (TypeV*) val_slot1->_ptr;
583 if (val_slot2 == NULL || ptr1 == NULL) return false;
584 return valEqualsTo(*ptr1, *((TypeV*) val_slot2->_ptr));
587 // Together with key() preserve the happens-before relationship on newly
589 static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
590 return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
591 desired, memory_order_release, memory_order_release);
595 Same as the val() function, we only label the CAS operation as the
596 potential commit point.
598 // Together with val() preserve the happens-before relationship on newly
600 static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
602 bool res = kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
603 desired, memory_order_release, memory_order_release);
605 # If it is a successful put instead of a copy or any other internal
606 # operantions, expected != NULL
608 @Potential_commit_point_define: __ATOMIC_RET__ == true
609 @Label: Write_Val_Point
615 slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
617 int len = kvs->_size;
618 CHM *chm = get_chm(kvs);
619 int *hashes = get_hashes(kvs);
621 int idx = fullhash & (len - 1);
624 slot *K = key(kvs, idx);
625 slot *V = val(kvs, idx);
628 @Commit_point_define: V == NULL
629 @Potential_commit_point_label: Read_Val_Point
630 @Label: Get_Success_Point_1
634 if (V == NULL) return NULL; // A miss
636 if (keyeq(K, key_slot, hashes, idx, fullhash)) {
637 // Key hit! Check if table-resize in progress
641 @Commit_point_define: true
642 @Potential_commit_point_label: Read_Val_Point
643 @Label: Get_Success_Point_2
646 return (V == TOMBSTONE) ? NULL : V; // Return this value
648 // Otherwise, finish the copy & retry in the new table
649 return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
650 idx, key_slot), key_slot, fullhash);
653 if (++reprobe_cnt >= REPROBE_LIMIT ||
654 key_slot == TOMBSTONE) {
655 // Retry in new table
656 // Atomic read (acquire) can be here
657 kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
660 @Commit_point_define_check: newkvs == NULL
661 @Label: Get_Success_Point_3
664 return newkvs == NULL ? NULL : get_impl(topmap,
665 topmap->help_copy(newkvs), key_slot, fullhash);
668 idx = (idx + 1) & (len - 1); // Reprobe by 1
672 // A wrapper of the essential function putIfMatch()
673 TypeV* putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
674 // TODO: Should throw an exception rather return NULL
675 if (old_val == NULL) {
678 void *key_ptr = (void*) new TypeK(key);
679 slot *key_slot = new slot(false, key_ptr);
681 void *val_ptr = (void*) new TypeV(value);
682 slot *value_slot = new slot(false, val_ptr);
683 slot *res = putIfMatch(this, _kvs.load(memory_order_relaxed), key_slot,
684 value_slot, old_val);
685 // Only when copy_slot() call putIfMatch() will it return NULL
686 assert (res != NULL);
687 assert (!is_prime(res));
688 return res == TOMBSTONE ? NULL : (TypeV*) res->_ptr;
692 Put, Remove, PutIfAbsent, etc will call this function. Return the old
693 value. If the returned value is equals to the expVal (or expVal is
694 NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
695 Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
696 NULL if passed a NULL expVal.
698 static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
699 *key_slot, slot *val_slot, slot *expVal) {
700 assert (val_slot != NULL);
701 assert (!is_prime(val_slot));
702 assert (!is_prime(expVal));
704 int fullhash = hash(key_slot);
705 int len = kvs->_size;
706 CHM *chm = get_chm(kvs);
707 int *hashes = get_hashes(kvs);
708 int idx = fullhash & (len - 1);
716 while (true) { // Spin till we get a key slot
719 if (K == NULL) { // Get a free slot
720 if (val_slot == TOMBSTONE) return val_slot;
721 // Claim the null key-slot
722 if (CAS_key(kvs, idx, NULL, key_slot)) {
723 chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
724 hashes[idx] = fullhash; // Memorize full hash
727 K = key(kvs, idx); // CAS failed, get updated value
731 // Key slot not null, there exists a Key here
732 if (keyeq(K, key_slot, hashes, idx, fullhash))
735 // Notice that the logic here should be consistent with that of get.
736 // The first predicate means too many reprobes means nothing in the
738 if (++reprobe_cnt >= reprobe_limit(len) ||
739 K == TOMBSTONE) { // Found a Tombstone key, no more keys
740 newkvs = chm->resize(topmap, kvs);
741 // Help along an existing copy
742 if (expVal != NULL) topmap->help_copy(newkvs);
743 return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
746 idx = (idx + 1) & (len - 1); // Reprobe
747 } // End of spinning till we get a Key slot
749 if (val_slot == V) return V; // Fast cutout for no-change
751 // Here it tries to resize cause it doesn't want other threads to stop
752 // its progress (eagerly try to resize soon)
753 newkvs = chm->_newkvs.load(memory_order_acquire);
754 if (newkvs == NULL &&
755 ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
756 newkvs = chm->resize(topmap, kvs); // Force the copy to start
758 // Finish the copy and then put it in the new table
760 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
761 expVal), key_slot, val_slot, expVal);
763 // Decided to update the existing table
765 assert (!is_prime(V));
767 if (expVal != NO_MATCH_OLD &&
769 (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
770 !(V == NULL && expVal == TOMBSTONE) &&
771 (expVal == NULL || !valeq(expVal, V))) {
774 @Commit_point_define: expVal == TOMBSTONE
775 @Potential_commit_point_label: Read_Val_Point
776 @Label: PutIfAbsent_Fail_Point
777 # This is a check for the PutIfAbsent() when the value
783 @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
784 @Potential_commit_point_label: Read_Val_Point
785 @Label: RemoveIfMatch_Fail_Point
790 @Commit_point_define: !valeq(expVal, V)
791 @Potential_commit_point_label: Read_Val_Point
792 @Label: ReplaceIfMatch_Fail_Point
795 return V; // Do not update!
798 if (CAS_val(kvs, idx, V, val_slot)) {
801 # The only point where a successful put happens
802 @Commit_point_define: true
803 @Potential_commit_point_label: Write_Val_Point
804 @Label: Write_Success_Point
807 if (expVal != NULL) { // Not called by a table-copy
808 // CAS succeeded, should adjust size
809 // Both normal put's and table-copy calls putIfMatch, but
810 // table-copy does not increase the number of live K/V pairs
811 if ((V == NULL || V == TOMBSTONE) &&
812 val_slot != TOMBSTONE)
813 chm->_size.fetch_add(1, memory_order_relaxed);
814 if (!(V == NULL || V == TOMBSTONE) &&
815 val_slot == TOMBSTONE)
816 chm->_size.fetch_add(-1, memory_order_relaxed);
818 return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
823 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
824 idx, expVal), key_slot, val_slot, expVal);
828 // Help along an existing table-resize. This is a fast cut-out wrapper.
829 kvs_data* help_copy(kvs_data *helper) {
830 kvs_data *topkvs = _kvs.load(memory_order_acquire);
831 CHM *topchm = get_chm(topkvs);
832 // No cpy in progress
833 if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
834 topchm->help_copy_impl(this, topkvs, false);