• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1[/
2          Copyright Oliver Kowalke 2014.
3 Distributed under the Boost Software License, Version 1.0.
4    (See accompanying file LICENSE_1_0.txt or copy at
5          http://www.boost.org/LICENSE_1_0.txt
6]
7
8[section:motivation Motivation]
9
10In order to support a broad range of execution control behaviour the coroutine
11types of __acoro__ can be used to ['escape-and-reenter] loops, to
12['escape-and-reenter] recursive computations and for ['cooperative] multitasking
13helping to solve problems in a much simpler and more elegant way than with only
14a single flow of control.
15
16
17[heading event-driven model]
18
19The event-driven model is a programming paradigm where the flow of a program is
20determined by events. The events are generated by multiple independent sources
21and an event-dispatcher, waiting on all external sources, triggers callback
22functions (event-handlers) whenever one of those events is detected (event-loop).
23The application is divided into event selection (detection) and event handling.
24
25[$../../../../libs/coroutine2/doc/images/event_model.png [align center]]
26
27The resulting applications are highly scalable, flexible, have high
28responsiveness and the components are loosely coupled. This makes the event-driven
29model suitable for user interface applications, rule-based productions systems
30or applications dealing with asynchronous I/O (for instance network servers).
31
32
33[heading event-based asynchronous paradigm]
34
35A classic synchronous console program issues an I/O request (e.g. for user
36input or filesystem data) and blocks until the request is complete.
37
38In contrast, an asynchronous I/O function initiates the physical operation but
39immediately returns to its caller, even though the operation is not yet
40complete. A program written to leverage this functionality does not block: it
41can proceed with other work (including other I/O requests in parallel) while
42the original operation is still pending. When the operation completes, the
43program is notified. Because asynchronous applications spend less overall time
44waiting for operations, they can outperform synchronous programs.
45
46Events are one of the paradigms for asynchronous execution, but
47not all asynchronous systems use events.
48Although asynchronous programming can be done using threads, they come with
49their own costs:
50
51* hard to program (traps for the unwary)
52* memory requirements are high
53* large overhead with creation and maintenance of thread state
54* expensive context switching between threads
55
56The event-based asynchronous model avoids those issues:
57
58* simpler because of the single stream of instructions
59* much less expensive context switches
60
61The downside of this paradigm consists in a sub-optimal program
62structure. An event-driven program is required to split its code into
63multiple small callback functions, i.e. the code is organized in a sequence of
64small steps that execute intermittently. An algorithm that would usually be expressed
65as a hierarchy of functions and loops must be transformed into callbacks. The
66complete state has to be stored into a data structure while the control flow
67returns to the event-loop.
68As a consequence, event-driven applications are often tedious and confusing to
69write. Each callback introduces a new scope, error callback etc. The
70sequential nature of the algorithm is split into multiple callstacks,
71making the application hard to debug. Exception handlers are restricted to
72local handlers: it is impossible to wrap a sequence of events into a single
73try-catch block.
74The use of local variables, while/for loops, recursions etc. together with the
75event-loop is not possible. The code becomes less expressive.
76
77In the past, code using asio's ['asynchronous operations] was convoluted by
78callback functions.
79
80        class session
81        {
82        public:
83            session(boost::asio::io_service& io_service) :
84                  socket_(io_service) // construct a TCP-socket from io_service
85            {}
86
87            tcp::socket& socket(){
88                return socket_;
89            }
90
91            void start(){
92                // initiate asynchronous read; handle_read() is callback-function
93                socket_.async_read_some(boost::asio::buffer(data_,max_length),
94                    boost::bind(&session::handle_read,this,
95                        boost::asio::placeholders::error,
96                        boost::asio::placeholders::bytes_transferred));
97            }
98
99        private:
100            void handle_read(const boost::system::error_code& error,
101                             size_t bytes_transferred){
102                if (!error)
103                    // initiate asynchronous write; handle_write() is callback-function
104                    boost::asio::async_write(socket_,
105                        boost::asio::buffer(data_,bytes_transferred),
106                        boost::bind(&session::handle_write,this,
107                            boost::asio::placeholders::error));
108                else
109                    delete this;
110            }
111
112            void handle_write(const boost::system::error_code& error){
113                if (!error)
114                    // initiate asynchronous read; handle_read() is callback-function
115                    socket_.async_read_some(boost::asio::buffer(data_,max_length),
116                        boost::bind(&session::handle_read,this,
117                            boost::asio::placeholders::error,
118                            boost::asio::placeholders::bytes_transferred));
119                else
120                    delete this;
121            }
122
123            boost::asio::ip::tcp::socket socket_;
124            enum { max_length=1024 };
125            char data_[max_length];
126        };
127
128In this example, a simple echo server, the logic is split into three member
129functions - local state (such as data buffer) is moved to member variables.
130
131__boost_asio__ provides with its new ['asynchronous result] feature a new
132framework combining event-driven model and coroutines, hiding the complexity
133of event-driven programming and permitting the style of classic sequential code.
134The application is not required to pass callback functions to asynchronous
135operations and local state is kept as local variables. Therefore the code
136is much easier to read and understand.
137[footnote Christopher Kohlhoff,
138[@ http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n3964.pdf
139N3964 - Library Foundations for Asynchronous Operations, Revision 1]].
140
141        void session(boost::asio::io_service& io_service){
142            // construct TCP-socket from io_service
143            boost::asio::ip::tcp::socket socket(io_service);
144
145            try{
146                for(;;){
147                    // local data-buffer
148                    char data[max_length];
149
150                    boost::system::error_code ec;
151
152                    // read asynchronous data from socket
153                    // execution context will be suspended until
154                    // some bytes are read from socket
155                    std::size_t length=socket.async_read_some(
156                            boost::asio::buffer(data),
157                            boost::asio::yield[ec]);
158                    if (ec==boost::asio::error::eof)
159                        break; //connection closed cleanly by peer
160                    else if(ec)
161                        throw boost::system::system_error(ec); //some other error
162
163                    // write some bytes asynchronously
164                    boost::asio::async_write(
165                            socket,
166                            boost::asio::buffer(data,length),
167                            boost::asio::yield[ec]);
168                    if (ec==boost::asio::error::eof)
169                        break; //connection closed cleanly by peer
170                    else if(ec)
171                        throw boost::system::system_error(ec); //some other error
172                }
173            } catch(std::exception const& e){
174                std::cerr<<"Exception: "<<e.what()<<"\n";
175            }
176        }
177
178In contrast to the previous example this one gives the impression of sequential
179code and local data (['data]) while using asynchronous operations
180(['async_read()], ['async_write()]). The algorithm is implemented in one
181function and error handling is done by one try-catch block.
182
183[heading recursive descent parsing]
184Coroutines let you invert the flow of control so you can ask a recursive descent
185parser for parsed symbols.
186
187        class Parser{
188           char next;
189           std::istream& is;
190           std::function<void(char)> cb;
191
192           char pull(){
193                return std::char_traits<char>::to_char_type(is.get());
194           }
195
196           void scan(){
197               do{
198                   next=pull();
199               }
200               while(isspace(next));
201           }
202
203        public:
204           Parser(std::istream& is_,std::function<void(char)> cb_) :
205              next(), is(is_), cb(cb_)
206            {}
207
208           void run() {
209              scan();
210              E();
211           }
212
213        private:
214           void E(){
215              T();
216              while (next=='+'||next=='-'){
217                 cb(next);
218                 scan();
219                 T();
220              }
221           }
222
223           void T(){
224              S();
225              while (next=='*'||next=='/'){
226                 cb(next);
227                 scan();
228                 S();
229              }
230           }
231
232           void S(){
233              if (std::isdigit(next)){
234                 cb(next);
235                 scan();
236              }
237              else if(next=='('){
238                 cb(next);
239                 scan();
240                 E();
241                 if (next==')'){
242                     cb(next);
243                     scan();
244                 }else{
245                     throw parser_error();
246                 }
247              }
248              else{
249                 throw parser_error();
250              }
251           }
252        };
253
254        typedef boost::coroutines2::coroutine< char > coro_t;
255
256        int main() {
257            std::istringstream is("1+1");
258            // invert control flow
259            coro_t::pull_type seq(
260                    boost::coroutines2::fixedsize_stack(),
261                    [&is](coro_t::push_type & yield) {
262                        // create parser with callback function
263                        Parser p( is,
264                                  [&yield](char ch){
265                                    // resume user-code
266                                    yield(ch);
267                                  });
268                        // start recursive parsing
269                        p.run();
270                    });
271
272            // user-code pulls parsed data from parser
273            // invert control flow
274            for(char c:seq){
275                printf("Parsed: %c\n",c);
276            }
277        }
278
279This problem does not map at all well to communicating between independent
280threads. It makes no sense for either side to proceed independently of the
281other. You want them to pass control back and forth.
282
283There's yet another advantage to using coroutines. This recursive descent parser
284throws an exception when parsing fails. With a coroutine implementation, you
285need only wrap the calling code in try/catch.
286
287With communicating threads, you would have to arrange to catch the exception
288and pass along the exception pointer on the same queue you're using to deliver
289the other events. You would then have to rethrow the exception to unwind the
290recursive document processing.
291
292The coroutine solution maps very naturally to the problem space.
293
294
295[heading 'same fringe' problem]
296
297The advantages of suspending at an arbitrary call depth can be seen
298particularly clearly with the use of a recursive function, such as traversal
299of trees.
300If traversing two different trees in the same deterministic order produces the
301same list of leaf nodes, then both trees have the same fringe.
302
303[$../../../../libs/coroutine2/doc/images/same_fringe.png [align center]]
304
305Both trees in the picture have the same fringe even though the structure of the
306trees is different.
307
308The same fringe problem could be solved using coroutines by iterating over the
309leaf nodes and comparing this sequence via ['std::equal()]. The range of data
310values is generated by function ['traverse()] which recursively traverses the
311tree and passes each node's data value to its __push_coro__.
312__push_coro__ suspends the recursive computation and transfers the data value to
313the main execution context.
314__pull_coro_it__, created from __pull_coro__, steps over those data values and
315delivers them to  ['std::equal()] for comparison. Each increment of
316__pull_coro_it__ resumes ['traverse()]. Upon return from
317['iterator::operator++()], either a new data value is available, or tree
318traversal is finished (iterator is invalidated).
319
320In effect, the coroutine iterator presents a flattened view of the recursive
321data structure.
322
323        struct node{
324            typedef std::shared_ptr<node> ptr_t;
325
326            // Each tree node has an optional left subtree,
327            // an optional right subtree and a value of its own.
328            // The value is considered to be between the left
329            // subtree and the right.
330            ptr_t       left,right;
331            std::string value;
332
333            // construct leaf
334            node(const std::string& v):
335                left(),right(),value(v)
336            {}
337            // construct nonleaf
338            node(ptr_t l,const std::string& v,ptr_t r):
339                left(l),right(r),value(v)
340            {}
341
342            static ptr_t create(const std::string& v){
343                return ptr_t(new node(v));
344            }
345
346            static ptr_t create(ptr_t l,const std::string& v,ptr_t r){
347                return ptr_t(new node(l,v,r));
348            }
349        };
350
351        node::ptr_t create_left_tree_from(const std::string& root){
352            /* --------
353                 root
354                 / \
355                b   e
356               / \
357              a   c
358             -------- */
359            return node::create(
360                    node::create(
361                        node::create("a"),
362                        "b",
363                        node::create("c")),
364                    root,
365                    node::create("e"));
366        }
367
368        node::ptr_t create_right_tree_from(const std::string& root){
369            /* --------
370                 root
371                 / \
372                a   d
373                   / \
374                  c   e
375               -------- */
376            return node::create(
377                    node::create("a"),
378                    root,
379                    node::create(
380                        node::create("c"),
381                        "d",
382                        node::create("e")));
383        }
384
385        typedef boost::coroutines2::coroutine<std::string>   coro_t;
386
387        // recursively walk the tree, delivering values in order
388        void traverse(node::ptr_t n,
389                      coro_t::push_type& out){
390            if(n->left) traverse(n->left,out);
391            out(n->value);
392            if(n->right) traverse(n->right,out);
393        }
394
395        // evaluation
396        {
397            node::ptr_t left_d(create_left_tree_from("d"));
398            coro_t::pull_type left_d_reader([&](coro_t::push_type & out){
399                                                traverse(left_d,out);
400                                            });
401
402            node::ptr_t right_b(create_right_tree_from("b"));
403            coro_t::pull_type right_b_reader([&](coro_t::push_type & out){
404                                                traverse(right_b,out);
405                                             });
406
407            std::cout << "left tree from d == right tree from b? "
408                      << std::boolalpha
409                      << std::equal(begin(left_d_reader),
410                                    end(left_d_reader),
411                                    begin(right_b_reader))
412                      << std::endl;
413        }
414        {
415            node::ptr_t left_d(create_left_tree_from("d"));
416            coro_t::pull_type left_d_reader([&](coro_t::push_type & out){
417                                                traverse(left_d,out);
418                                            });
419
420            node::ptr_t right_x(create_right_tree_from("x"));
421            coro_t::pull_type right_x_reader([&](coro_t::push_type & out){
422                                                 traverse(right_x,out);
423                                             });
424
425            std::cout << "left tree from d == right tree from x? "
426                      << std::boolalpha
427                      << std::equal(begin(left_d_reader),
428                                    end(left_d_reader),
429                                    begin(right_x_reader))
430                      << std::endl;
431        }
432        std::cout << "Done" << std::endl;
433
434        output:
435        left tree from d == right tree from b? true
436        left tree from d == right tree from x? false
437        Done
438
439
440[heading chaining coroutines]
441
442This code shows how coroutines could be chained.
443
444        typedef boost::coroutines2::coroutine<std::string> coro_t;
445
446        // deliver each line of input stream to sink as a separate string
447        void readlines(coro_t::push_type& sink,std::istream& in){
448            std::string line;
449            while(std::getline(in,line))
450                sink(line);
451        }
452
453        void tokenize(coro_t::push_type& sink, coro_t::pull_type& source){
454            // This tokenizer doesn't happen to be stateful: you could reasonably
455            // implement it with a single call to push each new token downstream. But
456            // I've worked with stateful tokenizers, in which the meaning of input
457            // characters depends in part on their position within the input line.
458            for(std::string line:source){
459                std::string::size_type pos=0;
460                while(pos<line.length()){
461                    if(line[pos]=='"'){
462                        std::string token;
463                        ++pos;              // skip open quote
464                        while(pos<line.length()&&line[pos]!='"')
465                            token+=line[pos++];
466                        ++pos;              // skip close quote
467                        sink(token);        // pass token downstream
468                    } else if (std::isspace(line[pos])){
469                        ++pos;              // outside quotes, ignore whitespace
470                    } else if (std::isalpha(line[pos])){
471                        std::string token;
472                        while (pos < line.length() && std::isalpha(line[pos]))
473                            token += line[pos++];
474                        sink(token);        // pass token downstream
475                    } else {                // punctuation
476                        sink(std::string(1,line[pos++]));
477                    }
478                }
479            }
480        }
481
482        void only_words(coro_t::push_type& sink,coro_t::pull_type& source){
483            for(std::string token:source){
484                if (!token.empty() && std::isalpha(token[0]))
485                    sink(token);
486            }
487        }
488
489        void trace(coro_t::push_type& sink, coro_t::pull_type& source){
490            for(std::string token:source){
491                std::cout << "trace: '" << token << "'\n";
492                sink(token);
493            }
494        }
495
496        struct FinalEOL{
497            ~FinalEOL(){
498                std::cout << std::endl;
499            }
500        };
501
502        void layout(coro_t::pull_type& source,int num,int width){
503            // Finish the last line when we leave by whatever means
504            FinalEOL eol;
505
506            // Pull values from upstream, lay them out 'num' to a line
507            for (;;){
508                for (int i = 0; i < num; ++i){
509                    // when we exhaust the input, stop
510                    if (!source) return;
511
512                    std::cout << std::setw(width) << source.get();
513                    // now that we've handled this item, advance to next
514                    source();
515                }
516                // after 'num' items, line break
517                std::cout << std::endl;
518            }
519        }
520
521        // For example purposes, instead of having a separate text file in the
522        // local filesystem, construct an istringstream to read.
523        std::string data(
524            "This is the first line.\n"
525            "This, the second.\n"
526            "The third has \"a phrase\"!\n"
527            );
528
529        {
530            std::cout << "\nfilter:\n";
531            std::istringstream infile(data);
532            coro_t::pull_type reader(std::bind(readlines, _1, std::ref(infile)));
533            coro_t::pull_type tokenizer(std::bind(tokenize, _1, std::ref(reader)));
534            coro_t::pull_type filter(std::bind(only_words, _1, std::ref(tokenizer)));
535            coro_t::pull_type tracer(std::bind(trace, _1, std::ref(filter)));
536            for(std::string token:tracer){
537                // just iterate, we're already pulling through tracer
538            }
539        }
540
541        {
542            std::cout << "\nlayout() as coroutine::push_type:\n";
543            std::istringstream infile(data);
544            coro_t::pull_type reader(std::bind(readlines, _1, std::ref(infile)));
545            coro_t::pull_type tokenizer(std::bind(tokenize, _1, std::ref(reader)));
546            coro_t::pull_type filter(std::bind(only_words, _1, std::ref(tokenizer)));
547            coro_t::push_type writer(std::bind(layout, _1, 5, 15));
548            for(std::string token:filter){
549                writer(token);
550            }
551        }
552
553        {
554            std::cout << "\nfiltering output:\n";
555            std::istringstream infile(data);
556            coro_t::pull_type reader(std::bind(readlines,_1,std::ref(infile)));
557            coro_t::pull_type tokenizer(std::bind(tokenize,_1,std::ref(reader)));
558            coro_t::push_type writer(std::bind(layout,_1,5,15));
559            // Because of the symmetry of the API, we can use any of these
560            // chaining functions in a push_type coroutine chain as well.
561            coro_t::push_type filter(std::bind(only_words,std::ref(writer),_1));
562            for(std::string token:tokenizer){
563                filter(token);
564            }
565        }
566
567[endsect]
568