LCOV - code coverage report
Current view: top level - capy/ex - async_event.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 68 68
Test Date: 2026-03-06 20:40:21 Functions: 100.0 % 13 13

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
      11                 : #define BOOST_CAPY_ASYNC_EVENT_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/intrusive.hpp>
      15                 : #include <boost/capy/concept/executor.hpp>
      16                 : #include <boost/capy/error.hpp>
      17                 : #include <boost/capy/ex/io_env.hpp>
      18                 : #include <boost/capy/io_result.hpp>
      19                 : 
      20                 : #include <stop_token>
      21                 : 
      22                 : #include <atomic>
      23                 : #include <coroutine>
      24                 : #include <new>
      25                 : #include <utility>
      26                 : 
      27                 : /*  async_event implementation notes
      28                 :     =================================
      29                 : 
      30                 :     Same cancellation pattern as async_mutex (see that file for the
      31                 :     full discussion on claimed_, stop_cb lifetime, member ordering,
      32                 :     and threading assumptions).
      33                 : 
      34                 :     Key difference: set() wakes ALL waiters (broadcast), not one.
      35                 :     It pops every waiter from the list and posts the ones it
      36                 :     claims. Waiters already claimed by a stop callback are skipped.
      37                 : 
      38                 :     Because set() pops all waiters, a canceled waiter may have been
      39                 :     removed from the list by set() before its await_resume runs.
      40                 :     This requires a separate in_list_ flag (unlike async_mutex where
      41                 :     active_ served double duty). await_resume only calls remove()
      42                 :     when in_list_ is true.
      43                 : */
      44                 : 
      45                 : namespace boost {
      46                 : namespace capy {
      47                 : 
      48                 : /** An asynchronous event for coroutines.
      49                 : 
      50                 :     This event provides a way to notify multiple coroutines that some
      51                 :     condition has occurred. When a coroutine awaits an unset event, it
      52                 :     suspends and is added to a wait queue. When the event is set, all
      53                 :     waiting coroutines are resumed.
      54                 : 
      55                 :     @par Cancellation
      56                 : 
      57                 :     When a coroutine is suspended waiting for the event and its stop
      58                 :     token is triggered, the waiter completes with `error::canceled`
      59                 :     instead of waiting for `set()`.
      60                 : 
      61                 :     Cancellation only applies while the coroutine is suspended in the
      62                 :     wait queue. If the event is already set when `wait()` is called,
      63                 :     the wait completes immediately even if the stop token is already
      64                 :     signaled.
      65                 : 
      66                 :     @par Zero Allocation
      67                 : 
      68                 :     No heap allocation occurs for wait operations.
      69                 : 
      70                 :     @par Thread Safety
      71                 : 
      72                 :     Distinct objects: Safe.@n
      73                 :     Shared objects: Unsafe.
      74                 : 
      75                 :     The event operations are designed for single-threaded use on one
      76                 :     executor. The stop callback may fire from any thread.
      77                 : 
      78                 :     This type is non-copyable and non-movable because suspended
      79                 :     waiters hold intrusive pointers into the event's internal list.
      80                 : 
      81                 :     @par Example
      82                 :     @code
      83                 :     async_event event;
      84                 : 
      85                 :     task<> waiter() {
      86                 :         auto [ec] = co_await event.wait();
      87                 :         if(ec)
      88                 :             co_return;
      89                 :         // ... event was set ...
      90                 :     }
      91                 : 
      92                 :     task<> notifier() {
      93                 :         // ... do some work ...
      94                 :         event.set();  // Wake all waiters
      95                 :     }
      96                 :     @endcode
      97                 : */
      98                 : class async_event
      99                 : {
     100                 : public:
     101                 :     class wait_awaiter;
     102                 : 
     103                 : private:
     104                 :     bool set_ = false;
     105                 :     detail::intrusive_list<wait_awaiter> waiters_;
     106                 : 
     107                 : public:
     108                 :     /** Awaiter returned by wait().
     109                 :     */
     110                 :     class wait_awaiter
     111                 :         : public detail::intrusive_list<wait_awaiter>::node
     112                 :     {
     113                 :         friend class async_event;
     114                 : 
     115                 :         async_event* e_;
     116                 :         std::coroutine_handle<> h_;
     117                 :         executor_ref ex_;
     118                 : 
     119                 :         // Declared before stop_cb_buf_: the callback
     120                 :         // accesses these members, so they must still be
     121                 :         // alive if the stop_cb_ destructor blocks.
     122                 :         std::atomic<bool> claimed_{false};
     123                 :         bool canceled_ = false;
     124                 :         bool active_ = false;
     125                 :         bool in_list_ = false;
     126                 : 
     127                 :         struct cancel_fn
     128                 :         {
     129                 :             wait_awaiter* self_;
     130                 : 
     131 HIT          21 :             void operator()() const noexcept
     132                 :             {
     133              21 :                 if(!self_->claimed_.exchange(
     134                 :                     true, std::memory_order_acq_rel))
     135                 :                 {
     136              20 :                     self_->canceled_ = true;
     137              20 :                     self_->ex_.post(self_->h_);
     138                 :                 }
     139              21 :             }
     140                 :         };
     141                 : 
     142                 :         using stop_cb_t =
     143                 :             std::stop_callback<cancel_fn>;
     144                 : 
     145                 :         // Aligned storage for stop_cb_t. Declared last:
     146                 :         // its destructor may block while the callback
     147                 :         // accesses the members above.
     148                 : #ifdef _MSC_VER
     149                 : # pragma warning(push)
     150                 : # pragma warning(disable: 4324) // padded due to alignas
     151                 : #endif
     152                 :         alignas(stop_cb_t)
     153                 :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     154                 : #ifdef _MSC_VER
     155                 : # pragma warning(pop)
     156                 : #endif
     157                 : 
     158              37 :         stop_cb_t& stop_cb_() noexcept
     159                 :         {
     160                 :             return *reinterpret_cast<stop_cb_t*>(
     161              37 :                 stop_cb_buf_);
     162                 :         }
     163                 : 
     164                 :     public:
     165             251 :         ~wait_awaiter()
     166                 :         {
     167             251 :             if(active_)
     168               1 :                 stop_cb_().~stop_cb_t();
     169             251 :             if(in_list_)
     170               1 :                 e_->waiters_.remove(this);
     171             251 :         }
     172                 : 
     173              57 :         explicit wait_awaiter(async_event* e) noexcept
     174              57 :             : e_(e)
     175                 :         {
     176              57 :         }
     177                 : 
     178             194 :         wait_awaiter(wait_awaiter&& o) noexcept
     179             194 :             : e_(o.e_)
     180             194 :             , h_(o.h_)
     181             194 :             , ex_(o.ex_)
     182             194 :             , claimed_(o.claimed_.load(
     183                 :                 std::memory_order_relaxed))
     184             194 :             , canceled_(o.canceled_)
     185             194 :             , active_(std::exchange(o.active_, false))
     186             194 :             , in_list_(std::exchange(o.in_list_, false))
     187                 :         {
     188             194 :         }
     189                 : 
     190                 :         wait_awaiter(wait_awaiter const&) = delete;
     191                 :         wait_awaiter& operator=(wait_awaiter const&) = delete;
     192                 :         wait_awaiter& operator=(wait_awaiter&&) = delete;
     193                 : 
     194              57 :         bool await_ready() const noexcept
     195                 :         {
     196              57 :             return e_->set_;
     197                 :         }
     198                 : 
     199                 :         /** IoAwaitable protocol overload. */
     200                 :         std::coroutine_handle<>
     201              47 :         await_suspend(
     202                 :             std::coroutine_handle<> h,
     203                 :             io_env const* env) noexcept
     204                 :         {
     205              47 :             if(env->stop_token.stop_requested())
     206                 :             {
     207              10 :                 canceled_ = true;
     208              10 :                 return h;
     209                 :             }
     210              37 :             h_ = h;
     211              37 :             ex_ = env->executor;
     212              37 :             e_->waiters_.push_back(this);
     213              37 :             in_list_ = true;
     214             111 :             ::new(stop_cb_buf_) stop_cb_t(
     215              37 :                 env->stop_token, cancel_fn{this});
     216              37 :             active_ = true;
     217              37 :             return std::noop_coroutine();
     218                 :         }
     219                 : 
     220              54 :         io_result<> await_resume() noexcept
     221                 :         {
     222              54 :             if(active_)
     223                 :             {
     224              36 :                 stop_cb_().~stop_cb_t();
     225              36 :                 active_ = false;
     226                 :             }
     227              54 :             if(canceled_)
     228                 :             {
     229              30 :                 if(in_list_)
     230                 :                 {
     231              20 :                     e_->waiters_.remove(this);
     232              20 :                     in_list_ = false;
     233                 :                 }
     234              30 :                 return {make_error_code(
     235              30 :                     error::canceled)};
     236                 :             }
     237              24 :             return {{}};
     238                 :         }
     239                 :     };
     240                 : 
     241                 :     /// Construct an unset event.
     242              20 :     async_event() = default;
     243                 : 
     244                 :     /// Copy constructor (deleted).
     245                 :     async_event(async_event const&) = delete;
     246                 : 
     247                 :     /// Copy assignment (deleted).
     248                 :     async_event& operator=(async_event const&) = delete;
     249                 : 
     250                 :     /// Move constructor (deleted).
     251                 :     async_event(async_event&&) = delete;
     252                 : 
     253                 :     /// Move assignment (deleted).
     254                 :     async_event& operator=(async_event&&) = delete;
     255                 : 
     256                 :     /** Returns an awaiter that waits until the event is set.
     257                 : 
     258                 :         If the event is already set, completes immediately.
     259                 : 
     260                 :         @return An awaitable yielding `(error_code)`.
     261                 :     */
     262              57 :     wait_awaiter wait() noexcept
     263                 :     {
     264              57 :         return wait_awaiter{this};
     265                 :     }
     266                 : 
     267                 :     /** Sets the event.
     268                 : 
     269                 :         All waiting coroutines are resumed. Canceled waiters
     270                 :         are skipped. Subsequent calls to wait() complete
     271                 :         immediately until clear() is called.
     272                 :     */
     273              23 :     void set()
     274                 :     {
     275              23 :         set_ = true;
     276                 :         for(;;)
     277                 :         {
     278              39 :             auto* w = waiters_.pop_front();
     279              39 :             if(!w)
     280              23 :                 break;
     281              16 :             w->in_list_ = false;
     282              16 :             if(!w->claimed_.exchange(
     283                 :                 true, std::memory_order_acq_rel))
     284                 :             {
     285              16 :                 w->ex_.post(w->h_);
     286                 :             }
     287              16 :         }
     288              23 :     }
     289                 : 
     290                 :     /** Clears the event.
     291                 : 
     292                 :         Subsequent calls to wait() will suspend until
     293                 :         set() is called again.
     294                 :     */
     295               2 :     void clear() noexcept
     296                 :     {
     297               2 :         set_ = false;
     298               2 :     }
     299                 : 
     300                 :     /** Returns true if the event is currently set.
     301                 :     */
     302               9 :     bool is_set() const noexcept
     303                 :     {
     304               9 :         return set_;
     305                 :     }
     306                 : };
     307                 : 
     308                 : } // namespace capy
     309                 : } // namespace boost
     310                 : 
     311                 : #endif
        

Generated by: LCOV version 2.3