1
2 #include <rxcpp/rx-lite.hpp>
3 #include <rxcpp/operators/rx-reduce.hpp>
4 #include <rxcpp/operators/rx-filter.hpp>
5 #include <rxcpp/operators/rx-map.hpp>
6 #include <rxcpp/operators/rx-tap.hpp>
7 #include <rxcpp/operators/rx-concat_map.hpp>
8 #include <rxcpp/operators/rx-flat_map.hpp>
9 #include <rxcpp/operators/rx-concat.hpp>
10 #include <rxcpp/operators/rx-merge.hpp>
11 #include <rxcpp/operators/rx-repeat.hpp>
12 #include <rxcpp/operators/rx-publish.hpp>
13 #include <rxcpp/operators/rx-ref_count.hpp>
14 #include <rxcpp/operators/rx-window.hpp>
15 #include <rxcpp/operators/rx-window_toggle.hpp>
16 #include <rxcpp/operators/rx-start_with.hpp>
17 namespace Rx {
18 using namespace rxcpp;
19 using namespace rxcpp::sources;
20 using namespace rxcpp::operators;
21 using namespace rxcpp::util;
22 }
23 using namespace Rx;
24
25 #include <regex>
26 #include <random>
27 using namespace std;
28 using namespace std::chrono;
29
main()30 int main()
31 {
32 random_device rd; // non-deterministic generator
33 mt19937 gen(rd());
34 uniform_int_distribution<> dist(4, 18);
35
36 // for testing purposes, produce byte stream that from lines of text
37 auto bytes = range(0, 10) |
38 flat_map([&](int i){
39 auto body = from((uint8_t)('A' + i)) |
40 repeat(dist(gen)) |
41 as_dynamic();
42 auto delim = from((uint8_t)'\r');
43 return from(body, delim) | concat();
44 }) |
45 window(17) |
46 flat_map([](observable<uint8_t> w){
47 return w |
48 reduce(
49 vector<uint8_t>(),
50 [](vector<uint8_t> v, uint8_t b){
51 v.push_back(b);
52 return v;
53 }) |
54 as_dynamic();
55 }) |
56 tap([](vector<uint8_t>& v){
57 // print input packet of bytes
58 copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
59 cout << endl;
60 });
61
62 //
63 // recover lines of text from byte stream
64 //
65
66 auto removespaces = [](string s){
67 s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
68 return s;
69 };
70
71 // create strings split on \r
72 auto strings = bytes |
73 concat_map([](vector<uint8_t> v){
74 string s(v.begin(), v.end());
75 regex delim(R"/(\r)/");
76 cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
77 cregex_token_iterator end;
78 vector<string> splits(cursor, end);
79 return iterate(move(splits));
80 }) |
81 filter([](const string& s){
82 return !s.empty();
83 }) |
84 publish() |
85 ref_count();
86
87 // filter to last string in each line
88 auto closes = strings |
89 filter(
90 [](const string& s){
91 return s.back() == '\r';
92 }) |
93 Rx::map([](const string&){return 0;});
94
95 // group strings by line
96 auto linewindows = strings |
97 window_toggle(closes | start_with(0), [=](int){return closes;});
98
99 // reduce the strings for a line into one string
100 auto lines = linewindows |
101 flat_map([&](observable<string> w) {
102 return w | start_with<string>("") | sum() | Rx::map(removespaces);
103 });
104
105 // print result
106 lines |
107 subscribe<string>(println(cout));
108
109 return 0;
110 }
111