From 261884e89e0b9dca1232bff984dce2f79b8bce2b Mon Sep 17 00:00:00 2001 From: khizmax Date: Sat, 11 Mar 2017 10:29:39 +0300 Subject: [PATCH] Mutex-based flat-combining wait strategy: try to reduce starvation --- cds/algo/flat_combining/kernel.h | 2 +- cds/algo/flat_combining/wait_strategy.h | 69 +++++++++++++++++++++---- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/cds/algo/flat_combining/kernel.h b/cds/algo/flat_combining/kernel.h index 5e333e15..c4c2ce23 100644 --- a/cds/algo/flat_combining/kernel.h +++ b/cds/algo/flat_combining/kernel.h @@ -422,7 +422,7 @@ namespace cds { namespace algo { /// Marks \p rec as executed /** - This function should be called by container if \p batch_combine mode is used. + This function should be called by container if \p batch_combine() mode is used. For usual combining (see \p combine()) this function is excess. */ void operation_done( publication_record& rec ) diff --git a/cds/algo/flat_combining/wait_strategy.h b/cds/algo/flat_combining/wait_strategy.h index 947699d3..8a4a567b 100644 --- a/cds/algo/flat_combining/wait_strategy.h +++ b/cds/algo/flat_combining/wait_strategy.h @@ -212,8 +212,9 @@ namespace cds { namespace algo { namespace flat_combining { class single_mutex_single_condvar { //@cond - std::mutex m_mutex; + std::mutex m_mutex; std::condition_variable m_condvar; + bool m_wakeup; typedef std::unique_lock< std::mutex > unique_lock; //@endcond @@ -229,6 +230,11 @@ namespace cds { namespace algo { namespace flat_combining { typedef PublicationRecord type; ///< publication record type }; + /// Default ctor + single_mutex_single_condvar() + : m_wakeup( false ) + {} + /// Does nothing template void prepare( PublicationRecord& /*rec*/ ) @@ -240,23 +246,33 @@ namespace cds { namespace algo { namespace flat_combining { { if ( fc.get_operation( rec ) >= req_Operation ) { unique_lock lock( m_mutex ); - if ( fc.get_operation( rec ) >= req_Operation ) - return m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout; + if ( fc.get_operation( rec ) >= req_Operation ) { + if ( m_wakeup ) { + m_wakeup = false; + return true; + } + + bool ret = m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout; + m_wakeup = false; + return ret; + } } return false; } /// Calls condition variable function \p notify_all() template - void notify( FCKernel& /*fc*/, PublicationRecord& /*rec*/ ) + void notify( FCKernel& fc, PublicationRecord& /*rec*/ ) { - m_condvar.notify_all(); + wakeup( fc ); } /// Calls condition variable function \p notify_all() template void wakeup( FCKernel& /*fc*/ ) { + unique_lock lock( m_mutex ); + m_wakeup = true; m_condvar.notify_all(); } }; @@ -272,7 +288,8 @@ namespace cds { namespace algo { namespace flat_combining { class single_mutex_multi_condvar { //@cond - std::mutex m_mutex; + std::mutex m_mutex; + bool m_wakeup; typedef std::unique_lock< std::mutex > unique_lock; //@endcond @@ -294,6 +311,11 @@ namespace cds { namespace algo { namespace flat_combining { }; }; + /// Default ctor + single_mutex_multi_condvar() + : m_wakeup( false ) + {} + /// Does nothing template void prepare( PublicationRecord& /*rec*/ ) @@ -305,8 +327,17 @@ namespace cds { namespace algo { namespace flat_combining { { if ( fc.get_operation( rec ) >= req_Operation ) { unique_lock lock( m_mutex ); - if ( fc.get_operation( rec ) >= req_Operation ) - return rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout; + + if ( fc.get_operation( rec ) >= req_Operation ) { + if ( m_wakeup ) { + m_wakeup = false; + return true; + } + + bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout; + m_wakeup = false; + return ret; + } } return false; } @@ -315,6 +346,8 @@ namespace cds { namespace algo { namespace flat_combining { template void notify( FCKernel& /*fc*/, PublicationRecord& rec ) { + unique_lock lock( m_mutex ); + m_wakeup = true; rec.m_condvar.notify_one(); } @@ -351,6 +384,11 @@ namespace cds { namespace algo { namespace flat_combining { //@cond std::mutex m_mutex; std::condition_variable m_condvar; + bool m_wakeup; + + type() + : m_wakeup( false ) + {} //@endcond }; }; @@ -366,8 +404,17 @@ namespace cds { namespace algo { namespace flat_combining { { if ( fc.get_operation( rec ) >= req_Operation ) { unique_lock lock( rec.m_mutex ); - if ( fc.get_operation( rec ) >= req_Operation ) - return rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout; + + if ( fc.get_operation( rec ) >= req_Operation ) { + if ( rec.m_wakeup ) { + rec.m_wakeup = false; + return true; + } + + bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout; + rec.m_wakeup = false; + return ret; + } } return false; } @@ -376,6 +423,8 @@ namespace cds { namespace algo { namespace flat_combining { template void notify( FCKernel& /*fc*/, PublicationRecord& rec ) { + unique_lock lock( rec.m_mutex ); + rec.m_wakeup = true; rec.m_condvar.notify_one(); } -- 2.34.1