• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // composed_7.cpp
3 // ~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <boost/asio/compose.hpp>
12 #include <boost/asio/io_context.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/steady_timer.hpp>
15 #include <boost/asio/use_future.hpp>
16 #include <boost/asio/write.hpp>
17 #include <functional>
18 #include <iostream>
19 #include <memory>
20 #include <sstream>
21 #include <string>
22 #include <type_traits>
23 #include <utility>
24 
25 using boost::asio::ip::tcp;
26 
27 // NOTE: This example requires the new boost::asio::async_compose function. For
28 // an example that works with the Networking TS style of completion tokens,
29 // please see an older version of asio.
30 
31 //------------------------------------------------------------------------------
32 
33 // This composed operation shows composition of multiple underlying operations.
34 // It automatically serialises a message, using its I/O streams insertion
35 // operator, before sending it N times on the socket. To do this, it must
36 // allocate a buffer for the encoded message and ensure this buffer's validity
37 // until all underlying async_write operation complete. A one second delay is
38 // inserted prior to each write operation, using a steady_timer.
39 
40 // In this example, the composed operation's logic is implemented as a state
41 // machine within a hand-crafted function object.
42 struct async_write_messages_implementation
43 {
44   // The implementation holds a reference to the socket as it is used for
45   // multiple async_write operations.
46   tcp::socket& socket_;
47 
48   // The allocated buffer for the encoded message. The std::unique_ptr smart
49   // pointer is move-only, and as a consequence our implementation is also
50   // move-only.
51   std::unique_ptr<std::string> encoded_message_;
52 
53   // The repeat count remaining.
54   std::size_t repeat_count_;
55 
56   // A steady timer used for introducing a delay.
57   std::unique_ptr<boost::asio::steady_timer> delay_timer_;
58 
59   // To manage the cycle between the multiple underlying asychronous
60   // operations, our implementation is a state machine.
61   enum { starting, waiting, writing } state_;
62 
63   // The first argument to our function object's call operator is a reference
64   // to the enclosing intermediate completion handler. This intermediate
65   // completion handler is provided for us by the boost::asio::async_compose
66   // function, and takes care of all the details required to implement a
67   // conforming asynchronous operation. When calling an underlying asynchronous
68   // operation, we pass it this enclosing intermediate completion handler
69   // as the completion token.
70   //
71   // All arguments after the first must be defaulted to allow the state machine
72   // to be started, as well as to allow the completion handler to match the
73   // completion signature of both the async_write and steady_timer::async_wait
74   // operations.
75   template <typename Self>
operator ()async_write_messages_implementation76   void operator()(Self& self,
77       const boost::system::error_code& error = boost::system::error_code(),
78       std::size_t = 0)
79   {
80     if (!error)
81     {
82       switch (state_)
83       {
84       case starting:
85       case writing:
86         if (repeat_count_ > 0)
87         {
88           --repeat_count_;
89           state_ = waiting;
90           delay_timer_->expires_after(std::chrono::seconds(1));
91           delay_timer_->async_wait(std::move(self));
92           return; // Composed operation not yet complete.
93         }
94         break; // Composed operation complete, continue below.
95       case waiting:
96         state_ = writing;
97         boost::asio::async_write(socket_,
98             boost::asio::buffer(*encoded_message_), std::move(self));
99         return; // Composed operation not yet complete.
100       }
101     }
102 
103     // This point is reached only on completion of the entire composed
104     // operation.
105 
106     // Deallocate the encoded message and delay timer before calling the
107     // user-supplied completion handler.
108     encoded_message_.reset();
109     delay_timer_.reset();
110 
111     // Call the user-supplied handler with the result of the operation.
112     self.complete(error);
113   }
114 };
115 
116 template <typename T, typename CompletionToken>
async_write_messages(tcp::socket & socket,const T & message,std::size_t repeat_count,CompletionToken && token)117 auto async_write_messages(tcp::socket& socket,
118     const T& message, std::size_t repeat_count,
119     CompletionToken&& token)
120   // The return type of the initiating function is deduced from the combination
121   // of CompletionToken type and the completion handler's signature. When the
122   // completion token is a simple callback, the return type is always void.
123   // In this example, when the completion token is boost::asio::yield_context
124   // (used for stackful coroutines) the return type would be also be void, as
125   // there is no non-error argument to the completion handler. When the
126   // completion token is boost::asio::use_future it would be std::future<void>.
127   -> typename boost::asio::async_result<
128     typename std::decay<CompletionToken>::type,
129     void(boost::system::error_code)>::return_type
130 {
131   // Encode the message and copy it into an allocated buffer. The buffer will
132   // be maintained for the lifetime of the composed asynchronous operation.
133   std::ostringstream os;
134   os << message;
135   std::unique_ptr<std::string> encoded_message(new std::string(os.str()));
136 
137   // Create a steady_timer to be used for the delay between messages.
138   std::unique_ptr<boost::asio::steady_timer> delay_timer(
139       new boost::asio::steady_timer(socket.get_executor()));
140 
141   // The boost::asio::async_compose function takes:
142   //
143   // - our asynchronous operation implementation,
144   // - the completion token,
145   // - the completion handler signature, and
146   // - any I/O objects (or executors) used by the operation
147   //
148   // It then wraps our implementation in an intermediate completion handler
149   // that meets the requirements of a conforming asynchronous operation. This
150   // includes tracking outstanding work against the I/O executors associated
151   // with the operation (in this example, this is the socket's executor).
152   return boost::asio::async_compose<
153     CompletionToken, void(boost::system::error_code)>(
154       async_write_messages_implementation{
155         socket, std::move(encoded_message),
156         repeat_count, std::move(delay_timer),
157         async_write_messages_implementation::starting},
158       token, socket);
159 }
160 
161 //------------------------------------------------------------------------------
162 
test_callback()163 void test_callback()
164 {
165   boost::asio::io_context io_context;
166 
167   tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
168   tcp::socket socket = acceptor.accept();
169 
170   // Test our asynchronous operation using a lambda as a callback.
171   async_write_messages(socket, "Testing callback\r\n", 5,
172       [](const boost::system::error_code& error)
173       {
174         if (!error)
175         {
176           std::cout << "Messages sent\n";
177         }
178         else
179         {
180           std::cout << "Error: " << error.message() << "\n";
181         }
182       });
183 
184   io_context.run();
185 }
186 
187 //------------------------------------------------------------------------------
188 
test_future()189 void test_future()
190 {
191   boost::asio::io_context io_context;
192 
193   tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
194   tcp::socket socket = acceptor.accept();
195 
196   // Test our asynchronous operation using the use_future completion token.
197   // This token causes the operation's initiating function to return a future,
198   // which may be used to synchronously wait for the result of the operation.
199   std::future<void> f = async_write_messages(
200       socket, "Testing future\r\n", 5, boost::asio::use_future);
201 
202   io_context.run();
203 
204   try
205   {
206     // Get the result of the operation.
207     f.get();
208     std::cout << "Messages sent\n";
209   }
210   catch (const std::exception& e)
211   {
212     std::cout << "Error: " << e.what() << "\n";
213   }
214 }
215 
216 //------------------------------------------------------------------------------
217 
main()218 int main()
219 {
220   test_callback();
221   test_future();
222 }
223