From: khizmax Date: Fri, 20 Nov 2015 21:30:02 +0000 (+0300) Subject: Fixed memory leaks in threaded URCU (general_threaded, signal_threaded) X-Git-Tag: v2.1.0~65 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=e9dfb7c5dfcd5e8deaecaa4f553c2bae6016ce7c;p=libcds.git Fixed memory leaks in threaded URCU (general_threaded, signal_threaded) --- diff --git a/cds/urcu/details/gpb.h b/cds/urcu/details/gpb.h index 94e27b65..9dd22509 100644 --- a/cds/urcu/details/gpb.h +++ b/cds/urcu/details/gpb.h @@ -4,6 +4,7 @@ #define CDSLIB_URCU_DETAILS_GPB_H #include +#include #include #include #include @@ -94,7 +95,7 @@ namespace cds { namespace urcu { ~general_buffered() { - clear_buffer( (uint64_t) -1 ); + clear_buffer( std::numeric_limits< uint64_t >::max()); } void flip_and_wait() @@ -147,7 +148,7 @@ namespace cds { namespace urcu { static void Destruct( bool bDetachAll = false ) { if ( isUsed() ) { - instance()->clear_buffer( (uint64_t) -1 ); + instance()->clear_buffer( std::numeric_limits< uint64_t >::max()); if ( bDetachAll ) instance()->m_ThreadList.detach_all(); delete instance(); diff --git a/cds/urcu/details/gpt.h b/cds/urcu/details/gpt.h index 4699ef2f..626b18d8 100644 --- a/cds/urcu/details/gpt.h +++ b/cds/urcu/details/gpt.h @@ -4,6 +4,7 @@ #define CDSLIB_URCU_DETAILS_GPT_H #include //unique_lock +#include #include #include #include @@ -74,11 +75,11 @@ namespace cds { namespace urcu { protected: //@cond - buffer_type m_Buffer; - atomics::atomic m_nCurEpoch; - lock_type m_Lock; - size_t const m_nCapacity; - disposer_thread m_DisposerThread; + buffer_type m_Buffer; + atomics::atomic m_nCurEpoch; + lock_type m_Lock; + size_t const m_nCapacity; + disposer_thread m_DisposerThread; //@endcond public: @@ -151,7 +152,7 @@ namespace cds { namespace urcu { if ( bDetachAll ) pThis->m_ThreadList.detach_all(); - pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire )); + pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max()); delete pThis; singleton_ptr::s_pRCU = nullptr; @@ -176,7 +177,7 @@ namespace cds { namespace urcu { template void batch_retire( ForwardIterator itFirst, ForwardIterator itLast ) { - uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed ); + uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire ); while ( itFirst != itLast ) { epoch_retired_ptr ep( *itFirst, nEpoch ); ++itFirst; @@ -188,7 +189,7 @@ namespace cds { namespace urcu { template void batch_retire( Func e ) { - uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed ); + uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire ); for ( retired_ptr p{ e() }; p.m_p; ) { epoch_retired_ptr ep( p, nEpoch ); p = e(); @@ -208,15 +209,13 @@ namespace cds { namespace urcu { { uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release ); - atomics::atomic_thread_fence( atomics::memory_order_acquire ); { std::unique_lock sl( m_Lock ); flip_and_wait(); flip_and_wait(); - - m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync ); } - atomics::atomic_thread_fence( atomics::memory_order_release ); + + m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync ); } void force_dispose() { diff --git a/cds/urcu/details/sig_buffered.h b/cds/urcu/details/sig_buffered.h index d1c4569c..f9760f97 100644 --- a/cds/urcu/details/sig_buffered.h +++ b/cds/urcu/details/sig_buffered.h @@ -7,6 +7,7 @@ #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED #include +#include #include #include @@ -97,7 +98,7 @@ namespace cds { namespace urcu { ~signal_buffered() { - clear_buffer( (uint64_t) -1 ); + clear_buffer( std::numeric_limits< uint64_t >::max() ); } void clear_buffer( uint64_t nEpoch ) @@ -145,7 +146,7 @@ namespace cds { namespace urcu { static void Destruct( bool bDetachAll = false ) { if ( isUsed() ) { - instance()->clear_buffer( (uint64_t) -1 ); + instance()->clear_buffer( std::numeric_limits< uint64_t >::max()); if ( bDetachAll ) instance()->m_ThreadList.detach_all(); delete instance(); diff --git a/cds/urcu/details/sig_threaded.h b/cds/urcu/details/sig_threaded.h index f4031862..465700b5 100644 --- a/cds/urcu/details/sig_threaded.h +++ b/cds/urcu/details/sig_threaded.h @@ -3,10 +3,11 @@ #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H #define CDSLIB_URCU_DETAILS_SIG_THREADED_H -#include //unique_lock #include #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED +#include //unique_lock +#include #include #include #include @@ -150,7 +151,7 @@ namespace cds { namespace urcu { if ( bDetachAll ) pThis->m_ThreadList.detach_all(); - pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire )); + pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max()); delete pThis; singleton_ptr::s_pRCU = nullptr; diff --git a/cds/urcu/dispose_thread.h b/cds/urcu/dispose_thread.h index 553a4d6b..1329af6d 100644 --- a/cds/urcu/dispose_thread.h +++ b/cds/urcu/dispose_thread.h @@ -72,6 +72,8 @@ namespace cds { namespace urcu { uint64_t nCurEpoch; bool bQuit = false; + epoch_retired_ptr rest; + while ( !bQuit ) { { unique_lock lock( m_Mutex ); @@ -93,23 +95,30 @@ namespace cds { namespace urcu { m_pBuffer = nullptr; } + if ( rest.m_p ) { + assert( rest.m_nEpoch < nCurEpoch ); + rest.free(); + } + if ( pBuffer ) - dispose_buffer( pBuffer, nCurEpoch ); + rest = dispose_buffer( pBuffer, nCurEpoch ); } } - void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch ) + epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch ) { epoch_retired_ptr p; - while ( pBuf->pop( p ) ) { + while ( pBuf->pop( p )) { if ( p.m_nEpoch <= nCurEpoch ) { p.free(); } else { - pBuf->push( p ); + if ( !pBuf->push( p )) + return p; break; } } + return epoch_retired_ptr(); } //@endcond