1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
22  
#include <boost/capy/error.hpp>
22  
#include <boost/capy/error.hpp>
23  
#include <boost/capy/ex/io_env.hpp>
23  
#include <boost/capy/ex/io_env.hpp>
24  
#include <boost/capy/io_result.hpp>
24  
#include <boost/capy/io_result.hpp>
25  
#include <boost/capy/io_task.hpp>
25  
#include <boost/capy/io_task.hpp>
26  

26  

27  
#include <concepts>
27  
#include <concepts>
28  
#include <coroutine>
28  
#include <coroutine>
29  
#include <cstddef>
29  
#include <cstddef>
30  
#include <exception>
30  
#include <exception>
31  
#include <new>
31  
#include <new>
32  
#include <span>
32  
#include <span>
33  
#include <stop_token>
33  
#include <stop_token>
34  
#include <system_error>
34  
#include <system_error>
35  
#include <utility>
35  
#include <utility>
36  

36  

37  
namespace boost {
37  
namespace boost {
38  
namespace capy {
38  
namespace capy {
39  

39  

40  
/** Type-erased wrapper for any BufferSource.
40  
/** Type-erased wrapper for any BufferSource.
41  

41  

42  
    This class provides type erasure for any type satisfying the
42  
    This class provides type erasure for any type satisfying the
43  
    @ref BufferSource concept, enabling runtime polymorphism for
43  
    @ref BufferSource concept, enabling runtime polymorphism for
44  
    buffer pull operations. It uses cached awaitable storage to achieve
44  
    buffer pull operations. It uses cached awaitable storage to achieve
45  
    zero steady-state allocation after construction.
45  
    zero steady-state allocation after construction.
46  

46  

47  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
47  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
48  
    satisfies only @ref BufferSource, the read operations are
48  
    satisfies only @ref BufferSource, the read operations are
49  
    synthesized using @ref pull and @ref consume with an extra
49  
    synthesized using @ref pull and @ref consume with an extra
50  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
50  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
51  
    and @ref ReadSource, the native read operations are forwarded
51  
    and @ref ReadSource, the native read operations are forwarded
52  
    directly across the virtual boundary, avoiding the copy.
52  
    directly across the virtual boundary, avoiding the copy.
53  

53  

54  
    The wrapper supports two construction modes:
54  
    The wrapper supports two construction modes:
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
56  
      allocates storage and owns the source.
56  
      allocates storage and owns the source.
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
58  
      pointed-to source must outlive this wrapper.
58  
      pointed-to source must outlive this wrapper.
59  

59  

60  
    Within each mode, the vtable is populated at compile time based
60  
    Within each mode, the vtable is populated at compile time based
61  
    on whether the wrapped type also satisfies @ref ReadSource:
61  
    on whether the wrapped type also satisfies @ref ReadSource:
62  
    - **BufferSource only**: @ref read_some and @ref read are
62  
    - **BufferSource only**: @ref read_some and @ref read are
63  
      synthesized from @ref pull and @ref consume, incurring one
63  
      synthesized from @ref pull and @ref consume, incurring one
64  
      buffer copy per operation.
64  
      buffer copy per operation.
65  
    - **BufferSource + ReadSource**: All read operations are
65  
    - **BufferSource + ReadSource**: All read operations are
66  
      forwarded natively through the type-erased boundary with
66  
      forwarded natively through the type-erased boundary with
67  
      no extra copy.
67  
      no extra copy.
68  

68  

69  
    @par Awaitable Preallocation
69  
    @par Awaitable Preallocation
70  
    The constructor preallocates storage for the type-erased awaitable.
70  
    The constructor preallocates storage for the type-erased awaitable.
71  
    This reserves all virtual address space at server startup
71  
    This reserves all virtual address space at server startup
72  
    so memory usage can be measured up front, rather than
72  
    so memory usage can be measured up front, rather than
73  
    allocating piecemeal as traffic arrives.
73  
    allocating piecemeal as traffic arrives.
74  

74  

75  
    @par Thread Safety
75  
    @par Thread Safety
76  
    Not thread-safe. Concurrent operations on the same wrapper
76  
    Not thread-safe. Concurrent operations on the same wrapper
77  
    are undefined behavior.
77  
    are undefined behavior.
78  

78  

79  
    @par Example
79  
    @par Example
80  
    @code
80  
    @code
81  
    // Owning - takes ownership of the source
81  
    // Owning - takes ownership of the source
82  
    any_buffer_source abs(some_buffer_source{args...});
82  
    any_buffer_source abs(some_buffer_source{args...});
83  

83  

84  
    // Reference - wraps without ownership
84  
    // Reference - wraps without ownership
85  
    some_buffer_source src;
85  
    some_buffer_source src;
86  
    any_buffer_source abs(&src);
86  
    any_buffer_source abs(&src);
87  

87  

88  
    const_buffer arr[16];
88  
    const_buffer arr[16];
89  
    auto [ec, bufs] = co_await abs.pull(arr);
89  
    auto [ec, bufs] = co_await abs.pull(arr);
90  

90  

91  
    // ReadSource interface also available
91  
    // ReadSource interface also available
92  
    char buf[64];
92  
    char buf[64];
93  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
93  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94  
    @endcode
94  
    @endcode
95  

95  

96  
    @see any_buffer_sink, BufferSource, ReadSource
96  
    @see any_buffer_sink, BufferSource, ReadSource
97  
*/
97  
*/
98  
class any_buffer_source
98  
class any_buffer_source
99  
{
99  
{
100  
    struct vtable;
100  
    struct vtable;
101  
    struct awaitable_ops;
101  
    struct awaitable_ops;
102  
    struct read_awaitable_ops;
102  
    struct read_awaitable_ops;
103  

103  

104  
    template<BufferSource S>
104  
    template<BufferSource S>
105  
    struct vtable_for_impl;
105  
    struct vtable_for_impl;
106  

106  

107  
    // hot-path members first for cache locality
107  
    // hot-path members first for cache locality
108  
    void* source_ = nullptr;
108  
    void* source_ = nullptr;
109  
    vtable const* vt_ = nullptr;
109  
    vtable const* vt_ = nullptr;
110  
    void* cached_awaitable_ = nullptr;
110  
    void* cached_awaitable_ = nullptr;
111  
    awaitable_ops const* active_ops_ = nullptr;
111  
    awaitable_ops const* active_ops_ = nullptr;
112  
    read_awaitable_ops const* active_read_ops_ = nullptr;
112  
    read_awaitable_ops const* active_read_ops_ = nullptr;
113  
    void* storage_ = nullptr;
113  
    void* storage_ = nullptr;
114  

114  

115  
public:
115  
public:
116  
    /** Destructor.
116  
    /** Destructor.
117  

117  

118  
        Destroys the owned source (if any) and releases the cached
118  
        Destroys the owned source (if any) and releases the cached
119  
        awaitable storage.
119  
        awaitable storage.
120  
    */
120  
    */
121  
    ~any_buffer_source();
121  
    ~any_buffer_source();
122  

122  

123 -
    /** Construct a default instance.
123 +
    /** Default constructor.
124  

124  

125  
        Constructs an empty wrapper. Operations on a default-constructed
125  
        Constructs an empty wrapper. Operations on a default-constructed
126  
        wrapper result in undefined behavior.
126  
        wrapper result in undefined behavior.
127  
    */
127  
    */
128  
    any_buffer_source() = default;
128  
    any_buffer_source() = default;
129  

129  

130  
    /** Non-copyable.
130  
    /** Non-copyable.
131  

131  

132  
        The awaitable cache is per-instance and cannot be shared.
132  
        The awaitable cache is per-instance and cannot be shared.
133  
    */
133  
    */
134  
    any_buffer_source(any_buffer_source const&) = delete;
134  
    any_buffer_source(any_buffer_source const&) = delete;
135  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
135  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
136  

136  

137 -
    /** Construct by moving.
137 +
    /** Move constructor.
138  

138  

139  
        Transfers ownership of the wrapped source (if owned) and
139  
        Transfers ownership of the wrapped source (if owned) and
140  
        cached awaitable storage from `other`. After the move, `other` is
140  
        cached awaitable storage from `other`. After the move, `other` is
141  
        in a default-constructed state.
141  
        in a default-constructed state.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
    */
144  
    */
145  
    any_buffer_source(any_buffer_source&& other) noexcept
145  
    any_buffer_source(any_buffer_source&& other) noexcept
146  
        : source_(std::exchange(other.source_, nullptr))
146  
        : source_(std::exchange(other.source_, nullptr))
147  
        , vt_(std::exchange(other.vt_, nullptr))
147  
        , vt_(std::exchange(other.vt_, nullptr))
148  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
148  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
149  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
150  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
150  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151  
        , storage_(std::exchange(other.storage_, nullptr))
151  
        , storage_(std::exchange(other.storage_, nullptr))
152  
    {
152  
    {
153  
    }
153  
    }
154  

154  

155 -
    /** Assign by moving.
155 +
    /** Move assignment operator.
156  

156  

157  
        Destroys any owned source and releases existing resources,
157  
        Destroys any owned source and releases existing resources,
158  
        then transfers ownership from `other`.
158  
        then transfers ownership from `other`.
159  

159  

160  
        @param other The wrapper to move from.
160  
        @param other The wrapper to move from.
161  
        @return Reference to this wrapper.
161  
        @return Reference to this wrapper.
162  
    */
162  
    */
163  
    any_buffer_source&
163  
    any_buffer_source&
164  
    operator=(any_buffer_source&& other) noexcept;
164  
    operator=(any_buffer_source&& other) noexcept;
165  

165  

166  
    /** Construct by taking ownership of a BufferSource.
166  
    /** Construct by taking ownership of a BufferSource.
167  

167  

168  
        Allocates storage and moves the source into this wrapper.
168  
        Allocates storage and moves the source into this wrapper.
169  
        The wrapper owns the source and will destroy it. If `S` also
169  
        The wrapper owns the source and will destroy it. If `S` also
170  
        satisfies @ref ReadSource, native read operations are
170  
        satisfies @ref ReadSource, native read operations are
171  
        forwarded through the virtual boundary.
171  
        forwarded through the virtual boundary.
172  

172  

173  
        @param s The source to take ownership of.
173  
        @param s The source to take ownership of.
174  
    */
174  
    */
175  
    template<BufferSource S>
175  
    template<BufferSource S>
176  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
176  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177  
    any_buffer_source(S s);
177  
    any_buffer_source(S s);
178  

178  

179  
    /** Construct by wrapping a BufferSource without ownership.
179  
    /** Construct by wrapping a BufferSource without ownership.
180  

180  

181  
        Wraps the given source by pointer. The source must remain
181  
        Wraps the given source by pointer. The source must remain
182  
        valid for the lifetime of this wrapper. If `S` also
182  
        valid for the lifetime of this wrapper. If `S` also
183  
        satisfies @ref ReadSource, native read operations are
183  
        satisfies @ref ReadSource, native read operations are
184  
        forwarded through the virtual boundary.
184  
        forwarded through the virtual boundary.
185  

185  

186  
        @param s Pointer to the source to wrap.
186  
        @param s Pointer to the source to wrap.
187  
    */
187  
    */
188  
    template<BufferSource S>
188  
    template<BufferSource S>
189  
    any_buffer_source(S* s);
189  
    any_buffer_source(S* s);
190  

190  

191  
    /** Check if the wrapper contains a valid source.
191  
    /** Check if the wrapper contains a valid source.
192  

192  

193  
        @return `true` if wrapping a source, `false` if default-constructed
193  
        @return `true` if wrapping a source, `false` if default-constructed
194  
            or moved-from.
194  
            or moved-from.
195  
    */
195  
    */
196  
    bool
196  
    bool
197  
    has_value() const noexcept
197  
    has_value() const noexcept
198  
    {
198  
    {
199  
        return source_ != nullptr;
199  
        return source_ != nullptr;
200  
    }
200  
    }
201  

201  

202  
    /** Check if the wrapper contains a valid source.
202  
    /** Check if the wrapper contains a valid source.
203  

203  

204  
        @return `true` if wrapping a source, `false` if default-constructed
204  
        @return `true` if wrapping a source, `false` if default-constructed
205  
            or moved-from.
205  
            or moved-from.
206  
    */
206  
    */
207  
    explicit
207  
    explicit
208  
    operator bool() const noexcept
208  
    operator bool() const noexcept
209  
    {
209  
    {
210  
        return has_value();
210  
        return has_value();
211  
    }
211  
    }
212  

212  

213  
    /** Consume bytes from the source.
213  
    /** Consume bytes from the source.
214  

214  

215  
        Advances the internal read position of the underlying source
215  
        Advances the internal read position of the underlying source
216  
        by the specified number of bytes. The next call to @ref pull
216  
        by the specified number of bytes. The next call to @ref pull
217  
        returns data starting after the consumed bytes.
217  
        returns data starting after the consumed bytes.
218  

218  

219  
        @param n The number of bytes to consume. Must not exceed the
219  
        @param n The number of bytes to consume. Must not exceed the
220  
        total size of buffers returned by the previous @ref pull.
220  
        total size of buffers returned by the previous @ref pull.
221  

221  

222  
        @par Preconditions
222  
        @par Preconditions
223  
        The wrapper must contain a valid source (`has_value() == true`).
223  
        The wrapper must contain a valid source (`has_value() == true`).
224  
    */
224  
    */
225  
    void
225  
    void
226  
    consume(std::size_t n) noexcept;
226  
    consume(std::size_t n) noexcept;
227  

227  

228  
    /** Pull buffer data from the source.
228  
    /** Pull buffer data from the source.
229  

229  

230  
        Fills the provided span with buffer descriptors from the
230  
        Fills the provided span with buffer descriptors from the
231  
        underlying source. The operation completes when data is
231  
        underlying source. The operation completes when data is
232  
        available, the source is exhausted, or an error occurs.
232  
        available, the source is exhausted, or an error occurs.
233  

233  

234  
        @param dest Span of const_buffer to fill.
234  
        @param dest Span of const_buffer to fill.
235  

235  

236  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
236  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237  
            On success with data, a non-empty span of filled buffers.
237  
            On success with data, a non-empty span of filled buffers.
238  
            On EOF, `ec == cond::eof` and span is empty.
238  
            On EOF, `ec == cond::eof` and span is empty.
239  

239  

240  
        @par Preconditions
240  
        @par Preconditions
241  
        The wrapper must contain a valid source (`has_value() == true`).
241  
        The wrapper must contain a valid source (`has_value() == true`).
242  
        The caller must not call this function again after a prior
242  
        The caller must not call this function again after a prior
243  
        call returned an error.
243  
        call returned an error.
244  
    */
244  
    */
245  
    auto
245  
    auto
246  
    pull(std::span<const_buffer> dest);
246  
    pull(std::span<const_buffer> dest);
247  

247  

248  
    /** Read some data into a mutable buffer sequence.
248  
    /** Read some data into a mutable buffer sequence.
249  

249  

250  
        Reads one or more bytes into the caller's buffers. May fill
250  
        Reads one or more bytes into the caller's buffers. May fill
251  
        less than the full sequence.
251  
        less than the full sequence.
252  

252  

253  
        When the wrapped type provides native @ref ReadSource support,
253  
        When the wrapped type provides native @ref ReadSource support,
254  
        the operation forwards directly. Otherwise it is synthesized
254  
        the operation forwards directly. Otherwise it is synthesized
255  
        from @ref pull, @ref buffer_copy, and @ref consume.
255  
        from @ref pull, @ref buffer_copy, and @ref consume.
256  

256  

257  
        @param buffers The buffer sequence to fill.
257  
        @param buffers The buffer sequence to fill.
258  

258  

259  
        @return An awaitable yielding `(error_code,std::size_t)`.
259  
        @return An awaitable yielding `(error_code,std::size_t)`.
260  

260  

261  
        @par Preconditions
261  
        @par Preconditions
262  
        The wrapper must contain a valid source (`has_value() == true`).
262  
        The wrapper must contain a valid source (`has_value() == true`).
263  
        The caller must not call this function again after a prior
263  
        The caller must not call this function again after a prior
264  
        call returned an error (including EOF).
264  
        call returned an error (including EOF).
265  

265  

266  
        @see pull, consume
266  
        @see pull, consume
267  
    */
267  
    */
268  
    template<MutableBufferSequence MB>
268  
    template<MutableBufferSequence MB>
269  
    io_task<std::size_t>
269  
    io_task<std::size_t>
270  
    read_some(MB buffers);
270  
    read_some(MB buffers);
271  

271  

272  
    /** Read data into a mutable buffer sequence.
272  
    /** Read data into a mutable buffer sequence.
273  

273  

274  
        Fills the provided buffer sequence completely. When the
274  
        Fills the provided buffer sequence completely. When the
275  
        wrapped type provides native @ref ReadSource support, each
275  
        wrapped type provides native @ref ReadSource support, each
276  
        window is forwarded directly. Otherwise the data is
276  
        window is forwarded directly. Otherwise the data is
277  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
277  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278  

278  

279  
        @param buffers The buffer sequence to fill.
279  
        @param buffers The buffer sequence to fill.
280  

280  

281  
        @return An awaitable yielding `(error_code,std::size_t)`.
281  
        @return An awaitable yielding `(error_code,std::size_t)`.
282  
            On success, `n == buffer_size(buffers)`.
282  
            On success, `n == buffer_size(buffers)`.
283  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
283  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
284  

284  

285  
        @par Preconditions
285  
        @par Preconditions
286  
        The wrapper must contain a valid source (`has_value() == true`).
286  
        The wrapper must contain a valid source (`has_value() == true`).
287  
        The caller must not call this function again after a prior
287  
        The caller must not call this function again after a prior
288  
        call returned an error (including EOF).
288  
        call returned an error (including EOF).
289  

289  

290  
        @see pull, consume
290  
        @see pull, consume
291  
    */
291  
    */
292  
    template<MutableBufferSequence MB>
292  
    template<MutableBufferSequence MB>
293  
    io_task<std::size_t>
293  
    io_task<std::size_t>
294  
    read(MB buffers);
294  
    read(MB buffers);
295  

295  

296  
protected:
296  
protected:
297  
    /** Rebind to a new source after move.
297  
    /** Rebind to a new source after move.
298  

298  

299  
        Updates the internal pointer to reference a new source object.
299  
        Updates the internal pointer to reference a new source object.
300  
        Used by owning wrappers after move assignment when the owned
300  
        Used by owning wrappers after move assignment when the owned
301  
        object has moved to a new location.
301  
        object has moved to a new location.
302  

302  

303  
        @param new_source The new source to bind to. Must be the same
303  
        @param new_source The new source to bind to. Must be the same
304  
            type as the original source.
304  
            type as the original source.
305  

305  

306  
        @note Terminates if called with a source of different type
306  
        @note Terminates if called with a source of different type
307  
            than the original.
307  
            than the original.
308  
    */
308  
    */
309  
    template<BufferSource S>
309  
    template<BufferSource S>
310  
    void
310  
    void
311  
    rebind(S& new_source) noexcept
311  
    rebind(S& new_source) noexcept
312  
    {
312  
    {
313  
        if(vt_ != &vtable_for_impl<S>::value)
313  
        if(vt_ != &vtable_for_impl<S>::value)
314  
            std::terminate();
314  
            std::terminate();
315  
        source_ = &new_source;
315  
        source_ = &new_source;
316  
    }
316  
    }
317  

317  

318  
private:
318  
private:
319  
    /** Forward a partial read through the vtable.
319  
    /** Forward a partial read through the vtable.
320  

320  

321  
        Constructs the underlying `read_some` awaitable in
321  
        Constructs the underlying `read_some` awaitable in
322  
        cached storage and returns a type-erased awaitable.
322  
        cached storage and returns a type-erased awaitable.
323  
    */
323  
    */
324  
    auto
324  
    auto
325  
    read_some_(std::span<mutable_buffer const> buffers);
325  
    read_some_(std::span<mutable_buffer const> buffers);
326  

326  

327  
    /** Forward a complete read through the vtable.
327  
    /** Forward a complete read through the vtable.
328  

328  

329  
        Constructs the underlying `read` awaitable in
329  
        Constructs the underlying `read` awaitable in
330  
        cached storage and returns a type-erased awaitable.
330  
        cached storage and returns a type-erased awaitable.
331  
    */
331  
    */
332  
    auto
332  
    auto
333  
    read_(std::span<mutable_buffer const> buffers);
333  
    read_(std::span<mutable_buffer const> buffers);
334  
};
334  
};
335  

335  

 
336 +
//----------------------------------------------------------
 
337 +

336  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
338  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
337  
struct any_buffer_source::awaitable_ops
339  
struct any_buffer_source::awaitable_ops
338  
{
340  
{
339  
    bool (*await_ready)(void*);
341  
    bool (*await_ready)(void*);
340  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
342  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
343  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
342  
    void (*destroy)(void*) noexcept;
344  
    void (*destroy)(void*) noexcept;
343  
};
345  
};
344  

346  

345  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
347  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
346  
struct any_buffer_source::read_awaitable_ops
348  
struct any_buffer_source::read_awaitable_ops
347  
{
349  
{
348  
    bool (*await_ready)(void*);
350  
    bool (*await_ready)(void*);
349  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
351  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
350  
    io_result<std::size_t> (*await_resume)(void*);
352  
    io_result<std::size_t> (*await_resume)(void*);
351  
    void (*destroy)(void*) noexcept;
353  
    void (*destroy)(void*) noexcept;
352  
};
354  
};
353  

355  

354  
struct any_buffer_source::vtable
356  
struct any_buffer_source::vtable
355  
{
357  
{
356  
    // BufferSource ops (always populated)
358  
    // BufferSource ops (always populated)
357  
    void (*destroy)(void*) noexcept;
359  
    void (*destroy)(void*) noexcept;
358  
    void (*do_consume)(void* source, std::size_t n) noexcept;
360  
    void (*do_consume)(void* source, std::size_t n) noexcept;
359  
    std::size_t awaitable_size;
361  
    std::size_t awaitable_size;
360  
    std::size_t awaitable_align;
362  
    std::size_t awaitable_align;
361  
    awaitable_ops const* (*construct_awaitable)(
363  
    awaitable_ops const* (*construct_awaitable)(
362  
        void* source,
364  
        void* source,
363  
        void* storage,
365  
        void* storage,
364  
        std::span<const_buffer> dest);
366  
        std::span<const_buffer> dest);
365  

367  

366  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
368  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
367  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
369  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
368  
        void* source,
370  
        void* source,
369  
        void* storage,
371  
        void* storage,
370  
        std::span<mutable_buffer const> buffers);
372  
        std::span<mutable_buffer const> buffers);
371  
    read_awaitable_ops const* (*construct_read_awaitable)(
373  
    read_awaitable_ops const* (*construct_read_awaitable)(
372  
        void* source,
374  
        void* source,
373  
        void* storage,
375  
        void* storage,
374  
        std::span<mutable_buffer const> buffers);
376  
        std::span<mutable_buffer const> buffers);
375  
};
377  
};
376  

378  

377  
template<BufferSource S>
379  
template<BufferSource S>
378  
struct any_buffer_source::vtable_for_impl
380  
struct any_buffer_source::vtable_for_impl
379  
{
381  
{
380  
    using PullAwaitable = decltype(std::declval<S&>().pull(
382  
    using PullAwaitable = decltype(std::declval<S&>().pull(
381  
        std::declval<std::span<const_buffer>>()));
383  
        std::declval<std::span<const_buffer>>()));
382  

384  

383  
    static void
385  
    static void
384  
    do_destroy_impl(void* source) noexcept
386  
    do_destroy_impl(void* source) noexcept
385  
    {
387  
    {
386  
        static_cast<S*>(source)->~S();
388  
        static_cast<S*>(source)->~S();
387  
    }
389  
    }
388  

390  

389  
    static void
391  
    static void
390  
    do_consume_impl(void* source, std::size_t n) noexcept
392  
    do_consume_impl(void* source, std::size_t n) noexcept
391  
    {
393  
    {
392  
        static_cast<S*>(source)->consume(n);
394  
        static_cast<S*>(source)->consume(n);
393  
    }
395  
    }
394  

396  

395  
    static awaitable_ops const*
397  
    static awaitable_ops const*
396  
    construct_awaitable_impl(
398  
    construct_awaitable_impl(
397  
        void* source,
399  
        void* source,
398  
        void* storage,
400  
        void* storage,
399  
        std::span<const_buffer> dest)
401  
        std::span<const_buffer> dest)
400  
    {
402  
    {
401  
        auto& s = *static_cast<S*>(source);
403  
        auto& s = *static_cast<S*>(source);
402  
        ::new(storage) PullAwaitable(s.pull(dest));
404  
        ::new(storage) PullAwaitable(s.pull(dest));
403  

405  

404  
        static constexpr awaitable_ops ops = {
406  
        static constexpr awaitable_ops ops = {
405  
            +[](void* p) {
407  
            +[](void* p) {
406  
                return static_cast<PullAwaitable*>(p)->await_ready();
408  
                return static_cast<PullAwaitable*>(p)->await_ready();
407  
            },
409  
            },
408  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
410  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
409  
                return detail::call_await_suspend(
411  
                return detail::call_await_suspend(
410  
                    static_cast<PullAwaitable*>(p), h, env);
412  
                    static_cast<PullAwaitable*>(p), h, env);
411  
            },
413  
            },
412  
            +[](void* p) {
414  
            +[](void* p) {
413  
                return static_cast<PullAwaitable*>(p)->await_resume();
415  
                return static_cast<PullAwaitable*>(p)->await_resume();
414  
            },
416  
            },
415  
            +[](void* p) noexcept {
417  
            +[](void* p) noexcept {
416  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
418  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
417  
            }
419  
            }
418  
        };
420  
        };
419  
        return &ops;
421  
        return &ops;
420  
    }
422  
    }
421  

423  

 
424 +
    //------------------------------------------------------
 
425 +
    // ReadSource forwarding (only instantiated when ReadSource<S>)
 
426 +

422  
    static read_awaitable_ops const*
427  
    static read_awaitable_ops const*
423  
    construct_read_some_awaitable_impl(
428  
    construct_read_some_awaitable_impl(
424  
        void* source,
429  
        void* source,
425  
        void* storage,
430  
        void* storage,
426  
        std::span<mutable_buffer const> buffers)
431  
        std::span<mutable_buffer const> buffers)
427  
        requires ReadSource<S>
432  
        requires ReadSource<S>
428  
    {
433  
    {
429  
        using Aw = decltype(std::declval<S&>().read_some(
434  
        using Aw = decltype(std::declval<S&>().read_some(
430  
            std::span<mutable_buffer const>{}));
435  
            std::span<mutable_buffer const>{}));
431  
        auto& s = *static_cast<S*>(source);
436  
        auto& s = *static_cast<S*>(source);
432  
        ::new(storage) Aw(s.read_some(buffers));
437  
        ::new(storage) Aw(s.read_some(buffers));
433  

438  

434  
        static constexpr read_awaitable_ops ops = {
439  
        static constexpr read_awaitable_ops ops = {
435  
            +[](void* p) {
440  
            +[](void* p) {
436  
                return static_cast<Aw*>(p)->await_ready();
441  
                return static_cast<Aw*>(p)->await_ready();
437  
            },
442  
            },
438  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
443  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
439  
                return detail::call_await_suspend(
444  
                return detail::call_await_suspend(
440  
                    static_cast<Aw*>(p), h, env);
445  
                    static_cast<Aw*>(p), h, env);
441  
            },
446  
            },
442  
            +[](void* p) {
447  
            +[](void* p) {
443  
                return static_cast<Aw*>(p)->await_resume();
448  
                return static_cast<Aw*>(p)->await_resume();
444  
            },
449  
            },
445  
            +[](void* p) noexcept {
450  
            +[](void* p) noexcept {
446  
                static_cast<Aw*>(p)->~Aw();
451  
                static_cast<Aw*>(p)->~Aw();
447  
            }
452  
            }
448  
        };
453  
        };
449  
        return &ops;
454  
        return &ops;
450  
    }
455  
    }
