• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 #include <rxcpp/rx-lite.hpp>
3 #include <rxcpp/operators/rx-reduce.hpp>
4 #include <rxcpp/operators/rx-filter.hpp>
5 #include <rxcpp/operators/rx-map.hpp>
6 #include <rxcpp/operators/rx-tap.hpp>
7 #include <rxcpp/operators/rx-concat_map.hpp>
8 #include <rxcpp/operators/rx-flat_map.hpp>
9 #include <rxcpp/operators/rx-concat.hpp>
10 #include <rxcpp/operators/rx-merge.hpp>
11 #include <rxcpp/operators/rx-repeat.hpp>
12 #include <rxcpp/operators/rx-publish.hpp>
13 #include <rxcpp/operators/rx-ref_count.hpp>
14 #include <rxcpp/operators/rx-window.hpp>
15 #include <rxcpp/operators/rx-window_toggle.hpp>
16 #include <rxcpp/operators/rx-start_with.hpp>
17 namespace Rx {
18 using namespace rxcpp;
19 using namespace rxcpp::sources;
20 using namespace rxcpp::operators;
21 using namespace rxcpp::util;
22 }
23 using namespace Rx;
24 
25 #include <regex>
26 #include <random>
27 using namespace std;
28 using namespace std::chrono;
29 
main()30 int main()
31 {
32     random_device rd;   // non-deterministic generator
33     mt19937 gen(rd());
34     uniform_int_distribution<> dist(4, 18);
35 
36     // for testing purposes, produce byte stream that from lines of text
37     auto bytes = range(0, 10) |
38         flat_map([&](int i){
39             auto body = from((uint8_t)('A' + i)) |
40                 repeat(dist(gen)) |
41                 as_dynamic();
42             auto delim = from((uint8_t)'\r');
43             return from(body, delim) | concat();
44         }) |
45         window(17) |
46         flat_map([](observable<uint8_t> w){
47             return w |
48                 reduce(
49                     vector<uint8_t>(),
50                     [](vector<uint8_t> v, uint8_t b){
51                         v.push_back(b);
52                         return v;
53                     }) |
54                 as_dynamic();
55         }) |
56         tap([](vector<uint8_t>& v){
57             // print input packet of bytes
58             copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
59             cout << endl;
60         });
61 
62     //
63     // recover lines of text from byte stream
64     //
65 
66     auto removespaces = [](string s){
67         s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
68         return s;
69     };
70 
71     // create strings split on \r
72     auto strings = bytes |
73         concat_map([](vector<uint8_t> v){
74             string s(v.begin(), v.end());
75             regex delim(R"/(\r)/");
76             cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
77             cregex_token_iterator end;
78             vector<string> splits(cursor, end);
79             return iterate(move(splits));
80         }) |
81         filter([](const string& s){
82             return !s.empty();
83         }) |
84         publish() |
85         ref_count();
86 
87     // filter to last string in each line
88     auto closes = strings |
89         filter(
90             [](const string& s){
91                 return s.back() == '\r';
92             }) |
93         Rx::map([](const string&){return 0;});
94 
95     // group strings by line
96     auto linewindows = strings |
97         window_toggle(closes | start_with(0), [=](int){return closes;});
98 
99     // reduce the strings for a line into one string
100     auto lines = linewindows |
101         flat_map([&](observable<string> w) {
102             return w | start_with<string>("") | sum() | Rx::map(removespaces);
103         });
104 
105     // print result
106     lines |
107         subscribe<string>(println(cout));
108 
109     return 0;
110 }
111