LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 119 119
Test Date: 2026-03-06 20:40:21 Functions: 100.0 % 24 24

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/detail/intrusive.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <algorithm>
      15                 : #include <atomic>
      16                 : #include <condition_variable>
      17                 : #include <cstdio>
      18                 : #include <mutex>
      19                 : #include <thread>
      20                 : #include <vector>
      21                 : 
      22                 : /*
      23                 :     Thread pool implementation using a shared work queue.
      24                 : 
      25                 :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      26                 :     in a single queue protected by a mutex. Worker threads wait on a
      27                 :     condition_variable until work is available or stop is requested.
      28                 : 
      29                 :     Threads are started lazily on first post() via std::call_once to avoid
      30                 :     spawning threads for pools that are constructed but never used. Each
      31                 :     thread is named with a configurable prefix plus index for debugger
      32                 :     visibility.
      33                 : 
      34                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      35                 :     outstanding_work_ counter. join() blocks until this counter reaches
      36                 :     zero, then signals workers to stop and joins threads.
      37                 : 
      38                 :     Two shutdown paths:
      39                 :     - join(): waits for outstanding work to drain, then stops workers.
      40                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      41                 :     - Destructor: stop() then join() (abandon + wait for threads).
      42                 : */
      43                 : 
      44                 : namespace boost {
      45                 : namespace capy {
      46                 : 
      47                 : //------------------------------------------------------------------------------
      48                 : 
      49                 : class thread_pool::impl
      50                 : {
      51                 :     struct work : detail::intrusive_queue<work>::node
      52                 :     {
      53                 :         std::coroutine_handle<> h_;
      54                 : 
      55 HIT         734 :         explicit work(std::coroutine_handle<> h) noexcept
      56             734 :             : h_(h)
      57                 :         {
      58             734 :         }
      59                 : 
      60             555 :         void run()
      61                 :         {
      62             555 :             auto h = h_;
      63             555 :             delete this;
      64             555 :             h.resume();
      65             555 :         }
      66                 : 
      67             179 :         void destroy()
      68                 :         {
      69             179 :             auto h = h_;
      70             179 :             delete this;
      71             179 :             if(h && h != std::noop_coroutine())
      72             127 :                 h.destroy();
      73             179 :         }
      74                 :     };
      75                 : 
      76                 :     std::mutex mutex_;
      77                 :     std::condition_variable cv_;
      78                 :     detail::intrusive_queue<work> q_;
      79                 :     std::vector<std::thread> threads_;
      80                 :     std::atomic<std::size_t> outstanding_work_{0};
      81                 :     bool stop_{false};
      82                 :     bool joined_{false};
      83                 :     std::size_t num_threads_;
      84                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      85                 :     std::once_flag start_flag_;
      86                 : 
      87                 : public:
      88             124 :     ~impl()
      89                 :     {
      90             303 :         while(auto* w = q_.pop())
      91             179 :             w->destroy();
      92             124 :     }
      93                 : 
      94             124 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      95             124 :         : num_threads_(num_threads)
      96                 :     {
      97             124 :         if(num_threads_ == 0)
      98               4 :             num_threads_ = std::max(
      99               2 :                 std::thread::hardware_concurrency(), 1u);
     100                 : 
     101                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     102             124 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     103             124 :         thread_name_prefix_[n] = '\0';
     104             124 :     }
     105                 : 
     106                 :     void
     107             734 :     post(std::coroutine_handle<> h)
     108                 :     {
     109             734 :         ensure_started();
     110             734 :         auto* w = new work(h);
     111                 :         {
     112             734 :             std::lock_guard<std::mutex> lock(mutex_);
     113             734 :             q_.push(w);
     114             734 :         }
     115             734 :         cv_.notify_one();
     116             734 :     }
     117                 : 
     118                 :     void
     119             314 :     on_work_started() noexcept
     120                 :     {
     121             314 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     122             314 :     }
     123                 : 
     124                 :     void
     125             314 :     on_work_finished() noexcept
     126                 :     {
     127             314 :         if(outstanding_work_.fetch_sub(
     128             314 :             1, std::memory_order_acq_rel) == 1)
     129                 :         {
     130              63 :             std::lock_guard<std::mutex> lock(mutex_);
     131              63 :             if(joined_ && !stop_)
     132               4 :                 stop_ = true;
     133              63 :             cv_.notify_all();
     134              63 :         }
     135             314 :     }
     136                 : 
     137                 :     void
     138             134 :     join() noexcept
     139                 :     {
     140                 :         {
     141             134 :             std::unique_lock<std::mutex> lock(mutex_);
     142             134 :             if(joined_)
     143              10 :                 return;
     144             124 :             joined_ = true;
     145                 : 
     146             124 :             if(outstanding_work_.load(
     147             124 :                 std::memory_order_acquire) == 0)
     148                 :             {
     149              70 :                 stop_ = true;
     150              70 :                 cv_.notify_all();
     151                 :             }
     152                 :             else
     153                 :             {
     154              54 :                 cv_.wait(lock, [this]{
     155              59 :                     return stop_;
     156                 :                 });
     157                 :             }
     158             134 :         }
     159                 : 
     160             273 :         for(auto& t : threads_)
     161             149 :             if(t.joinable())
     162             149 :                 t.join();
     163                 :     }
     164                 : 
     165                 :     void
     166             126 :     stop() noexcept
     167                 :     {
     168                 :         {
     169             126 :             std::lock_guard<std::mutex> lock(mutex_);
     170             126 :             stop_ = true;
     171             126 :         }
     172             126 :         cv_.notify_all();
     173             126 :     }
     174                 : 
     175                 : private:
     176                 :     void
     177             734 :     ensure_started()
     178                 :     {
     179             734 :         std::call_once(start_flag_, [this]{
     180              79 :             threads_.reserve(num_threads_);
     181             228 :             for(std::size_t i = 0; i < num_threads_; ++i)
     182             298 :                 threads_.emplace_back([this, i]{ run(i); });
     183              79 :         });
     184             734 :     }
     185                 : 
     186                 :     void
     187             149 :     run(std::size_t index)
     188                 :     {
     189                 :         // Build name; set_current_thread_name truncates to platform limits.
     190                 :         char name[16];
     191             149 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     192             149 :         set_current_thread_name(name);
     193                 : 
     194                 :         for(;;)
     195                 :         {
     196             704 :             work* w = nullptr;
     197                 :             {
     198             704 :                 std::unique_lock<std::mutex> lock(mutex_);
     199             704 :                 cv_.wait(lock, [this]{
     200            1149 :                     return !q_.empty() ||
     201            1149 :                         stop_;
     202                 :                 });
     203             704 :                 if(stop_)
     204             298 :                     return;
     205             555 :                 w = q_.pop();
     206             704 :             }
     207             555 :             if(w)
     208             555 :                 w->run();
     209             555 :         }
     210                 :     }
     211                 : };
     212                 : 
     213                 : //------------------------------------------------------------------------------
     214                 : 
     215             124 : thread_pool::
     216                 : ~thread_pool()
     217                 : {
     218             124 :     impl_->stop();
     219             124 :     impl_->join();
     220             124 :     shutdown();
     221             124 :     destroy();
     222             124 :     delete impl_;
     223             124 : }
     224                 : 
     225             124 : thread_pool::
     226             124 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     227             124 :     : impl_(new impl(num_threads, thread_name_prefix))
     228                 : {
     229             124 :     this->set_frame_allocator(std::allocator<void>{});
     230             124 : }
     231                 : 
     232                 : void
     233              10 : thread_pool::
     234                 : join() noexcept
     235                 : {
     236              10 :     impl_->join();
     237              10 : }
     238                 : 
     239                 : void
     240               2 : thread_pool::
     241                 : stop() noexcept
     242                 : {
     243               2 :     impl_->stop();
     244               2 : }
     245                 : 
     246                 : //------------------------------------------------------------------------------
     247                 : 
     248                 : thread_pool::executor_type
     249             120 : thread_pool::
     250                 : get_executor() const noexcept
     251                 : {
     252             120 :     return executor_type(
     253             120 :         const_cast<thread_pool&>(*this));
     254                 : }
     255                 : 
     256                 : void
     257             314 : thread_pool::executor_type::
     258                 : on_work_started() const noexcept
     259                 : {
     260             314 :     pool_->impl_->on_work_started();
     261             314 : }
     262                 : 
     263                 : void
     264             314 : thread_pool::executor_type::
     265                 : on_work_finished() const noexcept
     266                 : {
     267             314 :     pool_->impl_->on_work_finished();
     268             314 : }
     269                 : 
     270                 : void
     271             734 : thread_pool::executor_type::
     272                 : post(std::coroutine_handle<> h) const
     273                 : {
     274             734 :     pool_->impl_->post(h);
     275             734 : }
     276                 : 
     277                 : } // capy
     278                 : } // boost
        

Generated by: LCOV version 2.3