451  

456  

452  
    static read_awaitable_ops const*
457  
    static read_awaitable_ops const*
453  
    construct_read_awaitable_impl(
458  
    construct_read_awaitable_impl(
454  
        void* source,
459  
        void* source,
455  
        void* storage,
460  
        void* storage,
456  
        std::span<mutable_buffer const> buffers)
461  
        std::span<mutable_buffer const> buffers)
457  
        requires ReadSource<S>
462  
        requires ReadSource<S>
458  
    {
463  
    {
459  
        using Aw = decltype(std::declval<S&>().read(
464  
        using Aw = decltype(std::declval<S&>().read(
460  
            std::span<mutable_buffer const>{}));
465  
            std::span<mutable_buffer const>{}));
461  
        auto& s = *static_cast<S*>(source);
466  
        auto& s = *static_cast<S*>(source);
462  
        ::new(storage) Aw(s.read(buffers));
467  
        ::new(storage) Aw(s.read(buffers));
463  

468  

464  
        static constexpr read_awaitable_ops ops = {
469  
        static constexpr read_awaitable_ops ops = {
465  
            +[](void* p) {
470  
            +[](void* p) {
466  
                return static_cast<Aw*>(p)->await_ready();
471  
                return static_cast<Aw*>(p)->await_ready();
467  
            },
472  
            },
468  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
473  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
469  
                return detail::call_await_suspend(
474  
                return detail::call_await_suspend(
470  
                    static_cast<Aw*>(p), h, env);
475  
                    static_cast<Aw*>(p), h, env);
471  
            },
476  
            },
472  
            +[](void* p) {
477  
            +[](void* p) {
473  
                return static_cast<Aw*>(p)->await_resume();
478  
                return static_cast<Aw*>(p)->await_resume();
474  
            },
479  
            },
475  
            +[](void* p) noexcept {
480  
            +[](void* p) noexcept {
476  
                static_cast<Aw*>(p)->~Aw();
481  
                static_cast<Aw*>(p)->~Aw();
477  
            }
482  
            }
478  
        };
483  
        };
