1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/error.hpp>
16  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  

19  

20  
#include <stop_token>
20  
#include <stop_token>
21  

21  

22  
#include <atomic>
22  
#include <atomic>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <new>
24  
#include <new>
25  
#include <utility>
25  
#include <utility>
26  

26  

27  
/*  async_event implementation notes
27  
/*  async_event implementation notes
28  
    =================================
28  
    =================================
29  

29  

30  
    Same cancellation pattern as async_mutex (see that file for the
30  
    Same cancellation pattern as async_mutex (see that file for the
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    and threading assumptions).
32  
    and threading assumptions).
33  

33  

34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    It pops every waiter from the list and posts the ones it
35  
    It pops every waiter from the list and posts the ones it
36  
    claims. Waiters already claimed by a stop callback are skipped.
36  
    claims. Waiters already claimed by a stop callback are skipped.
37  

37  

38  
    Because set() pops all waiters, a canceled waiter may have been
38  
    Because set() pops all waiters, a canceled waiter may have been
39  
    removed from the list by set() before its await_resume runs.
39  
    removed from the list by set() before its await_resume runs.
40  
    This requires a separate in_list_ flag (unlike async_mutex where
40  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    active_ served double duty). await_resume only calls remove()
41  
    active_ served double duty). await_resume only calls remove()
42  
    when in_list_ is true.
42  
    when in_list_ is true.
43  
*/
43  
*/
44  

44  

