1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
19  
#include <coroutine>
19  
#include <coroutine>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  

22  

23  
#include <concepts>
23  
#include <concepts>
24  
#include <coroutine>
24  
#include <coroutine>
25  
#include <cstddef>
25  
#include <cstddef>
26  
#include <exception>
26  
#include <exception>
27  
#include <new>
27  
#include <new>
28  
#include <span>
28  
#include <span>
29  
#include <stop_token>
29  
#include <stop_token>
30  
#include <system_error>
30  
#include <system_error>
31  
#include <utility>
31  
#include <utility>
32  

32  

33  
namespace boost {
33  
namespace boost {
34  
namespace capy {
34  
namespace capy {
35  

35  

36  
/** Type-erased wrapper for any WriteStream.
36  
/** Type-erased wrapper for any WriteStream.
37  

37  

38  
    This class provides type erasure for any type satisfying the
38  
    This class provides type erasure for any type satisfying the
39  
    @ref WriteStream concept, enabling runtime polymorphism for
39  
    @ref WriteStream concept, enabling runtime polymorphism for
40  
    write operations. It uses cached awaitable storage to achieve
40  
    write operations. It uses cached awaitable storage to achieve
41  
    zero steady-state allocation after construction.
41  
    zero steady-state allocation after construction.
42  

42  

43  
    The wrapper supports two construction modes:
43  
    The wrapper supports two construction modes:
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
      allocates storage and owns the stream.
45  
      allocates storage and owns the stream.
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
      pointed-to stream must outlive this wrapper.
47  
      pointed-to stream must outlive this wrapper.
48  

48  

49  
    @par Awaitable Preallocation
49  
    @par Awaitable Preallocation
50  
    The constructor preallocates storage for the type-erased awaitable.
50  
    The constructor preallocates storage for the type-erased awaitable.
51  
    This reserves all virtual address space at server startup
51  
    This reserves all virtual address space at server startup
52  
    so memory usage can be measured up front, rather than
52  
    so memory usage can be measured up front, rather than
53  
    allocating piecemeal as traffic arrives.
53  
    allocating piecemeal as traffic arrives.
54  

54  

55  
    @par Immediate Completion
55  
    @par Immediate Completion
56  
    Operations complete immediately without suspending when the
56  
    Operations complete immediately without suspending when the
57  
    buffer sequence is empty, or when the underlying stream's
57  
    buffer sequence is empty, or when the underlying stream's
58  
    awaitable reports readiness via `await_ready`.
58  
    awaitable reports readiness via `await_ready`.
59  

59  

60  
    @par Thread Safety
60  
    @par Thread Safety
61  
    Not thread-safe. Concurrent operations on the same wrapper
61  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    are undefined behavior.
62  
    are undefined behavior.
63  

63  

64  
    @par Example
64  
    @par Example
65  
    @code
65  
    @code
66  
    // Owning - takes ownership of the stream
66  
    // Owning - takes ownership of the stream
67  
    any_write_stream stream(socket{ioc});
67  
    any_write_stream stream(socket{ioc});
68  

68  

69  
    // Reference - wraps without ownership
69  
    // Reference - wraps without ownership
70  
    socket sock(ioc);
70  
    socket sock(ioc);
71  
    any_write_stream stream(&sock);
71  
    any_write_stream stream(&sock);
72  

72  

73  
    const_buffer buf(data, size);
73  
    const_buffer buf(data, size);
74  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
74  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75  
    @endcode
75  
    @endcode
76  

76  

77  
    @see any_read_stream, any_stream, WriteStream
77  
    @see any_read_stream, any_stream, WriteStream
78  
*/
78  
*/
79  
class any_write_stream
79  
class any_write_stream
80  
{
80  
{
81  
    struct vtable;
81  
    struct vtable;
82  

82  

83  
    template<WriteStream S>
83  
    template<WriteStream S>
84  
    struct vtable_for_impl;
84  
    struct vtable_for_impl;
85  

85  

86  
    // ordered for cache line coherence
86  
    // ordered for cache line coherence
87  
    void* stream_ = nullptr;
87  
    void* stream_ = nullptr;
88  
    vtable const* vt_ = nullptr;
88  
    vtable const* vt_ = nullptr;
89  
    void* cached_awaitable_ = nullptr;
89  
    void* cached_awaitable_ = nullptr;
90  
    void* storage_ = nullptr;
90  
    void* storage_ = nullptr;
91  
    bool awaitable_active_ = false;
91  
    bool awaitable_active_ = false;
92  

92  

93  
public:
93  
public:
94  
    /** Destructor.
94  
    /** Destructor.
95  

95  

96  
        Destroys the owned stream (if any) and releases the cached
96  
        Destroys the owned stream (if any) and releases the cached
97  
        awaitable storage.
97  
        awaitable storage.
98  
    */
98  
    */
99  
    ~any_write_stream();
99  
    ~any_write_stream();
100  

100  

101 -
    /** Construct a default instance.
101 +
    /** Default constructor.
102  

102  

103  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        wrapper result in undefined behavior.
104  
        wrapper result in undefined behavior.
105  
    */
105  
    */
106  
    any_write_stream() = default;
106  
    any_write_stream() = default;
107  

107  

108  
    /** Non-copyable.
108  
    /** Non-copyable.
109  

109  

110  
        The awaitable cache is per-instance and cannot be shared.
110  
        The awaitable cache is per-instance and cannot be shared.
111  
    */
111  
    */
112  
    any_write_stream(any_write_stream const&) = delete;
112  
    any_write_stream(any_write_stream const&) = delete;
113  
    any_write_stream& operator=(any_write_stream const&) = delete;
113  
    any_write_stream& operator=(any_write_stream const&) = delete;
114  

114  

115 -
    /** Construct by moving.
115 +
    /** Move constructor.
116  

116  

117  
        Transfers ownership of the wrapped stream (if owned) and
117  
        Transfers ownership of the wrapped stream (if owned) and
118  
        cached awaitable storage from `other`. After the move, `other` is
118  
        cached awaitable storage from `other`. After the move, `other` is
119  
        in a default-constructed state.
119  
        in a default-constructed state.
120  

120  

121  
        @param other The wrapper to move from.
121  
        @param other The wrapper to move from.
122  
    */
122  
    */
123  
    any_write_stream(any_write_stream&& other) noexcept
123  
    any_write_stream(any_write_stream&& other) noexcept
124  
        : stream_(std::exchange(other.stream_, nullptr))
124  
        : stream_(std::exchange(other.stream_, nullptr))
125  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
128  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
129  
    {
129  
    {
130  
    }
130  
    }
131  

131  

132 -
    /** Assign by moving.
132 +
    /** Move assignment operator.
133  

133  

134  
        Destroys any owned stream and releases existing resources,
134  
        Destroys any owned stream and releases existing resources,
135  
        then transfers ownership from `other`.
135  
        then transfers ownership from `other`.
136  

136  

137  
        @param other The wrapper to move from.
137  
        @param other The wrapper to move from.
138  
        @return Reference to this wrapper.
138  
        @return Reference to this wrapper.
139  
    */
139  
    */
140  
    any_write_stream&
140  
    any_write_stream&
141  
    operator=(any_write_stream&& other) noexcept;
141  
    operator=(any_write_stream&& other) noexcept;
142  

142  

143  
    /** Construct by taking ownership of a WriteStream.
143  
    /** Construct by taking ownership of a WriteStream.
144  

144  

145  
        Allocates storage and moves the stream into this wrapper.
145  
        Allocates storage and moves the stream into this wrapper.
146  
        The wrapper owns the stream and will destroy it.
146  
        The wrapper owns the stream and will destroy it.
147  

147  

148  
        @param s The stream to take ownership of.
148  
        @param s The stream to take ownership of.
149  
    */
149  
    */
150  
    template<WriteStream S>
150  
    template<WriteStream S>
151  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
151  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152  
    any_write_stream(S s);
152  
    any_write_stream(S s);
153  

153  

154  
    /** Construct by wrapping a WriteStream without ownership.
154  
    /** Construct by wrapping a WriteStream without ownership.
155  

155  

156  
        Wraps the given stream by pointer. The stream must remain
156  
        Wraps the given stream by pointer. The stream must remain
157  
        valid for the lifetime of this wrapper.
157  
        valid for the lifetime of this wrapper.
158  

158  

159  
        @param s Pointer to the stream to wrap.
159  
        @param s Pointer to the stream to wrap.
160  
    */
160  
    */
161  
    template<WriteStream S>
161  
    template<WriteStream S>
162  
    any_write_stream(S* s);
162  
    any_write_stream(S* s);
163  

163  

164  
    /** Check if the wrapper contains a valid stream.
164  
    /** Check if the wrapper contains a valid stream.
165  

165  

166  
        @return `true` if wrapping a stream, `false` if default-constructed
166  
        @return `true` if wrapping a stream, `false` if default-constructed
167  
            or moved-from.
167  
            or moved-from.
168  
    */
168  
    */
169  
    bool
169  
    bool
170  
    has_value() const noexcept
170  
    has_value() const noexcept
171  
    {
171  
    {
172  
        return stream_ != nullptr;
172  
        return stream_ != nullptr;
173  
    }
173  
    }
174  

174  

175  
    /** Check if the wrapper contains a valid stream.
175  
    /** Check if the wrapper contains a valid stream.
176  

176  

177  
        @return `true` if wrapping a stream, `false` if default-constructed
177  
        @return `true` if wrapping a stream, `false` if default-constructed
178  
            or moved-from.
178  
            or moved-from.
179  
    */
179  
    */
180  
    explicit
180  
    explicit
181  
    operator bool() const noexcept
181  
    operator bool() const noexcept
182  
    {
182  
    {
183  
        return has_value();
183  
        return has_value();
184  
    }
184  
    }
185  

185  

186  
    /** Initiate an asynchronous write operation.
186  
    /** Initiate an asynchronous write operation.
187  

187  

188  
        Writes data from the provided buffer sequence. The operation
188  
        Writes data from the provided buffer sequence. The operation
189  
        completes when at least one byte has been written, or an error
189  
        completes when at least one byte has been written, or an error
190  
        occurs.
190  
        occurs.
191  

191  

192  
        @param buffers The buffer sequence containing data to write.
192  
        @param buffers The buffer sequence containing data to write.
193  
            Passed by value to ensure the sequence lives in the
193  
            Passed by value to ensure the sequence lives in the
194  
            coroutine frame across suspension points.
194  
            coroutine frame across suspension points.
195  

195  

196  
        @return An awaitable yielding `(error_code,std::size_t)`.
196  
        @return An awaitable yielding `(error_code,std::size_t)`.
197  

197  

198  
        @par Immediate Completion
198  
        @par Immediate Completion
199  
        The operation completes immediately without suspending
199  
        The operation completes immediately without suspending
200  
        the calling coroutine when:
200  
        the calling coroutine when:
201  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202  
        @li The underlying stream's awaitable reports immediate
202  
        @li The underlying stream's awaitable reports immediate
203  
            readiness via `await_ready`.
203  
            readiness via `await_ready`.
204  

204  

205  
        @note This is a partial operation and may not process the
205  
        @note This is a partial operation and may not process the
206  
        entire buffer sequence. Use the composed @ref write algorithm
206  
        entire buffer sequence. Use the composed @ref write algorithm
207  
        for guaranteed complete transfer.
207  
        for guaranteed complete transfer.
208  

208  

209  
        @par Preconditions
209  
        @par Preconditions
210  
        The wrapper must contain a valid stream (`has_value() == true`).
210  
        The wrapper must contain a valid stream (`has_value() == true`).
211  
    */
211  
    */
212  
    template<ConstBufferSequence CB>
212  
    template<ConstBufferSequence CB>
213  
    auto
213  
    auto
214  
    write_some(CB buffers);
214  
    write_some(CB buffers);
215  

215  

216  
protected:
216  
protected:
217  
    /** Rebind to a new stream after move.
217  
    /** Rebind to a new stream after move.
218  

218  

219  
        Updates the internal pointer to reference a new stream object.
219  
        Updates the internal pointer to reference a new stream object.
220  
        Used by owning wrappers after move assignment when the owned
220  
        Used by owning wrappers after move assignment when the owned
221  
        object has moved to a new location.
221  
        object has moved to a new location.
222  

222  

223  
        @param new_stream The new stream to bind to. Must be the same
223  
        @param new_stream The new stream to bind to. Must be the same
224  
            type as the original stream.
224  
            type as the original stream.
225  

225  

226  
        @note Terminates if called with a stream of different type
226  
        @note Terminates if called with a stream of different type
227  
            than the original.
227  
            than the original.
228  
    */
228  
    */
229  
    template<WriteStream S>
229  
    template<WriteStream S>
230  
    void
230  
    void
231  
    rebind(S& new_stream) noexcept
231  
    rebind(S& new_stream) noexcept
232  
    {
232  
    {
233  
        if(vt_ != &vtable_for_impl<S>::value)
233  
        if(vt_ != &vtable_for_impl<S>::value)
234  
            std::terminate();
234  
            std::terminate();
235  
        stream_ = &new_stream;
235  
        stream_ = &new_stream;
236  
    }
236  
    }
237  
};
237  
};
238  

238  

 
239 +
//----------------------------------------------------------
 
240 +

239  
struct any_write_stream::vtable
241  
struct any_write_stream::vtable
240  
{
242  
{
241  
    // ordered by call frequency for cache line coherence
243  
    // ordered by call frequency for cache line coherence
242  
    void (*construct_awaitable)(
244  
    void (*construct_awaitable)(
243  
        void* stream,
245  
        void* stream,
244  
        void* storage,
246  
        void* storage,
245  
        std::span<const_buffer const> buffers);
247  
        std::span<const_buffer const> buffers);
246  
    bool (*await_ready)(void*);
248  
    bool (*await_ready)(void*);
247  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
249  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248  
    io_result<std::size_t> (*await_resume)(void*);
250  
    io_result<std::size_t> (*await_resume)(void*);
249  
    void (*destroy_awaitable)(void*) noexcept;
251  
    void (*destroy_awaitable)(void*) noexcept;
250  
    std::size_t awaitable_size;
252  
    std::size_t awaitable_size;
251  
    std::size_t awaitable_align;
253  
    std::size_t awaitable_align;
252  
    void (*destroy)(void*) noexcept;
254  
    void (*destroy)(void*) noexcept;
253  
};
255  
};
254  

256  

255  
template<WriteStream S>
257  
template<WriteStream S>
256  
struct any_write_stream::vtable_for_impl
258  
struct any_write_stream::vtable_for_impl
257  
{
259  
{
258  
    using Awaitable = decltype(std::declval<S&>().write_some(
260  
    using Awaitable = decltype(std::declval<S&>().write_some(
259  
        std::span<const_buffer const>{}));
261  
        std::span<const_buffer const>{}));
260  

262  

261  
    static void
263  
    static void
262  
    do_destroy_impl(void* stream) noexcept
264  
    do_destroy_impl(void* stream) noexcept
263  
    {
265  
    {
264  
        static_cast<S*>(stream)->~S();
266  
        static_cast<S*>(stream)->~S();
265  
    }
267  
    }
266  

268  

267  
    static void
269  
    static void
268  
    construct_awaitable_impl(
270  
    construct_awaitable_impl(
269  
        void* stream,
271  
        void* stream,
270  
        void* storage,
272  
        void* storage,
271  
        std::span<const_buffer const> buffers)
273  
        std::span<const_buffer const> buffers)
272  
    {
274  
    {
273  
        auto& s = *static_cast<S*>(stream);
275  
        auto& s = *static_cast<S*>(stream);
274  
        ::new(storage) Awaitable(s.write_some(buffers));
276  
        ::new(storage) Awaitable(s.write_some(buffers));
275  
    }
277  
    }
276  

278  

277  
    static constexpr vtable value = {
279  
    static constexpr vtable value = {
278  
        &construct_awaitable_impl,
280  
        &construct_awaitable_impl,
279  
        +[](void* p) {
281  
        +[](void* p) {
280  
            return static_cast<Awaitable*>(p)->await_ready();
282  
            return static_cast<Awaitable*>(p)->await_ready();
281  
        },
283  
        },
282  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
284  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283  
            return detail::call_await_suspend(
285  
            return detail::call_await_suspend(
284  
                static_cast<Awaitable*>(p), h, env);
286  
                static_cast<Awaitable*>(p), h, env);
285  
        },
287  
        },
286  
        +[](void* p) {
288  
        +[](void* p) {
287  
            return static_cast<Awaitable*>(p)->await_resume();
289  
            return static_cast<Awaitable*>(p)->await_resume();
288  
        },
290  
        },
289  
        +[](void* p) noexcept {
291  
        +[](void* p) noexcept {
290  
            static_cast<Awaitable*>(p)->~Awaitable();
292  
            static_cast<Awaitable*>(p)->~Awaitable();
291  
        },
293  
        },
292  
        sizeof(Awaitable),
294  
        sizeof(Awaitable),
293  
        alignof(Awaitable),
295  
        alignof(Awaitable),
294  
        &do_destroy_impl
296  
        &do_destroy_impl
295  
    };
297  
    };
296  
};
298  
};
297  

299  

 
300 +
//----------------------------------------------------------
 
301 +

298  
inline
302  
inline
299  
any_write_stream::~any_write_stream()
303  
any_write_stream::~any_write_stream()
300  
{
304  
{
301  
    if(storage_)
305  
    if(storage_)
302  
    {
306  
    {
303  
        vt_->destroy(stream_);
307  
        vt_->destroy(stream_);
304  
        ::operator delete(storage_);
308  
        ::operator delete(storage_);
305  
    }
309  
    }
306  
    if(cached_awaitable_)
310  
    if(cached_awaitable_)
307  
    {
311  
    {
308  
        if(awaitable_active_)
312  
        if(awaitable_active_)
309  
            vt_->destroy_awaitable(cached_awaitable_);
313  
            vt_->destroy_awaitable(cached_awaitable_);
310  
        ::operator delete(cached_awaitable_);
314  
        ::operator delete(cached_awaitable_);
311  
    }
315  
    }
312  
}
316  
}
313  

317  

314  
inline any_write_stream&
318  
inline any_write_stream&
315  
any_write_stream::operator=(any_write_stream&& other) noexcept
319  
any_write_stream::operator=(any_write_stream&& other) noexcept
316  
{
320  
{
317  
    if(this != &other)
321  
    if(this != &other)
318  
    {
322  
    {
319  
        if(storage_)
323  
        if(storage_)
320  
        {
324  
        {
321  
            vt_->destroy(stream_);
325  
            vt_->destroy(stream_);
322  
            ::operator delete(storage_);
326  
            ::operator delete(storage_);
323  
        }
327  
        }
324  
        if(cached_awaitable_)
328  
        if(cached_awaitable_)
325  
        {
329  
        {
326  
            if(awaitable_active_)
330  
            if(awaitable_active_)
327  
                vt_->destroy_awaitable(cached_awaitable_);
331  
                vt_->destroy_awaitable(cached_awaitable_);
328  
            ::operator delete(cached_awaitable_);
332  
            ::operator delete(cached_awaitable_);
329  
        }
333  
        }
330  
        stream_ = std::exchange(other.stream_, nullptr);
334  
        stream_ = std::exchange(other.stream_, nullptr);
331  
        vt_ = std::exchange(other.vt_, nullptr);
335  
        vt_ = std::exchange(other.vt_, nullptr);
332  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
333  
        storage_ = std::exchange(other.storage_, nullptr);
337  
        storage_ = std::exchange(other.storage_, nullptr);
334  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
338  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
335  
    }
339  
    }
336  
    return *this;
340  
    return *this;
337  
}
341  
}
338  

342  

339  
template<WriteStream S>
343  
template<WriteStream S>
340  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
344  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
341  
any_write_stream::any_write_stream(S s)
345  
any_write_stream::any_write_stream(S s)
342  
    : vt_(&vtable_for_impl<S>::value)
346  
    : vt_(&vtable_for_impl<S>::value)
343  
{
347  
{
344  
    struct guard {
348  
    struct guard {
345  
        any_write_stream* self;
349  
        any_write_stream* self;
346  
        bool committed = false;
350  
        bool committed = false;
347  
        ~guard() {
351  
        ~guard() {
348  
            if(!committed && self->storage_) {
352  
            if(!committed && self->storage_) {
349  
                self->vt_->destroy(self->stream_);
353  
                self->vt_->destroy(self->stream_);
350  
                ::operator delete(self->storage_);
354  
                ::operator delete(self->storage_);
351  
                self->storage_ = nullptr;
355  
                self->storage_ = nullptr;
352  
                self->stream_ = nullptr;
356  
                self->stream_ = nullptr;
353  
            }
357  
            }
354  
        }
358  
        }
355  
    } g{this};
359  
    } g{this};
356  

360  

357  
    storage_ = ::operator new(sizeof(S));
361  
    storage_ = ::operator new(sizeof(S));
358  
    stream_ = ::new(storage_) S(std::move(s));
362  
    stream_ = ::new(storage_) S(std::move(s));
359  

363  

360  
    // Preallocate the awaitable storage
364  
    // Preallocate the awaitable storage
361  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
365  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
362  

366  

363  
    g.committed = true;
367  
    g.committed = true;
364  
}
368  
}
365  

369  

366  
template<WriteStream S>
370  
template<WriteStream S>
367  
any_write_stream::any_write_stream(S* s)
371  
any_write_stream::any_write_stream(S* s)
368  
    : stream_(s)
372  
    : stream_(s)
369  
    , vt_(&vtable_for_impl<S>::value)
373  
    , vt_(&vtable_for_impl<S>::value)
370  
{
374  
{
371  
    // Preallocate the awaitable storage
375  
    // Preallocate the awaitable storage
372  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
376  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
373  
}
377  
}
 
378 +

 
379 +
//----------------------------------------------------------
374  

380  

375  
template<ConstBufferSequence CB>
381  
template<ConstBufferSequence CB>
376  
auto
382  
auto
377  
any_write_stream::write_some(CB buffers)
383  
any_write_stream::write_some(CB buffers)
378  
{
384  
{
379  
    struct awaitable
385  
    struct awaitable
380  
    {
386  
    {
381  
        any_write_stream* self_;
387  
        any_write_stream* self_;
382  
        const_buffer_array<detail::max_iovec_> ba_;
388  
        const_buffer_array<detail::max_iovec_> ba_;
383  

389  

384  
        awaitable(
390  
        awaitable(
385  
            any_write_stream* self,
391  
            any_write_stream* self,
386  
            CB const& buffers) noexcept
392  
            CB const& buffers) noexcept
387  
            : self_(self)
393  
            : self_(self)
388  
            , ba_(buffers)
394  
            , ba_(buffers)
389  
        {
395  
        {
390  
        }
396  
        }
391  

397  

392  
        bool
398  
        bool
393  
        await_ready() const noexcept
399  
        await_ready() const noexcept
394  
        {
400  
        {
395  
            return ba_.to_span().empty();
401  
            return ba_.to_span().empty();
396  
        }
402  
        }
397  

403  

398  
        std::coroutine_handle<>
404  
        std::coroutine_handle<>
399  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
405  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
400  
        {
406  
        {
401  
            self_->vt_->construct_awaitable(
407  
            self_->vt_->construct_awaitable(
402  
                self_->stream_,
408  
                self_->stream_,
403  
                self_->cached_awaitable_,
409  
                self_->cached_awaitable_,
404  
                ba_.to_span());
410  
                ba_.to_span());
405  
            self_->awaitable_active_ = true;
411  
            self_->awaitable_active_ = true;
406  

412  

407  
            if(self_->vt_->await_ready(self_->cached_awaitable_))
413  
            if(self_->vt_->await_ready(self_->cached_awaitable_))
408  
                return h;
414  
                return h;
409  

415  

410  
            return self_->vt_->await_suspend(
416  
            return self_->vt_->await_suspend(
411  
                self_->cached_awaitable_, h, env);
417  
                self_->cached_awaitable_, h, env);
412  
        }
418  
        }
413  

419  

414  
        io_result<std::size_t>
420  
        io_result<std::size_t>
415  
        await_resume()
421  
        await_resume()
416  
        {
422  
        {
417  
            if(!self_->awaitable_active_)
423  
            if(!self_->awaitable_active_)
418  
                return {{}, 0};
424  
                return {{}, 0};
419  
            struct guard {
425  
            struct guard {
420  
                any_write_stream* self;
426  
                any_write_stream* self;
421  
                ~guard() {
427  
                ~guard() {
422  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
428  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
423  
                    self->awaitable_active_ = false;
429  
                    self->awaitable_active_ = false;
424  
                }
430  
                }
425  
            } g{self_};
431  
            } g{self_};
426  
            return self_->vt_->await_resume(
432  
            return self_->vt_->await_resume(
427  
                self_->cached_awaitable_);
433  
                self_->cached_awaitable_);
428  
        }
434  
        }
429  
    };
435  
    };
430  
    return awaitable{this, buffers};
436  
    return awaitable{this, buffers};
431  
}
437  
}
432  

438  

433  
} // namespace capy
439  
} // namespace capy
434  
} // namespace boost
440  
} // namespace boost
435  

441  

436  
#endif
442  
#endif