1 //
2 // composed_8.cpp
3 // ~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <boost/asio/compose.hpp>
12 #include <boost/asio/coroutine.hpp>
13 #include <boost/asio/io_context.hpp>
14 #include <boost/asio/ip/tcp.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 #include <boost/asio/use_future.hpp>
17 #include <boost/asio/write.hpp>
18 #include <functional>
19 #include <iostream>
20 #include <memory>
21 #include <sstream>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25
26 using boost::asio::ip::tcp;
27
28 // NOTE: This example requires the new boost::asio::async_compose function. For
29 // an example that works with the Networking TS style of completion tokens,
30 // please see an older version of asio.
31
32 //------------------------------------------------------------------------------
33
34 // This composed operation shows composition of multiple underlying operations,
35 // using asio's stackless coroutines support to express the flow of control. It
36 // automatically serialises a message, using its I/O streams insertion
37 // operator, before sending it N times on the socket. To do this, it must
38 // allocate a buffer for the encoded message and ensure this buffer's validity
39 // until all underlying async_write operation complete. A one second delay is
40 // inserted prior to each write operation, using a steady_timer.
41
42 #include <boost/asio/yield.hpp>
43
44 template <typename T, typename CompletionToken>
async_write_messages(tcp::socket & socket,const T & message,std::size_t repeat_count,CompletionToken && token)45 auto async_write_messages(tcp::socket& socket,
46 const T& message, std::size_t repeat_count,
47 CompletionToken&& token)
48 // The return type of the initiating function is deduced from the combination
49 // of CompletionToken type and the completion handler's signature. When the
50 // completion token is a simple callback, the return type is always void.
51 // In this example, when the completion token is boost::asio::yield_context
52 // (used for stackful coroutines) the return type would be also be void, as
53 // there is no non-error argument to the completion handler. When the
54 // completion token is boost::asio::use_future it would be std::future<void>.
55 //
56 // In C++14 we can omit the return type as it is automatically deduced from
57 // the return type of boost::asio::async_initiate.
58 {
59 // Encode the message and copy it into an allocated buffer. The buffer will
60 // be maintained for the lifetime of the composed asynchronous operation.
61 std::ostringstream os;
62 os << message;
63 std::unique_ptr<std::string> encoded_message(new std::string(os.str()));
64
65 // Create a steady_timer to be used for the delay between messages.
66 std::unique_ptr<boost::asio::steady_timer> delay_timer(
67 new boost::asio::steady_timer(socket.get_executor()));
68
69 // The boost::asio::async_compose function takes:
70 //
71 // - our asynchronous operation implementation,
72 // - the completion token,
73 // - the completion handler signature, and
74 // - any I/O objects (or executors) used by the operation
75 //
76 // It then wraps our implementation, which is implemented here as a stackless
77 // coroutine in a lambda, in an intermediate completion handler that meets the
78 // requirements of a conforming asynchronous operation. This includes
79 // tracking outstanding work against the I/O executors associated with the
80 // operation (in this example, this is the socket's executor).
81 //
82 // The first argument to our lambda is a reference to the enclosing
83 // intermediate completion handler. This intermediate completion handler is
84 // provided for us by the boost::asio::async_compose function, and takes care
85 // of all the details required to implement a conforming asynchronous
86 // operation. When calling an underlying asynchronous operation, we pass it
87 // this enclosing intermediate completion handler as the completion token.
88 //
89 // All arguments to our lambda after the first must be defaulted to allow the
90 // state machine to be started, as well as to allow the completion handler to
91 // match the completion signature of both the async_write and
92 // steady_timer::async_wait operations.
93 return boost::asio::async_compose<
94 CompletionToken, void(boost::system::error_code)>(
95 [
96 // The implementation holds a reference to the socket as it is used for
97 // multiple async_write operations.
98 &socket,
99
100 // The allocated buffer for the encoded message. The std::unique_ptr
101 // smart pointer is move-only, and as a consequence our lambda
102 // implementation is also move-only.
103 encoded_message = std::move(encoded_message),
104
105 // The repeat count remaining.
106 repeat_count,
107
108 // A steady timer used for introducing a delay.
109 delay_timer = std::move(delay_timer),
110
111 // The coroutine state.
112 coro = boost::asio::coroutine()
113 ]
114 (
115 auto& self,
116 const boost::system::error_code& error = {},
117 std::size_t = 0
118 ) mutable
119 {
120 reenter (coro)
121 {
122 while (repeat_count > 0)
123 {
124 --repeat_count;
125
126 delay_timer->expires_after(std::chrono::seconds(1));
127 yield delay_timer->async_wait(std::move(self));
128 if (error)
129 break;
130
131 yield boost::asio::async_write(socket,
132 boost::asio::buffer(*encoded_message), std::move(self));
133 if (error)
134 break;
135 }
136
137 // Deallocate the encoded message and delay timer before calling the
138 // user-supplied completion handler.
139 encoded_message.reset();
140 delay_timer.reset();
141
142 // Call the user-supplied handler with the result of the operation.
143 self.complete(error);
144 }
145 },
146 token, socket);
147 }
148
149 #include <boost/asio/unyield.hpp>
150
151 //------------------------------------------------------------------------------
152
test_callback()153 void test_callback()
154 {
155 boost::asio::io_context io_context;
156
157 tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
158 tcp::socket socket = acceptor.accept();
159
160 // Test our asynchronous operation using a lambda as a callback.
161 async_write_messages(socket, "Testing callback\r\n", 5,
162 [](const boost::system::error_code& error)
163 {
164 if (!error)
165 {
166 std::cout << "Messages sent\n";
167 }
168 else
169 {
170 std::cout << "Error: " << error.message() << "\n";
171 }
172 });
173
174 io_context.run();
175 }
176
177 //------------------------------------------------------------------------------
178
test_future()179 void test_future()
180 {
181 boost::asio::io_context io_context;
182
183 tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
184 tcp::socket socket = acceptor.accept();
185
186 // Test our asynchronous operation using the use_future completion token.
187 // This token causes the operation's initiating function to return a future,
188 // which may be used to synchronously wait for the result of the operation.
189 std::future<void> f = async_write_messages(
190 socket, "Testing future\r\n", 5, boost::asio::use_future);
191
192 io_context.run();
193
194 try
195 {
196 // Get the result of the operation.
197 f.get();
198 std::cout << "Messages sent\n";
199 }
200 catch (const std::exception& e)
201 {
202 std::cout << "Error: " << e.what() << "\n";
203 }
204 }
205
206 //------------------------------------------------------------------------------
207
main()208 int main()
209 {
210 test_callback();
211 test_future();
212 }
213