45  
namespace boost {
45  
namespace boost {
46  
namespace capy {
46  
namespace capy {
47  

47  

48  
/** An asynchronous event for coroutines.
48  
/** An asynchronous event for coroutines.
49  

49  

50  
    This event provides a way to notify multiple coroutines that some
50  
    This event provides a way to notify multiple coroutines that some
51  
    condition has occurred. When a coroutine awaits an unset event, it
51  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    suspends and is added to a wait queue. When the event is set, all
52  
    suspends and is added to a wait queue. When the event is set, all
53  
    waiting coroutines are resumed.
53  
    waiting coroutines are resumed.
54  

54  

55  
    @par Cancellation
55  
    @par Cancellation
56  

56  

57  
    When a coroutine is suspended waiting for the event and its stop
57  
    When a coroutine is suspended waiting for the event and its stop
58  
    token is triggered, the waiter completes with `error::canceled`
58  
    token is triggered, the waiter completes with `error::canceled`
59  
    instead of waiting for `set()`.
59  
    instead of waiting for `set()`.
60  

60  

61  
    Cancellation only applies while the coroutine is suspended in the
61  
    Cancellation only applies while the coroutine is suspended in the
62  
    wait queue. If the event is already set when `wait()` is called,
62  
    wait queue. If the event is already set when `wait()` is called,
63  
    the wait completes immediately even if the stop token is already
63  
    the wait completes immediately even if the stop token is already
64  
    signaled.
64  
    signaled.
65  

65  

66  
    @par Zero Allocation
66  
    @par Zero Allocation
67  

67  

68  
    No heap allocation occurs for wait operations.
68  
    No heap allocation occurs for wait operations.
69  

69  

70  
    @par Thread Safety
70  
    @par Thread Safety
71  

71  

72  
    Distinct objects: Safe.@n
72  
    Distinct objects: Safe.@n
73  
    Shared objects: Unsafe.
73  
    Shared objects: Unsafe.
74  

74  

75  
    The event operations are designed for single-threaded use on one
75  
    The event operations are designed for single-threaded use on one
76  
    executor. The stop callback may fire from any thread.
76  
    executor. The stop callback may fire from any thread.
77  

77  

78  
    This type is non-copyable and non-movable because suspended
78  
    This type is non-copyable and non-movable because suspended
79  
    waiters hold intrusive pointers into the event's internal list.
79  
    waiters hold intrusive pointers into the event's internal list.
80  

80  

81  
    @par Example
81  
    @par Example
82  
    @code
82  
    @code
83  
    async_event event;
83  
    async_event event;
84  

84  

85  
    task<> waiter() {
85  
    task<> waiter() {
86  
        auto [ec] = co_await event.wait();
86  
        auto [ec] = co_await event.wait();
87  
        if(ec)
87  
        if(ec)
88  
            co_return;
88  
            co_return;
89  
        // ... event was set ...
89  
        // ... event was set ...
90  
    }
90  
    }
91  

91  

92  
    task<> notifier() {
92  
    task<> notifier() {
93  
        // ... do some work ...
93  
        // ... do some work ...
94  
        event.set();  // Wake all waiters
94  
        event.set();  // Wake all waiters
95  
    }
95  
    }
96  
    @endcode
96  
    @endcode
97  
*/
97  
*/
98  
class async_event
98  
class async_event
99  
{
99  
{
100  
public:
100  
public:
101  
    class wait_awaiter;
101  
    class wait_awaiter;
102  

102  

103  
private:
103  
private:
104  
    bool set_ = false;
104  
    bool set_ = false;
105  
    detail::intrusive_list<wait_awaiter> waiters_;
105  
    detail::intrusive_list<wait_awaiter> waiters_;
106  

106  

107  
public:
107  
public:
108  
    /** Awaiter returned by wait().
108  
    /** Awaiter returned by wait().
109  
    */
109  
    */
110  
    class wait_awaiter
110  
    class wait_awaiter
111  
        : public detail::intrusive_list<wait_awaiter>::node
111  
        : public detail::intrusive_list<wait_awaiter>::node
112  
    {
112  
    {
113  
        friend class async_event;
113  
        friend class async_event;
114  

114  

115  
        async_event* e_;
115  
        async_event* e_;
116  
        std::coroutine_handle<> h_;
116  
        std::coroutine_handle<> h_;
117  
        executor_ref ex_;
117  
        executor_ref ex_;
118  

118  

119  
        // Declared before stop_cb_buf_: the callback
119  
        // Declared before stop_cb_buf_: the callback
120  
        // accesses these members, so they must still be
120  
        // accesses these members, so they must still be
121  
        // alive if the stop_cb_ destructor blocks.
121  
        // alive if the stop_cb_ destructor blocks.
122  
        std::atomic<bool> claimed_{false};
122  
        std::atomic<bool> claimed_{false};
123  
        bool canceled_ = false;
123  
        bool canceled_ = false;
124  
        bool active_ = false;
124  
        bool active_ = false;
125  
        bool in_list_ = false;
125  
        bool in_list_ = false;
126  

126  

127  
        struct cancel_fn
127  
        struct cancel_fn
128  
        {
128  
        {
129  
            wait_awaiter* self_;
129  
            wait_awaiter* self_;
130  

130  

131  
            void operator()() const noexcept
131  
            void operator()() const noexcept
132  
            {
132  
            {
133  
                if(!self_->claimed_.exchange(
133  
                if(!self_->claimed_.exchange(
134  
                    true, std::memory_order_acq_rel))
134  
                    true, std::memory_order_acq_rel))
135  
                {
135  
                {
136  
                    self_->canceled_ = true;
136  
                    self_->canceled_ = true;
137  
                    self_->ex_.post(self_->h_);
137  
                    self_->ex_.post(self_->h_);
138  
                }
138  
                }
139  
            }
139  
            }
140  
        };
140  
        };
141  

141  

142  
        using stop_cb_t =
142  
        using stop_cb_t =
143  
            std::stop_callback<cancel_fn>;
143  
            std::stop_callback<cancel_fn>;
144  

144  

145  
        // Aligned storage for stop_cb_t. Declared last:
145  
        // Aligned storage for stop_cb_t. Declared last:
146  
        // its destructor may block while the callback
146  
        // its destructor may block while the callback
147  
        // accesses the members above.
147  
        // accesses the members above.
148  
#ifdef _MSC_VER
148  
#ifdef _MSC_VER
149  
# pragma warning(push)
149  
# pragma warning(push)
150  
# pragma warning(disable: 4324) // padded due to alignas
150  
# pragma warning(disable: 4324) // padded due to alignas
151  
#endif
151  
#endif
152  
        alignas(stop_cb_t)
152  
        alignas(stop_cb_t)
153  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
153  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
154  
#ifdef _MSC_VER
154  
#ifdef _MSC_VER
155  
# pragma warning(pop)
155  
# pragma warning(pop)
156  
#endif
156  
#endif
157  

157  

158  
        stop_cb_t& stop_cb_() noexcept
158  
        stop_cb_t& stop_cb_() noexcept
159  
        {
159  
        {
160  
            return *reinterpret_cast<stop_cb_t*>(
160  
            return *reinterpret_cast<stop_cb_t*>(
161  
                stop_cb_buf_);
161  
                stop_cb_buf_);
162  
        }
162  
        }
163  

163  

164  
    public:
164  
    public:
165  
        ~wait_awaiter()
165  
        ~wait_awaiter()
166  
        {
166  
        {
167  
            if(active_)
167  
            if(active_)
168  
                stop_cb_().~stop_cb_t();
168  
                stop_cb_().~stop_cb_t();
169  
            if(in_list_)
169  
            if(in_list_)
170  
                e_->waiters_.remove(this);
170  
                e_->waiters_.remove(this);
171  
        }
171  
        }
172  

172  

173  
        explicit wait_awaiter(async_event* e) noexcept
173  
        explicit wait_awaiter(async_event* e) noexcept
174  
            : e_(e)
174  
            : e_(e)
175  
        {
175  
        {
176  
        }
176  
        }
177  

177  

178  
        wait_awaiter(wait_awaiter&& o) noexcept
178  
        wait_awaiter(wait_awaiter&& o) noexcept
179  
            : e_(o.e_)
179  
            : e_(o.e_)
180  
            , h_(o.h_)
180  
            , h_(o.h_)
181  
            , ex_(o.ex_)
181  
            , ex_(o.ex_)
182  
            , claimed_(o.claimed_.load(
182  
            , claimed_(o.claimed_.load(
183  
                std::memory_order_relaxed))
183  
                std::memory_order_relaxed))
184  
            , canceled_(o.canceled_)
184  
            , canceled_(o.canceled_)
185  
            , active_(std::exchange(o.active_, false))
185  
            , active_(std::exchange(o.active_, false))
186  
            , in_list_(std::exchange(o.in_list_, false))
186  
            , in_list_(std::exchange(o.in_list_, false))
187  
        {
187  
        {
188  
        }
188  
        }
189  

189  

190  
        wait_awaiter(wait_awaiter const&) = delete;
190  
        wait_awaiter(wait_awaiter const&) = delete;
191  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
191  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
192  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
192  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
193  

193  

194  
        bool await_ready() const noexcept
194  
        bool await_ready() const noexcept
195  
        {
195  
        {
196  
            return e_->set_;
196  
            return e_->set_;
197  
        }
197  
        }
198  

198  

199  
        /** IoAwaitable protocol overload. */
199  
        /** IoAwaitable protocol overload. */
200  
        std::coroutine_handle<>
200  
        std::coroutine_handle<>
201  
        await_suspend(
201  
        await_suspend(
202  
            std::coroutine_handle<> h,
202  
            std::coroutine_handle<> h,
203  
            io_env const* env) noexcept
203  
            io_env const* env) noexcept
204  
        {
204  
        {
205  
            if(env->stop_token.stop_requested())
205  
            if(env->stop_token.stop_requested())
206  
            {
206  
            {
207  
                canceled_ = true;
207  
                canceled_ = true;
208  
                return h;
208  
                return h;
209  
            }
209  
            }
210  
            h_ = h;
210  
            h_ = h;
211  
            ex_ = env->executor;
211  
            ex_ = env->executor;
212  
            e_->waiters_.push_back(this);
212  
            e_->waiters_.push_back(this);
213  
            in_list_ = true;
213  
            in_list_ = true;
214  
            ::new(stop_cb_buf_) stop_cb_t(
214  
            ::new(stop_cb_buf_) stop_cb_t(
215  
                env->stop_token, cancel_fn{this});
215  
                env->stop_token, cancel_fn{this});
216  
            active_ = true;
216  
            active_ = true;
217  
            return std::noop_coroutine();
217  
            return std::noop_coroutine();
218  
        }
218  
        }
219  

219  

220  
        io_result<> await_resume() noexcept
220  
        io_result<> await_resume() noexcept
221  
        {
221  
        {
222  
            if(active_)
222  
            if(active_)
223  
            {
223  
            {
224  
                stop_cb_().~stop_cb_t();
224  
                stop_cb_().~stop_cb_t();
225  
                active_ = false;
225  
                active_ = false;
226  
            }
226  
            }
227  
            if(canceled_)
227  
            if(canceled_)
228  
            {
228  
            {
229  
                if(in_list_)
229  
                if(in_list_)
230  
                {
230  
                {
231  
                    e_->waiters_.remove(this);
231  
                    e_->waiters_.remove(this);
232  
                    in_list_ = false;
232  
                    in_list_ = false;
233  
                }
233  
                }
234  
                return {make_error_code(
234  
                return {make_error_code(
235  
                    error::canceled)};
235  
                    error::canceled)};
236  
            }
236  
            }
237  
            return {{}};
237  
            return {{}};
238  
        }
238  
        }
239  
    };
239  
    };
240  

240  

241  
    /// Construct an unset event.
241  
    /// Construct an unset event.
242  
    async_event() = default;
242  
    async_event() = default;
243  

243  

244  
    /// Copy constructor (deleted).
244  
    /// Copy constructor (deleted).
245  
    async_event(async_event const&) = delete;
245  
    async_event(async_event const&) = delete;
246  

246  

247  
    /// Copy assignment (deleted).
247  
    /// Copy assignment (deleted).
248  
    async_event& operator=(async_event const&) = delete;
248  
    async_event& operator=(async_event const&) = delete;
249  

249  

250  
    /// Move constructor (deleted).
250  
    /// Move constructor (deleted).
251  
    async_event(async_event&&) = delete;
251  
    async_event(async_event&&) = delete;
252  

252  

253  
    /// Move assignment (deleted).
253  
    /// Move assignment (deleted).
254  
    async_event& operator=(async_event&&) = delete;
254  
    async_event& operator=(async_event&&) = delete;
255  

255  

256  
    /** Returns an awaiter that waits until the event is set.
256  
    /** Returns an awaiter that waits until the event is set.
257  

257  

258  
        If the event is already set, completes immediately.
258  
        If the event is already set, completes immediately.
259  

259  

260  
        @return An awaitable yielding `(error_code)`.
260  
        @return An awaitable yielding `(error_code)`.
261  
    */
261  
    */
262  
    wait_awaiter wait() noexcept
262  
    wait_awaiter wait() noexcept
263  
    {
263  
    {
264  
        return wait_awaiter{this};
264  
        return wait_awaiter{this};
265  
    }
265  
    }
266  

266  

267  
    /** Sets the event.
267  
    /** Sets the event.
268  

268  

269  
        All waiting coroutines are resumed. Canceled waiters
269  
        All waiting coroutines are resumed. Canceled waiters
270  
        are skipped. Subsequent calls to wait() complete
270  
        are skipped. Subsequent calls to wait() complete
271  
        immediately until clear() is called.
271  
        immediately until clear() is called.
272  
    */
272  
    */
273  
    void set()
273  
    void set()
274  
    {
274  
    {
275  
        set_ = true;
275  
        set_ = true;
276  
        for(;;)
276  
        for(;;)
277  
        {
277  
        {
278  
            auto* w = waiters_.pop_front();
278  
            auto* w = waiters_.pop_front();
279  
            if(!w)
279  
            if(!w)
280  
                break;
280  
                break;
281  
            w->in_list_ = false;
281  
            w->in_list_ = false;
282  
            if(!w->claimed_.exchange(
282  
            if(!w->claimed_.exchange(
283  
                true, std::memory_order_acq_rel))
283  
                true, std::memory_order_acq_rel))
284  
            {
284  
            {
285  
                w->ex_.post(w->h_);
285  
                w->ex_.post(w->h_);
286  
            }
286  
            }
287  
        }
287  
        }
288  
    }
288  
    }
289  

289  

290  
    /** Clears the event.
290  
    /** Clears the event.
291  

291  

292  
        Subsequent calls to wait() will suspend until
292  
        Subsequent calls to wait() will suspend until
293  
        set() is called again.
293  
        set() is called again.
294  
    */
294  
    */
295  
    void clear() noexcept
295  
    void clear() noexcept
296  
    {
296  
    {
297  
        set_ = false;
297  
        set_ = false;
298  
    }
298  
    }
299  

299  

300  
    /** Returns true if the event is currently set.
300  
    /** Returns true if the event is currently set.
301  
    */
301  
    */
302  
    bool is_set() const noexcept
302  
    bool is_set() const noexcept
303  
    {
303  
    {
304  
        return set_;
304  
        return set_;
305  
    }
305  
    }
306  
};
306  
};
307  

307  

308  
} // namespace capy
308  
} // namespace capy
309  
} // namespace boost
309  
} // namespace boost
310  

310  

311  
#endif
311  
#endif