TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12 : #define BOOST_CAPY_EX_THREAD_POOL_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <coroutine>
16 : #include <boost/capy/ex/execution_context.hpp>
17 : #include <cstddef>
18 : #include <string_view>
19 :
20 : namespace boost {
21 : namespace capy {
22 :
23 : /** A pool of threads for executing work concurrently.
24 :
25 : Use this when you need to run coroutines on multiple threads
26 : without the overhead of creating and destroying threads for
27 : each task. Work items are distributed across the pool using
28 : a shared queue.
29 :
30 : @par Thread Safety
31 : Distinct objects: Safe.
32 : Shared objects: Unsafe.
33 :
34 : @par Example
35 : @code
36 : thread_pool pool(4); // 4 worker threads
37 : auto ex = pool.get_executor();
38 : ex.post(some_coroutine);
39 : // pool destructor waits for all work to complete
40 : @endcode
41 : */
42 : class BOOST_CAPY_DECL
43 : thread_pool
44 : : public execution_context
45 : {
46 : class impl;
47 : impl* impl_;
48 :
49 : public:
50 : class executor_type;
51 :
52 : /** Destroy the thread pool.
53 :
54 : Signals all worker threads to stop, waits for them to
55 : finish, and destroys any pending work items.
56 : */
57 : ~thread_pool();
58 :
59 : /** Construct a thread pool.
60 :
61 : Creates a pool with the specified number of worker threads.
62 : If `num_threads` is zero, the number of threads is set to
63 : the hardware concurrency, or one if that cannot be determined.
64 :
65 : @param num_threads The number of worker threads, or zero
66 : for automatic selection.
67 :
68 : @param thread_name_prefix The prefix for worker thread names.
69 : Thread names appear as "{prefix}0", "{prefix}1", etc.
70 : The prefix is truncated to 12 characters. Defaults to
71 : "capy-pool-".
72 : */
73 : explicit
74 : thread_pool(
75 : std::size_t num_threads = 0,
76 : std::string_view thread_name_prefix = "capy-pool-");
77 :
78 : thread_pool(thread_pool const&) = delete;
79 : thread_pool& operator=(thread_pool const&) = delete;
80 :
81 : /** Wait for all outstanding work to complete.
82 :
83 : Releases the internal work guard, then blocks the calling
84 : thread until all outstanding work tracked by
85 : @ref executor_type::on_work_started and
86 : @ref executor_type::on_work_finished completes. After all
87 : work finishes, joins the worker threads.
88 :
89 : If @ref stop is called while `join()` is blocking, the
90 : pool stops without waiting for remaining work to
91 : complete. Worker threads finish their current item and
92 : exit; `join()` still waits for all threads to be joined
93 : before returning.
94 :
95 : This function is idempotent. The first call performs the
96 : join; subsequent calls return immediately.
97 :
98 : @par Preconditions
99 : Must not be called from a thread in this pool (undefined
100 : behavior).
101 :
102 : @par Postconditions
103 : All worker threads have been joined. The pool cannot be
104 : reused.
105 :
106 : @par Thread Safety
107 : May be called from any thread not in this pool.
108 : */
109 : void
110 : join() noexcept;
111 :
112 : /** Request all worker threads to stop.
113 :
114 : Signals all threads to exit after finishing their current
115 : work item. Queued work that has not started is abandoned.
116 : Does not wait for threads to exit.
117 :
118 : If @ref join is blocking on another thread, calling
119 : `stop()` causes it to stop waiting for outstanding
120 : work. The `join()` call still waits for worker threads
121 : to finish their current item and exit before returning.
122 : */
123 : void
124 : stop() noexcept;
125 :
126 : /** Return an executor for this thread pool.
127 :
128 : @return An executor associated with this thread pool.
129 : */
130 : executor_type
131 : get_executor() const noexcept;
132 : };
133 :
134 : //------------------------------------------------------------------------------
135 :
136 : /** An executor that submits work to a thread_pool.
137 :
138 : Executors are lightweight handles that can be copied and stored.
139 : All copies refer to the same underlying thread pool.
140 :
141 : @par Thread Safety
142 : Distinct objects: Safe.
143 : Shared objects: Safe.
144 : */
145 : class thread_pool::executor_type
146 : {
147 : friend class thread_pool;
148 :
149 : thread_pool* pool_ = nullptr;
150 :
151 : explicit
152 HIT 120 : executor_type(thread_pool& pool) noexcept
153 120 : : pool_(&pool)
154 : {
155 120 : }
156 :
157 : public:
158 : /// Default construct a null executor.
159 : executor_type() = default;
160 :
161 : /// Return the underlying thread pool.
162 : thread_pool&
163 337 : context() const noexcept
164 : {
165 337 : return *pool_;
166 : }
167 :
168 : /** Notify that work has started.
169 :
170 : Increments the outstanding work count. Must be paired
171 : with a subsequent call to @ref on_work_finished.
172 :
173 : @see on_work_finished, work_guard
174 : */
175 : BOOST_CAPY_DECL
176 : void
177 : on_work_started() const noexcept;
178 :
179 : /** Notify that work has finished.
180 :
181 : Decrements the outstanding work count. When the count
182 : reaches zero after @ref thread_pool::join has been called,
183 : the pool's worker threads are signaled to stop.
184 :
185 : @pre A preceding call to @ref on_work_started was made.
186 :
187 : @see on_work_started, work_guard
188 : */
189 : BOOST_CAPY_DECL
190 : void
191 : on_work_finished() const noexcept;
192 :
193 : /** Dispatch a coroutine for execution.
194 :
195 : Posts the coroutine to the thread pool for execution on a
196 : worker thread and returns `std::noop_coroutine()`. Thread
197 : pools never execute inline because no single thread "owns"
198 : the pool.
199 :
200 : @param h The coroutine handle to execute.
201 :
202 : @return `std::noop_coroutine()` always.
203 : */
204 : std::coroutine_handle<>
205 310 : dispatch(std::coroutine_handle<> h) const
206 : {
207 310 : post(h);
208 310 : return std::noop_coroutine();
209 : }
210 :
211 : /** Post a coroutine to the thread pool.
212 :
213 : The coroutine will be resumed on one of the pool's
214 : worker threads.
215 :
216 : @param h The coroutine handle to execute.
217 : */
218 : BOOST_CAPY_DECL
219 : void
220 : post(std::coroutine_handle<> h) const;
221 :
222 : /// Return true if two executors refer to the same thread pool.
223 : bool
224 13 : operator==(executor_type const& other) const noexcept
225 : {
226 13 : return pool_ == other.pool_;
227 : }
228 : };
229 :
230 : } // capy
231 : } // boost
232 :
233 : #endif
|