1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
 
14 +
#include <algorithm>
 
15 +
#include <atomic>
14  
#include <condition_variable>
16  
#include <condition_variable>
15  
#include <cstdio>
17  
#include <cstdio>
16  
#include <mutex>
18  
#include <mutex>
17  
#include <thread>
19  
#include <thread>
18  
#include <vector>
20  
#include <vector>
19  

21  

20  
/*
22  
/*
21  
    Thread pool implementation using a shared work queue.
23  
    Thread pool implementation using a shared work queue.
22  

24  

23  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
25  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
24  
    in a single queue protected by a mutex. Worker threads wait on a
26  
    in a single queue protected by a mutex. Worker threads wait on a
25  
    condition_variable until work is available or stop is requested.
27  
    condition_variable until work is available or stop is requested.
26  

28  

27  
    Threads are started lazily on first post() via std::call_once to avoid
29  
    Threads are started lazily on first post() via std::call_once to avoid
28  
    spawning threads for pools that are constructed but never used. Each
30  
    spawning threads for pools that are constructed but never used. Each
29  
    thread is named with a configurable prefix plus index for debugger
31  
    thread is named with a configurable prefix plus index for debugger
30  
    visibility.
32  
    visibility.
31  

33  

32 -
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
34 +
    Work tracking: on_work_started/on_work_finished maintain an atomic
33 -
    then the destructor joins threads and destroys any remaining queued
35 +
    outstanding_work_ counter. join() blocks until this counter reaches
34 -
    work without executing it.
36 +
    zero, then signals workers to stop and joins threads.
 
37 +

 
38 +
    Two shutdown paths:
 
39 +
    - join(): waits for outstanding work to drain, then stops workers.
 
40 +
    - stop(): immediately signals workers to exit; queued work is abandoned.
 
41 +
    - Destructor: stop() then join() (abandon + wait for threads).
35  
*/
42  
*/
36  

43  

