• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-concat.hpp>
3 #include <rxcpp/operators/rx-group_by.hpp>
4 #include <rxcpp/operators/rx-reduce.hpp>
5 #include <rxcpp/operators/rx-map.hpp>
6 #include <rxcpp/operators/rx-merge.hpp>
7 #include <rxcpp/operators/rx-take.hpp>
8 #include <rxcpp/operators/rx-start_with.hpp>
9 #include <rxcpp/operators/rx-observe_on.hpp>
10 
11 #include <locale>
12 #include <sstream>
13 
14 SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){
15     GIVEN("a for loop"){
16         WHEN("partitioning pi series across all hardware threads"){
17 
18             std::atomic_int c;
19             c = 0;
__anon769080da0102(int k) 20             auto pi = [&](int k) {
21                 ++c;
22                 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
23             };
24 
25             using namespace std::chrono;
26             auto start = steady_clock::now();
27 
28             // share an output thread across all the producer threads
29             auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
30 
31             struct work
32             {
33                 int index;
34                 int first;
35                 int last;
36             };
37 
38             // use all available hardware threads
39             auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
40                 map(
__anon769080da0202(int index)41                     [](int index){
42                         static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
43                         int first = (chunk * index) + 1;
44                         int last =   chunk * (index + 1);
45                         return work{index, first, last};}
46                     ).
47                 group_by(
__anon769080da0302(work w) 48                     [](work w) -> int {return w.index % std::thread::hardware_concurrency();},
__anon769080da0402(work w)49                     [](work w){return w;}).
50                 map(
__anon769080da0502(rxcpp::grouped_observable<int, work> onproc) 51                     [=](rxcpp::grouped_observable<int, work> onproc) {
52                         auto key = onproc.get_key();
53                         // share a producer thread across all the ranges in this group of chunks
54                         auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
55                         return onproc.
56                             map(
57                                 [=](work w){
58                                     std::stringstream message;
59                                     message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
60 
61                                     return rxcpp::observable<>::range(w.first, w.last, producerthread).
62                                         map(pi).
63                                         sum(). // each thread maps and reduces its contribution to the answer
64                                         map(
65                                             [=](long double v){
66                                                 std::stringstream message;
67                                                 message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
68                                                 return std::make_tuple(message.str(), v);
69                                             }).
70                                         start_with(std::make_tuple(message.str(), 0.0L)).
71                                         as_dynamic();
72                                 }).
73                             concat(). // only subscribe to one range at a time in this group.
74                             observe_on(outputthread).
75                             map(rxcpp::util::apply_to(
76                                 [](std::string message, long double v){
77                                     std::cout << message << std::endl;
78                                     return v;
79                                 })).
80                             as_dynamic();
81                     }).
82                 merge().
83                 sum(). // reduces the contributions from all the threads to the answer
84                 as_blocking().
85                 last();
86 
87             std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
88             auto finish = steady_clock::now();
89             auto msElapsed = duration_cast<milliseconds>(finish-start);
90             std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
91 
92         }
93     }
94 }
95 
96 SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){
97     GIVEN("a for loop"){
98         WHEN("partitioning pi series across all hardware threads"){
99 
100             std::atomic_int c;
101             c = 0;
__anon769080da0902(int k) 102             auto pi = [&](int k) {
103                 ++c;
104                 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
105             };
106 
107             using namespace std::chrono;
108             auto start = steady_clock::now();
109 
110             struct work
111             {
112                 int index;
113                 int first;
114                 int last;
115             };
116 
117             // use all available hardware threads
118             auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
119                 map(
__anon769080da0a02(int index)120                     [](int index){
121                         static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
122                         int first = (chunk * index) + 1;
123                         int last =   chunk * (index + 1);
124                         return work{index, first, last};
125                     }).
126                 map(
__anon769080da0b02(work w)127                     [=](work w){
128                         std::stringstream message;
129                         message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
130 
131                         // create a new thread for every chunk
132                         return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()).
133                             map(pi).
134                             sum(). // each thread maps and reduces its contribution to the answer
135                             map(
136                                 [=](long double v){
137                                     std::stringstream message;
138                                     message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
139                                     return std::make_tuple(message.str(), v);
140                                 }).
141                             start_with(std::make_tuple(message.str(), 0.0L)).
142                             as_dynamic();
143                     }).
144                 merge(rxcpp::observe_on_new_thread()).
145                 map(rxcpp::util::apply_to(
__anon769080da0d02(std::string message, long double v)146                     [](std::string message, long double v){
147                         std::cout << message << std::endl;
148                         return v;
149                     })).
150                 sum(). // reduces the contributions from all the threads to the answer
151                 as_blocking().
152                 last();
153 
154             std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
155             auto finish = steady_clock::now();
156             auto msElapsed = duration_cast<milliseconds>(finish-start);
157             std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
158 
159         }
160     }
161 }
162 
whitespace(char c)163 char whitespace(char c) {
164     return std::isspace<char>(c, std::locale::classic());
165 }
166 
trim(std::string s)167 std::string trim(std::string s) {
168     auto first = std::find_if_not(s.begin(), s.end(), whitespace);
169     auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace);
170     if (last != s.rend()) {
171         s.erase(s.end() - (last-s.rbegin()), s.end());
172     }
173     s.erase(s.begin(), first);
174     return s;
175 }
176 
tolowerLess(char lhs,char rhs)177 bool tolowerLess(char lhs, char rhs) {
178     return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic());
179 }
180 
tolowerStringLess(const std::string & lhs,const std::string & rhs)181 bool tolowerStringLess(const std::string& lhs, const std::string& rhs) {
182     return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess);
183 }
184 
185 SCENARIO("group_by", "[group_by][operators]"){
186     GIVEN("1 hot observable of ints."){
187         auto sc = rxsc::make_test();
188         auto w = sc.create_worker();
189         const rxsc::test::messages<std::string> on;
190         int keyInvoked = 0;
191         int marbleInvoked = 0;
192 
193         auto xs = sc.make_hot_observable({
194             on.next(90, "error"),
195             on.next(110, "error"),
196             on.next(130, "error"),
197             on.next(220, "  foo"),
198             on.next(240, " FoO "),
199             on.next(270, "baR  "),
200             on.next(310, "foO "),
201             on.next(350, " Baz   "),
202             on.next(360, "  qux "),
203             on.next(390, "   bar"),
204             on.next(420, " BAR  "),
205             on.next(470, "FOO "),
206             on.next(480, "baz  "),
207             on.next(510, " bAZ "),
208             on.next(530, "    fOo    "),
209             on.completed(570),
210             on.next(580, "error"),
211             on.completed(600),
212             on.error(650, std::runtime_error("error in completed sequence"))
213         });
214 
215         WHEN("group normalized strings"){
216 
217             auto res = w.start(
__anon769080da0e02() 218                 [&]() {
219                     return xs
220                         .group_by(
221                             [&](std::string v){
222                                 ++keyInvoked;
223                                 return trim(std::move(v));
224                             },
225                             [&](std::string v){
226                                 ++marbleInvoked;
227                                 std::reverse(v.begin(), v.end());
228                                 return v;
229                             },
230                             tolowerStringLess)
231                         .map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();})
232                         // forget type to workaround lambda deduction bug on msvc 2013
233                         .as_dynamic();
234                 }
235             );
236 
237             THEN("the output contains groups of group keys"){
238                 auto required = rxu::to_vector({
239                     on.next(220, "foo"),
240                     on.next(270, "baR"),
241                     on.next(350, "Baz"),
242                     on.next(360, "qux"),
243                     on.completed(570)
244                 });
245                 auto actual = res.get_observer().messages();
246                 REQUIRE(required == actual);
247             }
248 
249             THEN("there was one subscription and one unsubscription to the xs"){
250                 auto required = rxu::to_vector({
251                     on.subscribe(200, 570)
252                 });
253                 auto actual = xs.subscriptions();
254                 REQUIRE(required == actual);
255             }
256 
257             THEN("key selector was invoked for each value"){
258                 REQUIRE(12 == keyInvoked);
259             }
260 
261             THEN("marble selector was invoked for each value"){
262                 REQUIRE(12 == marbleInvoked);
263             }
264         }
265     }
266 }
267 
268 SCENARIO("group_by take 1", "[group_by][take][operators]"){
269     GIVEN("1 hot observable of ints."){
270         auto sc = rxsc::make_test();
271         auto w = sc.create_worker();
272         const rxsc::test::messages<long> on;
273         int keyInvoked = 0;
274         int marbleInvoked = 0;
275         int groupEmitted = 0;
276 
277         auto xs = sc.make_hot_observable({
278             on.next(130, -1),
279             on.next(220, 0),
280             on.next(240, -1),
281             on.next(270, 2),
282             on.next(310, -3),
283             on.next(350, 4),
284             on.next(360, -5),
285             on.next(390, 6),
286             on.next(420, -7),
287             on.next(470, 8),
288             on.next(480, -9),
289             on.completed(570)
290         });
291 
292         WHEN("1 group of ints is emitted"){
293 
294             auto res = w.start(
__anon769080da1202() 295                 [&]() {
296                     return xs
297                         | rxo::group_by(
298                             [&](long v) {
299                                 ++keyInvoked;
300                                 return v % 2;
301                             },
302                             [&](long v){
303                                 ++marbleInvoked;
304                                 return v;
305                             })
306                         | rxo::take(1)
307                         | rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
308                             ++groupEmitted;
309                             return g;
310                         })
311                         | rxo::merge()
312                         // forget type to workaround lambda deduction bug on msvc 2013
313                         | rxo::as_dynamic();
314                 }
315             );
316 
317             THEN("the output contains groups of ints"){
318                 auto required = rxu::to_vector({
319                     on.next(220, 0),
320                     on.next(270, 2),
321                     on.next(350, 4),
322                     on.next(390, 6),
323                     on.next(470, 8),
324                     on.completed(570)
325                 });
326                 auto actual = res.get_observer().messages();
327                 REQUIRE(required == actual);
328             }
329 
330             THEN("there was one subscription and one unsubscription to the xs"){
331                 auto required = rxu::to_vector({
332                     on.subscribe(200, 570)
333                 });
334                 auto actual = xs.subscriptions();
335                 REQUIRE(required == actual);
336             }
337 
338             THEN("key selector was invoked for each value"){
339                 REQUIRE(10 == keyInvoked);
340             }
341 
342             THEN("marble selector was invoked for each value"){
343                 REQUIRE(5 == marbleInvoked);
344             }
345 
346             THEN("1 group emitted"){
347                 REQUIRE(1 == groupEmitted);
348             }
349         }
350     }
351 }
352 
353 SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){
354     GIVEN("1 hot observable of ints."){
355         auto sc = rxsc::make_test();
356         auto w = sc.create_worker();
357         const rxsc::test::messages<long> on;
358         int keyInvoked = 0;
359         int marbleInvoked = 0;
360         int groupEmitted = 0;
361 
362         auto xs = sc.make_hot_observable({
363             on.next(130, -1),
364             on.next(220, 0),
365             on.next(240, -1),
366             on.next(270, 2),
367             on.next(310, -3),
368             on.next(350, 4),
369             on.next(360, -5),
370             on.next(390, 6),
371             on.next(420, -7),
372         });
373 
374         WHEN("1 group of ints is emitted"){
375 
376             auto res = w.start(
__anon769080da1602() 377                 [&]() {
378                     return xs
379                         .group_by(
380                             [&](long v) {
381                                 ++keyInvoked;
382                                 return v % 2;
383                             },
384                             [&](long v){
385                                 ++marbleInvoked;
386                                 return v;
387                             })
388                         .take(1)
389                         .map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
390                             ++groupEmitted;
391                             return g.take(4);
392                         })
393                         .merge()
394                         // forget type to workaround lambda deduction bug on msvc 2013
395                         .as_dynamic();
396                 }
397             );
398 
399             THEN("the output contains groups of ints"){
400                 auto required = rxu::to_vector({
401                     on.next(220, 0),
402                     on.next(270, 2),
403                     on.next(350, 4),
404                     on.next(390, 6),
405                     on.completed(390)
406                 });
407                 auto actual = res.get_observer().messages();
408                 REQUIRE(required == actual);
409             }
410 
411             THEN("there was one subscription and one unsubscription to the xs"){
412                 auto required = rxu::to_vector({
413                     on.subscribe(200, 390)
414                 });
415                 auto actual = xs.subscriptions();
416                 REQUIRE(required == actual);
417             }
418 
419             THEN("key selector was invoked for each value"){
420                 REQUIRE(7 == keyInvoked);
421             }
422 
423             THEN("marble selector was invoked for each value"){
424                 REQUIRE(4 == marbleInvoked);
425             }
426 
427             THEN("1 group emitted"){
428                 REQUIRE(1 == groupEmitted);
429             }
430         }
431     }
432 }