• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-filter.hpp>
4 #include <rxcpp/operators/rx-map.hpp>
5 #include <rxcpp/operators/rx-take.hpp>
6 #include <rxcpp/operators/rx-concat_map.hpp>
7 #include <rxcpp/operators/rx-observe_on.hpp>
8 
9 static const int static_tripletCount = 100;
10 
11 SCENARIO("concat_transform pythagorian ranges", "[!hide][range][concat_transform][pythagorian][perf]"){
12     const int& tripletCount = static_tripletCount;
13     GIVEN("some ranges"){
14         WHEN("generating pythagorian triplets"){
15             using namespace std::chrono;
16             typedef steady_clock clock;
17 
18             auto sc = rxsc::make_immediate();
19             //auto sc = rxsc::make_current_thread();
20             auto so = rx::identity_one_worker(sc);
21 
22             int c = 0;
23             int ct = 0;
24             int n = 1;
25             auto start = clock::now();
26             auto triples =
27                 rxs::range(1, so)
28                     .concat_transform(
__anon58f26a080102(int z)29                         [&c, so](int z){
30                             return rxs::range(1, z, 1, so)
31                                 .concat_transform(
32                                     [&c, so, z](int x){
33                                         return rxs::range(x, z, 1, so)
34                                             .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
35                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
36                                             // forget type to workaround lambda deduction bug on msvc 2013
37                                             .as_dynamic();},
38                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
39                                 // forget type to workaround lambda deduction bug on msvc 2013
40                                 .as_dynamic();},
__anon58f26a080602(int , std::tuple<int,int,int> triplet)41                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;});
42             triples
43                 .take(tripletCount)
44                 .subscribe(
__anon58f26a080702(int ,int ,int )45                     rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){++ct;}),
__anon58f26a080802(rxu::error_ptr)46                     [](rxu::error_ptr){abort();});
47             auto finish = clock::now();
48             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
49                    duration_cast<milliseconds>(start.time_since_epoch());
50             std::cout << "concat pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
51 
52         }
53     }
54 }
55 
56 SCENARIO("synchronize concat_transform pythagorian ranges", "[!hide][range][concat_transform][synchronize][pythagorian][perf]"){
57     const int& tripletCount = static_tripletCount;
58     GIVEN("some ranges"){
59         WHEN("generating pythagorian triplets"){
60             using namespace std::chrono;
61             typedef steady_clock clock;
62 
63             auto so = rx::synchronize_event_loop();
64 
65             int c = 0;
66             int n = 1;
67             auto start = clock::now();
68             auto triples =
69                 rxs::range(1, so)
70                     .concat_transform(
__anon58f26a080902(int z)71                         [&c, so](int z){
72                             return rxs::range(1, z, 1, so)
73                                 .concat_transform(
74                                     [&c, so, z](int x){
75                                         return rxs::range(x, z, 1, so)
76                                             .filter([&c, z, x](int y){
77                                                 ++c;
78                                                 if (x*x + y*y == z*z) {
79                                                     return true;}
80                                                 else {
81                                                     return false;}})
82                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
83                                             // forget type to workaround lambda deduction bug on msvc 2013
84                                             .as_dynamic();},
85                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
86                                     so)
87                                 // forget type to workaround lambda deduction bug on msvc 2013
88                                 .as_dynamic();},
__anon58f26a080e02(int , std::tuple<int,int,int> triplet)89                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
90                         so);
91             int ct = triples
92                 .take(tripletCount)
93                 .as_blocking()
94                 .count();
95 
96             auto finish = clock::now();
97             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
98                    duration_cast<milliseconds>(start.time_since_epoch());
99             std::cout << "concat sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
100         }
101     }
102 }
103 
104 SCENARIO("observe_on concat_transform pythagorian ranges", "[!hide][range][concat_transform][observe_on][pythagorian][perf]"){
105     const int& tripletCount = static_tripletCount;
106     GIVEN("some ranges"){
107         WHEN("generating pythagorian triplets"){
108             using namespace std::chrono;
109             typedef steady_clock clock;
110 
111             auto so = rx::observe_on_event_loop();
112 
113             int c = 0;
114             int n = 1;
115             auto start = clock::now();
116             auto triples =
117                 rxs::range(1, so)
118                     .concat_transform(
__anon58f26a080f02(int z)119                         [&c, so](int z){
120                             return rxs::range(1, z, 1, so)
121                                 .concat_transform(
122                                     [&c, so, z](int x){
123                                         return rxs::range(x, z, 1, so)
124                                             .filter([&c, z, x](int y){
125                                                 ++c;
126                                                 if (x*x + y*y == z*z) {
127                                                     return true;}
128                                                 else {
129                                                     return false;}})
130                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
131                                             // forget type to workaround lambda deduction bug on msvc 2013
132                                             .as_dynamic();},
133                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
134                                     so)
135                                 // forget type to workaround lambda deduction bug on msvc 2013
136                                 .as_dynamic();},
__anon58f26a081402(int , std::tuple<int,int,int> triplet)137                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
138                         so);
139 
140             int ct = triples
141                 .take(tripletCount)
142                 .as_blocking()
143                 .count();
144 
145             auto finish = clock::now();
146             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
147                    duration_cast<milliseconds>(start.time_since_epoch());
148             std::cout << "concat observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
149         }
150     }
151 }
152 
153 SCENARIO("serialize concat_transform pythagorian ranges", "[!hide][range][concat_transform][serialize][pythagorian][perf]"){
154     const int& tripletCount = static_tripletCount;
155     GIVEN("some ranges"){
156         WHEN("generating pythagorian triplets"){
157             using namespace std::chrono;
158             typedef steady_clock clock;
159 
160             auto so = rx::serialize_event_loop();
161 
162             int c = 0;
163             int n = 1;
164             auto start = clock::now();
165             auto triples =
166                 rxs::range(1, so)
167                     .concat_transform(
__anon58f26a081502(int z)168                         [&c, so](int z){
169                             return rxs::range(1, z, 1, so)
170                                 .concat_transform(
171                                     [&c, so, z](int x){
172                                         return rxs::range(x, z, 1, so)
173                                             .filter([&c, z, x](int y){
174                                                 ++c;
175                                                 if (x*x + y*y == z*z) {
176                                                     return true;}
177                                                 else {
178                                                     return false;}})
179                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
180                                             // forget type to workaround lambda deduction bug on msvc 2013
181                                             .as_dynamic();},
182                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
183                                     so)
184                                 // forget type to workaround lambda deduction bug on msvc 2013
185                                 .as_dynamic();},
__anon58f26a081a02(int , std::tuple<int,int,int> triplet)186                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
187                         so);
188 
189             int ct = triples
190                 .take(tripletCount)
191                 .as_blocking()
192                 .count();
193 
194             auto finish = clock::now();
195             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
196                    duration_cast<milliseconds>(start.time_since_epoch());
197             std::cout << "concat serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
198         }
199     }
200 }
201 
202 SCENARIO("concat_map completes", "[concat_map][transform][map][operators]"){
203     GIVEN("two cold observables. one of ints. one of strings."){
204         auto sc = rxsc::make_test();
205         auto w = sc.create_worker();
206         const rxsc::test::messages<int> i_on;
207         const rxsc::test::messages<std::string> s_on;
208 
209         auto xs = sc.make_cold_observable({
210             i_on.next(100, 4),
211             i_on.next(200, 2),
212             i_on.completed(500)
213         });
214 
215         auto ys = sc.make_cold_observable({
216             s_on.next(50, "foo"),
217             s_on.next(100, "bar"),
218             s_on.next(150, "baz"),
219             s_on.next(200, "qux"),
220             s_on.completed(250)
221         });
222 
223         WHEN("each int is mapped to the strings"){
224 
225             auto res = w.start(
__anon58f26a081b02() 226                 [&]() {
227                     return xs
228                         | rxo::concat_map(
229                             [&](int){
230                                 return ys;},
231                             [](int, std::string s){
232                                 return s;})
233                         // forget type to workaround lambda deduction bug on msvc 2013
234                         | rxo::as_dynamic();
235                 }
236             );
237 
238             THEN("the output contains strings repeated for each int"){
239                 auto required = rxu::to_vector({
240                     s_on.next(350, "foo"),
241                     s_on.next(400, "bar"),
242                     s_on.next(450, "baz"),
243                     s_on.next(500, "qux"),
244                     s_on.next(600, "foo"),
245                     s_on.next(650, "bar"),
246                     s_on.next(700, "baz"),
247                     s_on.next(750, "qux"),
248                     s_on.completed(800)
249                 });
250                 auto actual = res.get_observer().messages();
251                 REQUIRE(required == actual);
252             }
253 
254             THEN("there was one subscription and one unsubscription to the ints"){
255                 auto required = rxu::to_vector({
256                     i_on.subscribe(200, 700)
257                 });
258                 auto actual = xs.subscriptions();
259                 REQUIRE(required == actual);
260             }
261 
262             THEN("there were 2 subscription and unsubscription to the strings"){
263                 auto required = rxu::to_vector({
264                     s_on.subscribe(300, 550),
265                     s_on.subscribe(550, 800)
266                 });
267                 auto actual = ys.subscriptions();
268                 REQUIRE(required == actual);
269             }
270         }
271     }
272 }
273 
274 SCENARIO("concat_transform completes", "[concat_transform][transform][map][operators]"){
275     GIVEN("two cold observables. one of ints. one of strings."){
276         auto sc = rxsc::make_test();
277         auto w = sc.create_worker();
278         const rxsc::test::messages<int> i_on;
279         const rxsc::test::messages<std::string> s_on;
280 
281         auto xs = sc.make_cold_observable({
282             i_on.next(100, 4),
283             i_on.next(200, 2),
284             i_on.completed(500)
285         });
286 
287         auto ys = sc.make_cold_observable({
288             s_on.next(50, "foo"),
289             s_on.next(100, "bar"),
290             s_on.next(150, "baz"),
291             s_on.next(200, "qux"),
292             s_on.completed(250)
293         });
294 
295         WHEN("each int is mapped to the strings"){
296 
297             auto res = w.start(
__anon58f26a081e02() 298                 [&]() {
299                     return xs
300                         | rxo::concat_transform(
301                             [&](int){
302                                 return ys;},
303                             [](int, std::string s){
304                                 return s;})
305                         // forget type to workaround lambda deduction bug on msvc 2013
306                         | rxo::as_dynamic();
307                 }
308             );
309 
310             THEN("the output contains strings repeated for each int"){
311                 auto required = rxu::to_vector({
312                     s_on.next(350, "foo"),
313                     s_on.next(400, "bar"),
314                     s_on.next(450, "baz"),
315                     s_on.next(500, "qux"),
316                     s_on.next(600, "foo"),
317                     s_on.next(650, "bar"),
318                     s_on.next(700, "baz"),
319                     s_on.next(750, "qux"),
320                     s_on.completed(800)
321                 });
322                 auto actual = res.get_observer().messages();
323                 REQUIRE(required == actual);
324             }
325 
326             THEN("there was one subscription and one unsubscription to the ints"){
327                 auto required = rxu::to_vector({
328                     i_on.subscribe(200, 700)
329                 });
330                 auto actual = xs.subscriptions();
331                 REQUIRE(required == actual);
332             }
333 
334             THEN("there were 2 subscription and unsubscription to the strings"){
335                 auto required = rxu::to_vector({
336                     s_on.subscribe(300, 550),
337                     s_on.subscribe(550, 800)
338                 });
339                 auto actual = ys.subscriptions();
340                 REQUIRE(required == actual);
341             }
342         }
343 
344         WHEN("each int is mapped to the strings with coordinator"){
345 
346             auto res = w.start(
__anon58f26a082102() 347                 [&]() {
348                     return xs
349                         .concat_transform(
350                             [&](int){
351                                 return ys;},
352                             [](int, std::string s){
353                                 return s;},
354                             rx::identity_current_thread())
355                         // forget type to workaround lambda deduction bug on msvc 2013
356                         .as_dynamic();
357                 }
358             );
359 
360             THEN("the output contains strings repeated for each int"){
361                 auto required = rxu::to_vector({
362                     s_on.next(350, "foo"),
363                     s_on.next(400, "bar"),
364                     s_on.next(450, "baz"),
365                     s_on.next(500, "qux"),
366                     s_on.next(600, "foo"),
367                     s_on.next(650, "bar"),
368                     s_on.next(700, "baz"),
369                     s_on.next(750, "qux"),
370                     s_on.completed(800)
371                 });
372                 auto actual = res.get_observer().messages();
373                 REQUIRE(required == actual);
374             }
375 
376             THEN("there was one subscription and one unsubscription to the ints"){
377                 auto required = rxu::to_vector({
378                     i_on.subscribe(200, 700)
379                 });
380                 auto actual = xs.subscriptions();
381                 REQUIRE(required == actual);
382             }
383 
384             THEN("there were 2 subscription and unsubscription to the strings"){
385                 auto required = rxu::to_vector({
386                     s_on.subscribe(300, 550),
387                     s_on.subscribe(550, 800)
388                 });
389                 auto actual = ys.subscriptions();
390                 REQUIRE(required == actual);
391             }
392         }
393     }
394 }
395 
396 SCENARIO("concat_transform, no result selector, no coordination", "[concat_transform][transform][map][operators]"){
397     GIVEN("two cold observables. one of ints. one of strings."){
398         auto sc = rxsc::make_test();
399         auto w = sc.create_worker();
400         const rxsc::test::messages<int> i_on;
401         const rxsc::test::messages<std::string> s_on;
402 
403         auto xs = sc.make_cold_observable({
404             i_on.next(100, 4),
405             i_on.next(200, 2),
406             i_on.completed(500)
407         });
408 
409         auto ys = sc.make_cold_observable({
410             s_on.next(50, "foo"),
411             s_on.next(100, "bar"),
412             s_on.next(150, "baz"),
413             s_on.next(200, "qux"),
414             s_on.completed(250)
415         });
416 
417         WHEN("each int is mapped to the strings"){
418 
419             auto res = w.start(
__anon58f26a082402() 420                 [&]() {
421                     return xs
422                         .concat_transform(
423                             [&](int){
424                                 return ys;})
425                         // forget type to workaround lambda deduction bug on msvc 2013
426                         .as_dynamic();
427                 }
428             );
429 
430             THEN("the output contains strings repeated for each int"){
431                 auto required = rxu::to_vector({
432                     s_on.next(350, "foo"),
433                     s_on.next(400, "bar"),
434                     s_on.next(450, "baz"),
435                     s_on.next(500, "qux"),
436                     s_on.next(600, "foo"),
437                     s_on.next(650, "bar"),
438                     s_on.next(700, "baz"),
439                     s_on.next(750, "qux"),
440                     s_on.completed(800)
441                 });
442                 auto actual = res.get_observer().messages();
443                 REQUIRE(required == actual);
444             }
445 
446             THEN("there was one subscription and one unsubscription to the ints"){
447                 auto required = rxu::to_vector({
448                     i_on.subscribe(200, 700)
449                 });
450                 auto actual = xs.subscriptions();
451                 REQUIRE(required == actual);
452             }
453 
454             THEN("there were 2 subscription and unsubscription to the strings"){
455                 auto required = rxu::to_vector({
456                     s_on.subscribe(300, 550),
457                     s_on.subscribe(550, 800)
458                 });
459                 auto actual = ys.subscriptions();
460                 REQUIRE(required == actual);
461             }
462         }
463     }
464 }
465 
466 SCENARIO("concat_transform, no result selector, with coordination", "[concat_transform][transform][map][operators]"){
467     GIVEN("two cold observables. one of ints. one of strings."){
468         auto sc = rxsc::make_test();
469         auto w = sc.create_worker();
470         const rxsc::test::messages<int> i_on;
471         const rxsc::test::messages<std::string> s_on;
472 
473         auto xs = sc.make_cold_observable({
474             i_on.next(100, 4),
475             i_on.next(200, 2),
476             i_on.completed(500)
477         });
478 
479         auto ys = sc.make_cold_observable({
480             s_on.next(50, "foo"),
481             s_on.next(100, "bar"),
482             s_on.next(150, "baz"),
483             s_on.next(200, "qux"),
484             s_on.completed(250)
485         });
486 
487         WHEN("each int is mapped to the strings"){
488 
489             auto res = w.start(
__anon58f26a082602() 490                 [&]() {
491                     return xs
492                         .concat_transform(
493                             [&](int){
494                                 return ys;},
495                             rx::identity_current_thread())
496                         // forget type to workaround lambda deduction bug on msvc 2013
497                         .as_dynamic();
498                 }
499             );
500 
501             THEN("the output contains strings repeated for each int"){
502                 auto required = rxu::to_vector({
503                     s_on.next(350, "foo"),
504                     s_on.next(400, "bar"),
505                     s_on.next(450, "baz"),
506                     s_on.next(500, "qux"),
507                     s_on.next(600, "foo"),
508                     s_on.next(650, "bar"),
509                     s_on.next(700, "baz"),
510                     s_on.next(750, "qux"),
511                     s_on.completed(800)
512                 });
513                 auto actual = res.get_observer().messages();
514                 REQUIRE(required == actual);
515             }
516 
517             THEN("there was one subscription and one unsubscription to the ints"){
518                 auto required = rxu::to_vector({
519                     i_on.subscribe(200, 700)
520                 });
521                 auto actual = xs.subscriptions();
522                 REQUIRE(required == actual);
523             }
524 
525             THEN("there were 2 subscription and unsubscription to the strings"){
526                 auto required = rxu::to_vector({
527                     s_on.subscribe(300, 550),
528                     s_on.subscribe(550, 800)
529                 });
530                 auto actual = ys.subscriptions();
531                 REQUIRE(required == actual);
532             }
533         }
534     }
535 }
536