479  
        return &ops;
484  
        return &ops;
480  
    }
485  
    }
481  

486  

 
487 +
    //------------------------------------------------------
 
488 +

482  
    static consteval std::size_t
489  
    static consteval std::size_t
483  
    compute_max_size() noexcept
490  
    compute_max_size() noexcept
484  
    {
491  
    {
485  
        std::size_t s = sizeof(PullAwaitable);
492  
        std::size_t s = sizeof(PullAwaitable);
486  
        if constexpr (ReadSource<S>)
493  
        if constexpr (ReadSource<S>)
487  
        {
494  
        {
488  
            using RS = decltype(std::declval<S&>().read_some(
495  
            using RS = decltype(std::declval<S&>().read_some(
489  
                std::span<mutable_buffer const>{}));
496  
                std::span<mutable_buffer const>{}));
490  
            using R = decltype(std::declval<S&>().read(
497  
            using R = decltype(std::declval<S&>().read(
491  
                std::span<mutable_buffer const>{}));
498  
                std::span<mutable_buffer const>{}));
492  

499  

493  
            if(sizeof(RS) > s) s = sizeof(RS);
500  
            if(sizeof(RS) > s) s = sizeof(RS);
494  
            if(sizeof(R) > s) s = sizeof(R);
501  
            if(sizeof(R) > s) s = sizeof(R);
495  
        }
502  
        }
496  
        return s;
503  
        return s;
497  
    }
504  
    }
