TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 : #define BOOST_CAPY_ASYNC_EVENT_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/error.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/io_result.hpp>
19 :
20 : #include <stop_token>
21 :
22 : #include <atomic>
23 : #include <coroutine>
24 : #include <new>
25 : #include <utility>
26 :
27 : /* async_event implementation notes
28 : =================================
29 :
30 : Same cancellation pattern as async_mutex (see that file for the
31 : full discussion on claimed_, stop_cb lifetime, member ordering,
32 : and threading assumptions).
33 :
34 : Key difference: set() wakes ALL waiters (broadcast), not one.
35 : It pops every waiter from the list and posts the ones it
36 : claims. Waiters already claimed by a stop callback are skipped.
37 :
38 : Because set() pops all waiters, a canceled waiter may have been
39 : removed from the list by set() before its await_resume runs.
40 : This requires a separate in_list_ flag (unlike async_mutex where
41 : active_ served double duty). await_resume only calls remove()
42 : when in_list_ is true.
43 : */
44 :
45 : namespace boost {
46 : namespace capy {
47 :
48 : /** An asynchronous event for coroutines.
49 :
50 : This event provides a way to notify multiple coroutines that some
51 : condition has occurred. When a coroutine awaits an unset event, it
52 : suspends and is added to a wait queue. When the event is set, all
53 : waiting coroutines are resumed.
54 :
55 : @par Cancellation
56 :
57 : When a coroutine is suspended waiting for the event and its stop
58 : token is triggered, the waiter completes with `error::canceled`
59 : instead of waiting for `set()`.
60 :
61 : Cancellation only applies while the coroutine is suspended in the
62 : wait queue. If the event is already set when `wait()` is called,
63 : the wait completes immediately even if the stop token is already
64 : signaled.
65 :
66 : @par Zero Allocation
67 :
68 : No heap allocation occurs for wait operations.
69 :
70 : @par Thread Safety
71 :
72 : Distinct objects: Safe.@n
73 : Shared objects: Unsafe.
74 :
75 : The event operations are designed for single-threaded use on one
76 : executor. The stop callback may fire from any thread.
77 :
78 : This type is non-copyable and non-movable because suspended
79 : waiters hold intrusive pointers into the event's internal list.
80 :
81 : @par Example
82 : @code
83 : async_event event;
84 :
85 : task<> waiter() {
86 : auto [ec] = co_await event.wait();
87 : if(ec)
88 : co_return;
89 : // ... event was set ...
90 : }
91 :
92 : task<> notifier() {
93 : // ... do some work ...
94 : event.set(); // Wake all waiters
95 : }
96 : @endcode
97 : */
98 : class async_event
99 : {
100 : public:
101 : class wait_awaiter;
102 :
103 : private:
104 : bool set_ = false;
105 : detail::intrusive_list<wait_awaiter> waiters_;
106 :
107 : public:
108 : /** Awaiter returned by wait().
109 : */
110 : class wait_awaiter
111 : : public detail::intrusive_list<wait_awaiter>::node
112 : {
113 : friend class async_event;
114 :
115 : async_event* e_;
116 : std::coroutine_handle<> h_;
117 : executor_ref ex_;
118 :
119 : // Declared before stop_cb_buf_: the callback
120 : // accesses these members, so they must still be
121 : // alive if the stop_cb_ destructor blocks.
122 : std::atomic<bool> claimed_{false};
123 : bool canceled_ = false;
124 : bool active_ = false;
125 : bool in_list_ = false;
126 :
127 : struct cancel_fn
128 : {
129 : wait_awaiter* self_;
130 :
131 HIT 21 : void operator()() const noexcept
132 : {
133 21 : if(!self_->claimed_.exchange(
134 : true, std::memory_order_acq_rel))
135 : {
136 20 : self_->canceled_ = true;
137 20 : self_->ex_.post(self_->h_);
138 : }
139 21 : }
140 : };
141 :
142 : using stop_cb_t =
143 : std::stop_callback<cancel_fn>;
144 :
145 : // Aligned storage for stop_cb_t. Declared last:
146 : // its destructor may block while the callback
147 : // accesses the members above.
148 : #ifdef _MSC_VER
149 : # pragma warning(push)
150 : # pragma warning(disable: 4324) // padded due to alignas
151 : #endif
152 : alignas(stop_cb_t)
153 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
154 : #ifdef _MSC_VER
155 : # pragma warning(pop)
156 : #endif
157 :
158 37 : stop_cb_t& stop_cb_() noexcept
159 : {
160 : return *reinterpret_cast<stop_cb_t*>(
161 37 : stop_cb_buf_);
162 : }
163 :
164 : public:
165 251 : ~wait_awaiter()
166 : {
167 251 : if(active_)
168 1 : stop_cb_().~stop_cb_t();
169 251 : if(in_list_)
170 1 : e_->waiters_.remove(this);
171 251 : }
172 :
173 57 : explicit wait_awaiter(async_event* e) noexcept
174 57 : : e_(e)
175 : {
176 57 : }
177 :
178 194 : wait_awaiter(wait_awaiter&& o) noexcept
179 194 : : e_(o.e_)
180 194 : , h_(o.h_)
181 194 : , ex_(o.ex_)
182 194 : , claimed_(o.claimed_.load(
183 : std::memory_order_relaxed))
184 194 : , canceled_(o.canceled_)
185 194 : , active_(std::exchange(o.active_, false))
186 194 : , in_list_(std::exchange(o.in_list_, false))
187 : {
188 194 : }
189 :
190 : wait_awaiter(wait_awaiter const&) = delete;
191 : wait_awaiter& operator=(wait_awaiter const&) = delete;
192 : wait_awaiter& operator=(wait_awaiter&&) = delete;
193 :
194 57 : bool await_ready() const noexcept
195 : {
196 57 : return e_->set_;
197 : }
198 :
199 : /** IoAwaitable protocol overload. */
200 : std::coroutine_handle<>
201 47 : await_suspend(
202 : std::coroutine_handle<> h,
203 : io_env const* env) noexcept
204 : {
205 47 : if(env->stop_token.stop_requested())
206 : {
207 10 : canceled_ = true;
208 10 : return h;
209 : }
210 37 : h_ = h;
211 37 : ex_ = env->executor;
212 37 : e_->waiters_.push_back(this);
213 37 : in_list_ = true;
214 111 : ::new(stop_cb_buf_) stop_cb_t(
215 37 : env->stop_token, cancel_fn{this});
216 37 : active_ = true;
217 37 : return std::noop_coroutine();
218 : }
219 :
220 54 : io_result<> await_resume() noexcept
221 : {
222 54 : if(active_)
223 : {
224 36 : stop_cb_().~stop_cb_t();
225 36 : active_ = false;
226 : }
227 54 : if(canceled_)
228 : {
229 30 : if(in_list_)
230 : {
231 20 : e_->waiters_.remove(this);
232 20 : in_list_ = false;
233 : }
234 30 : return {make_error_code(
235 30 : error::canceled)};
236 : }
237 24 : return {{}};
238 : }
239 : };
240 :
241 : /// Construct an unset event.
242 20 : async_event() = default;
243 :
244 : /// Copy constructor (deleted).
245 : async_event(async_event const&) = delete;
246 :
247 : /// Copy assignment (deleted).
248 : async_event& operator=(async_event const&) = delete;
249 :
250 : /// Move constructor (deleted).
251 : async_event(async_event&&) = delete;
252 :
253 : /// Move assignment (deleted).
254 : async_event& operator=(async_event&&) = delete;
255 :
256 : /** Returns an awaiter that waits until the event is set.
257 :
258 : If the event is already set, completes immediately.
259 :
260 : @return An awaitable yielding `(error_code)`.
261 : */
262 57 : wait_awaiter wait() noexcept
263 : {
264 57 : return wait_awaiter{this};
265 : }
266 :
267 : /** Sets the event.
268 :
269 : All waiting coroutines are resumed. Canceled waiters
270 : are skipped. Subsequent calls to wait() complete
271 : immediately until clear() is called.
272 : */
273 23 : void set()
274 : {
275 23 : set_ = true;
276 : for(;;)
277 : {
278 39 : auto* w = waiters_.pop_front();
279 39 : if(!w)
280 23 : break;
281 16 : w->in_list_ = false;
282 16 : if(!w->claimed_.exchange(
283 : true, std::memory_order_acq_rel))
284 : {
285 16 : w->ex_.post(w->h_);
286 : }
287 16 : }
288 23 : }
289 :
290 : /** Clears the event.
291 :
292 : Subsequent calls to wait() will suspend until
293 : set() is called again.
294 : */
295 2 : void clear() noexcept
296 : {
297 2 : set_ = false;
298 2 : }
299 :
300 : /** Returns true if the event is currently set.
301 : */
302 9 : bool is_set() const noexcept
303 : {
304 9 : return set_;
305 : }
306 : };
307 :
308 : } // namespace capy
309 : } // namespace boost
310 :
311 : #endif
|