• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
11#define BOOST_BEAST_TEST_IMPL_STREAM_IPP
12
13#include <boost/beast/_experimental/test/stream.hpp>
14#include <boost/beast/core/bind_handler.hpp>
15#include <boost/beast/core/buffer_traits.hpp>
16#include <boost/make_shared.hpp>
17#include <stdexcept>
18#include <vector>
19
20namespace boost {
21namespace beast {
22namespace test {
23
24//------------------------------------------------------------------------------
25
26template<class Executor>
27void basic_stream<Executor>::initiate_read(
28    boost::shared_ptr<detail::stream_state> const& in_,
29    std::unique_ptr<detail::stream_read_op_base>&& op,
30    std::size_t buf_size)
31{
32    std::unique_lock<std::mutex> lock(in_->m);
33
34    ++in_->nread;
35    if(in_->op != nullptr)
36        BOOST_THROW_EXCEPTION(
37            std::logic_error{"in_->op != nullptr"});
38
39    // test failure
40    error_code ec;
41    if(in_->fc && in_->fc->fail(ec))
42    {
43        lock.unlock();
44        (*op)(ec);
45        return;
46    }
47
48    // A request to read 0 bytes from a stream is a no-op.
49    if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
50    {
51        lock.unlock();
52        (*op)(ec);
53        return;
54    }
55
56    // deliver error
57    if(in_->code != detail::stream_status::ok)
58    {
59        lock.unlock();
60        (*op)(net::error::eof);
61        return;
62    }
63
64    // complete when bytes available or closed
65    in_->op = std::move(op);
66}
67
68//------------------------------------------------------------------------------
69
70template<class Executor>
71basic_stream<Executor>::
72~basic_stream()
73{
74    close();
75    in_->remove();
76}
77
78template<class Executor>
79basic_stream<Executor>::
80basic_stream(basic_stream&& other)
81{
82    auto in = detail::stream_service::make_impl(
83        other.in_->exec, other.in_->fc);
84    in_ = std::move(other.in_);
85    out_ = std::move(other.out_);
86    other.in_ = in;
87}
88
89
90template<class Executor>
91basic_stream<Executor>&
92basic_stream<Executor>::
93operator=(basic_stream&& other)
94{
95    close();
96    auto in = detail::stream_service::make_impl(
97        other.in_->exec, other.in_->fc);
98    in_->remove();
99    in_ = std::move(other.in_);
100    out_ = std::move(other.out_);
101    other.in_ = in;
102    return *this;
103}
104
105//------------------------------------------------------------------------------
106
107template<class Executor>
108basic_stream<Executor>::
109basic_stream(executor_type exec)
110    : in_(detail::stream_service::make_impl(std::move(exec), nullptr))
111{
112}
113
114template<class Executor>
115basic_stream<Executor>::
116basic_stream(
117    net::io_context& ioc,
118    fail_count& fc)
119    : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
120{
121}
122
123template<class Executor>
124basic_stream<Executor>::
125basic_stream(
126    net::io_context& ioc,
127    string_view s)
128    : in_(detail::stream_service::make_impl(ioc.get_executor(), nullptr))
129{
130    in_->b.commit(net::buffer_copy(
131        in_->b.prepare(s.size()),
132        net::buffer(s.data(), s.size())));
133}
134
135template<class Executor>
136basic_stream<Executor>::
137basic_stream(
138    net::io_context& ioc,
139    fail_count& fc,
140    string_view s)
141    : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
142{
143    in_->b.commit(net::buffer_copy(
144        in_->b.prepare(s.size()),
145        net::buffer(s.data(), s.size())));
146}
147
148template<class Executor>
149void
150basic_stream<Executor>::
151connect(basic_stream& remote)
152{
153    BOOST_ASSERT(! out_.lock());
154    BOOST_ASSERT(! remote.out_.lock());
155    std::lock(in_->m, remote.in_->m);
156    std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
157    std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
158    out_ = remote.in_;
159    remote.out_ = in_;
160    in_->code = detail::stream_status::ok;
161    remote.in_->code = detail::stream_status::ok;
162}
163
164template<class Executor>
165string_view
166basic_stream<Executor>::
167str() const
168{
169    auto const bs = in_->b.data();
170    if(buffer_bytes(bs) == 0)
171        return {};
172    net::const_buffer const b = *net::buffer_sequence_begin(bs);
173    return {static_cast<char const*>(b.data()), b.size()};
174}
175
176template<class Executor>
177void
178basic_stream<Executor>::
179append(string_view s)
180{
181    std::lock_guard<std::mutex> lock{in_->m};
182    in_->b.commit(net::buffer_copy(
183        in_->b.prepare(s.size()),
184        net::buffer(s.data(), s.size())));
185}
186
187template<class Executor>
188void
189basic_stream<Executor>::
190clear()
191{
192    std::lock_guard<std::mutex> lock{in_->m};
193    in_->b.consume(in_->b.size());
194}
195
196template<class Executor>
197void
198basic_stream<Executor>::
199close()
200{
201    in_->cancel_read();
202
203    // disconnect
204    {
205        auto out = out_.lock();
206        out_.reset();
207
208        // notify peer
209        if(out)
210        {
211            std::lock_guard<std::mutex> lock(out->m);
212            if(out->code == detail::stream_status::ok)
213            {
214                out->code = detail::stream_status::eof;
215                out->notify_read();
216            }
217        }
218    }
219}
220
221template<class Executor>
222void
223basic_stream<Executor>::
224close_remote()
225{
226    std::lock_guard<std::mutex> lock{in_->m};
227    if(in_->code == detail::stream_status::ok)
228    {
229        in_->code = detail::stream_status::eof;
230        in_->notify_read();
231    }
232}
233
234template<class Executor>
235void
236teardown(
237    role_type,
238    basic_stream<Executor>& s,
239    boost::system::error_code& ec)
240{
241    if( s.in_->fc &&
242        s.in_->fc->fail(ec))
243        return;
244
245    s.close();
246
247    if( s.in_->fc &&
248        s.in_->fc->fail(ec))
249        ec = net::error::eof;
250    else
251        ec = {};
252}
253
254//------------------------------------------------------------------------------
255
256template<class Executor>
257basic_stream<Executor>
258connect(basic_stream<Executor>& to)
259{
260    basic_stream<Executor> from(to.get_executor());
261    from.connect(to);
262    return from;
263}
264
265template<class Executor>
266void
267connect(basic_stream<Executor>& s1, basic_stream<Executor>& s2)
268{
269    s1.connect(s2);
270}
271
272} // test
273} // beast
274} // boost
275
276#endif
277