498  

505  

499  
    static consteval std::size_t
506  
    static consteval std::size_t
500  
    compute_max_align() noexcept
507  
    compute_max_align() noexcept
501  
    {
508  
    {
502  
        std::size_t a = alignof(PullAwaitable);
509  
        std::size_t a = alignof(PullAwaitable);
503  
        if constexpr (ReadSource<S>)
510  
        if constexpr (ReadSource<S>)
504  
        {
511  
        {
505  
            using RS = decltype(std::declval<S&>().read_some(
512  
            using RS = decltype(std::declval<S&>().read_some(
506  
                std::span<mutable_buffer const>{}));
513  
                std::span<mutable_buffer const>{}));
507  
            using R = decltype(std::declval<S&>().read(
514  
            using R = decltype(std::declval<S&>().read(
508  
                std::span<mutable_buffer const>{}));
515  
                std::span<mutable_buffer const>{}));
509  

516  

510  
            if(alignof(RS) > a) a = alignof(RS);
517  
            if(alignof(RS) > a) a = alignof(RS);
511  
            if(alignof(R) > a) a = alignof(R);
518  
            if(alignof(R) > a) a = alignof(R);
512  
        }
519  
        }
513  
        return a;
520  
        return a;
514  
    }
521  
    }
515  

522  

