/*
This file is a part of libcds - Concurrent Data Structures library
- (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
Source code repo: http://github.com/khizmax/libcds/
Download: http://sourceforge.net/projects/libcds/files/
//@endcond
protected:
- atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
- publication_record_type * m_pHead; ///< Head of publication list
+ atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
+ publication_record_type* m_pHead; ///< Head of active publication list
+ publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list
boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
- mutable global_lock_type m_Mutex; ///< Global mutex
- mutable stat m_Stat; ///< Internal statistics
+ mutable global_lock_type m_Mutex; ///< Global mutex
+ mutable stat m_Stat; ///< Internal statistics
unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
unsigned int const m_nCombinePassCount; ///< Number of combining passes
wait_strategy m_waitStrategy; ///< Wait strategy
)
: m_nCount(0)
, m_pHead( nullptr )
+ , m_pAllocatedHead( nullptr )
, m_pThreadRec( tls_cleanup )
, m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
, m_nCombinePassCount( nCombinePassCount )
{
- init();
+ assert( m_pThreadRec.get() == nullptr );
+ publication_record_type* pRec = cxx11_allocator().New();
+ m_pAllocatedHead =
+ m_pHead = pRec;
+ m_pThreadRec.reset( pRec );
+ m_Stat.onCreatePubRecord();
}
- /// Destroys the objects and mark all publication records as inactive
+ /// Destroys the object and all publication records
~kernel()
{
m_pThreadRec.reset(); // calls tls_cleanup()
// delete all publication records
- for ( publication_record* p = m_pHead; p; ) {
+ for ( publication_record* p = m_pAllocatedHead; p; ) {
publication_record * pRec = p;
- p = p->pNext.load( memory_model::memory_order_relaxed );
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
free_publication_record( static_cast<publication_record_type *>( pRec ));
}
}
pRec = cxx11_allocator().New();
m_pThreadRec.reset( pRec );
m_Stat.onCreatePubRecord();
- }
- if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+ // Insert in allocated list
+ assert( m_pAllocatedHead != nullptr );
+ publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_relaxed );
+ do {
+ pRec->pNextAllocated.store( p, memory_model::memory_order_release );
+ } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+ publish( pRec );
+ }
+ else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
publish( pRec );
assert( pRec->op() == req_EmptyRecord );
/// Marks \p rec as executed
/**
- This function should be called by container if \p batch_combine mode is used.
+ This function should be called by container if \p batch_combine() mode is used.
For usual combining (see \p combine()) this function is excess.
*/
void operation_done( publication_record& rec )
pRec->nState.store( removed, memory_model::memory_order_release );
}
- static void free_publication_record( publication_record_type* pRec )
+ void free_publication_record( publication_record_type* pRec )
{
cxx11_allocator().Delete( pRec );
- }
-
- void init()
- {
- assert( m_pThreadRec.get() == nullptr );
- publication_record_type* pRec = cxx11_allocator().New();
- m_pHead = pRec;
- m_pThreadRec.reset( pRec );
- m_Stat.onCreatePubRecord();
+ m_Stat.onDeletePubRecord();
}
void publish( publication_record_type* pRec )
{
assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
- pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
- pRec->nState.store( active, memory_model::memory_order_release );
+ pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+ pRec->nState.store( active, memory_model::memory_order_relaxed );
// Insert record to publication list
if ( m_pHead != static_cast<publication_record *>(pRec)) {
- publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
+ publication_record * p = m_pHead->pNext.load( memory_model::memory_order_relaxed );
if ( p != static_cast<publication_record *>( pRec )) {
do {
- pRec->pNext.store( p, memory_model::memory_order_relaxed );
+ pRec->pNext.store( p, memory_model::memory_order_release );
// Failed CAS changes p
} while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
memory_model::memory_order_release, atomics::memory_order_acquire ));
}
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
template <class Container>
bool combining_pass( Container& owner, unsigned int nCurAge )
{
- publication_record* pPrev = nullptr;
publication_record* p = m_pHead;
bool bOpDone = false;
while ( p ) {
switch ( p->nState.load( memory_model::memory_order_acquire )) {
- case active:
- if ( p->op() >= req_Operation ) {
- p->nAge.store( nCurAge, memory_model::memory_order_release );
- owner.fc_apply( static_cast<publication_record_type*>(p));
- operation_done( *p );
- bOpDone = true;
- }
- break;
- case inactive:
- // Only m_pHead can be inactive in the publication list
- assert( p == m_pHead );
- break;
- case removed:
- // The record should be removed
- p = unlink_and_delete_record( pPrev, p );
- continue;
- default:
- /// ??? That is impossible
- assert(false);
+ case active:
+ if ( p->op( memory_model::memory_order_acquire ) >= req_Operation ) {
+ p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+ owner.fc_apply( static_cast<publication_record_type*>( p ));
+ operation_done( *p );
+ bOpDone = true;
+ }
+ break;
+ case inactive:
+ // Only m_pHead can be inactive in the publication list
+ assert( p == m_pHead );
+ break;
+ case removed:
+ // Such record will be removed on compacting phase
+ break;
+ default:
+ /// ??? That is impossible
+ assert( false );
}
- pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
return bOpDone;
combining_pass( owner, nCurAge );
m_Stat.onCombining();
- if ( (nCurAge & m_nCompactFactor) == 0 )
+ if ( ( nCurAge & m_nCompactFactor ) == 0 )
compact_list( nCurAge );
}
- bool wait_for_combining( publication_record_type * pRec )
+ bool wait_for_combining( publication_record_type* pRec )
{
m_waitStrategy.prepare( *pRec );
m_Stat.onPassiveWait();
return true;
}
- void compact_list( unsigned int const nCurAge )
+ void compact_list( unsigned int nCurAge )
{
- // Thinning publication list
- publication_record * pPrev = nullptr;
- for ( publication_record * p = m_pHead; p; ) {
- if ( p->nState.load( memory_model::memory_order_acquire ) == active
- && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
- {
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ // Compacts publication list
+ // This function is called only by combiner thread
+
+ try_again:
+ publication_record * pPrev = m_pHead;
+ for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+ switch ( p->nState.load( memory_model::memory_order_relaxed )) {
+ case active:
+ if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+ {
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_release, atomics::memory_order_relaxed ))
+ memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
{
p->nState.store( inactive, memory_model::memory_order_release );
p = pNext;
continue;
}
}
+ break;
+
+ case removed:
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+ p = pNext;
+ continue;
+ }
+ else {
+ // CAS can be failed only in beginning of list
+ assert( pPrev == m_pHead );
+ goto try_again;
+ }
}
pPrev = p;
p = p->pNext.load( memory_model::memory_order_acquire );
}
- m_Stat.onCompactPublicationList();
- }
-
- publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
- {
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
- if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
- {
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
+ // Iterate over allocated list to find removed records
+ pPrev = m_pAllocatedHead;
+ for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+ if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+ publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+ if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+ free_publication_record( static_cast<publication_record_type *>( p ));
+ p = pNext;
+ continue;
+ }
}
- return pNext;
- }
- else {
- m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
- return m_pHead;
+
+ pPrev = p;
+ p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
}
+
+ m_Stat.onCompactPublicationList();
}
//@endcond
};
} // namespace flat_combining
}} // namespace cds::algo
+/*
+ CppMem model (http://svr-pes20-cppmem.cl.cam.ac.uk/cppmem/)
+
+ // Combiner thread - slave (waiting) thread
+int main() {
+ atomic_int y = 0; // pRec->op
+ int x = 0; // pRec->data
+ {{{
+ { // slave thread (not combiner)
+ // Op data
+ x = 1;
+ // Annotate request (op)
+ y.store(1, release);
+ // Wait while request done
+ y.load(acquire).readsvalue(2);
+ // Read result
+ r2=x;
+ }
+ |||
+ { // Combiner thread
+ // Read request (op)
+ r1=y.load(acquire).readsvalue(1);
+ // Execute request - change request data
+ x = 2;
+ // store "request processed" flag (pRec->op := req_Response)
+ y.store(2, release);
+ }
+ }}};
+ return 0;
+}
+
+*/
+
#endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H