+++ /dev/null
-#ifndef SIMPLIFIED_CLIFFC_HASHTABLE_H
-#define SIMPLIFIED_CLIFFC_HASHTABLE_H
-
-#include <iostream>
-#include <atomic>
-#include <memory>
-#include <assert.h>
-
-using namespace std;
-
-
-
-/**
- This header file declares and defines a simplified version of Cliff Click's
- NonblockingHashMap. It contains all the necessary structrues and main
- functions. In simplified_cliffc_hashtable.cc file, it has the definition for
- the static fields.
-*/
-
-template<typename TypeK, typename TypeV>
-class cliffc_hashtable;
-
-/**
- Corresponding the the Object[] array in Cliff Click's Java implementation.
- It keeps the first two slots for CHM (Hashtable control unit) and the hash
- records (an array of hash used for fast negative key-equality check).
-*/
-struct kvs_data {
- int _size;
- atomic<void*> *_data;
-
- kvs_data(int sz) {
- _size = sz;
- int real_size = sizeof(atomic<void*>) * 2 + 2;
- _data = new atomic<void*>[real_size];
- // The control block should be initialized in resize()
- // Init the hash record array
- int *hashes = new int[_size];
- int i;
- for (i = 0; i < _size; i++) {
- hashes[i] = 0;
- }
- _data[1].store(hashes, memory_order_relaxed);
- // Init the data to Null slot
- for (i = 2; i < real_size; i++) {
- _data[i].store(NULL, memory_order_relaxed);
- }
- }
-
- ~kvs_data() {
- int *hashes = (int*) _data[1].load(memory_order_relaxed);
- delete hashes;
- delete[] _data;
- }
-};
-
-struct slot {
- bool _prime;
- shared_ptr<void> _ptr;
-
- slot(bool prime, shared_ptr<void> ptr) {
- _prime = prime;
- _ptr = ptr;
- }
-
-};
-
-
-/**
- TypeK must have defined function "int hashCode()" which return the hash
- code for the its object, and "int equals(TypeK anotherKey)" which is
- used to judge equality.
- TypeK and TypeV should define their own copy constructor.
- To make the memory management safe and similar to Cliff Click's Java
- implementation, we use shared_ptr instead of normal pointer in terms of the
- pointers that point to TypeK and TypeV.
-*/
-template<typename TypeK, typename TypeV>
-class cliffc_hashtable {
- /**
- # The synchronization we have for the hashtable gives us the property of
- # serializability, so we should have a sequential hashtable when we check the
- # correctness. The key thing is to identify all the commit point.
-
- @Begin
- @Options:
- LANG = C;
- CLASS = cliffc_hashtable;
- @Global_define:
- @DeclareVar:
- spec_hashtable<TypeK, TypeV*> map;
- spec_hashtable<TypeK, Tag> id_map;
- Tag tag;
- @InitVar:
- map = spec_hashtable<TypeK, TypeV*>();
- id_map = spec_hashtable<TypeK, TypeV*>();
- tag = Tag();
- @DefineFunc:
- bool equals_val(TypeV *ptr1, TypeV *ptr2) {
- // ...
- }
-
- @DefineFunc:
- # Update the tag for the current key slot if the corresponding tag
- # is NULL, otherwise just return that tag. It will update the next
- # available tag too if it requires a new tag for that key slot.
- Tag getKeyTag(TypeK &key) {
- if (id_map.get(key) == NULL) {
- Tag cur_tag = tag.current();
- id_map.put(key, cur_tag);
- tag.next();
- return cur_tag;
- } else {
- return id_map.get(key);
- }
- }
-
- @Interface_cluster:
- Read_interface = {
- Get,
- PutIfAbsent,
- RemoveAny,
- RemoveIfMatch,
- ReplaceAny,
- ReplaceIfMatch
- }
-
- Write_interface = {
- Put,
- PutIfAbsent(COND_PutIfAbsentSucc),
- RemoveAny,
- RemoveIfMatch(COND_RemoveIfMatchSucc),
- ReplaceAny,
- ReplaceIfMatch(COND_ReplaceIfMatchSucc)
- }
- @Happens_before:
- Write_interface -> Read_interface
- @End
- */
-
-friend class CHM;
- /**
- The control structure for the hashtable
- */
- private:
- class CHM {
- friend class cliffc_hashtable;
- private:
- atomic<kvs_data*> _newkvs;
-
- // Size of active K,V pairs
- atomic_int _size;
-
- // Count of used slots
- atomic_int _slots;
-
- // The next part of the table to copy
- atomic_int _copy_idx;
-
- // Work-done reporting
- atomic_int _copy_done;
-
- public:
- CHM(int size) {
- _size.store(size, memory_order_relaxed);
- _slots.store(0, memory_order_relaxed);
-
- _copy_idx.store(0, memory_order_relaxed);
- _copy_done.store(0, memory_order_release);
- }
-
- ~CHM() {}
-
- private:
-
- // Heuristic to decide if the table is too full
- bool table_full(int reprobe_cnt, int len) {
- return
- reprobe_cnt >= REPROBE_LIMIT &&
- _slots.load(memory_order_relaxed) >= reprobe_limit(len);
- }
-
- kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
- kvs_data *newkvs = _newkvs.load(memory_order_acquire);
- if (newkvs != NULL)
- return newkvs;
-
- // No copy in-progress, start one; Only double the table size
- int oldlen = kvs->_size;
- int sz = _size.load(memory_order_relaxed);
- int newsz = sz;
-
- // Just follow Cliff Click's heuristic to decide the new size
- if (sz >= (oldlen >> 2)) { // If we are 25% full
- newsz = oldlen << 1; // Double size
- if (sz >= (oldlen >> 1))
- newsz = oldlen << 2; // Double double size
- }
-
- // We do not record the record timestamp
- if (newsz <= oldlen) newsz = oldlen << 1;
- // Do not shrink ever
- if (newsz < oldlen) newsz = oldlen;
-
- // Last check cause the 'new' below is expensive
- newkvs = _newkvs.load(memory_order_acquire);
- if (newkvs != NULL) return newkvs;
-
- newkvs = new kvs_data(newsz);
- void *chm = (void*) new CHM(sz);
- newkvs->_data[0].store(chm, memory_order_relaxed);
-
- kvs_data *cur_newkvs;
- // Another check after the slow allocation
- if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
- return cur_newkvs;
- // CAS the _newkvs to the allocated table
- kvs_data *desired = (kvs_data*) NULL;
- kvs_data *expected = (kvs_data*) newkvs;
- if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
- memory_order_release)) {
- // Should clean the allocated area
- delete newkvs;
- newkvs = _newkvs.load(memory_order_acquire);
- }
- return newkvs;
- }
-
- void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
- bool copy_all) {
- assert (get_chm(oldkvs) == this);
- kvs_data *newkvs = _newkvs.load(memory_order_acquire);
- int oldlen = oldkvs->_size;
- int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
-
- // Just follow Cliff Click's code here
- int panic_start = -1;
- int copyidx;
- while (_copy_done.load(memory_order_acquire) < oldlen) {
- copyidx = _copy_idx.load(memory_order_acquire);
- if (panic_start == -1) { // No painc
- copyidx = _copy_idx.load(memory_order_acquire);
- while (copyidx < (oldlen << 1) &&
- !_copy_idx.compare_exchange_strong(copyidx, copyidx +
- min_copy_work, memory_order_release, memory_order_release))
- copyidx = _copy_idx.load(memory_order_relaxed);
- if (!(copyidx < (oldlen << 1)))
- panic_start = copyidx;
- }
-
- // Now copy the chunk of work we claimed
- int workdone = 0;
- for (int i = 0; i < min_copy_work; i++)
- if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
- newkvs))
- workdone++;
- if (workdone > 0)
- copy_check_and_promote(topmap, oldkvs, workdone);
-
- copyidx += min_copy_work;
- if (!copy_all && panic_start == -1)
- return; // We are done with the work we claim
- }
- copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
- }
-
- kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
- *oldkvs, int idx, void *should_help) {
- kvs_data *newkvs = _newkvs.load(memory_order_acquire);
- // We're only here cause the caller saw a Prime
- if (copy_slot(topmap, idx, oldkvs, _newkvs))
- copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
- return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
- }
-
- void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
- oldkvs, int workdone) {
- int oldlen = oldkvs->_size;
- int copyDone = _copy_done.load(memory_order_relaxed);
- if (workdone > 0) {
- while (true) {
- copyDone = _copy_done.load(memory_order_relaxed);
- if (_copy_done.compare_exchange_weak(copyDone, copyDone +
- workdone, memory_order_relaxed, memory_order_relaxed))
- break;
- }
- }
-
- // Promote the new table to the current table
- if (copyDone + workdone == oldlen &&
- topmap->_kvs.load(memory_order_acquire) == oldkvs)
- topmap->_kvs.compare_exchange_strong(oldkvs, _newkvs, memory_order_release,
- memory_order_release);
- }
-
- bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
- kvs_data *newkvs) {
- slot *key_slot;
- while ((key_slot = key(oldkvs, idx)) == NULL)
- CAS_key(oldkvs, idx, NULL, TOMBSTONE);
-
- // First CAS old to Prime
- slot *oldval = val(oldkvs, idx, NULL);
- while (!is_prime(oldval)) {
- slot *box = (oldval == NULL || oldval == TOMBSTONE)
- ? TOMBPRIME : new slot(true, oldval->_ptr);
- if (CAS_val(oldkvs, idx, oldval, box)) {
- if (box == TOMBPRIME)
- return 1; // Copy done
- // Otherwise we CAS'd the box
- oldval = box; // Record updated oldval
- break;
- }
- oldval = val(oldkvs, idx, NULL); // Else re-try
- }
-
- if (oldval == TOMBPRIME) return false; // Copy already completed here
-
- slot *old_unboxed = new slot(false, oldval->_ptr);
- int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
- NULL) == NULL);
-
- // Old value is exposed in the new table
- while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
- oldval = val(oldkvs, idx, NULL);
-
- return copied_into_new;
- }
- };
-
-
-
- private:
- static const int Default_Init_Size = 8; // Intial table size
-
- static slot* const MATCH_ANY;
- static slot* const NO_MATCH_OLD;
-
- static slot* const TOMBPRIME;
- static slot* const TOMBSTONE;
-
- static const int REPROBE_LIMIT = 10; // Forces a table-resize
-
- atomic<kvs_data*> _kvs;
-
- public:
- cliffc_hashtable() {
- // Should initialize the CHM for the construction of the table
- // For other CHM in kvs_data, they should be initialzed in resize()
- // because the size is determined dynamically
- kvs_data *kvs = new kvs_data(Default_Init_Size);
- void *chm = (void*) new CHM(0);
- kvs->_data[0].store(chm, memory_order_relaxed);
- _kvs.store(kvs, memory_order_release);
- }
-
- cliffc_hashtable(int init_size) {
- // Should initialize the CHM for the construction of the table
- // For other CHM in kvs_data, they should be initialzed in resize()
- // because the size is determined dynamically
- kvs_data *kvs = new kvs_data(init_size);
- void *chm = (void*) new CHM(0);
- kvs->_data[0].store(chm, memory_order_relaxed);
- _kvs.store(kvs, memory_order_release);
- }
-
- /**
- @Begin
- @Interface: Get
- @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
- @ID: __sequential.getKeyTag(key)
- @Action:
- TypeV *_Old_Val = __sequential.map.get(key)
- @Post_check:
- __sequential.equals_val(_Old_Val, __RET__)
- @End
- */
- shared_ptr<TypeV> get(TypeK& key) {
- void *key_ptr = (void*) new TypeK(key);
- slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
- int fullhash = hash(key_slot);
- slot *V = get_impl(this, _kvs, key_slot, fullhash);
- if (V == NULL) return NULL;
- assert (!is_prime(V));
- return static_pointer_cast<TypeV>(V->_ptr);
- }
-
- /**
- @Begin
- @Interface: Put
- @Commit_point_set: Write_Val_Point
- @ID: __sequential.getKeyTag(key)
- @Action:
- # Remember this old value at checking point
- TypeV *_Old_Val = __sequential.map.get(key)
- __sequential.map.put(key, &val);
- @Post_check:
- __sequential.equals_val(__RET__, _Old_Val)
- @End
- */
- shared_ptr<TypeV> put(TypeK& key, TypeV& val) {
- return putIfMatch(key, val, NO_MATCH_OLD);
- }
-
- /**
- @Begin
- @Interface: PutIfAbsent
- @Commit_point_set:
- Write_Val_Point | PutIfAbsent_Fail_Point
- @Condition: __sequential.map.get(key) == NULL
- @HB_condition:
- COND_PutIfAbsentSucc :: __RET__ == NULL
- @ID: __sequential.getKeyTag(key)
- @Action:
- TypeV *_Old_Val = __sequential.map.get(key)
- if (__COND_SAT__)
- __sequential.map.put(key, &value);
- @Post_check:
- __COND_SAT__ ? __RET__ == NULL : __sequential.equals_val(_Old_Val, __RET__)
- @End
- */
- shared_ptr<TypeV> putIfAbsent(TypeK& key, TypeV& value) {
- return putIfMatch(key, val, TOMBSTONE);
- }
-
- /**
- @Begin
- @Interface: RemoveAny
- @Commit_point_set: Write_Val_Point
- @ID: __sequential.getKeyTag(key)
- @Action:
- TypeV *_Old_Val = __sequential.map.get(key)
- __sequential.map.put(key, NULL);
- @Post_check:
- __sequential.equals_val(__RET__, _Old_Val)
- @End
- */
- shared_ptr<TypeV> remove(TypeK& key) {
- return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
- }
-
- /**
- @Begin
- @Interface: RemoveIfMatch
- @Commit_point_set:
- Write_Val_Point | RemoveIfMatch_Fail_Point
- @Condition:
- __sequential.equals_val(__sequential.map.get(key), &val)
- @HB_condition:
- COND_RemoveIfMatchSucc :: __RET__ == true
- @ID: __sequential.getKeyTag(key)
- @Action:
- if (__COND_SAT__)
- __sequential.map.put(key, NULL);
- @Post_check:
- __COND_SAT__ ? __RET__ : !__RET__
- @End
- */
- bool remove(TypeK& key, TypeV& val) {
- slot *val_slot = val == NULL ? NULL : new slot(false, val);
- return putIfMatch(key, TOMBSTONE, val) == val;
-
- }
-
- /**
- @Begin
- @Interface: ReplaceAny
- @Commit_point_set:
- Write_Val_Point
- @ID: __sequential.getKeyTag(key)
- @Action:
- TypeV *_Old_Val = __sequential.map.get(key)
- @Post_check:
- __sequential.equals_val(__RET__, _Old_Val)
- @End
- */
- shared_ptr<TypeV> replace(TypeK& key, TypeV& val) {
- return putIfMatch(key, val, MATCH_ANY);
- }
-
- /**
- @Begin
- @Interface: ReplaceIfMatch
- @Commit_point_set:
- Write_Val_Point | ReplaceIfMatch_Fail_Point
- @Condition:
- __sequential.equals_val(__sequential.map.get(key), &oldval)
- @HB_condition:
- COND_ReplaceIfMatchSucc :: __RET__ == true
- @ID: __sequential.getKeyTag(key)
- @Action:
- if (__COND_SAT__)
- __sequential.map.put(key, &newval);
- @Post_check:
- __COND_SAT__ ? __RET__ : !__RET__
- @End
- */
- bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
- return putIfMatch(key, newval, oldval) == oldval;
- }
-
- private:
- static CHM* get_chm(kvs_data* kvs) {
- return (CHM*) kvs->_data[0].load(memory_order_relaxed);
- }
-
- static int* get_hashes(kvs_data *kvs) {
- return (int *) kvs->_data[1].load(memory_order_relaxed);
- }
-
- // Preserve happens-before semantics on newly inserted keys
- static inline slot* key(kvs_data *kvs, int idx) {
- assert (idx >= 0 && idx < kvs->_size);
- // Corresponding to the volatile read in get_impl() and putIfMatch in
- // Cliff Click's Java implementation
- return (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
- }
-
- /**
- The atomic operation in val() function is a "potential" commit point,
- which means in some case it is a real commit point while it is not for
- some other cases. This so happens because the val() function is such a
- fundamental function that many internal operation will call. Our
- strategy is that we label any potential commit points and check if they
- really are the commit points later.
- */
- // Preserve happens-before semantics on newly inserted values
- static inline slot* val(kvs_data *kvs, int idx) {
- assert (idx >= 0 && idx < kvs->_size);
- // Corresponding to the volatile read in get_impl() and putIfMatch in
- // Cliff Click's Java implementation
- slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
- /**
- @Begin
- # This is a complicated potential commit point since many many functions are
- # calling val().
- @Potential_commit_point_define: true
- @Label: Read_Val_Point
- @End
- */
- return res;
-
-
- }
-
- static int hash(slot *key_slot) {
- assert(key_slot != NULL && key_slot->_ptr != NULL);
- shared_ptr<TypeK> key = static_pointer_cast<TypeK>(key_slot->_ptr);
- int h = key->hashCode();
- // Spread bits according to Cliff Click's code
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >> 10);
- h += (h << 3);
- h ^= (h >> 6);
- h += (h << 2) + (h << 14);
- return h ^ (h >> 16);
- }
-
- // Heuristic to decide if reprobed too many times.
- // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
- // put it triggers a table resize. Several places MUST have exact agreement.
- static int reprobe_limit(int len) {
- return REPROBE_LIMIT + (len >> 2);
- }
-
- static inline bool is_prime(slot *val) {
- return (val != NULL) && val->_prime;
- }
-
- // Check for key equality. Try direct pointer comparison first (fast
- // negative teset) and then the full 'equals' call
- static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
- int fullhash) {
- // Caller should've checked this.
- assert (K != NULL);
- shared_ptr<TypeK> key_ptr = static_pointer_cast<TypeK>(key_slot->_ptr);
- return
- K == key_slot ||
- ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
- K != TOMBSTONE &&
- key_ptr->equals(K->_ptr));
- }
-
- static bool valeq(slot *val_slot1, slot *val_slot2) {
- assert (val_slot1 != NULL);
- shared_ptr<TypeK> ptr1 = static_pointer_cast<TypeV>(val_slot1->_ptr);
- if (val_slot2 == NULL || ptr1 == NULL) return false;
- return ptr1->equals(val_slot2->_ptr);
- }
-
- // Together with key() preserve the happens-before relationship on newly
- // inserted keys
- static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
- return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
- desired, memory_order_release, memory_order_release);
- }
-
- /**
- Same as the val() function, we only label the CAS operation as the
- potential commit point.
- */
- // Together with val() preserve the happens-before relationship on newly
- // inserted values
- static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
- *desired) {
- bool res = kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
- desired, memory_order_release, memory_order_release);
- /**
- # If it is a successful put instead of a copy or any other internal
- # operantions, expected != NULL
- @Begin
- @Potential_commit_point_define: __ATOMIC_RET__ == true
- @Label: Write_Val_Point
- @End
- */
- return res;
- }
-
- slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
- fullhash) {
- int len = kvs->_size;
- CHM *chm = get_chm(kvs);
- int *hashes = get_hashes(kvs);
-
- int idx = fullhash & (len - 1);
- int reprobe_cnt = 0;
- while (true) {
- slot *K = key(kvs, idx);
- slot *V = val(kvs, idx);
- /**
- @Begin
- @Commit_point_define: V == NULL
- @Potential_commit_point_label: Read_Val_Point
- @Label: Get_Success_Point_1
- @End
- */
-
- if (V == NULL) return NULL; // A miss
-
- if (keyeq(K, key_slot, hashes, idx, fullhash)) {
- // Key hit! Check if table-resize in progress
- if (!is_prime(V)) {
- /**
- @Begin
- @Commit_point_define: true
- @Potential_commit_point_label: Read_Val_Point
- @Label: Get_Success_Point_2
- @End
- */
- return (V == TOMBSTONE) ? NULL : V; // Return this value
- }
- // Otherwise, finish the copy & retry in the new table
- return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
- idx, key_slot), key_slot, fullhash);
- }
-
- if (++reprobe_cnt >= REPROBE_LIMIT ||
- key_slot == TOMBSTONE) {
- // Retry in new table
- // Atomic read (acquire) can be here
- kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
- /**
- @Begin
- @Commit_point_define_check: newkvs == NULL
- @Label: Get_Success_Point_3
- @End
- */
- return newkvs == NULL ? NULL : get_impl(topmap,
- topmap->help_copy(newkvs), key_slot, fullhash);
- }
-
- idx = (idx + 1) & (len - 1); // Reprobe by 1
- }
- }
-
- // A wrapper of the essential function putIfMatch()
- shared_ptr<TypeV> putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
- // TODO: Should throw an exception rather return NULL
- if (old_val == NULL) {
- return NULL;
- }
- void *key_ptr = (void*) new TypeK(key);
- slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
-
- void *val_ptr = (void*) new TypeV(value);
- slot *value_slot = new slot(false, shared_ptr<void>(val_ptr));
- slot *res = putIfMatch(this, _kvs, key_slot, value_slot, old_val);
- // Only when copy_slot() call putIfMatch() will it return NULL
- assert (res != NULL);
- assert (!is_prime(res));
- return res == TOMBSTONE ? NULL : static_pointer_cast<TypeV>(res->_ptr);
- }
-
- /**
- Put, Remove, PutIfAbsent, etc will call this function. Return the old
- value. If the returned value is equals to the expVal (or expVal is
- NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
- Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
- NULL if passed a NULL expVal.
- */
- static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
- *key_slot, slot *val_slot, slot *expVal) {
- assert (val_slot != NULL);
- assert (!is_prime(val_slot));
- assert (!is_prime(expVal));
-
- int fullhash = hash(key_slot);
- int len = kvs->_size;
- CHM *chm = get_chm(kvs);
- int *hashes = get_hashes(kvs);
- int idx = fullhash & (len - 1);
-
- // Claim a key slot
- int reprobe_cnt = 0;
- slot *K;
- slot *V;
- kvs_data *newkvs;
-
- while (true) { // Spin till we get a key slot
- K = key(kvs, idx);
- V = val(kvs, idx, NULL);
- if (K == NULL) { // Get a free slot
- if (val_slot == TOMBSTONE) return val_slot;
- // Claim the null key-slot
- if (CAS_key(kvs, idx, NULL, key_slot)) {
- chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
- hashes[idx] = fullhash; // Memorize full hash
- break;
- }
- K = key(kvs, idx); // CAS failed, get updated value
- assert (K != NULL);
- }
-
- // Key slot not null, there exists a Key here
- if (keyeq(K, key_slot, hashes, idx, fullhash))
- break; // Got it
-
- // Notice that the logic here should be consistent with that of get.
- // The first predicate means too many reprobes means nothing in the
- // old table.
- if (++reprobe_cnt >= reprobe_limit(len) ||
- K == TOMBSTONE) { // Found a Tombstone key, no more keys
- newkvs = chm->resize(topmap, kvs);
- // Help along an existing copy
- if (expVal != NULL) topmap->help_copy(newkvs);
- return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
- }
-
- idx = (idx + 1) & (len - 1); // Reprobe
- } // End of spinning till we get a Key slot
-
- if (val_slot == V) return V; // Fast cutout for no-change
-
- // Here it tries to resize cause it doesn't want other threads to stop
- // its progress (eagerly try to resize soon)
- newkvs = chm->_newkvs.load(memory_order_acquire);
- if (newkvs == NULL &&
- ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
- newkvs = chm->resize(topmap, kvs); // Force the copy to start
-
- // Finish the copy and then put it in the new table
- if (newkvs != NULL)
- return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
- expVal), key_slot, val_slot, expVal);
-
- // Decided to update the existing table
- while (true) {
- assert (!is_prime(V));
-
- if (expVal != NO_MATCH_OLD &&
- V != expVal &&
- (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
- !(V == NULL && expVal == TOMBSTONE) &&
- (expVal == NULL || !valeq(expVal, V))) {
- /**
- @Begin
- @Commit_point_define: expVal == TOMBSTONE
- @Potential_commit_point_label: Read_Val_Point
- @Label: PutIfAbsent_Fail_Point
- # This is a check for the PutIfAbsent() when the value
- # is not absent
- @End
- */
- /**
- @Begin
- @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
- @Potential_commit_point_label: Read_Val_Point
- @Label: RemoveIfMatch_Fail_Point
- @End
- */
- /**
- @Begin
- @Commit_point_define: !valeq(expVal, V)
- @Potential_commit_point_label: Read_Val_Point
- @Label: ReplaceIfMatch_Fail_Point
- @End
- */
- return V; // Do not update!
- }
-
- if (CAS_val(kvs, idx, V, val_slot)) {
- /**
- @Begin
- # The only point where a successful put happens
- @Commit_point_define: true
- @Potential_commit_point_label: Write_Val_Point
- @Label: Write_Success_Point
- @End
- */
- if (expVal != NULL) { // Not called by a table-copy
- // CAS succeeded, should adjust size
- // Both normal put's and table-copy calls putIfMatch, but
- // table-copy does not increase the number of live K/V pairs
- if ((V == NULL || V == TOMBSTONE) &&
- val_slot != TOMBSTONE)
- chm->_size.fetch_add(1, memory_order_relaxed);
- if (!(V == NULL || V == TOMBSTONE) &&
- val_slot == TOMBSTONE)
- chm->_size.fetch_add(-1, memory_order_relaxed);
- }
- return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
- }
- // Else CAS failed
- V = val(kvs, idx, NULL);
- if (is_prime(V))
- return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
- idx, expVal), key_slot, val_slot, expVal);
- }
- }
-
- // Help along an existing table-resize. This is a fast cut-out wrapper.
- kvs_data* help_copy(kvs_data *helper) {
- kvs_data *topkvs = _kvs.load(memory_order_acquire);
- CHM *topchm = get_chm(topkvs);
- // No cpy in progress
- if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
- topchm->help_copy_impl(this, topkvs, false);
- return helper;
- }
-};
-
-#endif