include/boost/capy/ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% Functions (20/20)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_mutex implementation notes
28 ================================
29
30 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 async_mutex::waiters_.
33
34 Cancellation via stop_token
35 ---------------------------
36 A std::stop_callback is registered in await_suspend. Two actors can
37 race to resume the suspended coroutine: unlock() and the stop callback.
38 An atomic bool `claimed_` resolves the race -- whoever does
39 claimed_.exchange(true) and reads false wins. The loser does nothing.
40
41 The stop callback calls ex_.post(h_). The stop_callback is
42 destroyed later in await_resume. cancel_fn touches no members
43 after post returns (same pattern as delete-this).
44
45 unlock() pops waiters from the front. If the popped waiter was
46 already claimed by the stop callback, unlock() skips it and tries
47 the next. await_resume removes the (still-linked) canceled waiter
48 via waiters_.remove(this).
49
50 The stop_callback lives in a union to suppress automatic
51 construction/destruction. Placement new in await_suspend, explicit
52 destructor call in await_resume and ~lock_awaiter.
53
54 Member ordering constraint
55 --------------------------
56 The union containing stop_cb_ must be declared AFTER the members
57 the callback accesses (h_, ex_, claimed_, canceled_). If the
58 stop_cb_ destructor blocks waiting for a concurrent callback, those
59 members must still be alive (C++ destroys in reverse declaration
60 order).
61
62 active_ flag
63 ------------
64 Tracks both list membership and stop_cb_ lifetime (they are always
65 set and cleared together). Used by the destructor to clean up if the
66 coroutine is destroyed while suspended (e.g. execution_context
67 shutdown).
68
69 Cancellation scope
70 ------------------
71 Cancellation only takes effect while the coroutine is suspended in
72 the wait queue. If the mutex is unlocked, await_ready acquires it
73 immediately without checking the stop token. This is intentional:
74 the fast path has no token access and no overhead.
75
76 Threading assumptions
77 ---------------------
78 - All list mutations happen on the executor thread (await_suspend,
79 await_resume, unlock, ~lock_awaiter).
80 - The stop callback may fire from any thread, but only touches
81 claimed_ (atomic) and then calls post. It never touches the
82 list.
83 - ~lock_awaiter must be called from the executor thread. This is
84 guaranteed during normal shutdown but NOT if the coroutine frame
85 is destroyed from another thread while a stop callback could
86 fire (precondition violation, same as cppcoro/folly).
87 */
88
89 namespace boost {
90 namespace capy {
91
92 /** An asynchronous mutex for coroutines.
93
94 This mutex provides mutual exclusion for coroutines without blocking.
95 When a coroutine attempts to acquire a locked mutex, it suspends and
96 is added to an intrusive wait queue. When the holder unlocks, the next
97 waiter is resumed with the lock held.
98
99 @par Cancellation
100
101 When a coroutine is suspended waiting for the mutex and its stop
102 token is triggered, the waiter completes with `error::canceled`
103 instead of acquiring the lock.
104
105 Cancellation only applies while the coroutine is suspended in the
106 wait queue. If the mutex is unlocked when `lock()` is called, the
107 lock is acquired immediately even if the stop token is already
108 signaled.
109
110 @par Zero Allocation
111
112 No heap allocation occurs for lock operations.
113
114 @par Thread Safety
115
116 Distinct objects: Safe.@n
117 Shared objects: Unsafe.
118
119 The mutex operations are designed for single-threaded use on one
120 executor. The stop callback may fire from any thread.
121
122 This type is non-copyable and non-movable because suspended
123 waiters hold intrusive pointers into the mutex's internal list.
124
125 @par Example
126 @code
127 async_mutex cm;
128
129 task<> protected_operation() {
130 auto [ec] = co_await cm.lock();
131 if(ec)
132 co_return;
133 // ... critical section ...
134 cm.unlock();
135 }
136
137 // Or with RAII:
138 task<> protected_operation() {
139 auto [ec, guard] = co_await cm.scoped_lock();
140 if(ec)
141 co_return;
142 // ... critical section ...
143 // unlocks automatically
144 }
145 @endcode
146 */
147 class async_mutex
148 {
149 public:
150 class lock_awaiter;
151 class lock_guard;
152 class lock_guard_awaiter;
153
154 private:
155 bool locked_ = false;
156 detail::intrusive_list<lock_awaiter> waiters_;
157
158 public:
159 /** Awaiter returned by lock().
160 */
161 class lock_awaiter
162 : public detail::intrusive_list<lock_awaiter>::node
163 {
164 friend class async_mutex;
165
166 async_mutex* m_;
167 std::coroutine_handle<> h_;
168 executor_ref ex_;
169
170 // These members must be declared before stop_cb_
171 // (see comment on the union below).
172 std::atomic<bool> claimed_{false};
173 bool canceled_ = false;
174 bool active_ = false;
175
176 struct cancel_fn
177 {
178 lock_awaiter* self_;
179
180 6x void operator()() const noexcept
181 {
182 6x if(!self_->claimed_.exchange(
183 true, std::memory_order_acq_rel))
184 {
185 6x self_->canceled_ = true;
186 6x self_->ex_.post(self_->h_);
187 }
188 6x }
189 };
190
191 using stop_cb_t =
192 std::stop_callback<cancel_fn>;
193
194 // Aligned storage for stop_cb_t. Declared last:
195 // its destructor may block while the callback
196 // accesses the members above.
197 #ifdef _MSC_VER
198 # pragma warning(push)
199 # pragma warning(disable: 4324) // padded due to alignas
200 #endif
201 alignas(stop_cb_t)
202 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
203 #ifdef _MSC_VER
204 # pragma warning(pop)
205 #endif
206
207 17x stop_cb_t& stop_cb_() noexcept
208 {
209 return *reinterpret_cast<stop_cb_t*>(
210 17x stop_cb_buf_);
211 }
212
213 public:
214 70x ~lock_awaiter()
215 {
216 70x if(active_)
217 {
218 3x stop_cb_().~stop_cb_t();
219 3x m_->waiters_.remove(this);
220 }
221 70x }
222
223 35x explicit lock_awaiter(async_mutex* m) noexcept
224 35x : m_(m)
225 {
226 35x }
227
228 35x lock_awaiter(lock_awaiter&& o) noexcept
229 35x : m_(o.m_)
230 35x , h_(o.h_)
231 35x , ex_(o.ex_)
232 35x , claimed_(o.claimed_.load(
233 std::memory_order_relaxed))
234 35x , canceled_(o.canceled_)
235 35x , active_(std::exchange(o.active_, false))
236 {
237 35x }
238
239 lock_awaiter(lock_awaiter const&) = delete;
240 lock_awaiter& operator=(lock_awaiter const&) = delete;
241 lock_awaiter& operator=(lock_awaiter&&) = delete;
242
243 35x bool await_ready() const noexcept
244 {
245 35x if(!m_->locked_)
246 {
247 16x m_->locked_ = true;
248 16x return true;
249 }
250 19x return false;
251 }
252
253 /** IoAwaitable protocol overload. */
254 std::coroutine_handle<>
255 19x await_suspend(
256 std::coroutine_handle<> h,
257 io_env const* env) noexcept
258 {
259 19x if(env->stop_token.stop_requested())
260 {
261 2x canceled_ = true;
262 2x return h;
263 }
264 17x h_ = h;
265 17x ex_ = env->executor;
266 17x m_->waiters_.push_back(this);
267 51x ::new(stop_cb_buf_) stop_cb_t(
268 17x env->stop_token, cancel_fn{this});
269 17x active_ = true;
270 17x return std::noop_coroutine();
271 }
272
273 32x io_result<> await_resume() noexcept
274 {
275 32x if(active_)
276 {
277 14x stop_cb_().~stop_cb_t();
278 14x if(canceled_)
279 {
280 6x m_->waiters_.remove(this);
281 6x active_ = false;
282 6x return {make_error_code(
283 6x error::canceled)};
284 }
285 8x active_ = false;
286 }
287 26x if(canceled_)
288 2x return {make_error_code(
289 2x error::canceled)};
290 24x return {{}};
291 }
292 };
293
294 /** RAII lock guard for async_mutex.
295
296 Automatically unlocks the mutex when destroyed.
297 */
298 class [[nodiscard]] lock_guard
299 {
300 async_mutex* m_;
301
302 public:
303 5x ~lock_guard()
304 {
305 5x if(m_)
306 2x m_->unlock();
307 5x }
308
309 2x lock_guard() noexcept
310 2x : m_(nullptr)
311 {
312 2x }
313
314 2x explicit lock_guard(async_mutex* m) noexcept
315 2x : m_(m)
316 {
317 2x }
318
319 1x lock_guard(lock_guard&& o) noexcept
320 1x : m_(std::exchange(o.m_, nullptr))
321 {
322 1x }
323
324 lock_guard& operator=(lock_guard&& o) noexcept
325 {
326 if(this != &o)
327 {
328 if(m_)
329 m_->unlock();
330 m_ = std::exchange(o.m_, nullptr);
331 }
332 return *this;
333 }
334
335 lock_guard(lock_guard const&) = delete;
336 lock_guard& operator=(lock_guard const&) = delete;
337 };
338
339 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
340 */
341 class lock_guard_awaiter
342 {
343 async_mutex* m_;
344 lock_awaiter inner_;
345
346 public:
347 4x explicit lock_guard_awaiter(async_mutex* m) noexcept
348 4x : m_(m)
349 4x , inner_(m)
350 {
351 4x }
352
353 4x bool await_ready() const noexcept
354 {
355 4x return inner_.await_ready();
356 }
357
358 /** IoAwaitable protocol overload. */
359 std::coroutine_handle<>
360 2x await_suspend(
361 std::coroutine_handle<> h,
362 io_env const* env) noexcept
363 {
364 2x return inner_.await_suspend(h, env);
365 }
366
367 4x io_result<lock_guard> await_resume() noexcept
368 {
369 4x auto r = inner_.await_resume();
370 4x if(r.ec)
371 2x return {r.ec, {}};
372 2x return {{}, lock_guard(m_)};
373 4x }
374 };
375
376 /// Construct an unlocked mutex.
377 async_mutex() = default;
378
379 /// Copy constructor (deleted).
380 async_mutex(async_mutex const&) = delete;
381
382 /// Copy assignment (deleted).
383 async_mutex& operator=(async_mutex const&) = delete;
384
385 /// Move constructor (deleted).
386 async_mutex(async_mutex&&) = delete;
387
388 /// Move assignment (deleted).
389 async_mutex& operator=(async_mutex&&) = delete;
390
391 /** Returns an awaiter that acquires the mutex.
392
393 @return An awaitable yielding `(error_code)`.
394 */
395 31x lock_awaiter lock() noexcept
396 {
397 31x return lock_awaiter{this};
398 }
399
400 /** Returns an awaiter that acquires the mutex with RAII.
401
402 @return An awaitable yielding `(error_code,lock_guard)`.
403 */
404 4x lock_guard_awaiter scoped_lock() noexcept
405 {
406 4x return lock_guard_awaiter(this);
407 }
408
409 /** Releases the mutex.
410
411 If waiters are queued, the next eligible waiter is
412 resumed with the lock held. Canceled waiters are
413 skipped. If no eligible waiter remains, the mutex
414 becomes unlocked.
415 */
416 24x void unlock() noexcept
417 {
418 for(;;)
419 {
420 24x auto* waiter = waiters_.pop_front();
421 24x if(!waiter)
422 {
423 16x locked_ = false;
424 16x return;
425 }
426 8x if(!waiter->claimed_.exchange(
427 true, std::memory_order_acq_rel))
428 {
429 8x waiter->ex_.post(waiter->h_);
430 8x return;
431 }
432 }
433 }
434
435 /** Returns true if the mutex is currently locked.
436 */
437 26x bool is_locked() const noexcept
438 {
439 26x return locked_;
440 }
441 };
442
443 } // namespace capy
444 } // namespace boost
445
446 #endif
447