TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/detail/intrusive.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <algorithm>
15 : #include <atomic>
16 : #include <condition_variable>
17 : #include <cstdio>
18 : #include <mutex>
19 : #include <thread>
20 : #include <vector>
21 :
22 : /*
23 : Thread pool implementation using a shared work queue.
24 :
25 : Work items are coroutine handles wrapped in intrusive list nodes, stored
26 : in a single queue protected by a mutex. Worker threads wait on a
27 : condition_variable until work is available or stop is requested.
28 :
29 : Threads are started lazily on first post() via std::call_once to avoid
30 : spawning threads for pools that are constructed but never used. Each
31 : thread is named with a configurable prefix plus index for debugger
32 : visibility.
33 :
34 : Work tracking: on_work_started/on_work_finished maintain an atomic
35 : outstanding_work_ counter. join() blocks until this counter reaches
36 : zero, then signals workers to stop and joins threads.
37 :
38 : Two shutdown paths:
39 : - join(): waits for outstanding work to drain, then stops workers.
40 : - stop(): immediately signals workers to exit; queued work is abandoned.
41 : - Destructor: stop() then join() (abandon + wait for threads).
42 : */
43 :
44 : namespace boost {
45 : namespace capy {
46 :
47 : //------------------------------------------------------------------------------
48 :
49 : class thread_pool::impl
50 : {
51 : struct work : detail::intrusive_queue<work>::node
52 : {
53 : std::coroutine_handle<> h_;
54 :
55 HIT 734 : explicit work(std::coroutine_handle<> h) noexcept
56 734 : : h_(h)
57 : {
58 734 : }
59 :
60 555 : void run()
61 : {
62 555 : auto h = h_;
63 555 : delete this;
64 555 : h.resume();
65 555 : }
66 :
67 179 : void destroy()
68 : {
69 179 : auto h = h_;
70 179 : delete this;
71 179 : if(h && h != std::noop_coroutine())
72 127 : h.destroy();
73 179 : }
74 : };
75 :
76 : std::mutex mutex_;
77 : std::condition_variable cv_;
78 : detail::intrusive_queue<work> q_;
79 : std::vector<std::thread> threads_;
80 : std::atomic<std::size_t> outstanding_work_{0};
81 : bool stop_{false};
82 : bool joined_{false};
83 : std::size_t num_threads_;
84 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 : std::once_flag start_flag_;
86 :
87 : public:
88 124 : ~impl()
89 : {
90 303 : while(auto* w = q_.pop())
91 179 : w->destroy();
92 124 : }
93 :
94 124 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 124 : : num_threads_(num_threads)
96 : {
97 124 : if(num_threads_ == 0)
98 4 : num_threads_ = std::max(
99 2 : std::thread::hardware_concurrency(), 1u);
100 :
101 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102 124 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 124 : thread_name_prefix_[n] = '\0';
104 124 : }
105 :
106 : void
107 734 : post(std::coroutine_handle<> h)
108 : {
109 734 : ensure_started();
110 734 : auto* w = new work(h);
111 : {
112 734 : std::lock_guard<std::mutex> lock(mutex_);
113 734 : q_.push(w);
114 734 : }
115 734 : cv_.notify_one();
116 734 : }
117 :
118 : void
119 314 : on_work_started() noexcept
120 : {
121 314 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 314 : }
123 :
124 : void
125 314 : on_work_finished() noexcept
126 : {
127 314 : if(outstanding_work_.fetch_sub(
128 314 : 1, std::memory_order_acq_rel) == 1)
129 : {
130 63 : std::lock_guard<std::mutex> lock(mutex_);
131 63 : if(joined_ && !stop_)
132 4 : stop_ = true;
133 63 : cv_.notify_all();
134 63 : }
135 314 : }
136 :
137 : void
138 134 : join() noexcept
139 : {
140 : {
141 134 : std::unique_lock<std::mutex> lock(mutex_);
142 134 : if(joined_)
143 10 : return;
144 124 : joined_ = true;
145 :
146 124 : if(outstanding_work_.load(
147 124 : std::memory_order_acquire) == 0)
148 : {
149 70 : stop_ = true;
150 70 : cv_.notify_all();
151 : }
152 : else
153 : {
154 54 : cv_.wait(lock, [this]{
155 59 : return stop_;
156 : });
157 : }
158 134 : }
159 :
160 273 : for(auto& t : threads_)
161 149 : if(t.joinable())
162 149 : t.join();
163 : }
164 :
165 : void
166 126 : stop() noexcept
167 : {
168 : {
169 126 : std::lock_guard<std::mutex> lock(mutex_);
170 126 : stop_ = true;
171 126 : }
172 126 : cv_.notify_all();
173 126 : }
174 :
175 : private:
176 : void
177 734 : ensure_started()
178 : {
179 734 : std::call_once(start_flag_, [this]{
180 79 : threads_.reserve(num_threads_);
181 228 : for(std::size_t i = 0; i < num_threads_; ++i)
182 298 : threads_.emplace_back([this, i]{ run(i); });
183 79 : });
184 734 : }
185 :
186 : void
187 149 : run(std::size_t index)
188 : {
189 : // Build name; set_current_thread_name truncates to platform limits.
190 : char name[16];
191 149 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 149 : set_current_thread_name(name);
193 :
194 : for(;;)
195 : {
196 704 : work* w = nullptr;
197 : {
198 704 : std::unique_lock<std::mutex> lock(mutex_);
199 704 : cv_.wait(lock, [this]{
200 1149 : return !q_.empty() ||
201 1149 : stop_;
202 : });
203 704 : if(stop_)
204 298 : return;
205 555 : w = q_.pop();
206 704 : }
207 555 : if(w)
208 555 : w->run();
209 555 : }
210 : }
211 : };
212 :
213 : //------------------------------------------------------------------------------
214 :
215 124 : thread_pool::
216 : ~thread_pool()
217 : {
218 124 : impl_->stop();
219 124 : impl_->join();
220 124 : shutdown();
221 124 : destroy();
222 124 : delete impl_;
223 124 : }
224 :
225 124 : thread_pool::
226 124 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227 124 : : impl_(new impl(num_threads, thread_name_prefix))
228 : {
229 124 : this->set_frame_allocator(std::allocator<void>{});
230 124 : }
231 :
232 : void
233 10 : thread_pool::
234 : join() noexcept
235 : {
236 10 : impl_->join();
237 10 : }
238 :
239 : void
240 2 : thread_pool::
241 : stop() noexcept
242 : {
243 2 : impl_->stop();
244 2 : }
245 :
246 : //------------------------------------------------------------------------------
247 :
248 : thread_pool::executor_type
249 120 : thread_pool::
250 : get_executor() const noexcept
251 : {
252 120 : return executor_type(
253 120 : const_cast<thread_pool&>(*this));
254 : }
255 :
256 : void
257 314 : thread_pool::executor_type::
258 : on_work_started() const noexcept
259 : {
260 314 : pool_->impl_->on_work_started();
261 314 : }
262 :
263 : void
264 314 : thread_pool::executor_type::
265 : on_work_finished() const noexcept
266 : {
267 314 : pool_->impl_->on_work_finished();
268 314 : }
269 :
270 : void
271 734 : thread_pool::executor_type::
272 : post(std::coroutine_handle<> h) const
273 : {
274 734 : pool_->impl_->post(h);
275 734 : }
276 :
277 : } // capy
278 : } // boost
|