516  
    static consteval vtable
523  
    static consteval vtable
517  
    make_vtable() noexcept
524  
    make_vtable() noexcept
518  
    {
525  
    {
519  
        vtable v{};
526  
        vtable v{};
520  
        v.destroy = &do_destroy_impl;
527  
        v.destroy = &do_destroy_impl;
521  
        v.do_consume = &do_consume_impl;
528  
        v.do_consume = &do_consume_impl;
522  
        v.awaitable_size = compute_max_size();
529  
        v.awaitable_size = compute_max_size();
523  
        v.awaitable_align = compute_max_align();
530  
        v.awaitable_align = compute_max_align();
524  
        v.construct_awaitable = &construct_awaitable_impl;
531  
        v.construct_awaitable = &construct_awaitable_impl;
525  
        v.construct_read_some_awaitable = nullptr;
532  
        v.construct_read_some_awaitable = nullptr;
526  
        v.construct_read_awaitable = nullptr;
533  
        v.construct_read_awaitable = nullptr;
527  

534  

528  
        if constexpr (ReadSource<S>)
535  
        if constexpr (ReadSource<S>)
529  
        {
536  
        {
530  
            v.construct_read_some_awaitable =
537  
            v.construct_read_some_awaitable =
531  
                &construct_read_some_awaitable_impl;
538  
                &construct_read_some_awaitable_impl;
532  
            v.construct_read_awaitable =
539  
            v.construct_read_awaitable =
533  
                &construct_read_awaitable_impl;
540  
                &construct_read_awaitable_impl;
534  
        }
541  
        }
535  
        return v;
542  
        return v;
536  
    }
543  
    }
