src/ex/thread_pool.cpp

100.0% Lines (119/119) 100.0% Functions (24/24)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are coroutine handles wrapped in intrusive list nodes, stored
26 in a single queue protected by a mutex. Worker threads wait on a
27 condition_variable until work is available or stop is requested.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Work tracking: on_work_started/on_work_finished maintain an atomic
35 outstanding_work_ counter. join() blocks until this counter reaches
36 zero, then signals workers to stop and joins threads.
37
38 Two shutdown paths:
39 - join(): waits for outstanding work to drain, then stops workers.
40 - stop(): immediately signals workers to exit; queued work is abandoned.
41 - Destructor: stop() then join() (abandon + wait for threads).
42 */
43
44 namespace boost {
45 namespace capy {
46
47 //------------------------------------------------------------------------------
48
49 class thread_pool::impl
50 {
51 struct work : detail::intrusive_queue<work>::node
52 {
53 std::coroutine_handle<> h_;
54
55 734x explicit work(std::coroutine_handle<> h) noexcept
56 734x : h_(h)
57 {
58 734x }
59
60 555x void run()
61 {
62 555x auto h = h_;
63 555x delete this;
64 555x h.resume();
65 555x }
66
67 179x void destroy()
68 {
69 179x auto h = h_;
70 179x delete this;
71 179x if(h && h != std::noop_coroutine())
72 127x h.destroy();
73 179x }
74 };
75
76 std::mutex mutex_;
77 std::condition_variable cv_;
78 detail::intrusive_queue<work> q_;
79 std::vector<std::thread> threads_;
80 std::atomic<std::size_t> outstanding_work_{0};
81 bool stop_{false};
82 bool joined_{false};
83 std::size_t num_threads_;
84 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 std::once_flag start_flag_;
86
87 public:
88 124x ~impl()
89 {
90 303x while(auto* w = q_.pop())
91 179x w->destroy();
92 124x }
93
94 124x impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 124x : num_threads_(num_threads)
96 {
97 124x if(num_threads_ == 0)
98 4x num_threads_ = std::max(
99 2x std::thread::hardware_concurrency(), 1u);
100
101 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102 124x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 124x thread_name_prefix_[n] = '\0';
104 124x }
105
106 void
107 734x post(std::coroutine_handle<> h)
108 {
109 734x ensure_started();
110 734x auto* w = new work(h);
111 {
112 734x std::lock_guard<std::mutex> lock(mutex_);
113 734x q_.push(w);
114 734x }
115 734x cv_.notify_one();
116 734x }
117
118 void
119 314x on_work_started() noexcept
120 {
121 314x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 314x }
123
124 void
125 314x on_work_finished() noexcept
126 {
127 314x if(outstanding_work_.fetch_sub(
128 314x 1, std::memory_order_acq_rel) == 1)
129 {
130 63x std::lock_guard<std::mutex> lock(mutex_);
131 63x if(joined_ && !stop_)
132 4x stop_ = true;
133 63x cv_.notify_all();
134 63x }
135 314x }
136
137 void
138 134x join() noexcept
139 {
140 {
141 134x std::unique_lock<std::mutex> lock(mutex_);
142 134x if(joined_)
143 10x return;
144 124x joined_ = true;
145
146 124x if(outstanding_work_.load(
147 124x std::memory_order_acquire) == 0)
148 {
149 70x stop_ = true;
150 70x cv_.notify_all();
151 }
152 else
153 {
154 54x cv_.wait(lock, [this]{
155 59x return stop_;
156 });
157 }
158 134x }
159
160 273x for(auto& t : threads_)
161 149x if(t.joinable())
162 149x t.join();
163 }
164
165 void
166 126x stop() noexcept
167 {
168 {
169 126x std::lock_guard<std::mutex> lock(mutex_);
170 126x stop_ = true;
171 126x }
172 126x cv_.notify_all();
173 126x }
174
175 private:
176 void
177 734x ensure_started()
178 {
179 734x std::call_once(start_flag_, [this]{
180 79x threads_.reserve(num_threads_);
181 228x for(std::size_t i = 0; i < num_threads_; ++i)
182 298x threads_.emplace_back([this, i]{ run(i); });
183 79x });
184 734x }
185
186 void
187 149x run(std::size_t index)
188 {
189 // Build name; set_current_thread_name truncates to platform limits.
190 char name[16];
191 149x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 149x set_current_thread_name(name);
193
194 for(;;)
195 {
196 704x work* w = nullptr;
197 {
198 704x std::unique_lock<std::mutex> lock(mutex_);
199 704x cv_.wait(lock, [this]{
200 1149x return !q_.empty() ||
201 1149x stop_;
202 });
203 704x if(stop_)
204 298x return;
205 555x w = q_.pop();
206 704x }
207 555x if(w)
208 555x w->run();
209 555x }
210 }
211 };
212
213 //------------------------------------------------------------------------------
214
215 124x thread_pool::
216 ~thread_pool()
217 {
218 124x impl_->stop();
219 124x impl_->join();
220 124x shutdown();
221 124x destroy();
222 124x delete impl_;
223 124x }
224
225 124x thread_pool::
226 124x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227 124x : impl_(new impl(num_threads, thread_name_prefix))
228 {
229 124x this->set_frame_allocator(std::allocator<void>{});
230 124x }
231
232 void
233 10x thread_pool::
234 join() noexcept
235 {
236 10x impl_->join();
237 10x }
238
239 void
240 2x thread_pool::
241 stop() noexcept
242 {
243 2x impl_->stop();
244 2x }
245
246 //------------------------------------------------------------------------------
247
248 thread_pool::executor_type
249 120x thread_pool::
250 get_executor() const noexcept
251 {
252 120x return executor_type(
253 120x const_cast<thread_pool&>(*this));
254 }
255
256 void
257 314x thread_pool::executor_type::
258 on_work_started() const noexcept
259 {
260 314x pool_->impl_->on_work_started();
261 314x }
262
263 void
264 314x thread_pool::executor_type::
265 on_work_finished() const noexcept
266 {
267 314x pool_->impl_->on_work_finished();
268 314x }
269
270 void
271 734x thread_pool::executor_type::
272 post(std::coroutine_handle<> h) const
273 {
274 734x pool_->impl_->post(h);
275 734x }
276
277 } // capy
278 } // boost
279