1// 2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) 3// 4// Distributed under the Boost Software License, Version 1.0. (See accompanying 5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6// 7// Official repository: https://github.com/boostorg/beast 8// 9 10#ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP 11#define BOOST_BEAST_TEST_IMPL_STREAM_IPP 12 13#include <boost/beast/_experimental/test/stream.hpp> 14#include <boost/beast/core/bind_handler.hpp> 15#include <boost/beast/core/buffer_traits.hpp> 16#include <boost/make_shared.hpp> 17#include <stdexcept> 18#include <vector> 19 20namespace boost { 21namespace beast { 22namespace test { 23 24//------------------------------------------------------------------------------ 25 26template<class Executor> 27void basic_stream<Executor>::initiate_read( 28 boost::shared_ptr<detail::stream_state> const& in_, 29 std::unique_ptr<detail::stream_read_op_base>&& op, 30 std::size_t buf_size) 31{ 32 std::unique_lock<std::mutex> lock(in_->m); 33 34 ++in_->nread; 35 if(in_->op != nullptr) 36 BOOST_THROW_EXCEPTION( 37 std::logic_error{"in_->op != nullptr"}); 38 39 // test failure 40 error_code ec; 41 if(in_->fc && in_->fc->fail(ec)) 42 { 43 lock.unlock(); 44 (*op)(ec); 45 return; 46 } 47 48 // A request to read 0 bytes from a stream is a no-op. 49 if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0) 50 { 51 lock.unlock(); 52 (*op)(ec); 53 return; 54 } 55 56 // deliver error 57 if(in_->code != detail::stream_status::ok) 58 { 59 lock.unlock(); 60 (*op)(net::error::eof); 61 return; 62 } 63 64 // complete when bytes available or closed 65 in_->op = std::move(op); 66} 67 68//------------------------------------------------------------------------------ 69 70template<class Executor> 71basic_stream<Executor>:: 72~basic_stream() 73{ 74 close(); 75 in_->remove(); 76} 77 78template<class Executor> 79basic_stream<Executor>:: 80basic_stream(basic_stream&& other) 81{ 82 auto in = detail::stream_service::make_impl( 83 other.in_->exec, other.in_->fc); 84 in_ = std::move(other.in_); 85 out_ = std::move(other.out_); 86 other.in_ = in; 87} 88 89 90template<class Executor> 91basic_stream<Executor>& 92basic_stream<Executor>:: 93operator=(basic_stream&& other) 94{ 95 close(); 96 auto in = detail::stream_service::make_impl( 97 other.in_->exec, other.in_->fc); 98 in_->remove(); 99 in_ = std::move(other.in_); 100 out_ = std::move(other.out_); 101 other.in_ = in; 102 return *this; 103} 104 105//------------------------------------------------------------------------------ 106 107template<class Executor> 108basic_stream<Executor>:: 109basic_stream(executor_type exec) 110 : in_(detail::stream_service::make_impl(std::move(exec), nullptr)) 111{ 112} 113 114template<class Executor> 115basic_stream<Executor>:: 116basic_stream( 117 net::io_context& ioc, 118 fail_count& fc) 119 : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc)) 120{ 121} 122 123template<class Executor> 124basic_stream<Executor>:: 125basic_stream( 126 net::io_context& ioc, 127 string_view s) 128 : in_(detail::stream_service::make_impl(ioc.get_executor(), nullptr)) 129{ 130 in_->b.commit(net::buffer_copy( 131 in_->b.prepare(s.size()), 132 net::buffer(s.data(), s.size()))); 133} 134 135template<class Executor> 136basic_stream<Executor>:: 137basic_stream( 138 net::io_context& ioc, 139 fail_count& fc, 140 string_view s) 141 : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc)) 142{ 143 in_->b.commit(net::buffer_copy( 144 in_->b.prepare(s.size()), 145 net::buffer(s.data(), s.size()))); 146} 147 148template<class Executor> 149void 150basic_stream<Executor>:: 151connect(basic_stream& remote) 152{ 153 BOOST_ASSERT(! out_.lock()); 154 BOOST_ASSERT(! remote.out_.lock()); 155 std::lock(in_->m, remote.in_->m); 156 std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock}; 157 std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock}; 158 out_ = remote.in_; 159 remote.out_ = in_; 160 in_->code = detail::stream_status::ok; 161 remote.in_->code = detail::stream_status::ok; 162} 163 164template<class Executor> 165string_view 166basic_stream<Executor>:: 167str() const 168{ 169 auto const bs = in_->b.data(); 170 if(buffer_bytes(bs) == 0) 171 return {}; 172 net::const_buffer const b = *net::buffer_sequence_begin(bs); 173 return {static_cast<char const*>(b.data()), b.size()}; 174} 175 176template<class Executor> 177void 178basic_stream<Executor>:: 179append(string_view s) 180{ 181 std::lock_guard<std::mutex> lock{in_->m}; 182 in_->b.commit(net::buffer_copy( 183 in_->b.prepare(s.size()), 184 net::buffer(s.data(), s.size()))); 185} 186 187template<class Executor> 188void 189basic_stream<Executor>:: 190clear() 191{ 192 std::lock_guard<std::mutex> lock{in_->m}; 193 in_->b.consume(in_->b.size()); 194} 195 196template<class Executor> 197void 198basic_stream<Executor>:: 199close() 200{ 201 in_->cancel_read(); 202 203 // disconnect 204 { 205 auto out = out_.lock(); 206 out_.reset(); 207 208 // notify peer 209 if(out) 210 { 211 std::lock_guard<std::mutex> lock(out->m); 212 if(out->code == detail::stream_status::ok) 213 { 214 out->code = detail::stream_status::eof; 215 out->notify_read(); 216 } 217 } 218 } 219} 220 221template<class Executor> 222void 223basic_stream<Executor>:: 224close_remote() 225{ 226 std::lock_guard<std::mutex> lock{in_->m}; 227 if(in_->code == detail::stream_status::ok) 228 { 229 in_->code = detail::stream_status::eof; 230 in_->notify_read(); 231 } 232} 233 234template<class Executor> 235void 236teardown( 237 role_type, 238 basic_stream<Executor>& s, 239 boost::system::error_code& ec) 240{ 241 if( s.in_->fc && 242 s.in_->fc->fail(ec)) 243 return; 244 245 s.close(); 246 247 if( s.in_->fc && 248 s.in_->fc->fail(ec)) 249 ec = net::error::eof; 250 else 251 ec = {}; 252} 253 254//------------------------------------------------------------------------------ 255 256template<class Executor> 257basic_stream<Executor> 258connect(basic_stream<Executor>& to) 259{ 260 basic_stream<Executor> from(to.get_executor()); 261 from.connect(to); 262 return from; 263} 264 265template<class Executor> 266void 267connect(basic_stream<Executor>& s1, basic_stream<Executor>& s2) 268{ 269 s1.connect(s2); 270} 271 272} // test 273} // beast 274} // boost 275 276#endif 277