/*
This file is a part of libcds - Concurrent Data Structures library
- (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
Source code repo: http://github.com/khizmax/libcds/
Download: http://sourceforge.net/projects/libcds/files/
-
+
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
of a <i>publication list</i>. The publication list is a list of thread-local records
of a size proportional to the number of threads that are concurrently accessing the shared object.
- Each thread \p t accessing the structure to perform an invocation of some method \p m
+ Each thread \p t accessing the structure to perform an invocation of some method \p f()
on the shared object executes the following sequence of steps:
<ol>
- <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
+ <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
sequentially to the shared object in the <i>request</i> field of your thread local publication
record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
be used to receive the response. If your thread local publication record is marked as active
<li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
field waiting for a response to the invocation (one can add a yield at this point to allow other threads
on the same core to run). Once in a while while spinning check if the lock is still taken and that your
- record is active. If your record is inactive proceed to step 5. Once the response is available,
- reset the request field to null and return the response.</li>
+ record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
+ Once the response is available, reset the request field to null and return the response.</li>
<li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
return to spinning in step 2.</li>
<li>Otherwise, you hold the lock and are a combiner.
<ul>
<li>Increment the combining pass count by one.</li>
<li>Execute a \p fc_apply() by traversing the publication list from the head,
- combining all nonnull method call invocations, setting the <i>age</i> of each of these records
+ combining all non-null method call invocations, setting the <i>age</i> of each of these records
to the current <i>count</i>, applying the combined method calls to the structure D, and returning
responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
<li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
void onInvokeExclusive() { ++m_nInvokeExclusive; }
void onWakeupByNotifying() { ++m_nWakeupByNotifying; }
void onPassiveToCombiner() { ++m_nPassiveToCombiner; }
-
+
//@endcond
};
//@endcond
protected:
- atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
- publication_record_type * m_pHead; ///< Head of publication list
+ atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
+ publication_record_type* m_pHead; ///< Head of active publication list
+ publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list
boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
- mutable global_lock_type m_Mutex; ///< Global mutex
- mutable stat m_Stat; ///< Internal statistics
+ mutable global_lock_type m_Mutex; ///< Global mutex
+ mutable stat m_Stat; ///< Internal statistics
unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
unsigned int const m_nCombinePassCount; ///< Number of combining passes
wait_strategy m_waitStrategy; ///< Wait strategy
)
: m_nCount(0)
, m_pHead( nullptr )
+ , m_pAllocatedHead( nullptr )
, m_pThreadRec( tls_cleanup )
, m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
, m_nCombinePassCount( nCombinePassCount )
{
- init();
+ assert( m_pThreadRec.get() == nullptr );
+ publication_record_type* pRec = cxx11_allocator().New();
+ m_pAllocatedHead =
+ m_pHead = pRec;
+ m_pThreadRec.reset( pRec );
+ m_Stat.onCreatePubRecord();
}
- /// Destroys the objects and mark all publication records as inactive
+ /// Destroys the object and all publication records
~kernel()
{
- // mark all publication record as detached
- for ( publication_record* p = m_pHead; p; ) {
- p->pOwner = nullptr;
+ m_pThreadRec.reset(); // calls tls_cleanup()
+ // delete all publication records
+ for ( publication_record* p = m_pAllocatedHead; p; ) {
publication_record * pRec = p;
- p = p->pNext.load( memory_model::memory_order_relaxed );
- if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
- free_publication_record( static_cast<publication_record_type *>( pRec ));
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+ free_publication_record( static_cast<publication_record_type *>( pRec ));
}
}
if ( !pRec ) {
// Allocate new publication record
pRec = cxx11_allocator().New();
- pRec->pOwner = reinterpret_cast<void *>( this );
m_pThreadRec.reset( pRec );
m_Stat.onCreatePubRecord();
- }
- if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+ // Insert in allocated list
+ assert( m_pAllocatedHead != nullptr );
+ publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_relaxed );
+ do {
+ pRec->pNextAllocated.store( p, memory_model::memory_order_release );
+ } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+ publish( pRec );
+ }
+ else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
publish( pRec );
assert( pRec->op() == req_EmptyRecord );
/// Marks publication record for the current thread as empty
void release_record( publication_record_type * pRec )
{
- assert( pRec->is_done() );
+ assert( pRec->is_done());
pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
}
Some operation in flat combining containers should be called in exclusive mode
i.e the current thread should become the combiner to process the operation.
The typical example is \p empty() function.
-
+
\p %invoke_exclusive() allows do that: the current thread becomes the combiner,
invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
Instead, after end of \p f call the current thread wakes up a pending thread if any.
/// Marks \p rec as executed
/**
- This function should be called by container if \p batch_combine mode is used.
- For usual combining (see \p combine() ) this function is excess.
+ This function should be called by container if \p batch_combine() mode is used.
+ For usual combining (see \p combine()) this function is excess.
*/
void operation_done( publication_record& rec )
{
{
// Thread done
// pRec that is TLS data should be excluded from publication list
- if ( pRec ) {
- if ( pRec->pOwner ) {
- // kernel is alive
- pRec->nState.store( removed, memory_model::memory_order_release );
- }
- else {
- // kernel already deleted
- free_publication_record( pRec );
- }
- }
+ pRec->nState.store( removed, memory_model::memory_order_release );
}
- static void free_publication_record( publication_record_type* pRec )
+ void free_publication_record( publication_record_type* pRec )
{
cxx11_allocator().Delete( pRec );
- }
-
- void init()
- {
- assert( m_pThreadRec.get() == nullptr );
- publication_record_type* pRec = cxx11_allocator().New();
- m_pHead = pRec;
- pRec->pOwner = this;
- m_pThreadRec.reset( pRec );
- m_Stat.onCreatePubRecord();
+ m_Stat.onDeletePubRecord();
}
void publish( publication_record_type* pRec )
{
assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
- pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
- pRec->nState.store( active, memory_model::memory_order_release );
+ pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+ pRec->nState.store( active, memory_model::memory_order_relaxed );
// Insert record to publication list
- if ( m_pHead != static_cast<publication_record *>(pRec) ) {
- publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
+ if ( m_pHead != static_cast<publication_record *>(pRec)) {
+ publication_record * p = m_pHead->pNext.load( memory_model::memory_order_relaxed );
if ( p != static_cast<publication_record *>( pRec )) {
do {
- pRec->pNext = p;
+ pRec->pNext.store( p, memory_model::memory_order_release );
// Failed CAS changes p
} while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
- memory_model::memory_order_release, atomics::memory_order_relaxed ));
+ memory_model::memory_order_release, atomics::memory_order_acquire ));
m_Stat.onActivatePubRecord();
}
}
template <class Container>
void try_combining( Container& owner, publication_record_type* pRec )
{
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
}
else {
// There is another combiner, wait while it executes our request
- if ( !wait_for_combining( pRec ) ) {
+ if ( !wait_for_combining( pRec )) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
template <class Container>
void try_batch_combining( Container& owner, publication_record_type * pRec )
{
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
}
else {
// There is another combiner, wait while it executes our request
- if ( !wait_for_combining( pRec ) ) {
+ if ( !wait_for_combining( pRec )) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
void combining( Container& owner )
{
// The thread is a combiner
- assert( !m_Mutex.try_lock() );
+ assert( !m_Mutex.try_lock());
unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
}
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
template <class Container>
bool combining_pass( Container& owner, unsigned int nCurAge )
{
- publication_record* pPrev = nullptr;
publication_record* p = m_pHead;
bool bOpDone = false;
while ( p ) {
switch ( p->nState.load( memory_model::memory_order_acquire )) {
- case active:
- if ( p->op() >= req_Operation ) {
- p->nAge.store( nCurAge, memory_model::memory_order_release );
- owner.fc_apply( static_cast<publication_record_type*>(p) );
- operation_done( *p );
- bOpDone = true;
- }
- break;
- case inactive:
- // Only m_pHead can be inactive in the publication list
- assert( p == m_pHead );
- break;
- case removed:
- // The record should be removed
- p = unlink_and_delete_record( pPrev, p );
- continue;
- default:
- /// ??? That is impossible
- assert(false);
+ case active:
+ if ( p->op( memory_model::memory_order_acquire ) >= req_Operation ) {
+ p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+ owner.fc_apply( static_cast<publication_record_type*>( p ));
+ operation_done( *p );
+ bOpDone = true;
+ }
+ break;
+ case inactive:
+ // Only m_pHead can be inactive in the publication list
+ assert( p == m_pHead );
+ break;
+ case removed:
+ // Such record will be removed on compacting phase
+ break;
+ default:
+ /// ??? That is impossible
+ assert( false );
}
- pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
return bOpDone;
void batch_combining( Container& owner )
{
// The thread is a combiner
- assert( !m_Mutex.try_lock() );
+ assert( !m_Mutex.try_lock());
unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
- owner.fc_process( begin(), end() );
+ owner.fc_process( begin(), end());
combining_pass( owner, nCurAge );
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
- bool wait_for_combining( publication_record_type * pRec )
+ bool wait_for_combining( publication_record_type* pRec )
{
m_waitStrategy.prepare( *pRec );
m_Stat.onPassiveWait();
if ( m_waitStrategy.wait( *this, *pRec ))
m_Stat.onWakeupByNotifying();
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
// Operation is done
m_Mutex.unlock();
return true;
}
- void compact_list( unsigned int const nCurAge )
+ void compact_list( unsigned int nCurAge )
{
- // Thinning publication list
- publication_record * pPrev = nullptr;
- for ( publication_record * p = m_pHead; p; ) {
- if ( p->nState.load( memory_model::memory_order_acquire ) == active
- && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
- {
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ // Compacts publication list
+ // This function is called only by combiner thread
+
+ try_again:
+ publication_record * pPrev = m_pHead;
+ for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+ switch ( p->nState.load( memory_model::memory_order_relaxed )) {
+ case active:
+ if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+ {
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_release, atomics::memory_order_relaxed ))
+ memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
{
p->nState.store( inactive, memory_model::memory_order_release );
p = pNext;
continue;
}
}
+ break;
+
+ case removed:
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+ p = pNext;
+ continue;
+ }
+ else {
+ // CAS can be failed only in beginning of list
+ assert( pPrev == m_pHead );
+ goto try_again;
+ }
}
pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
- m_Stat.onCompactPublicationList();
- }
-
- publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
- {
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
- if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_release, atomics::memory_order_relaxed ))
- {
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
+ // Iterate over allocated list to find removed records
+ pPrev = m_pAllocatedHead;
+ for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+ if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+ publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+ if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+ free_publication_record( static_cast<publication_record_type *>( p ));
+ p = pNext;
+ continue;
+ }
}
- return pNext;
- }
- else {
- m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
- return m_pHead;
+
+ pPrev = p;
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
}
+
+ m_Stat.onCompactPublicationList();
}
//@endcond
};
} // namespace flat_combining
}} // namespace cds::algo
+/*
+ CppMem model (http://svr-pes20-cppmem.cl.cam.ac.uk/cppmem/)
+
+ // Combiner thread - slave (waiting) thread
+int main() {
+ atomic_int y = 0; // pRec->op
+ int x = 0; // pRec->data
+ {{{
+ { // slave thread (not combiner)
+ // Op data
+ x = 1;
+ // Annotate request (op)
+ y.store(1, release);
+ // Wait while request done
+ y.load(acquire).readsvalue(2);
+ // Read result
+ r2=x;
+ }
+ |||
+ { // Combiner thread
+ // Read request (op)
+ r1=y.load(acquire).readsvalue(1);
+ // Execute request - change request data
+ x = 2;
+ // store "request processed" flag (pRec->op := req_Response)
+ y.store(2, release);
+ }
+ }}};
+ return 0;
+}
+
+*/
+
#endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H