TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/error.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/io_result.hpp>
19 :
20 : #include <stop_token>
21 :
22 : #include <atomic>
23 : #include <coroutine>
24 : #include <new>
25 : #include <utility>
26 :
27 : /* async_mutex implementation notes
28 : ================================
29 :
30 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 : async_mutex::waiters_.
33 :
34 : Cancellation via stop_token
35 : ---------------------------
36 : A std::stop_callback is registered in await_suspend. Two actors can
37 : race to resume the suspended coroutine: unlock() and the stop callback.
38 : An atomic bool `claimed_` resolves the race -- whoever does
39 : claimed_.exchange(true) and reads false wins. The loser does nothing.
40 :
41 : The stop callback calls ex_.post(h_). The stop_callback is
42 : destroyed later in await_resume. cancel_fn touches no members
43 : after post returns (same pattern as delete-this).
44 :
45 : unlock() pops waiters from the front. If the popped waiter was
46 : already claimed by the stop callback, unlock() skips it and tries
47 : the next. await_resume removes the (still-linked) canceled waiter
48 : via waiters_.remove(this).
49 :
50 : The stop_callback lives in a union to suppress automatic
51 : construction/destruction. Placement new in await_suspend, explicit
52 : destructor call in await_resume and ~lock_awaiter.
53 :
54 : Member ordering constraint
55 : --------------------------
56 : The union containing stop_cb_ must be declared AFTER the members
57 : the callback accesses (h_, ex_, claimed_, canceled_). If the
58 : stop_cb_ destructor blocks waiting for a concurrent callback, those
59 : members must still be alive (C++ destroys in reverse declaration
60 : order).
61 :
62 : active_ flag
63 : ------------
64 : Tracks both list membership and stop_cb_ lifetime (they are always
65 : set and cleared together). Used by the destructor to clean up if the
66 : coroutine is destroyed while suspended (e.g. execution_context
67 : shutdown).
68 :
69 : Cancellation scope
70 : ------------------
71 : Cancellation only takes effect while the coroutine is suspended in
72 : the wait queue. If the mutex is unlocked, await_ready acquires it
73 : immediately without checking the stop token. This is intentional:
74 : the fast path has no token access and no overhead.
75 :
76 : Threading assumptions
77 : ---------------------
78 : - All list mutations happen on the executor thread (await_suspend,
79 : await_resume, unlock, ~lock_awaiter).
80 : - The stop callback may fire from any thread, but only touches
81 : claimed_ (atomic) and then calls post. It never touches the
82 : list.
83 : - ~lock_awaiter must be called from the executor thread. This is
84 : guaranteed during normal shutdown but NOT if the coroutine frame
85 : is destroyed from another thread while a stop callback could
86 : fire (precondition violation, same as cppcoro/folly).
87 : */
88 :
89 : namespace boost {
90 : namespace capy {
91 :
92 : /** An asynchronous mutex for coroutines.
93 :
94 : This mutex provides mutual exclusion for coroutines without blocking.
95 : When a coroutine attempts to acquire a locked mutex, it suspends and
96 : is added to an intrusive wait queue. When the holder unlocks, the next
97 : waiter is resumed with the lock held.
98 :
99 : @par Cancellation
100 :
101 : When a coroutine is suspended waiting for the mutex and its stop
102 : token is triggered, the waiter completes with `error::canceled`
103 : instead of acquiring the lock.
104 :
105 : Cancellation only applies while the coroutine is suspended in the
106 : wait queue. If the mutex is unlocked when `lock()` is called, the
107 : lock is acquired immediately even if the stop token is already
108 : signaled.
109 :
110 : @par Zero Allocation
111 :
112 : No heap allocation occurs for lock operations.
113 :
114 : @par Thread Safety
115 :
116 : Distinct objects: Safe.@n
117 : Shared objects: Unsafe.
118 :
119 : The mutex operations are designed for single-threaded use on one
120 : executor. The stop callback may fire from any thread.
121 :
122 : This type is non-copyable and non-movable because suspended
123 : waiters hold intrusive pointers into the mutex's internal list.
124 :
125 : @par Example
126 : @code
127 : async_mutex cm;
128 :
129 : task<> protected_operation() {
130 : auto [ec] = co_await cm.lock();
131 : if(ec)
132 : co_return;
133 : // ... critical section ...
134 : cm.unlock();
135 : }
136 :
137 : // Or with RAII:
138 : task<> protected_operation() {
139 : auto [ec, guard] = co_await cm.scoped_lock();
140 : if(ec)
141 : co_return;
142 : // ... critical section ...
143 : // unlocks automatically
144 : }
145 : @endcode
146 : */
147 : class async_mutex
148 : {
149 : public:
150 : class lock_awaiter;
151 : class lock_guard;
152 : class lock_guard_awaiter;
153 :
154 : private:
155 : bool locked_ = false;
156 : detail::intrusive_list<lock_awaiter> waiters_;
157 :
158 : public:
159 : /** Awaiter returned by lock().
160 : */
161 : class lock_awaiter
162 : : public detail::intrusive_list<lock_awaiter>::node
163 : {
164 : friend class async_mutex;
165 :
166 : async_mutex* m_;
167 : std::coroutine_handle<> h_;
168 : executor_ref ex_;
169 :
170 : // These members must be declared before stop_cb_
171 : // (see comment on the union below).
172 : std::atomic<bool> claimed_{false};
173 : bool canceled_ = false;
174 : bool active_ = false;
175 :
176 : struct cancel_fn
177 : {
178 : lock_awaiter* self_;
179 :
180 HIT 6 : void operator()() const noexcept
181 : {
182 6 : if(!self_->claimed_.exchange(
183 : true, std::memory_order_acq_rel))
184 : {
185 6 : self_->canceled_ = true;
186 6 : self_->ex_.post(self_->h_);
187 : }
188 6 : }
189 : };
190 :
191 : using stop_cb_t =
192 : std::stop_callback<cancel_fn>;
193 :
194 : // Aligned storage for stop_cb_t. Declared last:
195 : // its destructor may block while the callback
196 : // accesses the members above.
197 : #ifdef _MSC_VER
198 : # pragma warning(push)
199 : # pragma warning(disable: 4324) // padded due to alignas
200 : #endif
201 : alignas(stop_cb_t)
202 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
203 : #ifdef _MSC_VER
204 : # pragma warning(pop)
205 : #endif
206 :
207 17 : stop_cb_t& stop_cb_() noexcept
208 : {
209 : return *reinterpret_cast<stop_cb_t*>(
210 17 : stop_cb_buf_);
211 : }
212 :
213 : public:
214 70 : ~lock_awaiter()
215 : {
216 70 : if(active_)
217 : {
218 3 : stop_cb_().~stop_cb_t();
219 3 : m_->waiters_.remove(this);
220 : }
221 70 : }
222 :
223 35 : explicit lock_awaiter(async_mutex* m) noexcept
224 35 : : m_(m)
225 : {
226 35 : }
227 :
228 35 : lock_awaiter(lock_awaiter&& o) noexcept
229 35 : : m_(o.m_)
230 35 : , h_(o.h_)
231 35 : , ex_(o.ex_)
232 35 : , claimed_(o.claimed_.load(
233 : std::memory_order_relaxed))
234 35 : , canceled_(o.canceled_)
235 35 : , active_(std::exchange(o.active_, false))
236 : {
237 35 : }
238 :
239 : lock_awaiter(lock_awaiter const&) = delete;
240 : lock_awaiter& operator=(lock_awaiter const&) = delete;
241 : lock_awaiter& operator=(lock_awaiter&&) = delete;
242 :
243 35 : bool await_ready() const noexcept
244 : {
245 35 : if(!m_->locked_)
246 : {
247 16 : m_->locked_ = true;
248 16 : return true;
249 : }
250 19 : return false;
251 : }
252 :
253 : /** IoAwaitable protocol overload. */
254 : std::coroutine_handle<>
255 19 : await_suspend(
256 : std::coroutine_handle<> h,
257 : io_env const* env) noexcept
258 : {
259 19 : if(env->stop_token.stop_requested())
260 : {
261 2 : canceled_ = true;
262 2 : return h;
263 : }
264 17 : h_ = h;
265 17 : ex_ = env->executor;
266 17 : m_->waiters_.push_back(this);
267 51 : ::new(stop_cb_buf_) stop_cb_t(
268 17 : env->stop_token, cancel_fn{this});
269 17 : active_ = true;
270 17 : return std::noop_coroutine();
271 : }
272 :
273 32 : io_result<> await_resume() noexcept
274 : {
275 32 : if(active_)
276 : {
277 14 : stop_cb_().~stop_cb_t();
278 14 : if(canceled_)
279 : {
280 6 : m_->waiters_.remove(this);
281 6 : active_ = false;
282 6 : return {make_error_code(
283 6 : error::canceled)};
284 : }
285 8 : active_ = false;
286 : }
287 26 : if(canceled_)
288 2 : return {make_error_code(
289 2 : error::canceled)};
290 24 : return {{}};
291 : }
292 : };
293 :
294 : /** RAII lock guard for async_mutex.
295 :
296 : Automatically unlocks the mutex when destroyed.
297 : */
298 : class [[nodiscard]] lock_guard
299 : {
300 : async_mutex* m_;
301 :
302 : public:
303 5 : ~lock_guard()
304 : {
305 5 : if(m_)
306 2 : m_->unlock();
307 5 : }
308 :
309 2 : lock_guard() noexcept
310 2 : : m_(nullptr)
311 : {
312 2 : }
313 :
314 2 : explicit lock_guard(async_mutex* m) noexcept
315 2 : : m_(m)
316 : {
317 2 : }
318 :
319 1 : lock_guard(lock_guard&& o) noexcept
320 1 : : m_(std::exchange(o.m_, nullptr))
321 : {
322 1 : }
323 :
324 : lock_guard& operator=(lock_guard&& o) noexcept
325 : {
326 : if(this != &o)
327 : {
328 : if(m_)
329 : m_->unlock();
330 : m_ = std::exchange(o.m_, nullptr);
331 : }
332 : return *this;
333 : }
334 :
335 : lock_guard(lock_guard const&) = delete;
336 : lock_guard& operator=(lock_guard const&) = delete;
337 : };
338 :
339 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
340 : */
341 : class lock_guard_awaiter
342 : {
343 : async_mutex* m_;
344 : lock_awaiter inner_;
345 :
346 : public:
347 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
348 4 : : m_(m)
349 4 : , inner_(m)
350 : {
351 4 : }
352 :
353 4 : bool await_ready() const noexcept
354 : {
355 4 : return inner_.await_ready();
356 : }
357 :
358 : /** IoAwaitable protocol overload. */
359 : std::coroutine_handle<>
360 2 : await_suspend(
361 : std::coroutine_handle<> h,
362 : io_env const* env) noexcept
363 : {
364 2 : return inner_.await_suspend(h, env);
365 : }
366 :
367 4 : io_result<lock_guard> await_resume() noexcept
368 : {
369 4 : auto r = inner_.await_resume();
370 4 : if(r.ec)
371 2 : return {r.ec, {}};
372 2 : return {{}, lock_guard(m_)};
373 4 : }
374 : };
375 :
376 : /// Construct an unlocked mutex.
377 : async_mutex() = default;
378 :
379 : /// Copy constructor (deleted).
380 : async_mutex(async_mutex const&) = delete;
381 :
382 : /// Copy assignment (deleted).
383 : async_mutex& operator=(async_mutex const&) = delete;
384 :
385 : /// Move constructor (deleted).
386 : async_mutex(async_mutex&&) = delete;
387 :
388 : /// Move assignment (deleted).
389 : async_mutex& operator=(async_mutex&&) = delete;
390 :
391 : /** Returns an awaiter that acquires the mutex.
392 :
393 : @return An awaitable yielding `(error_code)`.
394 : */
395 31 : lock_awaiter lock() noexcept
396 : {
397 31 : return lock_awaiter{this};
398 : }
399 :
400 : /** Returns an awaiter that acquires the mutex with RAII.
401 :
402 : @return An awaitable yielding `(error_code,lock_guard)`.
403 : */
404 4 : lock_guard_awaiter scoped_lock() noexcept
405 : {
406 4 : return lock_guard_awaiter(this);
407 : }
408 :
409 : /** Releases the mutex.
410 :
411 : If waiters are queued, the next eligible waiter is
412 : resumed with the lock held. Canceled waiters are
413 : skipped. If no eligible waiter remains, the mutex
414 : becomes unlocked.
415 : */
416 24 : void unlock() noexcept
417 : {
418 : for(;;)
419 : {
420 24 : auto* waiter = waiters_.pop_front();
421 24 : if(!waiter)
422 : {
423 16 : locked_ = false;
424 16 : return;
425 : }
426 8 : if(!waiter->claimed_.exchange(
427 : true, std::memory_order_acq_rel))
428 : {
429 8 : waiter->ex_.post(waiter->h_);
430 8 : return;
431 : }
432 MIS 0 : }
433 : }
434 :
435 : /** Returns true if the mutex is currently locked.
436 : */
437 HIT 26 : bool is_locked() const noexcept
438 : {
439 26 : return locked_;
440 : }
441 : };
442 :
443 : } // namespace capy
444 : } // namespace boost
445 :
446 : #endif
|