//@endcond
protected:
- atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
- publication_record_type * m_pHead; ///< Head of publication list
+ atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
+ publication_record_type* m_pHead; ///< Head of active publication list
+ publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list
boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
- mutable global_lock_type m_Mutex; ///< Global mutex
- mutable stat m_Stat; ///< Internal statistics
+ mutable global_lock_type m_Mutex; ///< Global mutex
+ mutable stat m_Stat; ///< Internal statistics
unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
unsigned int const m_nCombinePassCount; ///< Number of combining passes
wait_strategy m_waitStrategy; ///< Wait strategy
)
: m_nCount(0)
, m_pHead( nullptr )
+ , m_pAllocatedHead( nullptr )
, m_pThreadRec( tls_cleanup )
, m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
, m_nCombinePassCount( nCombinePassCount )
{
- init();
+ assert( m_pThreadRec.get() == nullptr );
+ publication_record_type* pRec = cxx11_allocator().New();
+ m_pAllocatedHead =
+ m_pHead = pRec;
+ m_pThreadRec.reset( pRec );
+ m_Stat.onCreatePubRecord();
}
- /// Destroys the objects and mark all publication records as inactive
+ /// Destroys the object and all publication records
~kernel()
{
m_pThreadRec.reset(); // calls tls_cleanup()
// delete all publication records
- for ( publication_record* p = m_pHead; p; ) {
+ for ( publication_record* p = m_pAllocatedHead; p; ) {
publication_record * pRec = p;
- p = p->pNext.load( memory_model::memory_order_relaxed );
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
free_publication_record( static_cast<publication_record_type *>( pRec ));
- m_Stat.onDeletePubRecord();
}
}
pRec = cxx11_allocator().New();
m_pThreadRec.reset( pRec );
m_Stat.onCreatePubRecord();
- }
- if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+ // Insert in allocated list
+ assert( m_pAllocatedHead != nullptr );
+ publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
+ do {
+ pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
+ } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+ publish( pRec );
+ }
+ else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
publish( pRec );
assert( pRec->op() == req_EmptyRecord );
pRec->nState.store( removed, memory_model::memory_order_release );
}
- static void free_publication_record( publication_record_type* pRec )
+ void free_publication_record( publication_record_type* pRec )
{
cxx11_allocator().Delete( pRec );
- }
-
- void init()
- {
- assert( m_pThreadRec.get() == nullptr );
- publication_record_type* pRec = cxx11_allocator().New();
- m_pHead = pRec;
- m_pThreadRec.reset( pRec );
- m_Stat.onCreatePubRecord();
+ m_Stat.onDeletePubRecord();
}
void publish( publication_record_type* pRec )
{
assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
- pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
- pRec->nState.store( active, memory_model::memory_order_release );
+ pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+ pRec->nState.store( active, memory_model::memory_order_relaxed );
// Insert record to publication list
if ( m_pHead != static_cast<publication_record *>(pRec)) {
}
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
template <class Container>
bool combining_pass( Container& owner, unsigned int nCurAge )
{
- publication_record* pPrev = nullptr;
publication_record* p = m_pHead;
bool bOpDone = false;
while ( p ) {
- switch ( p->nState.load( memory_model::memory_order_acquire )) {
- case active:
- if ( p->op() >= req_Operation ) {
- p->nAge.store( nCurAge, memory_model::memory_order_release );
- owner.fc_apply( static_cast<publication_record_type*>(p));
- operation_done( *p );
- bOpDone = true;
- }
- break;
- case inactive:
- // Only m_pHead can be inactive in the publication list
- assert( p == m_pHead );
- break;
- case removed:
- // The record should be removed (except m_pHead)
- if ( pPrev ) {
- p = unlink_and_delete_record( pPrev, p );
- continue;
- }
- break;
- default:
- /// ??? That is impossible
- assert(false);
+ switch ( p->nState.load( memory_model::memory_order_acquire ) ) {
+ case active:
+ if ( p->op() >= req_Operation ) {
+ p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+ owner.fc_apply( static_cast<publication_record_type*>( p ) );
+ operation_done( *p );
+ bOpDone = true;
+ }
+ break;
+ case inactive:
+ // Only m_pHead can be inactive in the publication list
+ assert( p == m_pHead );
+ break;
+ case removed:
+ // Such record will be removed on compacting phase
+ break;
+ default:
+ /// ??? That is impossible
+ assert( false );
}
- pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
return bOpDone;
void batch_combining( Container& owner )
{
// The thread is a combiner
- assert( !m_Mutex.try_lock());
+ assert( !m_Mutex.try_lock() );
unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
- owner.fc_process( begin(), end());
+ owner.fc_process( begin(), end() );
combining_pass( owner, nCurAge );
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
m_Stat.onPassiveWaitIteration();
// Wait while operation processing
- if ( m_waitStrategy.wait( *this, *pRec ))
+ if ( m_waitStrategy.wait( *this, *pRec ) )
m_Stat.onWakeupByNotifying();
- if ( m_Mutex.try_lock()) {
+ if ( m_Mutex.try_lock() ) {
if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
// Operation is done
m_Mutex.unlock();
return true;
}
- void compact_list( unsigned int const nCurAge )
+ void compact_list( unsigned int nCurAge )
{
- // Thinning publication list
- publication_record * pPrev = nullptr;
- for ( publication_record * p = m_pHead; p; ) {
- if ( p->nState.load( memory_model::memory_order_acquire ) == active
- && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
- {
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ // Compacts publication list
+ // This function is called only by combiner thread
+
+ try_again:
+ publication_record * pPrev = m_pHead;
+ for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+ switch ( p->nState.load( memory_model::memory_order_relaxed ) ) {
+ case active:
+ if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+ {
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_release, atomics::memory_order_relaxed ))
+ memory_model::memory_order_acquire, atomics::memory_order_relaxed ) )
{
p->nState.store( inactive, memory_model::memory_order_release );
p = pNext;
continue;
}
}
+ break;
+
+ case removed:
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+ p = pNext;
+ continue;
+ }
+ else {
+ // CAS can be failed only in beginning of list
+ assert( pPrev == m_pHead );
+ goto try_again;
+ }
}
pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
- m_Stat.onCompactPublicationList();
- }
-
- publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
- {
- // m_pHead is persistent node and cannot be deleted
- assert( pPrev != nullptr );
- assert( p != m_pHead );
+ // Iterate over allocated list to find removed records
+ pPrev = m_pAllocatedHead;
+ for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+ if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+ publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+ if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+ free_publication_record( static_cast<publication_record_type *>( p ));
+ p = pNext;
+ continue;
+ }
+ }
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
- if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
- {
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
+ pPrev = p;
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
}
- return pNext;
+
+ m_Stat.onCompactPublicationList();
}
//@endcond
};