Fixed memory leaks in threaded URCU (general_threaded, signal_threaded)
authorkhizmax <libcds.dev@gmail.com>
Fri, 20 Nov 2015 21:30:02 +0000 (00:30 +0300)
committerkhizmax <libcds.dev@gmail.com>
Fri, 20 Nov 2015 21:30:02 +0000 (00:30 +0300)
cds/urcu/details/gpb.h
cds/urcu/details/gpt.h
cds/urcu/details/sig_buffered.h
cds/urcu/details/sig_threaded.h
cds/urcu/dispose_thread.h

index 94e27b65de3e51feb7d7afe32c7255e0dce5eeaf..9dd22509b12bb7d6a4d4eae515d32100d49fc4cc 100644 (file)
@@ -4,6 +4,7 @@
 #define CDSLIB_URCU_DETAILS_GPB_H
 
 #include <mutex>
+#include <limits>
 #include <cds/urcu/details/gp.h>
 #include <cds/algo/backoff_strategy.h>
 #include <cds/container/vyukov_mpmc_cycle_queue.h>
@@ -94,7 +95,7 @@ namespace cds { namespace urcu {
 
         ~general_buffered()
         {
-            clear_buffer( (uint64_t) -1 );
+            clear_buffer( std::numeric_limits< uint64_t >::max());
         }
 
         void flip_and_wait()
@@ -147,7 +148,7 @@ namespace cds { namespace urcu {
         static void Destruct( bool bDetachAll = false )
         {
             if ( isUsed() ) {
-                instance()->clear_buffer( (uint64_t) -1 );
+                instance()->clear_buffer( std::numeric_limits< uint64_t >::max());
                 if ( bDetachAll )
                     instance()->m_ThreadList.detach_all();
                 delete instance();
index 4699ef2f3b9f710bc44d60d2617b19df07912806..626b18d83caf8cb199758ee1366196f63faa9b4f 100644 (file)
@@ -4,6 +4,7 @@
 #define CDSLIB_URCU_DETAILS_GPT_H
 
 #include <mutex>    //unique_lock
+#include <limits>
 #include <cds/urcu/details/gp.h>
 #include <cds/urcu/dispose_thread.h>
 #include <cds/algo/backoff_strategy.h>
@@ -74,11 +75,11 @@ namespace cds { namespace urcu {
 
     protected:
         //@cond
-        buffer_type                     m_Buffer;
-        atomics::atomic<uint64_t>    m_nCurEpoch;
-        lock_type                       m_Lock;
-        size_t const                    m_nCapacity;
-        disposer_thread                 m_DisposerThread;
+        buffer_type               m_Buffer;
+        atomics::atomic<uint64_t> m_nCurEpoch;
+        lock_type                 m_Lock;
+        size_t const              m_nCapacity;
+        disposer_thread           m_DisposerThread;
         //@endcond
 
     public:
@@ -151,7 +152,7 @@ namespace cds { namespace urcu {
                 if ( bDetachAll )
                     pThis->m_ThreadList.detach_all();
 
-                pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
+                pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
 
                 delete pThis;
                 singleton_ptr::s_pRCU = nullptr;
@@ -176,7 +177,7 @@ namespace cds { namespace urcu {
         template <typename ForwardIterator>
         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
         {
-            uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
+            uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
             while ( itFirst != itLast ) {
                 epoch_retired_ptr ep( *itFirst, nEpoch );
                 ++itFirst;
@@ -188,7 +189,7 @@ namespace cds { namespace urcu {
         template <typename Func>
         void batch_retire( Func e )
         {
-            uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
+            uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
             for ( retired_ptr p{ e() }; p.m_p; ) {
                 epoch_retired_ptr ep( p, nEpoch );
                 p = e();
@@ -208,15 +209,13 @@ namespace cds { namespace urcu {
         {
             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
 
-            atomics::atomic_thread_fence( atomics::memory_order_acquire );
             {
                 std::unique_lock<lock_type> sl( m_Lock );
                 flip_and_wait();
                 flip_and_wait();
-
-                m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
             }
-            atomics::atomic_thread_fence( atomics::memory_order_release );
+
+            m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
         }
         void force_dispose()
         {
index d1c4569c54c7bc12f4732a79e130825866d720fb..f9760f97c9ff76876a08f19225bac870deb2a244 100644 (file)
@@ -7,6 +7,7 @@
 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
 
 #include <mutex>
+#include <limits>
 #include <cds/algo/backoff_strategy.h>
 #include <cds/container/vyukov_mpmc_cycle_queue.h>
 
@@ -97,7 +98,7 @@ namespace cds { namespace urcu {
 
         ~signal_buffered()
         {
-            clear_buffer( (uint64_t) -1 );
+            clear_buffer( std::numeric_limits< uint64_t >::max() );
         }
 
         void clear_buffer( uint64_t nEpoch )
@@ -145,7 +146,7 @@ namespace cds { namespace urcu {
         static void Destruct( bool bDetachAll = false )
         {
             if ( isUsed() ) {
-                instance()->clear_buffer( (uint64_t) -1 );
+                instance()->clear_buffer( std::numeric_limits< uint64_t >::max());
                 if ( bDetachAll )
                     instance()->m_ThreadList.detach_all();
                 delete instance();
index f4031862ee193ad891cd983213e0f5a6e776f9a9..465700b5deb0e2a79dbe7a49c039b7326fb427ab 100644 (file)
@@ -3,10 +3,11 @@
 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
 
-#include <mutex>    //unique_lock
 #include <cds/urcu/details/sh.h>
 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
 
+#include <mutex>    //unique_lock
+#include <limits>
 #include <cds/urcu/dispose_thread.h>
 #include <cds/algo/backoff_strategy.h>
 #include <cds/container/vyukov_mpmc_cycle_queue.h>
@@ -150,7 +151,7 @@ namespace cds { namespace urcu {
                 if ( bDetachAll )
                     pThis->m_ThreadList.detach_all();
 
-                pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
+                pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
 
                 delete pThis;
                 singleton_ptr::s_pRCU = nullptr;
index 553a4d6b0e60f9c60e19b5726d432c7a8285b2bc..1329af6d0f69980610f3f77e84a1669f66d5ccd8 100644 (file)
@@ -72,6 +72,8 @@ namespace cds { namespace urcu {
             uint64_t        nCurEpoch;
             bool            bQuit = false;
 
+            epoch_retired_ptr rest;
+
             while ( !bQuit ) {
                 {
                     unique_lock lock( m_Mutex );
@@ -93,23 +95,30 @@ namespace cds { namespace urcu {
                     m_pBuffer = nullptr;
                 }
 
+                if ( rest.m_p ) {
+                    assert( rest.m_nEpoch < nCurEpoch );
+                    rest.free();
+                }
+
                 if ( pBuffer )
-                    dispose_buffer( pBuffer, nCurEpoch );
+                    rest = dispose_buffer( pBuffer, nCurEpoch );
             }
         }
 
-        void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
+        epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
         {
             epoch_retired_ptr p;
-            while ( pBuf->pop( p ) ) {
+            while ( pBuf->pop( p )) {
                 if ( p.m_nEpoch <= nCurEpoch ) {
                     p.free();
                 }
                 else {
-                    pBuf->push( p );
+                    if ( !pBuf->push( p ))
+                        return p;
                     break;
                 }
             }
+            return epoch_retired_ptr();
         }
         //@endcond