537  

544  

538  
    static constexpr vtable value = make_vtable();
545  
    static constexpr vtable value = make_vtable();
539  
};
546  
};
540  

547  

 
548 +
//----------------------------------------------------------
 
549 +

541  
inline
550  
inline
542  
any_buffer_source::~any_buffer_source()
551  
any_buffer_source::~any_buffer_source()
543  
{
552  
{
544  
    if(storage_)
553  
    if(storage_)
545  
    {
554  
    {
546  
        vt_->destroy(source_);
555  
        vt_->destroy(source_);
547  
        ::operator delete(storage_);
556  
        ::operator delete(storage_);
548  
    }
557  
    }
549  
    if(cached_awaitable_)
558  
    if(cached_awaitable_)
550  
        ::operator delete(cached_awaitable_);
559  
        ::operator delete(cached_awaitable_);
551  
}
560  
}
552  

561  

553  
inline any_buffer_source&
562  
inline any_buffer_source&
554  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
563  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
555  
{
564  
{
556  
    if(this != &other)
565  
    if(this != &other)
557  
    {
566  
    {
558  
        if(storage_)
567  
        if(storage_)
559  
        {
568  
        {
560  
            vt_->destroy(source_);
569  
            vt_->destroy(source_);
561  
            ::operator delete(storage_);
570  
            ::operator delete(storage_);
562  
        }
571  
        }
563  
        if(cached_awaitable_)
572  
        if(cached_awaitable_)
564  
            ::operator delete(cached_awaitable_);
573  
            ::operator delete(cached_awaitable_);
565  
        source_ = std::exchange(other.source_, nullptr);
574  
        source_ = std::exchange(other.source_, nullptr);
566  
        vt_ = std::exchange(other.vt_, nullptr);
575  
        vt_ = std::exchange(other.vt_, nullptr);
567  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
576  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
568  
        storage_ = std::exchange(other.storage_, nullptr);
577  
        storage_ = std::exchange(other.storage_, nullptr);
569  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
578  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
570  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
579  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
571  
    }
580  
    }
572  
    return *this;
581  
    return *this;
573  
}
582  
}
574  

583  

575  
template<BufferSource S>
584  
template<BufferSource S>
576  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
585  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
577  
any_buffer_source::any_buffer_source(S s)
586  
any_buffer_source::any_buffer_source(S s)
578  
    : vt_(&vtable_for_impl<S>::value)
587  
    : vt_(&vtable_for_impl<S>::value)
579  
{
588  
{
580  
    struct guard {
589  
    struct guard {
581  
        any_buffer_source* self;
590  
        any_buffer_source* self;
582  
        bool committed = false;
591  
        bool committed = false;
583  
        ~guard() {
592  
        ~guard() {
584  
            if(!committed && self->storage_) {
593  
            if(!committed && self->storage_) {
585  
                self->vt_->destroy(self->source_);
594  
                self->vt_->destroy(self->source_);
586  
                ::operator delete(self->storage_);
595  
                ::operator delete(self->storage_);
587  
                self->storage_ = nullptr;
596  
                self->storage_ = nullptr;
588  
                self->source_ = nullptr;
597  
                self->source_ = nullptr;
589  
            }
598  
            }
590  
        }
599  
        }
591  
    } g{this};
600  
    } g{this};
592  

601  

593  
    storage_ = ::operator new(sizeof(S));
602  
    storage_ = ::operator new(sizeof(S));
594  
    source_ = ::new(storage_) S(std::move(s));
603  
    source_ = ::new(storage_) S(std::move(s));
595  

604  

596  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
597  

606  

598  
    g.committed = true;
607  
    g.committed = true;
599  
}
608  
}
600  

609  

601  
template<BufferSource S>
610  
template<BufferSource S>
602  
any_buffer_source::any_buffer_source(S* s)
611  
any_buffer_source::any_buffer_source(S* s)
603  
    : source_(s)
612  
    : source_(s)
604  
    , vt_(&vtable_for_impl<S>::value)
613  
    , vt_(&vtable_for_impl<S>::value)
605  
{
614  
{
606  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
615  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
607  
}
616  
}
608  

617  

 
618 +
//----------------------------------------------------------
 
619 +

609  
inline void
620  
inline void
610  
any_buffer_source::consume(std::size_t n) noexcept
621  
any_buffer_source::consume(std::size_t n) noexcept
611  
{
622  
{
612  
    vt_->do_consume(source_, n);
623  
    vt_->do_consume(source_, n);
613  
}
624  
}
614  

625  