37  
namespace boost {
44  
namespace boost {
38  
namespace capy {
45  
namespace capy {
39  

46  

40  
//------------------------------------------------------------------------------
47  
//------------------------------------------------------------------------------
41  

48  

42  
class thread_pool::impl
49  
class thread_pool::impl
43  
{
50  
{
44  
    struct work : detail::intrusive_queue<work>::node
51  
    struct work : detail::intrusive_queue<work>::node
45  
    {
52  
    {
46  
        std::coroutine_handle<> h_;
53  
        std::coroutine_handle<> h_;
47  

54  

48  
        explicit work(std::coroutine_handle<> h) noexcept
55  
        explicit work(std::coroutine_handle<> h) noexcept
49  
            : h_(h)
56  
            : h_(h)
50  
        {
57  
        {
51  
        }
58  
        }
52  

59  

53  
        void run()
60  
        void run()
54  
        {
61  
        {
55  
            auto h = h_;
62  
            auto h = h_;
56  
            delete this;
63  
            delete this;
57  
            h.resume();
64  
            h.resume();
58  
        }
65  
        }
59  

66  

60  
        void destroy()
67  
        void destroy()
61  
        {
68  
        {
 
69 +
            auto h = h_;
62  
            delete this;
70  
            delete this;
 
71 +
            if(h && h != std::noop_coroutine())
 
72 +
                h.destroy();
63  
        }
73  
        }
64  
    };
74  
    };
65  

75  

66  
    std::mutex mutex_;
76  
    std::mutex mutex_;
67  
    std::condition_variable cv_;
77  
    std::condition_variable cv_;
68  
    detail::intrusive_queue<work> q_;
78  
    detail::intrusive_queue<work> q_;
69  
    std::vector<std::thread> threads_;
79  
    std::vector<std::thread> threads_;
 
80 +
    std::atomic<std::size_t> outstanding_work_{0};
70  
    bool stop_{false};
81  
    bool stop_{false};
 
82 +
    bool joined_{false};
71  
    std::size_t num_threads_;
83  
    std::size_t num_threads_;
72  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
84  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
73  
    std::once_flag start_flag_;
85  
    std::once_flag start_flag_;
74  

86  

75  
public:
87  
public:
76  
    ~impl()
88  
    ~impl()
77 -
        stop();
 
78 -
        for(auto& t : threads_)
 
79 -
            if(t.joinable())
 
80 -
                t.join();
 
81 -

 
82  
    {
89  
    {
83  
        while(auto* w = q_.pop())
90  
        while(auto* w = q_.pop())
84  
            w->destroy();
91  
            w->destroy();
85  
    }
92  
    }
86  

93  

87  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
94  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
88  
        : num_threads_(num_threads)
95  
        : num_threads_(num_threads)
89  
    {
96  
    {
90  
        if(num_threads_ == 0)
97  
        if(num_threads_ == 0)
91 -
            num_threads_ = std::thread::hardware_concurrency();
98 +
            num_threads_ = std::max(
92 -
        if(num_threads_ == 0)
99 +
                std::thread::hardware_concurrency(), 1u);
93 -
            num_threads_ = 1;
 
94  

100  

95  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
101  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
102  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97  
        thread_name_prefix_[n] = '\0';
103  
        thread_name_prefix_[n] = '\0';
98  
    }
104  
    }
99  

105  

100  
    void
106  
    void
101  
    post(std::coroutine_handle<> h)
107  
    post(std::coroutine_handle<> h)
102  
    {
108  
    {
103  
        ensure_started();
109  
        ensure_started();
104  
        auto* w = new work(h);
110  
        auto* w = new work(h);
105  
        {
111  
        {
106  
            std::lock_guard<std::mutex> lock(mutex_);
112  
            std::lock_guard<std::mutex> lock(mutex_);
107  
            q_.push(w);
113  
            q_.push(w);
108  
        }
114  
        }
109  
        cv_.notify_one();
115  
        cv_.notify_one();
110  
    }
116  
    }
111  

117  

112  
    void
118  
    void
 
119 +
    on_work_started() noexcept
 
120 +
    {
 
121 +
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
 
122 +
    }
 
123 +

 
124 +
    void
 
125 +
    on_work_finished() noexcept
 
126 +
    {
 
127 +
        if(outstanding_work_.fetch_sub(
 
128 +
            1, std::memory_order_acq_rel) == 1)
 
129 +
        {
 
130 +
            std::lock_guard<std::mutex> lock(mutex_);
 
131 +
            if(joined_ && !stop_)
 
132 +
                stop_ = true;
 
133 +
            cv_.notify_all();
 
134 +
        }
 
135 +
    }
 
136 +

 
137 +
    void
113  
    join() noexcept
138  
    join() noexcept
114  
    {
139  
    {
115 -
        stop();
140 +
        {
 
141 +
            std::unique_lock<std::mutex> lock(mutex_);
 
142 +
            if(joined_)
 
143 +
                return;
 
144 +
            joined_ = true;
 
145 +

 
146 +
            if(outstanding_work_.load(
 
147 +
                std::memory_order_acquire) == 0)
 
148 +
            {
 
149 +
                stop_ = true;
 
150 +
                cv_.notify_all();
 
151 +
            }
 
152 +
            else
 
153 +
            {
 
154 +
                cv_.wait(lock, [this]{
 
155 +
                    return stop_;
 
156 +
                });
 
157 +
            }
 
158 +
        }
 
159 +

116  
        for(auto& t : threads_)
160  
        for(auto& t : threads_)
117  
            if(t.joinable())
161  
            if(t.joinable())
118  
                t.join();
162  
                t.join();
119  
    }
163  
    }
120  

164  

121  
    void
165  
    void
122  
    stop() noexcept
166  
    stop() noexcept
123  
    {
167  
    {
124  
        {
168  
        {
125  
            std::lock_guard<std::mutex> lock(mutex_);
169  
            std::lock_guard<std::mutex> lock(mutex_);
126  
            stop_ = true;
170  
            stop_ = true;
127  
        }
171  
        }
128  
        cv_.notify_all();
172  
        cv_.notify_all();
129  
    }
173  
    }
130  

174  

131  
private:
175  
private:
132  
    void
176  
    void
133  
    ensure_started()
177  
    ensure_started()
134  
    {
178  
    {
135  
        std::call_once(start_flag_, [this]{
179  
        std::call_once(start_flag_, [this]{
136  
            threads_.reserve(num_threads_);
180  
            threads_.reserve(num_threads_);
137  
            for(std::size_t i = 0; i < num_threads_; ++i)
181  
            for(std::size_t i = 0; i < num_threads_; ++i)
138  
                threads_.emplace_back([this, i]{ run(i); });
182  
                threads_.emplace_back([this, i]{ run(i); });
139  
        });
183  
        });
140  
    }
184  
    }
141  

185  

142  
    void
186  
    void
143  
    run(std::size_t index)
187  
    run(std::size_t index)
144  
    {
188  
    {
145  
        // Build name; set_current_thread_name truncates to platform limits.
189  
        // Build name; set_current_thread_name truncates to platform limits.
146  
        char name[16];
190  
        char name[16];
147  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
191  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
148  
        set_current_thread_name(name);
192  
        set_current_thread_name(name);
149  

193  

150  
        for(;;)
194  
        for(;;)
151  
        {
195  
        {
152  
            work* w = nullptr;
196  
            work* w = nullptr;
153  
            {
197  
            {
154  
                std::unique_lock<std::mutex> lock(mutex_);
198  
                std::unique_lock<std::mutex> lock(mutex_);
155  
                cv_.wait(lock, [this]{
199  
                cv_.wait(lock, [this]{
156  
                    return !q_.empty() ||
200  
                    return !q_.empty() ||
157  
                        stop_;
201  
                        stop_;
158  
                });
202  
                });
159 -
                if(stop_ && q_.empty())
203 +
                if(stop_)
160  
                    return;
204  
                    return;
161  
                w = q_.pop();
205  
                w = q_.pop();
162  
            }
206  
            }
163  
            if(w)
207  
            if(w)
164  
                w->run();
208  
                w->run();
165  
        }
209  
        }
166  
    }
210  
    }
167  
};
211  
};
168  

212  

169  
//------------------------------------------------------------------------------
213  
//------------------------------------------------------------------------------
170  

214  

171  
thread_pool::
215  
thread_pool::
172  
~thread_pool()
216  
~thread_pool()
173  
{
217  
{
 
218 +
    impl_->stop();
174  
    impl_->join();
219  
    impl_->join();
175  
    shutdown();
220  
    shutdown();
176  
    destroy();
221  
    destroy();
177  
    delete impl_;
222  
    delete impl_;
178  
}
223  
}
179  

224  

180  
thread_pool::
225  
thread_pool::
181  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
226  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
182  
    : impl_(new impl(num_threads, thread_name_prefix))
227  
    : impl_(new impl(num_threads, thread_name_prefix))
183  
{
228  
{
184  
    this->set_frame_allocator(std::allocator<void>{});
229  
    this->set_frame_allocator(std::allocator<void>{});
185  
}
230  
}
186  

231  

187  
void
232  
void
188  
thread_pool::
233  
thread_pool::
 
234 +
join() noexcept
 
235 +
{
 
236 +
    impl_->join();
 
237 +
}
 
238 +

 
239 +
void
 
240 +
thread_pool::
189  
stop() noexcept
241  
stop() noexcept
190  
{
242  
{
191  
    impl_->stop();
243  
    impl_->stop();
192  
}
244  
}
193  

245  

194  
//------------------------------------------------------------------------------
246  
//------------------------------------------------------------------------------
195  

247  

196  
thread_pool::executor_type
248  
thread_pool::executor_type
197  
thread_pool::
249  
thread_pool::
198  
get_executor() const noexcept
250  
get_executor() const noexcept
199  
{
251  
{
200  
    return executor_type(
252  
    return executor_type(
201  
        const_cast<thread_pool&>(*this));
253  
        const_cast<thread_pool&>(*this));
 
254 +
}
 
255 +

 
256 +
void
 
257 +
thread_pool::executor_type::
 
258 +
on_work_started() const noexcept
 
259 +
{
 
260 +
    pool_->impl_->on_work_started();
 
261 +
}
 
262 +

 
263 +
void
 
264 +
thread_pool::executor_type::
 
265 +
on_work_finished() const noexcept
 
266 +
{
 
267 +
    pool_->impl_->on_work_finished();
202  
}
268  
}
203  

269  

204  
void
270  
void
205  
thread_pool::executor_type::
271  
thread_pool::executor_type::
206  
post(std::coroutine_handle<> h) const
272  
post(std::coroutine_handle<> h) const
207  
{
273  
{
208  
    pool_->impl_->post(h);
274  
    pool_->impl_->post(h);
209  
}
275  
}
210  

276  

211  
} // capy
277  
} // capy
212  
} // boost
278  
} // boost