• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-filter.hpp>
4 #include <rxcpp/operators/rx-map.hpp>
5 #include <rxcpp/operators/rx-take.hpp>
6 #include <rxcpp/operators/rx-flat_map.hpp>
7 #include <rxcpp/operators/rx-observe_on.hpp>
8 
9 static const int static_tripletCount = 100;
10 
11 SCENARIO("pythagorian for loops", "[!hide][for][pythagorian][perf]"){
12     const int& tripletCount = static_tripletCount;
13     GIVEN("a for loop"){
14         WHEN("generating pythagorian triplets"){
15             using namespace std::chrono;
16             typedef steady_clock clock;
17 
18             int c = 0;
19             int ct = 0;
20             int n = 1;
21             auto start = clock::now();
22             for(int z = 1;; ++z)
23             {
24                 for(int x = 1; x <= z; ++x)
25                 {
26                     for(int y = x; y <= z; ++y)
27                     {
28                         ++c;
29                         if(x*x + y*y == z*z)
30                         {
31                             ++ct;
32                             if(ct == tripletCount)
33                                 goto done;
34                         }
35                     }
36                 }
37             }
38             done:
39             auto finish = clock::now();
40             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
41                    duration_cast<milliseconds>(start.time_since_epoch());
42             std::cout << "pythagorian for   : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
43 
44         }
45     }
46 }
47 
48 SCENARIO("merge_transform pythagorian ranges", "[!hide][range][merge_transform][pythagorian][perf]"){
49     const int& tripletCount = static_tripletCount;
50     GIVEN("some ranges"){
51         WHEN("generating pythagorian triplets"){
52             using namespace std::chrono;
53             typedef steady_clock clock;
54 
55             auto so = rx::identity_immediate();
56 
57             int c = 0;
58             int ct = 0;
59             int n = 1;
60             auto start = clock::now();
61             auto triples =
62                 rxs::range(1, so)
63                     .merge_transform(
__anon63d55a350102(int z)64                         [&c, so](int z){
65                             return rxs::range(1, z, 1, so)
66                                 .flat_map(
67                                     [&c, so, z](int x){
68                                         return rxs::range(x, z, 1, so)
69                                             .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
70                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
71                                             // forget type to workaround lambda deduction bug on msvc 2013
72                                             .as_dynamic();},
73                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
74                                 // forget type to workaround lambda deduction bug on msvc 2013
75                                 .as_dynamic();},
__anon63d55a350602(int , std::tuple<int,int,int> triplet)76                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;});
77             triples
78                 .take(tripletCount)
79                 .subscribe(
__anon63d55a350702(int ,int ,int )80                     rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){
81                         ++ct;
82                     }));
83             auto finish = clock::now();
84             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
85                    duration_cast<milliseconds>(start.time_since_epoch());
86             std::cout << "merge pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
87 
88         }
89     }
90 }
91 
92 SCENARIO("synchronize merge_transform pythagorian ranges", "[!hide][range][merge_transform][synchronize][pythagorian][perf]"){
93     const int& tripletCount = static_tripletCount;
94     GIVEN("some ranges"){
95         WHEN("generating pythagorian triplets"){
96             using namespace std::chrono;
97             typedef steady_clock clock;
98 
99             auto so = rx::synchronize_event_loop();
100 
101             int c = 0;
102             int n = 1;
103             auto start = clock::now();
104             auto triples =
105                 rxs::range(1, so)
106                     .merge_transform(
__anon63d55a350802(int z)107                         [&c, so](int z){
108                             return rxs::range(1, z, 1, so)
109                                 .merge_transform(
110                                     [&c, so, z](int x){
111                                         return rxs::range(x, z, 1, so)
112                                             .filter([&c, z, x](int y){
113                                                 ++c;
114                                                 if (x*x + y*y == z*z) {
115                                                     return true;}
116                                                 else {
117                                                     return false;}})
118                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
119                                             // forget type to workaround lambda deduction bug on msvc 2013
120                                             .as_dynamic();},
121                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
122                                     so)
123                                 // forget type to workaround lambda deduction bug on msvc 2013
124                                 .as_dynamic();},
__anon63d55a350d02(int , std::tuple<int,int,int> triplet)125                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
126                         so);
127             int ct = triples
128                 .take(tripletCount)
129                 .as_blocking()
130                 .count();
131 
132             auto finish = clock::now();
133             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
134                    duration_cast<milliseconds>(start.time_since_epoch());
135             std::cout << "merge sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
136         }
137     }
138 }
139 
140 SCENARIO("observe_on merge_transform pythagorian ranges", "[!hide][range][merge_transform][observe_on][pythagorian][perf]"){
141     const int& tripletCount = static_tripletCount;
142     GIVEN("some ranges"){
143         WHEN("generating pythagorian triplets"){
144             using namespace std::chrono;
145             typedef steady_clock clock;
146 
147             auto so = rx::observe_on_event_loop();
148 
149             int c = 0;
150             int n = 1;
151             auto start = clock::now();
152             auto triples =
153                 rxs::range(1, so)
154                     .merge_transform(
__anon63d55a350e02(int z)155                         [&c, so](int z){
156                             return rxs::range(1, z, 1, so)
157                                 .merge_transform(
158                                     [&c, so, z](int x){
159                                         return rxs::range(x, z, 1, so)
160                                             .filter([&c, z, x](int y){
161                                                 ++c;
162                                                 if (x*x + y*y == z*z) {
163                                                     return true;}
164                                                 else {
165                                                     return false;}})
166                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
167                                             // forget type to workaround lambda deduction bug on msvc 2013
168                                             .as_dynamic();},
169                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
170                                     so)
171                                 // forget type to workaround lambda deduction bug on msvc 2013
172                                 .as_dynamic();},
__anon63d55a351302(int , std::tuple<int,int,int> triplet)173                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
174                         so);
175             int ct = triples
176                 .take(tripletCount)
177                 .as_blocking()
178                 .count();
179 
180             auto finish = clock::now();
181             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
182                    duration_cast<milliseconds>(start.time_since_epoch());
183             std::cout << "merge observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
184         }
185     }
186 }
187 
188 SCENARIO("serialize merge_transform pythagorian ranges", "[!hide][range][merge_transform][serialize][pythagorian][perf]"){
189     const int& tripletCount = static_tripletCount;
190     GIVEN("some ranges"){
191         WHEN("generating pythagorian triplets"){
192             using namespace std::chrono;
193             typedef steady_clock clock;
194 
195             auto so = rx::serialize_event_loop();
196 
197             int c = 0;
198             int n = 1;
199             auto start = clock::now();
200             auto triples =
201                 rxs::range(1, so)
202                     .merge_transform(
__anon63d55a351402(int z)203                         [&c, so](int z){
204                             return rxs::range(1, z, 1, so)
205                                 .merge_transform(
206                                     [&c, so, z](int x){
207                                         return rxs::range(x, z, 1, so)
208                                             .filter([&c, z, x](int y){
209                                                 ++c;
210                                                 if (x*x + y*y == z*z) {
211                                                     return true;}
212                                                 else {
213                                                     return false;}})
214                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
215                                             // forget type to workaround lambda deduction bug on msvc 2013
216                                             .as_dynamic();},
217                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
218                                     so)
219                                 // forget type to workaround lambda deduction bug on msvc 2013
220                                 .as_dynamic();},
__anon63d55a351902(int , std::tuple<int,int,int> triplet)221                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
222                         so);
223             int ct = triples
224                 .take(tripletCount)
225                 .as_blocking()
226                 .count();
227 
228             auto finish = clock::now();
229             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
230                    duration_cast<milliseconds>(start.time_since_epoch());
231             std::cout << "merge serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
232         }
233     }
234 }
235 
236 SCENARIO("flat_map completes", "[flat_map][map][operators]"){
237     GIVEN("two cold observables. one of ints. one of strings."){
238         auto sc = rxsc::make_test();
239         auto w = sc.create_worker();
240         const rxsc::test::messages<int> i_on;
241         const rxsc::test::messages<std::string> s_on;
242 
243         auto xs = sc.make_cold_observable({
244             i_on.next(100, 4),
245             i_on.next(200, 2),
246             i_on.next(300, 3),
247             i_on.next(400, 1),
248             i_on.completed(500)
249         });
250 
251         auto ys = sc.make_cold_observable({
252             s_on.next(50, "foo"),
253             s_on.next(100, "bar"),
254             s_on.next(150, "baz"),
255             s_on.next(200, "qux"),
256             s_on.completed(250)
257         });
258 
259         WHEN("each int is mapped to the strings"){
260 
261             auto res = w.start(
__anon63d55a351a02() 262                 [&]() {
263                     return xs
264                         | rxo::flat_map(
265                             [&](int){
266                                 return ys;},
267                             [](int, std::string s){
268                                 return s;})
269                         // forget type to workaround lambda deduction bug on msvc 2013
270                         | rxo::as_dynamic();
271                 }
272             );
273 
274             THEN("the output contains strings repeated for each int"){
275                 auto required = rxu::to_vector({
276                     s_on.next(350, "foo"),
277                     s_on.next(400, "bar"),
278                     s_on.next(450, "baz"),
279                     s_on.next(450, "foo"),
280                     s_on.next(500, "qux"),
281                     s_on.next(500, "bar"),
282                     s_on.next(550, "baz"),
283                     s_on.next(550, "foo"),
284                     s_on.next(600, "qux"),
285                     s_on.next(600, "bar"),
286                     s_on.next(650, "baz"),
287                     s_on.next(650, "foo"),
288                     s_on.next(700, "qux"),
289                     s_on.next(700, "bar"),
290                     s_on.next(750, "baz"),
291                     s_on.next(800, "qux"),
292                     s_on.completed(850)
293                 });
294                 auto actual = res.get_observer().messages();
295                 REQUIRE(required == actual);
296             }
297 
298             THEN("there was one subscription and one unsubscription to the ints"){
299                 auto required = rxu::to_vector({
300                     i_on.subscribe(200, 700)
301                 });
302                 auto actual = xs.subscriptions();
303                 REQUIRE(required == actual);
304             }
305 
306             THEN("there were four subscription and unsubscription to the strings"){
307                 auto required = rxu::to_vector({
308                     s_on.subscribe(300, 550),
309                     s_on.subscribe(400, 650),
310                     s_on.subscribe(500, 750),
311                     s_on.subscribe(600, 850)
312                 });
313                 auto actual = ys.subscriptions();
314                 REQUIRE(required == actual);
315             }
316         }
317     }
318 }
319 
320 SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){
321     GIVEN("two cold observables. one of ints. one of strings."){
322         auto sc = rxsc::make_test();
323         auto w = sc.create_worker();
324         const rxsc::test::messages<int> i_on;
325         const rxsc::test::messages<std::string> s_on;
326 
327         auto xs = sc.make_cold_observable({
328             i_on.next(100, 4),
329             i_on.next(200, 2),
330             i_on.next(300, 3),
331             i_on.next(400, 1),
332             i_on.completed(500)
333         });
334 
335         auto ys = sc.make_cold_observable({
336             s_on.next(50, "foo"),
337             s_on.next(100, "bar"),
338             s_on.next(150, "baz"),
339             s_on.next(200, "qux"),
340             s_on.completed(250)
341         });
342 
343         WHEN("each int is mapped to the strings"){
344 
345             auto res = w.start(
__anon63d55a351d02() 346                 [&]() {
347                     return xs
348                         | rxo::merge_transform(
349                             [&](int){
350                                 return ys;},
351                             [](int, std::string s){
352                                 return s;})
353                         // forget type to workaround lambda deduction bug on msvc 2013
354                         | rxo::as_dynamic();
355                 }
356             );
357 
358             THEN("the output contains strings repeated for each int"){
359                 auto required = rxu::to_vector({
360                     s_on.next(350, "foo"),
361                     s_on.next(400, "bar"),
362                     s_on.next(450, "baz"),
363                     s_on.next(450, "foo"),
364                     s_on.next(500, "qux"),
365                     s_on.next(500, "bar"),
366                     s_on.next(550, "baz"),
367                     s_on.next(550, "foo"),
368                     s_on.next(600, "qux"),
369                     s_on.next(600, "bar"),
370                     s_on.next(650, "baz"),
371                     s_on.next(650, "foo"),
372                     s_on.next(700, "qux"),
373                     s_on.next(700, "bar"),
374                     s_on.next(750, "baz"),
375                     s_on.next(800, "qux"),
376                     s_on.completed(850)
377                 });
378                 auto actual = res.get_observer().messages();
379                 REQUIRE(required == actual);
380             }
381 
382             THEN("there was one subscription and one unsubscription to the ints"){
383                 auto required = rxu::to_vector({
384                     i_on.subscribe(200, 700)
385                 });
386                 auto actual = xs.subscriptions();
387                 REQUIRE(required == actual);
388             }
389 
390             THEN("there were four subscription and unsubscription to the strings"){
391                 auto required = rxu::to_vector({
392                     s_on.subscribe(300, 550),
393                     s_on.subscribe(400, 650),
394                     s_on.subscribe(500, 750),
395                     s_on.subscribe(600, 850)
396                 });
397                 auto actual = ys.subscriptions();
398                 REQUIRE(required == actual);
399             }
400         }
401 
402         WHEN("each int is mapped to the strings with coordinator"){
403 
404             auto res = w.start(
__anon63d55a352002() 405                 [&]() {
406                     return xs
407                         .merge_transform(
408                             [&](int){
409                                 return ys;},
410                             [](int, std::string s){
411                                 return s;},
412                             rx::identity_current_thread())
413                         // forget type to workaround lambda deduction bug on msvc 2013
414                         .as_dynamic();
415                 }
416             );
417 
418             THEN("the output contains strings repeated for each int"){
419                 auto required = rxu::to_vector({
420                     s_on.next(350, "foo"),
421                     s_on.next(400, "bar"),
422                     s_on.next(450, "baz"),
423                     s_on.next(450, "foo"),
424                     s_on.next(500, "qux"),
425                     s_on.next(500, "bar"),
426                     s_on.next(550, "baz"),
427                     s_on.next(550, "foo"),
428                     s_on.next(600, "qux"),
429                     s_on.next(600, "bar"),
430                     s_on.next(650, "baz"),
431                     s_on.next(650, "foo"),
432                     s_on.next(700, "qux"),
433                     s_on.next(700, "bar"),
434                     s_on.next(750, "baz"),
435                     s_on.next(800, "qux"),
436                     s_on.completed(850)
437                 });
438                 auto actual = res.get_observer().messages();
439                 REQUIRE(required == actual);
440             }
441 
442             THEN("there was one subscription and one unsubscription to the ints"){
443                 auto required = rxu::to_vector({
444                     i_on.subscribe(200, 700)
445                 });
446                 auto actual = xs.subscriptions();
447                 REQUIRE(required == actual);
448             }
449 
450             THEN("there were four subscription and unsubscription to the strings"){
451                 auto required = rxu::to_vector({
452                     s_on.subscribe(300, 550),
453                     s_on.subscribe(400, 650),
454                     s_on.subscribe(500, 750),
455                     s_on.subscribe(600, 850)
456                 });
457                 auto actual = ys.subscriptions();
458                 REQUIRE(required == actual);
459             }
460         }
461     }
462 }
463 
464 SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){
465     GIVEN("two cold observables. one of ints. one of strings."){
466         auto sc = rxsc::make_test();
467         auto w = sc.create_worker();
468         const rxsc::test::messages<int> i_on;
469         const rxsc::test::messages<std::string> s_on;
470 
471         auto xs = sc.make_cold_observable({
472             i_on.next(100, 4),
473             i_on.next(200, 2),
474             i_on.next(300, 3),
475             i_on.next(400, 1),
476             i_on.next(500, 5),
477             i_on.next(700, 0)
478         });
479 
480         auto ys = sc.make_cold_observable({
481             s_on.next(50, "foo"),
482             s_on.next(100, "bar"),
483             s_on.next(150, "baz"),
484             s_on.next(200, "qux"),
485             s_on.completed(250)
486         });
487 
488         WHEN("each int is mapped to the strings"){
489 
490             auto res = w.start(
__anon63d55a352302() 491                 [&]() {
492                     return xs
493                         .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
494                         // forget type to workaround lambda deduction bug on msvc 2013
495                         .as_dynamic();
496                 }
497             );
498 
499             THEN("the output contains strings repeated for each int"){
500                 auto required = rxu::to_vector({
501                     s_on.next(350, "foo"),
502                     s_on.next(400, "bar"),
503                     s_on.next(450, "baz"),
504                     s_on.next(450, "foo"),
505                     s_on.next(500, "qux"),
506                     s_on.next(500, "bar"),
507                     s_on.next(550, "baz"),
508                     s_on.next(550, "foo"),
509                     s_on.next(600, "qux"),
510                     s_on.next(600, "bar"),
511                     s_on.next(650, "baz"),
512                     s_on.next(650, "foo"),
513                     s_on.next(700, "qux"),
514                     s_on.next(700, "bar"),
515                     s_on.next(750, "baz"),
516                     s_on.next(750, "foo"),
517                     s_on.next(800, "qux"),
518                     s_on.next(800, "bar"),
519                     s_on.next(850, "baz"),
520                     s_on.next(900, "qux"),
521                     s_on.next(950, "foo")
522                 });
523                 auto actual = res.get_observer().messages();
524                 REQUIRE(required == actual);
525             }
526 
527             THEN("there was one subscription and one unsubscription to the ints"){
528                 auto required = rxu::to_vector({
529                     i_on.subscribe(200, 1000)
530                 });
531                 auto actual = xs.subscriptions();
532                 REQUIRE(required == actual);
533             }
534 
535             THEN("there were four subscription and unsubscription to the strings"){
536                 auto required = rxu::to_vector({
537                     s_on.subscribe(300, 550),
538                     s_on.subscribe(400, 650),
539                     s_on.subscribe(500, 750),
540                     s_on.subscribe(600, 850),
541                     s_on.subscribe(700, 950),
542                     s_on.subscribe(900, 1000)
543                 });
544                 auto actual = ys.subscriptions();
545                 REQUIRE(required == actual);
546             }
547         }
548     }
549 }
550 
551 SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){
552     GIVEN("two cold observables. one of ints. one of strings."){
553         auto sc = rxsc::make_test();
554         auto w = sc.create_worker();
555         const rxsc::test::messages<int> i_on;
556         const rxsc::test::messages<std::string> s_on;
557 
558         auto xs = sc.make_cold_observable({
559             i_on.next(100, 4),
560             i_on.next(200, 2),
561             i_on.next(300, 3),
562             i_on.next(400, 1),
563             i_on.completed(500)
564         });
565 
566         std::runtime_error ex("filter on_error from inner source");
567 
568         auto ys = sc.make_cold_observable({
569             s_on.next(55, "foo"),
570             s_on.next(104, "bar"),
571             s_on.next(153, "baz"),
572             s_on.next(202, "qux"),
573             s_on.error(301, ex)
574         });
575 
576         WHEN("each int is mapped to the strings"){
577 
578             auto res = w.start(
__anon63d55a352602() 579                 [&]() {
580                     return xs
581                         .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
582                         // forget type to workaround lambda deduction bug on msvc 2013
583                         .as_dynamic();
584                 }
585             );
586 
587             THEN("the output contains strings repeated for each int"){
588                 auto required = rxu::to_vector({
589                     s_on.next(355, "foo"),
590                     s_on.next(404, "bar"),
591                     s_on.next(453, "baz"),
592                     s_on.next(455, "foo"),
593                     s_on.next(502, "qux"),
594                     s_on.next(504, "bar"),
595                     s_on.next(553, "baz"),
596                     s_on.next(555, "foo"),
597                     s_on.error(601, ex)
598                 });
599                 auto actual = res.get_observer().messages();
600                 REQUIRE(required == actual);
601             }
602 
603             THEN("there was one subscription and one unsubscription to the ints"){
604                 auto required = rxu::to_vector({
605                     i_on.subscribe(200, 601)
606                 });
607                 auto actual = xs.subscriptions();
608                 REQUIRE(required == actual);
609             }
610 
611             THEN("there were four subscription and unsubscription to the strings"){
612                 auto required = rxu::to_vector({
613                     s_on.subscribe(300, 601),
614                     s_on.subscribe(400, 601),
615                     s_on.subscribe(500, 601),
616                     s_on.subscribe(600, 601)
617                 });
618                 auto actual = ys.subscriptions();
619                 REQUIRE(required == actual);
620             }
621         }
622     }
623 }
624 
625 SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){
626     GIVEN("two cold observables. one of ints. one of strings."){
627         auto sc = rxsc::make_test();
628         auto w = sc.create_worker();
629         const rxsc::test::messages<int> i_on;
630         const rxsc::test::messages<std::string> s_on;
631 
632         auto xs = sc.make_cold_observable({
633             i_on.next(100, 4),
634             i_on.next(200, 2),
635             i_on.next(300, 3),
636             i_on.next(400, 1),
637             i_on.completed(500)
638         });
639 
640         auto ys = sc.make_cold_observable({
641             s_on.next(50, "foo"),
642             s_on.next(100, "bar"),
643             s_on.next(150, "baz"),
644             s_on.next(200, "qux"),
645             s_on.completed(250)
646         });
647 
648         WHEN("each int is mapped to the strings"){
649 
650             auto res = w.start(
__anon63d55a352902() 651                 [&]() {
652                     return xs
653                         .merge_transform(
654                             [&](int){
655                                 return ys;})
656                         // forget type to workaround lambda deduction bug on msvc 2013
657                         .as_dynamic();
658                 }
659             );
660 
661             THEN("the output contains strings repeated for each int"){
662                 auto required = rxu::to_vector({
663                     s_on.next(350, "foo"),
664                     s_on.next(400, "bar"),
665                     s_on.next(450, "baz"),
666                     s_on.next(450, "foo"),
667                     s_on.next(500, "qux"),
668                     s_on.next(500, "bar"),
669                     s_on.next(550, "baz"),
670                     s_on.next(550, "foo"),
671                     s_on.next(600, "qux"),
672                     s_on.next(600, "bar"),
673                     s_on.next(650, "baz"),
674                     s_on.next(650, "foo"),
675                     s_on.next(700, "qux"),
676                     s_on.next(700, "bar"),
677                     s_on.next(750, "baz"),
678                     s_on.next(800, "qux"),
679                     s_on.completed(850)
680                 });
681                 auto actual = res.get_observer().messages();
682                 REQUIRE(required == actual);
683             }
684 
685             THEN("there was one subscription and one unsubscription to the ints"){
686                 auto required = rxu::to_vector({
687                     i_on.subscribe(200, 700)
688                 });
689                 auto actual = xs.subscriptions();
690                 REQUIRE(required == actual);
691             }
692 
693             THEN("there were four subscription and unsubscription to the strings"){
694                 auto required = rxu::to_vector({
695                     s_on.subscribe(300, 550),
696                     s_on.subscribe(400, 650),
697                     s_on.subscribe(500, 750),
698                     s_on.subscribe(600, 850)
699                 });
700                 auto actual = ys.subscriptions();
701                 REQUIRE(required == actual);
702             }
703         }
704     }
705 }
706 
707 SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){
708     GIVEN("two cold observables. one of ints. one of strings."){
709         auto sc = rxsc::make_test();
710         auto w = sc.create_worker();
711         const rxsc::test::messages<int> i_on;
712         const rxsc::test::messages<std::string> s_on;
713 
714         auto xs = sc.make_cold_observable({
715             i_on.next(100, 4),
716             i_on.next(200, 2),
717             i_on.next(300, 3),
718             i_on.next(400, 1),
719             i_on.completed(500)
720         });
721 
722         auto ys = sc.make_cold_observable({
723             s_on.next(50, "foo"),
724             s_on.next(100, "bar"),
725             s_on.next(150, "baz"),
726             s_on.next(200, "qux"),
727             s_on.completed(250)
728         });
729 
730         WHEN("each int is mapped to the strings"){
731 
732             auto res = w.start(
__anon63d55a352b02() 733                 [&]() {
734                     return xs
735                         .merge_transform(
736                             [&](int){
737                                 return ys;},
738                             rx::identity_current_thread())
739                         // forget type to workaround lambda deduction bug on msvc 2013
740                         .as_dynamic();
741                 }
742             );
743 
744             THEN("the output contains strings repeated for each int"){
745                 auto required = rxu::to_vector({
746                     s_on.next(350, "foo"),
747                     s_on.next(400, "bar"),
748                     s_on.next(450, "baz"),
749                     s_on.next(450, "foo"),
750                     s_on.next(500, "qux"),
751                     s_on.next(500, "bar"),
752                     s_on.next(550, "baz"),
753                     s_on.next(550, "foo"),
754                     s_on.next(600, "qux"),
755                     s_on.next(600, "bar"),
756                     s_on.next(650, "baz"),
757                     s_on.next(650, "foo"),
758                     s_on.next(700, "qux"),
759                     s_on.next(700, "bar"),
760                     s_on.next(750, "baz"),
761                     s_on.next(800, "qux"),
762                     s_on.completed(850)
763                 });
764                 auto actual = res.get_observer().messages();
765                 REQUIRE(required == actual);
766             }
767 
768             THEN("there was one subscription and one unsubscription to the ints"){
769                 auto required = rxu::to_vector({
770                     i_on.subscribe(200, 700)
771                 });
772                 auto actual = xs.subscriptions();
773                 REQUIRE(required == actual);
774             }
775 
776             THEN("there were four subscription and unsubscription to the strings"){
777                 auto required = rxu::to_vector({
778                     s_on.subscribe(300, 550),
779                     s_on.subscribe(400, 650),
780                     s_on.subscribe(500, 750),
781                     s_on.subscribe(600, 850)
782                 });
783                 auto actual = ys.subscriptions();
784                 REQUIRE(required == actual);
785             }
786         }
787     }
788 }
789