615  
inline auto
626  
inline auto
616  
any_buffer_source::pull(std::span<const_buffer> dest)
627  
any_buffer_source::pull(std::span<const_buffer> dest)
617  
{
628  
{
618  
    struct awaitable
629  
    struct awaitable
619  
    {
630  
    {
620  
        any_buffer_source* self_;
631  
        any_buffer_source* self_;
621  
        std::span<const_buffer> dest_;
632  
        std::span<const_buffer> dest_;
622  

633  

623  
        bool
634  
        bool
624  
        await_ready()
635  
        await_ready()
625  
        {
636  
        {
626  
            self_->active_ops_ = self_->vt_->construct_awaitable(
637  
            self_->active_ops_ = self_->vt_->construct_awaitable(
627  
                self_->source_,
638  
                self_->source_,
628  
                self_->cached_awaitable_,
639  
                self_->cached_awaitable_,
629  
                dest_);
640  
                dest_);
630  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
641  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
631  
        }
642  
        }
632  

643  

633  
        std::coroutine_handle<>
644  
        std::coroutine_handle<>
634  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
645  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
635  
        {
646  
        {
636  
            return self_->active_ops_->await_suspend(
647  
            return self_->active_ops_->await_suspend(
637  
                self_->cached_awaitable_, h, env);
648  
                self_->cached_awaitable_, h, env);
638  
        }
649  
        }
639  

650  

640  
        io_result<std::span<const_buffer>>
651  
        io_result<std::span<const_buffer>>
641  
        await_resume()
652  
        await_resume()
642  
        {
653  
        {
643  
            struct guard {
654  
            struct guard {
644  
                any_buffer_source* self;
655  
                any_buffer_source* self;
645  
                ~guard() {
656  
                ~guard() {
646  
                    self->active_ops_->destroy(self->cached_awaitable_);
657  
                    self->active_ops_->destroy(self->cached_awaitable_);
647  
                    self->active_ops_ = nullptr;
658  
                    self->active_ops_ = nullptr;
648  
                }
659  
                }
649  
            } g{self_};
660  
            } g{self_};
650  
            return self_->active_ops_->await_resume(
661  
            return self_->active_ops_->await_resume(
651  
                self_->cached_awaitable_);
662  
                self_->cached_awaitable_);
652  
        }
663  
        }
653  
    };
664  
    };
654  
    return awaitable{this, dest};
665  
    return awaitable{this, dest};
655  
}
666  
}
656  

667  

 
668 +
//----------------------------------------------------------
 
669 +
// Private helpers for native ReadSource forwarding
 
670 +

657  
inline auto
671  
inline auto
658  
any_buffer_source::read_some_(
672  
any_buffer_source::read_some_(
659  
    std::span<mutable_buffer const> buffers)
673  
    std::span<mutable_buffer const> buffers)
660  
{
674  
{
661  
    struct awaitable
675  
    struct awaitable
662  
    {
676  
    {
663  
        any_buffer_source* self_;
677  
        any_buffer_source* self_;
664  
        std::span<mutable_buffer const> buffers_;
678  
        std::span<mutable_buffer const> buffers_;
665  

679  

666  
        bool
680  
        bool
667  
        await_ready() const noexcept
681  
        await_ready() const noexcept
668  
        {
682  
        {
669  
            return false;
683  
            return false;
670  
        }
684  
        }
671  

685  

672  
        std::coroutine_handle<>
686  
        std::coroutine_handle<>
673  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
687  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
674  
        {
688  
        {
675  
            self_->active_read_ops_ =
689  
            self_->active_read_ops_ =
676  
                self_->vt_->construct_read_some_awaitable(
690  
                self_->vt_->construct_read_some_awaitable(
677  
                    self_->source_,
691  
                    self_->source_,
678  
                    self_->cached_awaitable_,
692  
                    self_->cached_awaitable_,
679  
                    buffers_);
693  
                    buffers_);
680  

694  

681  
            if(self_->active_read_ops_->await_ready(
695  
            if(self_->active_read_ops_->await_ready(
682  
                self_->cached_awaitable_))
696  
                self_->cached_awaitable_))
683  
                return h;
697  
                return h;
684  

698  

685  
            return self_->active_read_ops_->await_suspend(
699  
            return self_->active_read_ops_->await_suspend(
686  
                self_->cached_awaitable_, h, env);
700  
                self_->cached_awaitable_, h, env);
687  
        }
701  
        }
688  

702  

689  
        io_result<std::size_t>
703  
        io_result<std::size_t>
690  
        await_resume()
704  
        await_resume()
691  
        {
705  
        {
692  
            struct guard {
706  
            struct guard {
693  
                any_buffer_source* self;
707  
                any_buffer_source* self;
694  
                ~guard() {
708  
                ~guard() {
695  
                    self->active_read_ops_->destroy(
709  
                    self->active_read_ops_->destroy(
696  
                        self->cached_awaitable_);
710  
                        self->cached_awaitable_);
697  
                    self->active_read_ops_ = nullptr;
711  
                    self->active_read_ops_ = nullptr;
698  
                }
712  
                }
699  
            } g{self_};
713  
            } g{self_};
700  
            return self_->active_read_ops_->await_resume(
714  
            return self_->active_read_ops_->await_resume(
701  
                self_->cached_awaitable_);
715  
                self_->cached_awaitable_);
702  
        }
716  
        }
703  
    };
717  
    };
704  
    return awaitable{this, buffers};
718  
    return awaitable{this, buffers};
705  
}
719  
}
706  

720  

707  
inline auto
721  
inline auto
708  
any_buffer_source::read_(
722  
any_buffer_source::read_(
709  
    std::span<mutable_buffer const> buffers)
723  
    std::span<mutable_buffer const> buffers)
