• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include "rxcpp/operators/rx-combine_latest.hpp"
3 
4 SCENARIO("combine_latest interleaved with tail", "[combine_latest][join][operators]"){
5     GIVEN("2 hot observables of ints."){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto o1 = sc.make_hot_observable({
11             on.next(150, 1),
12             on.next(215, 2),
13             on.next(225, 4),
14             on.completed(230)
15         });
16 
17         auto o2 = sc.make_hot_observable({
18             on.next(150, 1),
19             on.next(220, 3),
20             on.next(230, 5),
21             on.next(235, 6),
22             on.next(240, 7),
23             on.completed(250)
24         });
25 
26         WHEN("each int is combined with the latest from the other source"){
27 
28             auto res = w.start(
__anonbe168adc0102() 29                 [&]() {
30                     return o2
31                         .combine_latest(
32                             [](int v2, int v1){
33                                 return v2 + v1;
34                             },
35                             o1
36                         )
37                         // forget type to workaround lambda deduction bug on msvc 2013
38                         .as_dynamic();
39                 }
40             );
41 
42             THEN("the output contains combined ints"){
43                 auto required = rxu::to_vector({
44                     on.next(220, 2 + 3),
45                     on.next(225, 4 + 3),
46                     on.next(230, 4 + 5),
47                     on.next(235, 4 + 6),
48                     on.next(240, 4 + 7),
49                     on.completed(250)
50                 });
51                 auto actual = res.get_observer().messages();
52                 REQUIRE(required == actual);
53             }
54 
55             THEN("there was one subscription and one unsubscription to the o1"){
56                 auto required = rxu::to_vector({
57                     on.subscribe(200, 230)
58                 });
59                 auto actual = o1.subscriptions();
60                 REQUIRE(required == actual);
61             }
62 
63             THEN("there was one subscription and one unsubscription to the o2"){
64                 auto required = rxu::to_vector({
65                     on.subscribe(200, 250)
66                 });
67                 auto actual = o2.subscriptions();
68                 REQUIRE(required == actual);
69             }
70         }
71     }
72 }
73 
74 SCENARIO("combine_latest consecutive", "[combine_latest][join][operators]"){
75     GIVEN("2 hot observables of ints."){
76         auto sc = rxsc::make_test();
77         auto w = sc.create_worker();
78         const rxsc::test::messages<int> on;
79 
80         auto o1 = sc.make_hot_observable({
81             on.next(150, 1),
82             on.next(215, 2),
83             on.next(225, 4),
84             on.completed(230)
85         });
86 
87         auto o2 = sc.make_hot_observable({
88             on.next(150, 1),
89             on.next(235, 6),
90             on.next(240, 7),
91             on.completed(250)
92         });
93 
94         WHEN("each int is combined with the latest from the other source"){
95 
96             auto res = w.start(
__anonbe168adc0302() 97                 [&]() {
98                     return o2
99                         .combine_latest(
100                             [](int v2, int v1){
101                                 return v2 + v1;
102                             },
103                             o1
104                         )
105                         // forget type to workaround lambda deduction bug on msvc 2013
106                         .as_dynamic();
107                 }
108             );
109 
110             THEN("the output contains combined ints"){
111                 auto required = rxu::to_vector({
112                     on.next(235, 4 + 6),
113                     on.next(240, 4 + 7),
114                     on.completed(250)
115                 });
116                 auto actual = res.get_observer().messages();
117                 REQUIRE(required == actual);
118             }
119 
120             THEN("there was one subscription and one unsubscription to the o1"){
121                 auto required = rxu::to_vector({
122                     on.subscribe(200, 230)
123                 });
124                 auto actual = o1.subscriptions();
125                 REQUIRE(required == actual);
126             }
127 
128             THEN("there was one subscription and one unsubscription to the o2"){
129                 auto required = rxu::to_vector({
130                     on.subscribe(200, 250)
131                 });
132                 auto actual = o2.subscriptions();
133                 REQUIRE(required == actual);
134             }
135         }
136     }
137 }
138 
139 SCENARIO("combine_latest consecutive ends with error left", "[combine_latest][join][operators]"){
140     GIVEN("2 hot observables of ints."){
141         auto sc = rxsc::make_test();
142         auto w = sc.create_worker();
143         const rxsc::test::messages<int> on;
144 
145         std::runtime_error ex("combine_latest on_error from source");
146 
147         auto o1 = sc.make_hot_observable({
148             on.next(150, 1),
149             on.next(215, 2),
150             on.next(225, 4),
151             on.error(230, ex)
152         });
153 
154         auto o2 = sc.make_hot_observable({
155             on.next(150, 1),
156             on.next(235, 6),
157             on.next(240, 7),
158             on.completed(250)
159         });
160 
161         WHEN("each int is combined with the latest from the other source"){
162 
163             auto res = w.start(
__anonbe168adc0502() 164                 [&]() {
165                     return o2
166                         .combine_latest(
167                             [](int v2, int v1){
168                                 return v2 + v1;
169                             },
170                             o1
171                         )
172                         // forget type to workaround lambda deduction bug on msvc 2013
173                         .as_dynamic();
174                 }
175             );
176 
177             THEN("the output contains only an error"){
178                 auto required = rxu::to_vector({
179                     on.error(230, ex)
180                 });
181                 auto actual = res.get_observer().messages();
182                 REQUIRE(required == actual);
183             }
184 
185             THEN("there was one subscription and one unsubscription to the o1"){
186                 auto required = rxu::to_vector({
187                     on.subscribe(200, 230)
188                 });
189                 auto actual = o1.subscriptions();
190                 REQUIRE(required == actual);
191             }
192 
193             THEN("there was one subscription and one unsubscription to the o2"){
194                 auto required = rxu::to_vector({
195                     on.subscribe(200, 230)
196                 });
197                 auto actual = o2.subscriptions();
198                 REQUIRE(required == actual);
199             }
200         }
201     }
202 }
203 
204 SCENARIO("combine_latest consecutive ends with error right", "[combine_latest][join][operators]"){
205     GIVEN("2 hot observables of ints."){
206         auto sc = rxsc::make_test();
207         auto w = sc.create_worker();
208         const rxsc::test::messages<int> on;
209 
210         std::runtime_error ex("combine_latest on_error from source");
211 
212         auto o1 = sc.make_hot_observable({
213             on.next(150, 1),
214             on.next(215, 2),
215             on.next(225, 4),
216             on.completed(250)
217         });
218 
219         auto o2 = sc.make_hot_observable({
220             on.next(150, 1),
221             on.next(235, 6),
222             on.next(240, 7),
223             on.error(245, ex)
224         });
225 
226         WHEN("each int is combined with the latest from the other source"){
227 
228             auto res = w.start(
__anonbe168adc0702() 229                 [&]() {
230                     return o2
231                         .combine_latest(
232                             [](int v2, int v1){
233                                 return v2 + v1;
234                             },
235                             o1
236                         )
237                         // forget type to workaround lambda deduction bug on msvc 2013
238                         .as_dynamic();
239                 }
240             );
241 
242             THEN("the output contains combined ints followed by an error"){
243                 auto required = rxu::to_vector({
244                     on.next(235, 4 + 6),
245                     on.next(240, 4 + 7),
246                     on.error(245, ex)
247                 });
248                 auto actual = res.get_observer().messages();
249                 REQUIRE(required == actual);
250             }
251 
252             THEN("there was one subscription and one unsubscription to the o1"){
253                 auto required = rxu::to_vector({
254                     on.subscribe(200, 245)
255                 });
256                 auto actual = o1.subscriptions();
257                 REQUIRE(required == actual);
258             }
259 
260             THEN("there was one subscription and one unsubscription to the o2"){
261                 auto required = rxu::to_vector({
262                     on.subscribe(200, 245)
263                 });
264                 auto actual = o2.subscriptions();
265                 REQUIRE(required == actual);
266             }
267         }
268     }
269 }
270 
271 SCENARIO("combine_latest never N", "[combine_latest][join][operators]"){
272     GIVEN("N never completed hot observables of ints."){
273         auto sc = rxsc::make_test();
274         auto w = sc.create_worker();
275         const rxsc::test::messages<int> on;
276 
277         const int N = 4;
278 
279         std::vector<rxcpp::test::testable_observable<int>> n;
280         for (int i = 0; i < N; ++i) {
281             n.push_back(
282                 sc.make_hot_observable({
283                     on.next(150, 1)
284                 })
285             );
286         }
287 
288         WHEN("each int is combined with the latest from the other source"){
289 
290             auto res = w.start(
__anonbe168adc0902() 291                 [&]() {
292                     return n[0]
293                         .combine_latest(
294                             [](int v0, int v1, int v2, int v3){
295                                 return v0 + v1 + v2 + v3;
296                             },
297                             n[1], n[2], n[3]
298                         )
299                         // forget type to workaround lambda deduction bug on msvc 2013
300                         .as_dynamic();
301                 }
302             );
303 
304             THEN("the output is empty"){
305                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
306                 auto actual = res.get_observer().messages();
307                 REQUIRE(required == actual);
308             }
309 
310             THEN("there was one subscription and one unsubscription to each observable"){
311 
__anonbe168adc0b02(rxcpp::test::testable_observable<int> &s)312                 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){
313                     auto required = rxu::to_vector({
314                         on.subscribe(200, 1000)
315                     });
316                     auto actual = s.subscriptions();
317                     REQUIRE(required == actual);
318                 });
319             }
320         }
321     }
322 }
323 
324 SCENARIO("combine_latest empty N", "[combine_latest][join][operators]"){
325     GIVEN("N empty hot observables of ints."){
326         auto sc = rxsc::make_test();
327         auto w = sc.create_worker();
328         const rxsc::test::messages<int> on;
329 
330         const int N = 4;
331 
332         std::vector<rxcpp::test::testable_observable<int>> e;
333         for (int i = 0; i < N; ++i) {
334             e.push_back(
335                 sc.make_hot_observable({
336                     on.next(150, 1),
337                     on.completed(210 + 10 * i)
338                 })
339             );
340         }
341 
342         WHEN("each int is combined with the latest from the other source"){
343 
344             auto res = w.start(
__anonbe168adc0c02() 345                 [&]() {
346                     return e[0]
347                         .combine_latest(
348                             [](int v0, int v1, int v2, int v3){
349                                 return v0 + v1 + v2 + v3;
350                             },
351                             e[1], e[2], e[3]
352                         )
353                         // forget type to workaround lambda deduction bug on msvc 2013
354                         .as_dynamic();
355                 }
356             );
357 
358             THEN("the output contains only complete message"){
359                 auto required = rxu::to_vector({
360                     on.completed(200 + 10 * N)
361                 });
362                 auto actual = res.get_observer().messages();
363                 REQUIRE(required == actual);
364             }
365 
366             THEN("there was one subscription and one unsubscription to each observable"){
367 
368                 int i = 0;
__anonbe168adc0e02(rxcpp::test::testable_observable<int> &s)369                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
370                     auto required = rxu::to_vector({
371                         on.subscribe(200, 200 + 10 * ++i)
372                     });
373                     auto actual = s.subscriptions();
374                     REQUIRE(required == actual);
375                 });
376             }
377         }
378     }
379 }
380 
381 SCENARIO("combine_latest never/empty", "[combine_latest][join][operators]"){
382     GIVEN("2 hot observables of ints."){
383         auto sc = rxsc::make_test();
384         auto w = sc.create_worker();
385         const rxsc::test::messages<int> on;
386 
387         auto n = sc.make_hot_observable({
388             on.next(150, 1)
389         });
390 
391         auto e = sc.make_hot_observable({
392             on.next(150, 1),
393             on.completed(210)
394         });
395 
396         WHEN("each int is combined with the latest from the other source"){
397 
398             auto res = w.start(
__anonbe168adc0f02() 399                 [&]() {
400                     return n
401                         .combine_latest(
402                             [](int v2, int v1){
403                                 return v2 + v1;
404                             },
405                             e
406                         )
407                         // forget type to workaround lambda deduction bug on msvc 2013
408                         .as_dynamic();
409                 }
410             );
411 
412             THEN("the output is empty"){
413                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
414                 auto actual = res.get_observer().messages();
415                 REQUIRE(required == actual);
416             }
417 
418             THEN("there was one subscription and one unsubscription to the n"){
419                 auto required = rxu::to_vector({
420                     on.subscribe(200, 1000)
421                 });
422                 auto actual = n.subscriptions();
423                 REQUIRE(required == actual);
424             }
425 
426             THEN("there was one subscription and one unsubscription to the e"){
427                 auto required = rxu::to_vector({
428                     on.subscribe(200, 210)
429                 });
430                 auto actual = e.subscriptions();
431                 REQUIRE(required == actual);
432             }
433         }
434     }
435 }
436 
437 SCENARIO("combine_latest empty/never", "[combine_latest][join][operators]"){
438     GIVEN("2 hot observables of ints."){
439         auto sc = rxsc::make_test();
440         auto w = sc.create_worker();
441         const rxsc::test::messages<int> on;
442 
443         auto e = sc.make_hot_observable({
444             on.next(150, 1),
445             on.completed(210)
446         });
447 
448         auto n = sc.make_hot_observable({
449             on.next(150, 1)
450         });
451 
452         WHEN("each int is combined with the latest from the other source"){
453 
454             auto res = w.start(
__anonbe168adc1102() 455                 [&]() {
456                     return e
457                         .combine_latest(
458                             [](int v2, int v1){
459                                 return v2 + v1;
460                             },
461                             n
462                         )
463                         // forget type to workaround lambda deduction bug on msvc 2013
464                         .as_dynamic();
465                 }
466             );
467 
468             THEN("the output is empty"){
469                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
470                 auto actual = res.get_observer().messages();
471                 REQUIRE(required == actual);
472             }
473 
474             THEN("there was one subscription and one unsubscription to the e"){
475                 auto required = rxu::to_vector({
476                     on.subscribe(200, 210)
477                 });
478                 auto actual = e.subscriptions();
479                 REQUIRE(required == actual);
480             }
481 
482             THEN("there was one subscription and one unsubscription to the n"){
483                 auto required = rxu::to_vector({
484                     on.subscribe(200, 1000)
485                 });
486                 auto actual = n.subscriptions();
487                 REQUIRE(required == actual);
488             }
489         }
490     }
491 }
492 
493 SCENARIO("combine_latest empty/return", "[combine_latest][join][operators]"){
494     GIVEN("2 hot observables of ints."){
495         auto sc = rxsc::make_test();
496         auto w = sc.create_worker();
497         const rxsc::test::messages<int> on;
498 
499         auto e = sc.make_hot_observable({
500             on.next(150, 1),
501             on.completed(210)
502         });
503 
504         auto o = sc.make_hot_observable({
505             on.next(150, 1),
506             on.next(215, 2),
507             on.completed(220)
508         });
509 
510         WHEN("each int is combined with the latest from the other source"){
511 
512             auto res = w.start(
__anonbe168adc1302() 513                 [&]() {
514                     return e
515                         .combine_latest(
516                             [](int v2, int v1){
517                                 return v2 + v1;
518                             },
519                             o
520                         )
521                         // forget type to workaround lambda deduction bug on msvc 2013
522                         .as_dynamic();
523                 }
524             );
525 
526             THEN("the output contains only complete message"){
527                 auto required = rxu::to_vector({
528                     on.completed(220)
529                 });
530                 auto actual = res.get_observer().messages();
531                 REQUIRE(required == actual);
532             }
533 
534             THEN("there was one subscription and one unsubscription to the e"){
535                 auto required = rxu::to_vector({
536                     on.subscribe(200, 210)
537                 });
538                 auto actual = e.subscriptions();
539                 REQUIRE(required == actual);
540             }
541 
542             THEN("there was one subscription and one unsubscription to the o"){
543                 auto required = rxu::to_vector({
544                     on.subscribe(200, 220)
545                 });
546                 auto actual = o.subscriptions();
547                 REQUIRE(required == actual);
548             }
549         }
550     }
551 }
552 
553 SCENARIO("combine_latest return/empty", "[combine_latest][join][operators]"){
554     GIVEN("2 hot observables of ints."){
555         auto sc = rxsc::make_test();
556         auto w = sc.create_worker();
557         const rxsc::test::messages<int> on;
558 
559         auto o = sc.make_hot_observable({
560             on.next(150, 1),
561             on.next(215, 2),
562             on.completed(220)
563         });
564 
565         auto e = sc.make_hot_observable({
566             on.next(150, 1),
567             on.completed(210)
568         });
569 
570         WHEN("each int is combined with the latest from the other source"){
571 
572             auto res = w.start(
__anonbe168adc1502() 573                 [&]() {
574                     return o
575                         .combine_latest(
576                             [](int v2, int v1){
577                                 return v2 + v1;
578                             },
579                             e
580                         )
581                         // forget type to workaround lambda deduction bug on msvc 2013
582                         .as_dynamic();
583                 }
584             );
585 
586             THEN("the output contains only complete message"){
587                 auto required = rxu::to_vector({
588                     on.completed(220)
589                 });
590                 auto actual = res.get_observer().messages();
591                 REQUIRE(required == actual);
592             }
593 
594             THEN("there was one subscription and one unsubscription to the o"){
595                 auto required = rxu::to_vector({
596                     on.subscribe(200, 220)
597                 });
598                 auto actual = o.subscriptions();
599                 REQUIRE(required == actual);
600             }
601 
602             THEN("there was one subscription and one unsubscription to the e"){
603                 auto required = rxu::to_vector({
604                     on.subscribe(200, 210)
605                 });
606                 auto actual = e.subscriptions();
607                 REQUIRE(required == actual);
608             }
609         }
610     }
611 }
612 
613 SCENARIO("combine_latest never/return", "[combine_latest][join][operators]"){
614     GIVEN("2 hot observables of ints."){
615         auto sc = rxsc::make_test();
616         auto w = sc.create_worker();
617         const rxsc::test::messages<int> on;
618 
619         auto n = sc.make_hot_observable({
620             on.next(150, 1)
621         });
622 
623         auto o = sc.make_hot_observable({
624             on.next(150, 1),
625             on.next(215, 2),
626             on.completed(220)
627         });
628 
629         WHEN("each int is combined with the latest from the other source"){
630 
631             auto res = w.start(
__anonbe168adc1702() 632                 [&]() {
633                     return n
634                         .combine_latest(
635                             [](int v2, int v1){
636                                 return v2 + v1;
637                             },
638                             o
639                         )
640                         // forget type to workaround lambda deduction bug on msvc 2013
641                         .as_dynamic();
642                 }
643             );
644 
645             THEN("the output is empty"){
646                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
647                 auto actual = res.get_observer().messages();
648                 REQUIRE(required == actual);
649             }
650 
651             THEN("there was one subscription and one unsubscription to the n"){
652                 auto required = rxu::to_vector({
653                     on.subscribe(200, 1000)
654                 });
655                 auto actual = n.subscriptions();
656                 REQUIRE(required == actual);
657             }
658 
659             THEN("there was one subscription and one unsubscription to the o"){
660                 auto required = rxu::to_vector({
661                     on.subscribe(200, 220)
662                 });
663                 auto actual = o.subscriptions();
664                 REQUIRE(required == actual);
665             }
666         }
667     }
668 }
669 
670 SCENARIO("combine_latest return/never", "[combine_latest][join][operators]"){
671     GIVEN("2 hot observables of ints."){
672         auto sc = rxsc::make_test();
673         auto w = sc.create_worker();
674         const rxsc::test::messages<int> on;
675 
676         auto o = sc.make_hot_observable({
677             on.next(150, 1),
678             on.next(215, 2),
679             on.completed(220)
680         });
681 
682         auto n = sc.make_hot_observable({
683             on.next(150, 1)
684         });
685 
686         WHEN("each int is combined with the latest from the other source"){
687 
688             auto res = w.start(
__anonbe168adc1902() 689                 [&]() {
690                     return o
691                         .combine_latest(
692                             [](int v2, int v1){
693                                 return v2 + v1;
694                             },
695                             n
696                         )
697                         // forget type to workaround lambda deduction bug on msvc 2013
698                         .as_dynamic();
699                 }
700             );
701 
702             THEN("the output is empty"){
703                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
704                 auto actual = res.get_observer().messages();
705                 REQUIRE(required == actual);
706             }
707 
708             THEN("there was one subscription and one unsubscription to the n"){
709                 auto required = rxu::to_vector({
710                     on.subscribe(200, 1000)
711                 });
712                 auto actual = n.subscriptions();
713                 REQUIRE(required == actual);
714             }
715 
716             THEN("there was one subscription and one unsubscription to the o"){
717                 auto required = rxu::to_vector({
718                     on.subscribe(200, 220)
719                 });
720                 auto actual = o.subscriptions();
721                 REQUIRE(required == actual);
722             }
723         }
724     }
725 }
726 
727 
728 SCENARIO("combine_latest return/return", "[combine_latest][join][operators]"){
729     GIVEN("2 hot observables of ints."){
730         auto sc = rxsc::make_test();
731         auto w = sc.create_worker();
732         const rxsc::test::messages<int> on;
733 
734         auto o1 = sc.make_hot_observable({
735             on.next(150, 1),
736             on.next(215, 2),
737             on.completed(230)
738         });
739 
740         auto o2 = sc.make_hot_observable({
741             on.next(150, 1),
742             on.next(220, 3),
743             on.completed(240)
744         });
745 
746         WHEN("each int is combined with the latest from the other source"){
747 
748             auto res = w.start(
__anonbe168adc1b02() 749                 [&]() {
750                     return o1
751                         .combine_latest(
752                             [](int v2, int v1){
753                              return v2 + v1;
754                             },
755                             o2
756                         )
757                         // forget type to workaround lambda deduction bug on msvc 2013
758                         .as_dynamic();
759                 }
760             );
761 
762             THEN("the output contains combined ints"){
763                 auto required = rxu::to_vector({
764                     on.next(220, 2 + 3),
765                     on.completed(240)
766                 });
767                 auto actual = res.get_observer().messages();
768                 REQUIRE(required == actual);
769             }
770 
771             THEN("there was one subscription and one unsubscription to the o1"){
772                 auto required = rxu::to_vector({
773                     on.subscribe(200, 230)
774                 });
775                 auto actual = o1.subscriptions();
776                 REQUIRE(required == actual);
777             }
778 
779             THEN("there was one subscription and one unsubscription to the o2"){
780                 auto required = rxu::to_vector({
781                     on.subscribe(200, 240)
782                 });
783                 auto actual = o2.subscriptions();
784                 REQUIRE(required == actual);
785             }
786         }
787     }
788 }
789 
790 SCENARIO("combine_latest empty/error", "[combine_latest][join][operators]"){
791     GIVEN("2 hot observables of ints."){
792         auto sc = rxsc::make_test();
793         auto w = sc.create_worker();
794         const rxsc::test::messages<int> on;
795 
796         std::runtime_error ex("combine_latest on_error from source");
797 
798         auto emp = sc.make_hot_observable({
799             on.next(150, 1),
800             on.completed(230)
801         });
802 
803         auto err = sc.make_hot_observable({
804             on.next(150, 1),
805             on.error(220, ex)
806         });
807 
808         WHEN("each int is combined with the latest from the other source"){
809 
810             auto res = w.start(
__anonbe168adc1d02() 811                 [&]() {
812                     return emp
813                         .combine_latest(
814                             [](int v2, int v1){
815                                 return v2 + v1;
816                             },
817                             err
818                         )
819                         // forget type to workaround lambda deduction bug on msvc 2013
820                         .as_dynamic();
821                 }
822             );
823 
824             THEN("the output contains only error message"){
825                 auto required = rxu::to_vector({
826                     on.error(220, ex)
827                 });
828                 auto actual = res.get_observer().messages();
829                 REQUIRE(required == actual);
830             }
831 
832             THEN("there was one subscription and one unsubscription to the emp"){
833                 auto required = rxu::to_vector({
834                     on.subscribe(200, 220)
835                 });
836                 auto actual = emp.subscriptions();
837                 REQUIRE(required == actual);
838             }
839 
840             THEN("there was one subscription and one unsubscription to the err"){
841                 auto required = rxu::to_vector({
842                     on.subscribe(200, 220)
843                 });
844                 auto actual = err.subscriptions();
845                 REQUIRE(required == actual);
846             }
847         }
848     }
849 }
850 
851 SCENARIO("combine_latest error/empty", "[combine_latest][join][operators]"){
852     GIVEN("2 hot observables of ints."){
853         auto sc = rxsc::make_test();
854         auto w = sc.create_worker();
855         const rxsc::test::messages<int> on;
856 
857         std::runtime_error ex("combine_latest on_error from source");
858 
859         auto err = sc.make_hot_observable({
860             on.next(150, 1),
861             on.error(220, ex)
862         });
863 
864         auto emp = sc.make_hot_observable({
865             on.next(150, 1),
866             on.completed(230)
867         });
868 
869         WHEN("each int is combined with the latest from the other source"){
870 
871             auto res = w.start(
__anonbe168adc1f02() 872                 [&]() {
873                     return err
874                         .combine_latest(
875                             [](int v2, int v1){
876                                 return v2 + v1;
877                             },
878                             emp
879                         )
880                         // forget type to workaround lambda deduction bug on msvc 2013
881                         .as_dynamic();
882                 }
883             );
884 
885             THEN("the output contains only error message"){
886                 auto required = rxu::to_vector({
887                     on.error(220, ex)
888                 });
889                 auto actual = res.get_observer().messages();
890                 REQUIRE(required == actual);
891             }
892 
893             THEN("there was one subscription and one unsubscription to the emp"){
894                 auto required = rxu::to_vector({
895                     on.subscribe(200, 220)
896                 });
897                 auto actual = emp.subscriptions();
898                 REQUIRE(required == actual);
899             }
900 
901             THEN("there was one subscription and one unsubscription to the err"){
902                 auto required = rxu::to_vector({
903                     on.subscribe(200, 220)
904                 });
905                 auto actual = err.subscriptions();
906                 REQUIRE(required == actual);
907             }
908         }
909     }
910 }
911 
912 SCENARIO("combine_latest return/error", "[combine_latest][join][operators]"){
913     GIVEN("2 hot observables of ints."){
914         auto sc = rxsc::make_test();
915         auto w = sc.create_worker();
916         const rxsc::test::messages<int> on;
917 
918         std::runtime_error ex("combine_latest on_error from source");
919 
920         auto o = sc.make_hot_observable({
921             on.next(150, 1),
922             on.next(210, 2),
923             on.completed(230)
924         });
925 
926         auto err = sc.make_hot_observable({
927             on.next(150, 1),
928             on.error(220, ex)
929         });
930 
931         WHEN("each int is combined with the latest from the other source"){
932 
933             auto res = w.start(
__anonbe168adc2102() 934                 [&]() {
935                     return o
936                         .combine_latest(
937                             [](int v2, int v1){
938                                 return v2 + v1;
939                             },
940                             err
941                         )
942                         // forget type to workaround lambda deduction bug on msvc 2013
943                         .as_dynamic();
944                 }
945             );
946 
947             THEN("the output contains only error message"){
948                 auto required = rxu::to_vector({
949                     on.error(220, ex)
950                 });
951                 auto actual = res.get_observer().messages();
952                 REQUIRE(required == actual);
953             }
954 
955             THEN("there was one subscription and one unsubscription to the ret"){
956                 auto required = rxu::to_vector({
957                     on.subscribe(200, 220)
958                 });
959                 auto actual = o.subscriptions();
960                 REQUIRE(required == actual);
961             }
962 
963             THEN("there was one subscription and one unsubscription to the err"){
964                 auto required = rxu::to_vector({
965                     on.subscribe(200, 220)
966                 });
967                 auto actual = err.subscriptions();
968                 REQUIRE(required == actual);
969             }
970         }
971     }
972 }
973 
974 SCENARIO("combine_latest error/return", "[combine_latest][join][operators]"){
975     GIVEN("2 hot observables of ints."){
976         auto sc = rxsc::make_test();
977         auto w = sc.create_worker();
978         const rxsc::test::messages<int> on;
979 
980         std::runtime_error ex("combine_latest on_error from source");
981 
982         auto err = sc.make_hot_observable({
983             on.next(150, 1),
984             on.error(220, ex)
985         });
986 
987         auto ret = sc.make_hot_observable({
988             on.next(150, 1),
989             on.next(210, 2),
990             on.completed(230)
991         });
992 
993         WHEN("each int is combined with the latest from the other source"){
994 
995             auto res = w.start(
__anonbe168adc2302() 996                 [&]() {
997                     return err
998                         .combine_latest(
999                             [](int v2, int v1){
1000                                 return v2 + v1;
1001                             },
1002                             ret
1003                         )
1004                         // forget type to workaround lambda deduction bug on msvc 2013
1005                         .as_dynamic();
1006                 }
1007             );
1008 
1009             THEN("the output contains only error message"){
1010                 auto required = rxu::to_vector({
1011                     on.error(220, ex)
1012                 });
1013                 auto actual = res.get_observer().messages();
1014                 REQUIRE(required == actual);
1015             }
1016 
1017             THEN("there was one subscription and one unsubscription to the ret"){
1018                 auto required = rxu::to_vector({
1019                     on.subscribe(200, 220)
1020                 });
1021                 auto actual = ret.subscriptions();
1022                 REQUIRE(required == actual);
1023             }
1024 
1025             THEN("there was one subscription and one unsubscription to the err"){
1026                 auto required = rxu::to_vector({
1027                     on.subscribe(200, 220)
1028                 });
1029                 auto actual = err.subscriptions();
1030                 REQUIRE(required == actual);
1031             }
1032         }
1033     }
1034 }
1035 
1036 SCENARIO("combine_latest error/error", "[combine_latest][join][operators]"){
1037     GIVEN("2 hot observables of ints."){
1038         auto sc = rxsc::make_test();
1039         auto w = sc.create_worker();
1040         const rxsc::test::messages<int> on;
1041 
1042         std::runtime_error ex1("combine_latest on_error from source 1");
1043         std::runtime_error ex2("combine_latest on_error from source 2");
1044 
1045         auto err1 = sc.make_hot_observable({
1046             on.next(150, 1),
1047             on.error(220, ex1)
1048         });
1049 
1050         auto err2 = sc.make_hot_observable({
1051             on.next(150, 1),
1052             on.error(230, ex2)
1053         });
1054 
1055         WHEN("each int is combined with the latest from the other source"){
1056 
1057             auto res = w.start(
__anonbe168adc2502() 1058                 [&]() {
1059                     return err1
1060                         .combine_latest(
1061                             [](int v2, int v1){
1062                                 return v2 + v1;
1063                             },
1064                             err2
1065                         )
1066                         // forget type to workaround lambda deduction bug on msvc 2013
1067                         .as_dynamic();
1068                 }
1069             );
1070 
1071             THEN("the output contains only error message"){
1072                 auto required = rxu::to_vector({
1073                     on.error(220, ex1)
1074                 });
1075                 auto actual = res.get_observer().messages();
1076                 REQUIRE(required == actual);
1077             }
1078 
1079             THEN("there was one subscription and one unsubscription to the err1"){
1080                 auto required = rxu::to_vector({
1081                     on.subscribe(200, 220)
1082                 });
1083                 auto actual = err1.subscriptions();
1084                 REQUIRE(required == actual);
1085             }
1086 
1087             THEN("there was one subscription and one unsubscription to the err2"){
1088                 auto required = rxu::to_vector({
1089                     on.subscribe(200, 220)
1090                 });
1091                 auto actual = err2.subscriptions();
1092                 REQUIRE(required == actual);
1093             }
1094         }
1095     }
1096 }
1097 
1098 SCENARIO("combine_latest next+error/error", "[combine_latest][join][operators]"){
1099     GIVEN("2 hot observables of ints."){
1100         auto sc = rxsc::make_test();
1101         auto w = sc.create_worker();
1102         const rxsc::test::messages<int> on;
1103 
1104         std::runtime_error ex1("combine_latest on_error from source 1");
1105         std::runtime_error ex2("combine_latest on_error from source 2");
1106 
1107         auto err1 = sc.make_hot_observable({
1108             on.next(150, 1),
1109             on.next(210, 2),
1110             on.error(220, ex1)
1111         });
1112 
1113         auto err2 = sc.make_hot_observable({
1114             on.next(150, 1),
1115             on.error(230, ex2)
1116         });
1117 
1118         WHEN("each int is combined with the latest from the other source"){
1119 
1120             auto res = w.start(
__anonbe168adc2702() 1121                 [&]() {
1122                     return err1
1123                         .combine_latest(
1124                             [](int v2, int v1){
1125                                 return v2 + v1;
1126                             },
1127                             err2
1128                         )
1129                         // forget type to workaround lambda deduction bug on msvc 2013
1130                         .as_dynamic();
1131                 }
1132             );
1133 
1134             THEN("the output contains only error message"){
1135                 auto required = rxu::to_vector({
1136                     on.error(220, ex1)
1137                 });
1138                 auto actual = res.get_observer().messages();
1139                 REQUIRE(required == actual);
1140             }
1141 
1142             THEN("there was one subscription and one unsubscription to the err1"){
1143                 auto required = rxu::to_vector({
1144                     on.subscribe(200, 220)
1145                 });
1146                 auto actual = err1.subscriptions();
1147                 REQUIRE(required == actual);
1148             }
1149 
1150             THEN("there was one subscription and one unsubscription to the err2"){
1151                 auto required = rxu::to_vector({
1152                     on.subscribe(200, 220)
1153                 });
1154                 auto actual = err2.subscriptions();
1155                 REQUIRE(required == actual);
1156             }
1157         }
1158     }
1159 }
1160 
1161 SCENARIO("combine_latest error/next+error", "[combine_latest][join][operators]"){
1162     GIVEN("2 hot observables of ints."){
1163         auto sc = rxsc::make_test();
1164         auto w = sc.create_worker();
1165         const rxsc::test::messages<int> on;
1166 
1167         std::runtime_error ex1("combine_latest on_error from source 1");
1168         std::runtime_error ex2("combine_latest on_error from source 2");
1169 
1170         auto err1 = sc.make_hot_observable({
1171             on.next(150, 1),
1172             on.error(230, ex1)
1173         });
1174 
1175         auto err2 = sc.make_hot_observable({
1176             on.next(150, 1),
1177             on.next(210, 2),
1178             on.error(220, ex2)
1179         });
1180 
1181         WHEN("each int is combined with the latest from the other source"){
1182 
1183             auto res = w.start(
__anonbe168adc2902() 1184                 [&]() {
1185                     return err1
1186                         .combine_latest(
1187                             [](int v2, int v1){
1188                                 return v2 + v1;
1189                             },
1190                             err2
1191                         )
1192                         // forget type to workaround lambda deduction bug on msvc 2013
1193                         .as_dynamic();
1194                 }
1195             );
1196 
1197             THEN("the output contains only error message"){
1198                 auto required = rxu::to_vector({
1199                     on.error(220, ex2)
1200                 });
1201                 auto actual = res.get_observer().messages();
1202                 REQUIRE(required == actual);
1203             }
1204 
1205             THEN("there was one subscription and one unsubscription to the err1"){
1206                 auto required = rxu::to_vector({
1207                     on.subscribe(200, 220)
1208                 });
1209                 auto actual = err1.subscriptions();
1210                 REQUIRE(required == actual);
1211             }
1212 
1213             THEN("there was one subscription and one unsubscription to the err2"){
1214                 auto required = rxu::to_vector({
1215                     on.subscribe(200, 220)
1216                 });
1217                 auto actual = err2.subscriptions();
1218                 REQUIRE(required == actual);
1219             }
1220         }
1221     }
1222 }
1223 
1224 SCENARIO("combine_latest never/error", "[combine_latest][join][operators]"){
1225     GIVEN("2 hot observables of ints."){
1226         auto sc = rxsc::make_test();
1227         auto w = sc.create_worker();
1228         const rxsc::test::messages<int> on;
1229 
1230         std::runtime_error ex("combine_latest on_error from source");
1231 
1232         auto n = sc.make_hot_observable({
1233             on.next(150, 1)
1234         });
1235 
1236         auto err = sc.make_hot_observable({
1237             on.next(150, 1),
1238             on.error(220, ex)
1239         });
1240 
1241         WHEN("each int is combined with the latest from the other source"){
1242 
1243             auto res = w.start(
__anonbe168adc2b02() 1244                 [&]() {
1245                     return n
1246                         .combine_latest(
1247                             [](int v2, int v1){
1248                                 return v2 + v1;
1249                             },
1250                             err
1251                         )
1252                         // forget type to workaround lambda deduction bug on msvc 2013
1253                         .as_dynamic();
1254                 }
1255             );
1256 
1257             THEN("the output contains only error message"){
1258                 auto required = rxu::to_vector({
1259                     on.error(220, ex)
1260                 });
1261                 auto actual = res.get_observer().messages();
1262                 REQUIRE(required == actual);
1263             }
1264 
1265             THEN("there was one subscription and one unsubscription to the n"){
1266                 auto required = rxu::to_vector({
1267                     on.subscribe(200, 220)
1268                 });
1269                 auto actual = n.subscriptions();
1270                 REQUIRE(required == actual);
1271             }
1272 
1273             THEN("there was one subscription and one unsubscription to the err"){
1274                 auto required = rxu::to_vector({
1275                     on.subscribe(200, 220)
1276                 });
1277                 auto actual = err.subscriptions();
1278                 REQUIRE(required == actual);
1279             }
1280         }
1281     }
1282 }
1283 
1284 SCENARIO("combine_latest error/never", "[combine_latest][join][operators]"){
1285     GIVEN("2 hot observables of ints."){
1286         auto sc = rxsc::make_test();
1287         auto w = sc.create_worker();
1288         const rxsc::test::messages<int> on;
1289 
1290         std::runtime_error ex("combine_latest on_error from source");
1291 
1292         auto err = sc.make_hot_observable({
1293             on.next(150, 1),
1294             on.error(220, ex)
1295         });
1296 
1297         auto n = sc.make_hot_observable({
1298             on.next(150, 1)
1299         });
1300 
1301         WHEN("each int is combined with the latest from the other source"){
1302 
1303             auto res = w.start(
__anonbe168adc2d02() 1304                 [&]() {
1305                     return err
1306                         .combine_latest(
1307                             [](int v2, int v1){
1308                                 return v2 + v1;
1309                             },
1310                             n
1311                         )
1312                         // forget type to workaround lambda deduction bug on msvc 2013
1313                         .as_dynamic();
1314                 }
1315             );
1316 
1317             THEN("the output contains only error message"){
1318                 auto required = rxu::to_vector({
1319                     on.error(220, ex)
1320                 });
1321                 auto actual = res.get_observer().messages();
1322                 REQUIRE(required == actual);
1323             }
1324 
1325             THEN("there was one subscription and one unsubscription to the n"){
1326                 auto required = rxu::to_vector({
1327                     on.subscribe(200, 220)
1328                 });
1329                 auto actual = n.subscriptions();
1330                 REQUIRE(required == actual);
1331             }
1332 
1333             THEN("there was one subscription and one unsubscription to the err"){
1334                 auto required = rxu::to_vector({
1335                     on.subscribe(200, 220)
1336                 });
1337                 auto actual = err.subscriptions();
1338                 REQUIRE(required == actual);
1339             }
1340         }
1341     }
1342 }
1343 
1344 SCENARIO("combine_latest error after completed left", "[combine_latest][join][operators]"){
1345     GIVEN("2 hot observables of ints."){
1346         auto sc = rxsc::make_test();
1347         auto w = sc.create_worker();
1348         const rxsc::test::messages<int> on;
1349 
1350         std::runtime_error ex("combine_latest on_error from source");
1351 
1352         auto ret = sc.make_hot_observable({
1353             on.next(150, 1),
1354             on.next(210, 2),
1355             on.completed(215)
1356         });
1357 
1358         auto err = sc.make_hot_observable({
1359             on.next(150, 1),
1360             on.error(220, ex)
1361         });
1362 
1363         WHEN("each int is combined with the latest from the other source"){
1364 
1365             auto res = w.start(
__anonbe168adc2f02() 1366                 [&]() {
1367                     return ret
1368                         .combine_latest(
1369                             [](int v2, int v1){
1370                                 return v2 + v1;
1371                             },
1372                             err
1373                         )
1374                         // forget type to workaround lambda deduction bug on msvc 2013
1375                         .as_dynamic();
1376                 }
1377             );
1378 
1379             THEN("the output contains only error message"){
1380                 auto required = rxu::to_vector({
1381                     on.error(220, ex)
1382                 });
1383                 auto actual = res.get_observer().messages();
1384                 REQUIRE(required == actual);
1385             }
1386 
1387             THEN("there was one subscription and one unsubscription to the ret"){
1388                 auto required = rxu::to_vector({
1389                     on.subscribe(200, 215)
1390                 });
1391                 auto actual = ret.subscriptions();
1392                 REQUIRE(required == actual);
1393             }
1394 
1395             THEN("there was one subscription and one unsubscription to the err"){
1396                 auto required = rxu::to_vector({
1397                     on.subscribe(200, 220)
1398                 });
1399                 auto actual = err.subscriptions();
1400                 REQUIRE(required == actual);
1401             }
1402         }
1403     }
1404 }
1405 
1406 SCENARIO("combine_latest error after completed right", "[combine_latest][join][operators]"){
1407     GIVEN("2 hot observables of ints."){
1408         auto sc = rxsc::make_test();
1409         auto w = sc.create_worker();
1410         const rxsc::test::messages<int> on;
1411 
1412         std::runtime_error ex("combine_latest on_error from source");
1413 
1414         auto err = sc.make_hot_observable({
1415             on.next(150, 1),
1416             on.error(220, ex)
1417         });
1418 
1419         auto ret = sc.make_hot_observable({
1420             on.next(150, 1),
1421             on.next(210, 2),
1422             on.completed(215)
1423         });
1424 
1425         WHEN("each int is combined with the latest from the other source"){
1426 
1427             auto res = w.start(
__anonbe168adc3102() 1428                 [&]() {
1429                     return err
1430                         .combine_latest(
1431                             [](int v2, int v1){
1432                                 return v2 + v1;
1433                             },
1434                             ret
1435                         )
1436                         // forget type to workaround lambda deduction bug on msvc 2013
1437                         .as_dynamic();
1438                 }
1439             );
1440 
1441             THEN("the output contains only error message"){
1442                 auto required = rxu::to_vector({
1443                     on.error(220, ex)
1444                 });
1445                 auto actual = res.get_observer().messages();
1446                 REQUIRE(required == actual);
1447             }
1448 
1449             THEN("there was one subscription and one unsubscription to the ret"){
1450                 auto required = rxu::to_vector({
1451                     on.subscribe(200, 215)
1452                 });
1453                 auto actual = ret.subscriptions();
1454                 REQUIRE(required == actual);
1455             }
1456 
1457             THEN("there was one subscription and one unsubscription to the err"){
1458                 auto required = rxu::to_vector({
1459                     on.subscribe(200, 220)
1460                 });
1461                 auto actual = err.subscriptions();
1462                 REQUIRE(required == actual);
1463             }
1464         }
1465     }
1466 }
1467 
1468 SCENARIO("combine_latest selector throws", "[combine_latest][join][operators][!throws]"){
1469     GIVEN("2 hot observables of ints."){
1470         auto sc = rxsc::make_test();
1471         auto w = sc.create_worker();
1472         const rxsc::test::messages<int> on;
1473 
1474         std::runtime_error ex("combine_latest on_error from source");
1475 
1476         auto o1 = sc.make_hot_observable({
1477             on.next(150, 1),
1478             on.next(215, 2),
1479             on.completed(230)
1480         });
1481 
1482         auto o2 = sc.make_hot_observable({
1483             on.next(150, 1),
1484             on.next(220, 3),
1485             on.completed(240)
1486         });
1487 
1488         WHEN("each int is combined with the latest from the other source"){
1489 
1490             auto res = w.start(
__anonbe168adc3302() 1491                 [&]() {
1492                     return o1
1493                         .combine_latest(
1494                             // Note for trying to handle this test case when exceptions are disabled
1495                             // with RXCPP_USE_EXCEPTIONS == 0:
1496                             //
1497                             // It seems that this test is in particular testing that the
1498                             // combine_latest selector (aggregate function) thrown exceptions
1499                             // are being translated into an on_error.
1500                             //
1501                             // Since there appears to be no way to give combine_latest
1502                             // an Observable that would call on_error directly (as opposed
1503                             // to a regular function that's converted into an observable),
1504                             // this test is meaningless when exceptions are disabled
1505                             // since any selectors with 'throw' will not even compile.
1506                             //
1507                             // Attempting to change this to e.g.
1508                             //    o1.combineLatest(o2).map ... unconditional onError
1509                             // would defeat the purpose of the test since its the combineLatest
1510                             // implementation that's supposed to be doing the error forwarding.
1511                             [&ex](int, int) -> int {
1512                                 rxu::throw_exception(ex);
1513                             },
1514                             o2
1515                         )
1516                         // forget type to workaround lambda deduction bug on msvc 2013
1517                         .as_dynamic();
1518                 }
1519             );
1520 
1521             THEN("the output contains only error"){
1522                 auto required = rxu::to_vector({
1523                     on.error(220, ex)
1524                 });
1525                 auto actual = res.get_observer().messages();
1526                 REQUIRE(required == actual);
1527             }
1528 
1529             THEN("there was one subscription and one unsubscription to the o1"){
1530                 auto required = rxu::to_vector({
1531                     on.subscribe(200, 220)
1532                 });
1533                 auto actual = o1.subscriptions();
1534                 REQUIRE(required == actual);
1535             }
1536 
1537             THEN("there was one subscription and one unsubscription to the o2"){
1538                 auto required = rxu::to_vector({
1539                     on.subscribe(200, 220)
1540                 });
1541                 auto actual = o2.subscriptions();
1542                 REQUIRE(required == actual);
1543             }
1544         }
1545     }
1546 }
1547 
1548 SCENARIO("combine_latest selector throws N", "[combine_latest][join][operators][!throws]"){
1549     GIVEN("N hot observables of ints."){
1550         auto sc = rxsc::make_test();
1551         auto w = sc.create_worker();
1552         const rxsc::test::messages<int> on;
1553 
1554         const int N = 4;
1555 
1556         std::runtime_error ex("combine_latest on_error from source");
1557 
1558         std::vector<rxcpp::test::testable_observable<int>> e;
1559         for (int i = 0; i < N; ++i) {
1560             e.push_back(
1561                 sc.make_hot_observable({
1562                     on.next(210 + 10 * i, 1),
1563                     on.completed(500)
1564                 })
1565             );
1566         }
1567 
1568         WHEN("each int is combined with the latest from the other source"){
1569 
1570             auto res = w.start(
__anonbe168adc3502() 1571                 [&]() {
1572                     return e[0]
1573                         .combine_latest(
1574                             [&ex](int, int, int, int) -> int {
1575                                 rxu::throw_exception(ex);
1576                             },
1577                             e[1], e[2], e[3]
1578                         )
1579                         // forget type to workaround lambda deduction bug on msvc 2013
1580                         .as_dynamic();
1581                 }
1582             );
1583 
1584             THEN("the output contains only error"){
1585                 auto required = rxu::to_vector({
1586                     on.error(200 + 10 * N, ex)
1587                 });
1588                 auto actual = res.get_observer().messages();
1589                 REQUIRE(required == actual);
1590             }
1591 
1592             THEN("there was one subscription and one unsubscription to each observable"){
1593 
__anonbe168adc3702(rxcpp::test::testable_observable<int> &s)1594                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
1595                     auto required = rxu::to_vector({
1596                         on.subscribe(200, 200 + 10 * N)
1597                     });
1598                     auto actual = s.subscriptions();
1599                     REQUIRE(required == actual);
1600                 });
1601             }
1602         }
1603     }
1604 }
1605 
1606 SCENARIO("combine_latest typical N", "[combine_latest][join][operators]"){
1607     GIVEN("N hot observables of ints."){
1608         auto sc = rxsc::make_test();
1609         auto w = sc.create_worker();
1610         const rxsc::test::messages<int> on;
1611 
1612         const int N = 4;
1613 
1614         std::vector<rxcpp::test::testable_observable<int>> o;
1615         for (int i = 0; i < N; ++i) {
1616             o.push_back(
1617                 sc.make_hot_observable({
1618                     on.next(150, 1),
1619                     on.next(210 + 10 * i, i + 1),
1620                     on.next(410 + 10 * i, i + N + 1),
1621                     on.completed(800)
1622                 })
1623             );
1624         }
1625 
1626         WHEN("each int is combined with the latest from the other source"){
1627 
1628             auto res = w.start(
__anonbe168adc3802() 1629                 [&]() {
1630                     return o[0]
1631                         .combine_latest(
1632                             [](int v0, int v1, int v2, int v3) {
1633                                 return v0 + v1 + v2 + v3;
1634                             },
1635                             o[1], o[2], o[3]
1636                         )
1637                         // forget type to workaround lambda deduction bug on msvc 2013
1638                         .as_dynamic();
1639                 }
1640             );
1641 
1642             THEN("the output contains combined ints"){
1643                 auto required = rxu::to_vector({
1644                     on.next(200 + 10 * N, N * (N + 1) / 2)
1645                 });
1646                 for (int i = 0; i < N; ++i) {
1647                     required.push_back(on.next(410 + 10 * i, N * (N + 1) / 2 + N + N * i));
1648                 }
1649                 required.push_back(on.completed(800));
1650                 auto actual = res.get_observer().messages();
1651                 REQUIRE(required == actual);
1652             }
1653 
1654             THEN("there was one subscription and one unsubscription to each observable"){
1655 
__anonbe168adc3a02(rxcpp::test::testable_observable<int> &s)1656                 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){
1657                     auto required = rxu::to_vector({
1658                         on.subscribe(200, 800)
1659                     });
1660                     auto actual = s.subscriptions();
1661                     REQUIRE(required == actual);
1662                 });
1663             }
1664         }
1665     }
1666 }
1667