1 #ifndef _NDB_TXN_IMPL_H_
2 #define _NDB_TXN_IMPL_H_
9 template <template <typename> class Protocol, typename Traits>
10 transaction<Protocol, Traits>::transaction(uint64_t flags, string_allocator_type &sa)
11 : transaction_base(flags), sa(&sa)
13 INVARIANT(rcu::s_instance.in_rcu_region());
14 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
15 concurrent_btree::NodeLockRegionBegin();
19 template <template <typename> class Protocol, typename Traits>
20 transaction<Protocol, Traits>::~transaction()
22 // transaction shouldn't fall out of scope w/o resolution
23 // resolution means TXN_EMBRYO, TXN_COMMITED, and TXN_ABRT
24 INVARIANT(state != TXN_ACTIVE);
25 INVARIANT(rcu::s_instance.in_rcu_region());
26 const unsigned cur_depth = rcu_guard_->sync()->depth();
29 INVARIANT(!rcu::s_instance.in_rcu_region());
30 cast()->on_post_rcu_region_completion();
32 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
33 concurrent_btree::AssertAllNodeLocksReleased();
37 template <template <typename> class Protocol, typename Traits>
39 transaction<Protocol, Traits>::clear()
41 // it's actually *more* efficient to not call clear explicitly on the
42 // read/write/absent sets, and let the destructors do the clearing- this is
43 // because the destructors can take shortcuts since it knows the obj doesn't
44 // have to end in a valid state
47 template <template <typename> class Protocol, typename Traits>
49 transaction<Protocol, Traits>::abort_impl(abort_reason reason)
59 throw transaction_unusable_exception();
62 this->reason = reason;
64 // on abort, we need to go over all insert nodes and
66 typename write_set_map::iterator it = write_set.begin();
67 typename write_set_map::iterator it_end = write_set.end();
68 for (; it != it_end; ++it) {
69 dbtuple * const tuple = it->get_tuple();
70 if (it->is_insert()) {
71 INVARIANT(tuple->is_locked());
72 this->cleanup_inserted_tuple_marker(tuple, it->get_key(), it->get_btree());
80 template <template <typename> class Protocol, typename Traits>
82 transaction<Protocol, Traits>::cleanup_inserted_tuple_marker(
83 dbtuple *marker, const std::string &key, concurrent_btree *btr)
85 // XXX: this code should really live in txn_proto2_impl.h
86 INVARIANT(marker->version == dbtuple::MAX_TID);
87 INVARIANT(marker->is_locked());
88 INVARIANT(marker->is_lock_owner());
89 typename concurrent_btree::value_type removed = 0;
90 const bool did_remove = btr->remove(varkey(key), &removed);
91 if (unlikely(!did_remove)) {
92 #ifdef CHECK_INVARIANTS
93 std::cerr << " *** could not remove key: " << util::hexify(key) << std::endl;
94 #ifdef TUPLE_CHECK_KEY
95 std::cerr << " *** original key : " << util::hexify(marker->key) << std::endl;
100 INVARIANT(removed == (typename concurrent_btree::value_type) marker);
101 INVARIANT(marker->is_latest());
102 marker->clear_latest();
103 dbtuple::release(marker); // rcu free
108 transaction_state_to_cstr(transaction_base::txn_state state)
111 case transaction_base::TXN_EMBRYO: return "TXN_EMBRYO";
112 case transaction_base::TXN_ACTIVE: return "TXN_ACTIVE";
113 case transaction_base::TXN_ABRT: return "TXN_ABRT";
114 case transaction_base::TXN_COMMITED: return "TXN_COMMITED";
116 ALWAYS_ASSERT(false);
121 transaction_flags_to_str(uint64_t flags)
124 std::ostringstream oss;
125 if (flags & transaction_base::TXN_FLAG_LOW_LEVEL_SCAN) {
126 oss << "TXN_FLAG_LOW_LEVEL_SCAN";
129 if (flags & transaction_base::TXN_FLAG_READ_ONLY) {
131 oss << "TXN_FLAG_READ_ONLY";
133 oss << " | TXN_FLAG_READ_ONLY";
140 template <template <typename> class Protocol, typename Traits>
142 transaction<Protocol, Traits>::dump_debug_info() const
144 std::cerr << "Transaction (obj=" << util::hexify(this) << ") -- state "
145 << transaction_state_to_cstr(state) << std::endl;
146 std::cerr << " Abort Reason: " << AbortReasonStr(reason) << std::endl;
147 std::cerr << " Flags: " << transaction_flags_to_str(flags) << std::endl;
148 std::cerr << " Read/Write sets:" << std::endl;
150 std::cerr << " === Read Set ===" << std::endl;
152 for (typename read_set_map::const_iterator rs_it = read_set.begin();
153 rs_it != read_set.end(); ++rs_it)
154 std::cerr << *rs_it << std::endl;
156 std::cerr << " === Write Set ===" << std::endl;
158 for (typename write_set_map::const_iterator ws_it = write_set.begin();
159 ws_it != write_set.end(); ++ws_it)
160 std::cerr << *ws_it << std::endl;
162 std::cerr << " === Absent Set ===" << std::endl;
164 for (typename absent_set_map::const_iterator as_it = absent_set.begin();
165 as_it != absent_set.end(); ++as_it)
166 std::cerr << " B-tree Node " << util::hexify(as_it->first)
167 << " : " << as_it->second << std::endl;
171 template <template <typename> class Protocol, typename Traits>
172 std::map<std::string, uint64_t>
173 transaction<Protocol, Traits>::get_txn_counters() const
175 std::map<std::string, uint64_t> ret;
178 ret["read_set_size"] = read_set.size();;
179 ret["read_set_is_large?"] = !read_set.is_small_type();
181 // max_absent_set_size
182 ret["absent_set_size"] = absent_set.size();
183 ret["absent_set_is_large?"] = !absent_set.is_small_type();
185 // max_write_set_size
186 ret["write_set_size"] = write_set.size();
187 ret["write_set_is_large?"] = !write_set.is_small_type();
192 template <template <typename> class Protocol, typename Traits>
194 transaction<Protocol, Traits>::handle_last_tuple_in_group(
195 dbtuple_write_info &last,
196 bool did_group_insert)
198 if (did_group_insert) {
199 // don't need to lock
200 if (!last.is_insert())
201 // we inserted the last run, and then we did 1+ more overwrites
202 // to it, so we do NOT need to lock the node (again), but we DO
203 // need to apply the latest write
204 last.entry->set_do_write();
206 dbtuple *tuple = last.get_tuple();
207 if (unlikely(tuple->version == dbtuple::MAX_TID)) {
208 // if we race to put/insert w/ another txn which has inserted a new
209 // record, we *must* abort b/c the other txn could try to put/insert
210 // into a new record which we hold the lock on, so we must abort
213 // we could *not* abort if this txn did not insert any new records.
214 // we could also release our insert locks and try to acquire them
215 // again in sorted order
216 return false; // signal abort
218 const dbtuple::version_t v = tuple->lock(true); // lock for write
219 INVARIANT(dbtuple::IsLatest(v) == tuple->is_latest());
221 if (unlikely(!dbtuple::IsLatest(v) ||
222 !cast()->can_read_tid(tuple->version))) {
223 // XXX(stephentu): overly conservative (with the can_read_tid() check)
224 return false; // signal abort
226 last.entry->set_do_write();
231 template <template <typename> class Protocol, typename Traits>
233 transaction<Protocol, Traits>::commit(bool doThrow)
240 static std::string probe0_name(
241 std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
242 ANON_REGION(probe0_name.c_str(), &transaction_base::g_txn_commit_probe0_cg);
252 throw transaction_abort_exception(reason);
256 dbtuple_write_info_vec write_dbtuples;
257 std::pair<bool, tid_t> commit_tid(false, 0);
259 // copy write tuples to vector for sorting
260 if (!write_set.empty()) {
262 static std::string probe1_name(
263 std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
264 ANON_REGION(probe1_name.c_str(), &transaction_base::g_txn_commit_probe1_cg);
265 INVARIANT(!is_snapshot());
266 typename write_set_map::iterator it = write_set.begin();
267 typename write_set_map::iterator it_end = write_set.end();
268 for (size_t pos = 0; it != it_end; ++it, ++pos) {
269 INVARIANT(!it->is_insert() || it->get_tuple()->is_locked());
270 write_dbtuples.emplace_back(it->get_tuple(), &(*it), it->is_insert(), pos);
274 // read_only txns require consistent snapshots
275 INVARIANT(!is_snapshot() || read_set.empty());
276 INVARIANT(!is_snapshot() || write_set.empty());
277 INVARIANT(!is_snapshot() || absent_set.empty());
278 if (!is_snapshot()) {
279 // we don't have consistent tids, or not a read-only txn
282 if (!write_dbtuples.empty()) {
284 static std::string probe2_name(
285 std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
286 ANON_REGION(probe2_name.c_str(), &transaction_base::g_txn_commit_probe2_cg);
287 // lock the logical nodes in sort order
290 static std::string probe6_name(
291 std::string(__PRETTY_FUNCTION__) + std::string(":sort_write_nodes:")));
292 ANON_REGION(probe6_name.c_str(), &transaction_base::g_txn_commit_probe6_cg);
293 write_dbtuples.sort(); // in-place
295 typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
296 typename dbtuple_write_info_vec::iterator it_end = write_dbtuples.end();
297 dbtuple_write_info *last_px = nullptr;
298 bool inserted_last_run = false;
299 for (; it != it_end; last_px = &(*it), ++it) {
300 if (likely(last_px && last_px->tuple != it->tuple)) {
302 if (unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
303 abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
306 inserted_last_run = false;
308 if (it->is_insert()) {
309 INVARIANT(!last_px || last_px->tuple != it->tuple);
310 INVARIANT(it->is_locked());
311 INVARIANT(it->get_tuple()->is_locked());
312 INVARIANT(it->get_tuple()->is_lock_owner());
313 it->entry->set_do_write(); // all inserts are marked do-write
314 inserted_last_run = true;
316 INVARIANT(!it->is_locked());
319 if (likely(last_px) &&
320 unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
321 abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
324 commit_tid.first = true;
326 static std::string probe5_name(
327 std::string(__PRETTY_FUNCTION__) + std::string(":gen_commit_tid:")));
328 ANON_REGION(probe5_name.c_str(), &transaction_base::g_txn_commit_probe5_cg);
329 commit_tid.second = cast()->gen_commit_tid(write_dbtuples);
330 VERBOSE(std::cerr << "commit tid: " << g_proto_version_str(commit_tid.second) << std::endl);
332 VERBOSE(std::cerr << "commit tid: <read-only>" << std::endl);
335 // do read validation
338 static std::string probe3_name(
339 std::string(__PRETTY_FUNCTION__) + std::string(":read_validation:")));
340 ANON_REGION(probe3_name.c_str(), &transaction_base::g_txn_commit_probe3_cg);
342 // check the nodes we actually read are still the latest version
343 if (!read_set.empty()) {
344 typename read_set_map::iterator it = read_set.begin();
345 typename read_set_map::iterator it_end = read_set.end();
346 for (; it != it_end; ++it) {
347 VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple())
348 << " at snapshot_tid "
349 << g_proto_version_str(cast()->snapshot_tid())
351 const bool found = sorted_dbtuples_contains(
352 write_dbtuples, it->get_tuple());
354 it->get_tuple()->is_latest_version(it->get_tid()) :
355 it->get_tuple()->stable_is_latest_version(it->get_tid())))
358 VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple()) << " at snapshot_tid "
359 << g_proto_version_str(cast()->snapshot_tid()) << " FAILED" << std::endl
360 << " txn read version: " << g_proto_version_str(it->get_tid()) << std::endl
361 << " tuple=" << *it->get_tuple() << std::endl);
363 //std::cerr << "failed tuple: " << *it->get_tuple() << std::endl;
365 abort_trap((reason = ABORT_REASON_READ_NODE_INTEREFERENCE));
370 // check btree versions have not changed
371 if (!absent_set.empty()) {
372 typename absent_set_map::iterator it = absent_set.begin();
373 typename absent_set_map::iterator it_end = absent_set.end();
374 for (; it != it_end; ++it) {
375 const uint64_t v = concurrent_btree::ExtractVersionNumber(it->first);
376 if (unlikely(v != it->second.version)) {
377 VERBOSE(std::cerr << "expected node " << util::hexify(it->first) << " at v="
378 << it->second.version << ", got v=" << v << std::endl);
379 abort_trap((reason = ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED));
386 // commit actual records
387 if (!write_dbtuples.empty()) {
389 static std::string probe4_name(
390 std::string(__PRETTY_FUNCTION__) + std::string(":write_records:")));
391 ANON_REGION(probe4_name.c_str(), &transaction_base::g_txn_commit_probe4_cg);
392 typename write_set_map::iterator it = write_set.begin();
393 typename write_set_map::iterator it_end = write_set.end();
394 for (; it != it_end; ++it) {
395 if (unlikely(!it->do_write()))
397 dbtuple * const tuple = it->get_tuple();
398 INVARIANT(tuple->is_locked());
399 VERBOSE(std::cerr << "writing dbtuple " << util::hexify(tuple)
400 << " at commit_tid " << g_proto_version_str(commit_tid.second)
402 if (it->is_insert()) {
403 INVARIANT(tuple->version == dbtuple::MAX_TID);
404 tuple->version = commit_tid.second; // allows write_record_ret() to succeed
405 // w/o creating a new chain
408 const dbtuple::write_record_ret ret =
409 tuple->write_record_at(
410 cast(), commit_tid.second,
411 it->get_value(), it->get_writer());
412 bool unlock_head = false;
413 if (unlikely(ret.head_ != tuple)) {
414 // tuple was replaced by ret.head_
415 INVARIANT(ret.rest_ == tuple);
416 // XXX: write_record_at() should acquire this lock
417 ret.head_->lock(true);
419 // need to unlink tuple from underlying btree, replacing
420 // with ret.rest_ (atomically)
421 typename concurrent_btree::value_type old_v = 0;
422 if (it->get_btree()->insert(
423 varkey(it->get_key()), (typename concurrent_btree::value_type) ret.head_, &old_v, NULL))
424 // should already exist in tree
426 INVARIANT(old_v == (typename concurrent_btree::value_type) tuple);
427 // we don't RCU free this, because it is now part of the chain
428 // (the cleaners will take care of this)
429 ++evt_dbtuple_latest_replacement;
431 if (unlikely(ret.rest_))
432 // spill happened: schedule GC task
433 cast()->on_dbtuple_spill(ret.head_, ret.rest_);
434 if (!it->get_value())
435 // logical delete happened: schedule GC task
436 cast()->on_logical_delete(ret.head_, it->get_key(), it->get_btree());
437 if (unlikely(unlock_head))
440 VERBOSE(std::cerr << "dbtuple " << util::hexify(tuple) << " is_locked? " << tuple->is_locked() << std::endl);
443 // NB: we can no longer un-lock after doing the writes above
444 for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
445 it != write_dbtuples.end(); ++it) {
449 INVARIANT(!it->is_insert());
453 state = TXN_COMMITED;
454 if (commit_tid.first)
455 cast()->on_tid_finish(commit_tid.second);
460 // XXX: these values are possibly un-initialized
461 if (this->is_snapshot())
462 VERBOSE(std::cerr << "aborting txn @ snapshot_tid " << cast()->snapshot_tid() << std::endl);
464 VERBOSE(std::cerr << "aborting txn" << std::endl);
466 for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
467 it != write_dbtuples.end(); ++it) {
468 if (it->is_locked()) {
469 if (it->is_insert()) {
470 INVARIANT(it->entry);
471 this->cleanup_inserted_tuple_marker(
472 it->tuple.get(), it->entry->get_key(), it->entry->get_btree());
474 // XXX: potential optimization: on unlock() for abort, we don't
475 // technically need to change the version number
478 INVARIANT(!it->is_insert());
483 if (commit_tid.first)
484 cast()->on_tid_finish(commit_tid.second);
487 throw transaction_abort_exception(reason);
491 } catch (dbtuple::magic_failed_exception &) {
493 ALWAYS_ASSERT(false);
498 template <template <typename> class Protocol, typename Traits>
499 std::pair< dbtuple *, bool >
500 transaction<Protocol, Traits>::try_insert_new_tuple(
501 concurrent_btree &btr,
502 const std::string *key,
504 dbtuple::tuple_writer_t writer)
508 value ? writer(dbtuple::TUPLE_WRITER_COMPUTE_NEEDED,
509 value, nullptr, 0) : 0;
511 // perf: ~900 tsc/alloc on istc11.csail.mit.edu
512 dbtuple * const tuple = dbtuple::alloc_first(sz, true);
514 writer(dbtuple::TUPLE_WRITER_DO_WRITE,
515 value, tuple->get_value_start(), 0);
516 INVARIANT(find_read_set(tuple) == read_set.end());
517 INVARIANT(tuple->is_latest());
518 INVARIANT(tuple->version == dbtuple::MAX_TID);
519 INVARIANT(tuple->is_locked());
520 INVARIANT(tuple->is_write_intent());
521 #ifdef TUPLE_CHECK_KEY
522 tuple->key.assign(key->data(), key->size());
523 tuple->tree = (void *) &btr;
526 // XXX: underlying btree api should return the existing value if insert
527 // fails- this would allow us to avoid having to do another search
528 typename concurrent_btree::insert_info_t insert_info;
529 if (unlikely(!btr.insert_if_absent(
530 varkey(*key), (typename concurrent_btree::value_type) tuple, &insert_info))) {
531 VERBOSE(std::cerr << "insert_if_absent failed for key: " << util::hexify(key) << std::endl);
532 tuple->clear_latest();
534 dbtuple::release_no_rcu(tuple);
535 ++transaction_base::g_evt_dbtuple_write_insert_failed;
536 return std::pair< dbtuple *, bool >(nullptr, false);
538 VERBOSE(std::cerr << "insert_if_absent suceeded for key: " << util::hexify(key) << std::endl
539 << " new dbtuple is " << util::hexify(tuple) << std::endl);
541 // too expensive to be practical
542 // INVARIANT(find_write_set(tuple) == write_set.end());
543 write_set.emplace_back(tuple, key, value, writer, &btr, true);
546 INVARIANT(insert_info.node);
547 if (!absent_set.empty()) {
548 auto it = absent_set.find(insert_info.node);
549 if (it != absent_set.end()) {
550 if (unlikely(it->second.version != insert_info.old_version)) {
551 abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
552 return std::make_pair(tuple, true);
554 VERBOSE(std::cerr << "bump node=" << util::hexify(it->first) << " from v=" << insert_info.old_version
555 << " -> v=" << insert_info.new_version << std::endl);
556 // otherwise, bump the version
557 it->second.version = insert_info.new_version;
558 SINGLE_THREADED_INVARIANT(concurrent_btree::ExtractVersionNumber(it->first) == it->second);
561 return std::make_pair(tuple, false);
564 template <template <typename> class Protocol, typename Traits>
565 template <typename ValueReader>
567 transaction<Protocol, Traits>::do_tuple_read(
568 const dbtuple *tuple, ValueReader &value_reader)
571 ++evt_local_search_lookups;
573 const bool is_snapshot_txn = is_snapshot();
574 const transaction_base::tid_t snapshot_tid = is_snapshot_txn ?
575 cast()->snapshot_tid() : static_cast<transaction_base::tid_t>(dbtuple::MAX_TID);
576 transaction_base::tid_t start_t = 0;
578 if (Traits::read_own_writes) {
579 // this is why read_own_writes is not performant, because we have
581 auto write_set_it = find_write_set(const_cast<dbtuple *>(tuple));
582 if (unlikely(write_set_it != write_set.end())) {
583 ++evt_local_search_write_set_hits;
584 if (!write_set_it->get_value())
586 const typename ValueReader::value_type * const px =
587 reinterpret_cast<const typename ValueReader::value_type *>(
588 write_set_it->get_value());
589 value_reader.dup(*px, this->string_allocator());
594 // do the actual tuple read
595 dbtuple::ReadStatus stat;
597 PERF_DECL(static std::string probe0_name(std::string(__PRETTY_FUNCTION__) + std::string(":do_read:")));
598 ANON_REGION(probe0_name.c_str(), &private_::txn_btree_search_probe0_cg);
600 stat = tuple->stable_read(snapshot_tid, start_t, value_reader, this->string_allocator(), is_snapshot_txn);
601 if (unlikely(stat == dbtuple::READ_FAILED)) {
602 const transaction_base::abort_reason r = transaction_base::ABORT_REASON_UNSTABLE_READ;
604 throw transaction_abort_exception(r);
607 if (unlikely(!cast()->can_read_tid(start_t))) {
608 const transaction_base::abort_reason r = transaction_base::ABORT_REASON_FUTURE_TID_READ;
610 throw transaction_abort_exception(r);
612 INVARIANT(stat == dbtuple::READ_EMPTY ||
613 stat == dbtuple::READ_RECORD);
614 const bool v_empty = (stat == dbtuple::READ_EMPTY);
616 ++transaction_base::g_evt_read_logical_deleted_node_search;
617 if (!is_snapshot_txn)
618 // read-only txns do not need read-set tracking
619 // (b/c we know the values are consistent)
620 read_set.emplace_back(tuple, start_t);
624 template <template <typename> class Protocol, typename Traits>
626 transaction<Protocol, Traits>::do_node_read(
627 const typename concurrent_btree::node_opaque_t *n, uint64_t v)
632 auto it = absent_set.find(n);
633 if (it == absent_set.end()) {
634 absent_set[n].version = v;
635 } else if (it->second.version != v) {
636 const transaction_base::abort_reason r =
637 transaction_base::ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED;
639 throw transaction_abort_exception(r);
643 #endif /* _NDB_TXN_IMPL_H_ */