710  
{
724  
{
711  
    struct awaitable
725  
    struct awaitable
712  
    {
726  
    {
713  
        any_buffer_source* self_;
727  
        any_buffer_source* self_;
714  
        std::span<mutable_buffer const> buffers_;
728  
        std::span<mutable_buffer const> buffers_;
715  

729  

716  
        bool
730  
        bool
717  
        await_ready() const noexcept
731  
        await_ready() const noexcept
718  
        {
732  
        {
719  
            return false;
733  
            return false;
720  
        }
734  
        }
721  

735  

722  
        std::coroutine_handle<>
736  
        std::coroutine_handle<>
723  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
737  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
724  
        {
738  
        {
725  
            self_->active_read_ops_ =
739  
            self_->active_read_ops_ =
726  
                self_->vt_->construct_read_awaitable(
740  
                self_->vt_->construct_read_awaitable(
727  
                    self_->source_,
741  
                    self_->source_,
728  
                    self_->cached_awaitable_,
742  
                    self_->cached_awaitable_,
729  
                    buffers_);
743  
                    buffers_);
730  

744  

731  
            if(self_->active_read_ops_->await_ready(
745  
            if(self_->active_read_ops_->await_ready(
732  
                self_->cached_awaitable_))
746  
                self_->cached_awaitable_))
733  
                return h;
747  
                return h;
734  

748  

735  
            return self_->active_read_ops_->await_suspend(
749  
            return self_->active_read_ops_->await_suspend(
736  
                self_->cached_awaitable_, h, env);
750  
                self_->cached_awaitable_, h, env);
737  
        }
751  
        }
738  

752  

739  
        io_result<std::size_t>
753  
        io_result<std::size_t>
740  
        await_resume()
754  
        await_resume()
741  
        {
755  
        {
742  
            struct guard {
756  
            struct guard {
743  
                any_buffer_source* self;
757  
                any_buffer_source* self;
744  
                ~guard() {
758  
                ~guard() {
745  
                    self->active_read_ops_->destroy(
759  
                    self->active_read_ops_->destroy(
746  
                        self->cached_awaitable_);
760  
                        self->cached_awaitable_);
747  
                    self->active_read_ops_ = nullptr;
761  
                    self->active_read_ops_ = nullptr;
748  
                }
762  
                }
749  
            } g{self_};
763  
            } g{self_};
750  
            return self_->active_read_ops_->await_resume(
764  
            return self_->active_read_ops_->await_resume(
751  
                self_->cached_awaitable_);
765  
                self_->cached_awaitable_);
752  
        }
766  
        }
753  
    };
767  
    };
754  
    return awaitable{this, buffers};
768  
    return awaitable{this, buffers};
755  
}
769  
}
756  

770  

 
771 +
//----------------------------------------------------------
 
772 +
// Public ReadSource methods
 
773 +

757  
template<MutableBufferSequence MB>
774  
template<MutableBufferSequence MB>
758  
io_task<std::size_t>
775  
io_task<std::size_t>
759  
any_buffer_source::read_some(MB buffers)
776  
any_buffer_source::read_some(MB buffers)
760  
{
777  
{
761  
    buffer_param<MB> bp(buffers);
778  
    buffer_param<MB> bp(buffers);
762  
    auto dest = bp.data();
779  
    auto dest = bp.data();
763  
    if(dest.empty())
780  
    if(dest.empty())
764  
        co_return {{}, 0};
781  
        co_return {{}, 0};
765  

782  

766  
    // Native ReadSource path
783  
    // Native ReadSource path
767  
    if(vt_->construct_read_some_awaitable)
784  
    if(vt_->construct_read_some_awaitable)
768  
        co_return co_await read_some_(dest);
785  
        co_return co_await read_some_(dest);
769  

786  

770  
    // Synthesized path: pull + buffer_copy + consume
787  
    // Synthesized path: pull + buffer_copy + consume
771  
    const_buffer arr[detail::max_iovec_];
788  
    const_buffer arr[detail::max_iovec_];
772  
    auto [ec, bufs] = co_await pull(arr);
789  
    auto [ec, bufs] = co_await pull(arr);
773  
    if(ec)
790  
    if(ec)
774  
        co_return {ec, 0};
791  
        co_return {ec, 0};
775  

792  

776  
    auto n = buffer_copy(dest, bufs);
793  
    auto n = buffer_copy(dest, bufs);
777  
    consume(n);
794  
    consume(n);
778  
    co_return {{}, n};
795  
    co_return {{}, n};
779  
}
796  
}
780  

797  

781  
template<MutableBufferSequence MB>
798  
template<MutableBufferSequence MB>
782  
io_task<std::size_t>
799  
io_task<std::size_t>
783  
any_buffer_source::read(MB buffers)
800  
any_buffer_source::read(MB buffers)
784  
{
801  
{
785  
    buffer_param<MB> bp(buffers);
802  
    buffer_param<MB> bp(buffers);
786  
    std::size_t total = 0;
803  
    std::size_t total = 0;
787  

804  

788  
    // Native ReadSource path
805  
    // Native ReadSource path
789  
    if(vt_->construct_read_awaitable)
806  
    if(vt_->construct_read_awaitable)
790  
    {
807  
    {
791  
        for(;;)
808  
        for(;;)
792  
        {
809  
        {
793  
            auto dest = bp.data();
810  
            auto dest = bp.data();
794  
            if(dest.empty())
811  
            if(dest.empty())
795  
                break;
812  
                break;
796  

813  

797  
            auto [ec, n] = co_await read_(dest);
814  
            auto [ec, n] = co_await read_(dest);
798  
            total += n;
815  
            total += n;
799  
            if(ec)
816  
            if(ec)
800  
                co_return {ec, total};
817  
                co_return {ec, total};
801  
            bp.consume(n);
818  
            bp.consume(n);
802  
        }
819  
        }
803  
        co_return {{}, total};
820  
        co_return {{}, total};
804  
    }
821  
    }
805  

822  

806  
    // Synthesized path: pull + buffer_copy + consume
823  
    // Synthesized path: pull + buffer_copy + consume
807  
    for(;;)
824  
    for(;;)
808  
    {
825  
    {
809  
        auto dest = bp.data();
826  
        auto dest = bp.data();
810  
        if(dest.empty())
827  
        if(dest.empty())
811  
            break;
828  
            break;
812  

829  

813  
        const_buffer arr[detail::max_iovec_];
830  
        const_buffer arr[detail::max_iovec_];
814  
        auto [ec, bufs] = co_await pull(arr);
831  
        auto [ec, bufs] = co_await pull(arr);
815  

832  

816  
        if(ec)
833  
        if(ec)
817  
            co_return {ec, total};
834  
            co_return {ec, total};
818  

835  

819  
        auto n = buffer_copy(dest, bufs);
836  
        auto n = buffer_copy(dest, bufs);
820  
        consume(n);
837  
        consume(n);
821  
        total += n;
838  
        total += n;
822  
        bp.consume(n);
839  
        bp.consume(n);
823  
    }
840  
    }
824  

841  

825  
    co_return {{}, total};
842  
    co_return {{}, total};
826  
}
843  
}
 
844 +

 
845 +
//----------------------------------------------------------
827  

846  

828  
static_assert(BufferSource<any_buffer_source>);
847  
static_assert(BufferSource<any_buffer_source>);
829  
static_assert(ReadSource<any_buffer_source>);
848  
static_assert(ReadSource<any_buffer_source>);
830  

849  

831  
} // namespace capy
850  
} // namespace capy
832  
} // namespace boost
851  
} // namespace boost
833  

852  

834  
#endif
853  
#endif