• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include "rxcpp/operators/rx-amb.hpp"
3 
4 SCENARIO("amb never 3", "[amb][join][operators]"){
5     GIVEN("1 cold observable with 3 hot observables of ints."){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9         const rxsc::test::messages<rx::observable<int>> o_on;
10 
11         auto ys1 = sc.make_hot_observable({
12             on.next(100, 1)
13         });
14 
15         auto ys2 = sc.make_hot_observable({
16             on.next(110, 2)
17         });
18 
19         auto ys3 = sc.make_hot_observable({
20             on.next(120, 3)
21         });
22 
23         auto xs = sc.make_cold_observable({
24             o_on.next(100, ys1),
25             o_on.next(100, ys2),
26             o_on.next(100, ys3),
27             o_on.completed(200)
28         });
29 
30         WHEN("the first observable is selected to produce ints"){
31 
32             auto res = w.start(
__anon371989c60102() 33                 [&]() {
34                     return xs
35                         | rxo::amb()
36                         // forget type to workaround lambda deduction bug on msvc 2013
37                         | rxo::as_dynamic();
38                 }
39             );
40 
41             THEN("the output is empty"){
42                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
43                 auto actual = res.get_observer().messages();
44                 REQUIRE(required == actual);
45             }
46 
47             THEN("there was one subscription and one unsubscription to the xs"){
48                 auto required = rxu::to_vector({
49                     on.subscribe(200, 400)
50                 });
51                 auto actual = xs.subscriptions();
52                 REQUIRE(required == actual);
53             }
54 
55             THEN("there was one subscription and one unsubscription to the ys1"){
56                 auto required = rxu::to_vector({
57                     on.subscribe(300, 1000)
58                 });
59                 auto actual = ys1.subscriptions();
60                 REQUIRE(required == actual);
61             }
62 
63             THEN("there was one subscription and one unsubscription to the ys2"){
64                 auto required = rxu::to_vector({
65                     on.subscribe(300, 1000)
66                 });
67                 auto actual = ys2.subscriptions();
68                 REQUIRE(required == actual);
69             }
70 
71             THEN("there was one subscription and one unsubscription to the ys3"){
72                 auto required = rxu::to_vector({
73                     on.subscribe(300, 1000)
74                 });
75                 auto actual = ys3.subscriptions();
76                 REQUIRE(required == actual);
77             }
78         }
79     }
80 }
81 
82 SCENARIO("amb never empty", "[amb][join][operators]"){
83     GIVEN("1 cold observable with 2 hot observables of ints."){
84         auto sc = rxsc::make_test();
85         auto w = sc.create_worker();
86         const rxsc::test::messages<int> on;
87         const rxsc::test::messages<rx::observable<int>> o_on;
88 
89         auto ys1 = sc.make_hot_observable({
90             on.next(100, 1)
91         });
92 
93         auto ys2 = sc.make_hot_observable({
94             on.next(110, 2),
95             on.completed(400)
96         });
97 
98         auto xs = sc.make_cold_observable({
99             o_on.next(100, ys1),
100             o_on.next(100, ys2),
101             o_on.completed(150)
102         });
103 
104         WHEN("the first observable is selected to produce ints"){
105 
106             auto res = w.start(
__anon371989c60202() 107                 [&]() {
108                     return xs
109                         .amb()
110                         // forget type to workaround lambda deduction bug on msvc 2013
111                         .as_dynamic();
112                 }
113             );
114 
115             THEN("the output contains only complete message"){
116                 auto required = rxu::to_vector({
117                     on.completed(400)
118                 });
119                 auto actual = res.get_observer().messages();
120                 REQUIRE(required == actual);
121             }
122 
123             THEN("there was one subscription and one unsubscription to the xs"){
124                 auto required = rxu::to_vector({
125                     on.subscribe(200, 350)
126                 });
127                 auto actual = xs.subscriptions();
128                 REQUIRE(required == actual);
129             }
130 
131             THEN("there was one subscription and one unsubscription to the ys1"){
132                 auto required = rxu::to_vector({
133                     on.subscribe(300, 400)
134                 });
135                 auto actual = ys1.subscriptions();
136                 REQUIRE(required == actual);
137             }
138 
139             THEN("there was one subscription and one unsubscription to the ys2"){
140                 auto required = rxu::to_vector({
141                     on.subscribe(300, 400)
142                 });
143                 auto actual = ys2.subscriptions();
144                 REQUIRE(required == actual);
145             }
146         }
147     }
148 }
149 
150 SCENARIO("amb completes", "[amb][join][operators]"){
151     GIVEN("1 cold observable with 3 cold observables of ints."){
152         auto sc = rxsc::make_test();
153         auto w = sc.create_worker();
154         const rxsc::test::messages<int> on;
155         const rxsc::test::messages<rx::observable<int>> o_on;
156 
157         auto ys1 = sc.make_cold_observable({
158             on.next(10, 101),
159             on.next(110, 102),
160             on.next(210, 103),
161             on.completed(310)
162         });
163 
164         auto ys2 = sc.make_cold_observable({
165             on.next(20, 201),
166             on.next(120, 202),
167             on.next(220, 203),
168             on.completed(320)
169         });
170 
171         auto ys3 = sc.make_cold_observable({
172             on.next(30, 301),
173             on.next(130, 302),
174             on.next(230, 303),
175             on.completed(330)
176         });
177 
178         auto xs = sc.make_cold_observable({
179             o_on.next(100, ys1),
180             o_on.next(100, ys2),
181             o_on.next(100, ys3),
182             o_on.completed(100)
183         });
184 
185         WHEN("the first observable is selected to produce ints"){
186 
187             auto res = w.start(
__anon371989c60302() 188                 [&]() {
189                     return xs
190                         .amb()
191                         // forget type to workaround lambda deduction bug on msvc 2013
192                         .as_dynamic();
193                 }
194             );
195 
196             THEN("the output contains ints from the first observable"){
197                 auto required = rxu::to_vector({
198                     on.next(310, 101),
199                     on.next(410, 102),
200                     on.next(510, 103),
201                     on.completed(610)
202                 });
203                 auto actual = res.get_observer().messages();
204                 REQUIRE(required == actual);
205             }
206 
207             THEN("there was one subscription and one unsubscription to the xs"){
208                 auto required = rxu::to_vector({
209                     on.subscribe(200, 300)
210                 });
211                 auto actual = xs.subscriptions();
212                 REQUIRE(required == actual);
213             }
214 
215             THEN("there was one subscription and one unsubscription to the ys1"){
216                 auto required = rxu::to_vector({
217                     on.subscribe(300, 610)
218                 });
219                 auto actual = ys1.subscriptions();
220                 REQUIRE(required == actual);
221             }
222 
223             THEN("there was one subscription and one unsubscription to the ys2"){
224                 auto required = rxu::to_vector({
225                     on.subscribe(300, 310)
226                 });
227                 auto actual = ys2.subscriptions();
228                 REQUIRE(required == actual);
229             }
230 
231             THEN("there was one subscription and one unsubscription to the ys3"){
232                 auto required = rxu::to_vector({
233                     on.subscribe(300, 310)
234                 });
235                 auto actual = ys3.subscriptions();
236                 REQUIRE(required == actual);
237             }
238         }
239     }
240 }
241 
242 SCENARIO("amb winner throws", "[amb][join][operators]"){
243     GIVEN("1 cold observable with 3 cold observables of ints."){
244         auto sc = rxsc::make_test();
245         auto w = sc.create_worker();
246         const rxsc::test::messages<int> on;
247         const rxsc::test::messages<rx::observable<int>> o_on;
248 
249         std::runtime_error ex("amb on_error from source");
250 
251         auto ys1 = sc.make_cold_observable({
252             on.next(10, 101),
253             on.next(110, 102),
254             on.next(210, 103),
255             on.error(310, ex)
256         });
257 
258         auto ys2 = sc.make_cold_observable({
259             on.next(20, 201),
260             on.next(120, 202),
261             on.next(220, 203),
262             on.completed(320)
263         });
264 
265         auto ys3 = sc.make_cold_observable({
266             on.next(30, 301),
267             on.next(130, 302),
268             on.next(230, 303),
269             on.completed(330)
270         });
271 
272         auto xs = sc.make_cold_observable({
273             o_on.next(100, ys1),
274             o_on.next(100, ys2),
275             o_on.next(100, ys3),
276             o_on.completed(100)
277         });
278 
279         WHEN("the first observable is selected to produce ints"){
280 
281             auto res = w.start(
__anon371989c60402() 282                 [&]() {
283                     return xs
284                         .amb()
285                         // forget type to workaround lambda deduction bug on msvc 2013
286                         .as_dynamic();
287                 }
288             );
289 
290             THEN("the output contains ints from the first observable"){
291                 auto required = rxu::to_vector({
292                     on.next(310, 101),
293                     on.next(410, 102),
294                     on.next(510, 103),
295                     on.error(610, ex)
296                 });
297                 auto actual = res.get_observer().messages();
298                 REQUIRE(required == actual);
299             }
300 
301             THEN("there was one subscription and one unsubscription to the xs"){
302                 auto required = rxu::to_vector({
303                     on.subscribe(200, 300)
304                 });
305                 auto actual = xs.subscriptions();
306                 REQUIRE(required == actual);
307             }
308 
309             THEN("there was one subscription and one unsubscription to the ys1"){
310                 auto required = rxu::to_vector({
311                     on.subscribe(300, 610)
312                 });
313                 auto actual = ys1.subscriptions();
314                 REQUIRE(required == actual);
315             }
316 
317             THEN("there was one subscription and one unsubscription to the ys2"){
318                 auto required = rxu::to_vector({
319                     on.subscribe(300, 310)
320                 });
321                 auto actual = ys2.subscriptions();
322                 REQUIRE(required == actual);
323             }
324 
325             THEN("there was one subscription and one unsubscription to the ys3"){
326                 auto required = rxu::to_vector({
327                     on.subscribe(300, 310)
328                 });
329                 auto actual = ys3.subscriptions();
330                 REQUIRE(required == actual);
331             }
332         }
333     }
334 }
335 
336 SCENARIO("amb loser throws", "[amb][join][operators]"){
337     GIVEN("1 cold observable with 3 cold observables of ints."){
338         auto sc = rxsc::make_test();
339         auto w = sc.create_worker();
340         const rxsc::test::messages<int> on;
341         const rxsc::test::messages<rx::observable<int>> o_on;
342 
343         std::runtime_error ex("amb on_error from source");
344 
345         auto ys1 = sc.make_cold_observable({
346             on.next(10, 101),
347             on.next(110, 102),
348             on.next(210, 103),
349             on.completed(310)
350         });
351 
352         auto ys2 = sc.make_cold_observable({
353             on.error(20, ex)
354         });
355 
356         auto ys3 = sc.make_cold_observable({
357             on.next(30, 301),
358             on.next(130, 302),
359             on.next(230, 303),
360             on.completed(330)
361         });
362 
363         auto xs = sc.make_cold_observable({
364             o_on.next(100, ys1),
365             o_on.next(100, ys2),
366             o_on.next(100, ys3),
367             o_on.completed(100)
368         });
369 
370         WHEN("the first observable is selected to produce ints"){
371 
372             auto res = w.start(
__anon371989c60502() 373                 [&]() {
374                     return xs
375                         .amb()
376                         // forget type to workaround lambda deduction bug on msvc 2013
377                         .as_dynamic();
378                 }
379             );
380 
381             THEN("the output contains ints from the first observable"){
382                 auto required = rxu::to_vector({
383                     on.next(310, 101),
384                     on.next(410, 102),
385                     on.next(510, 103),
386                     on.completed(610)
387                 });
388                 auto actual = res.get_observer().messages();
389                 REQUIRE(required == actual);
390             }
391 
392             THEN("there was one subscription and one unsubscription to the xs"){
393                 auto required = rxu::to_vector({
394                     on.subscribe(200, 300)
395                 });
396                 auto actual = xs.subscriptions();
397                 REQUIRE(required == actual);
398             }
399 
400             THEN("there was one subscription and one unsubscription to the ys1"){
401                 auto required = rxu::to_vector({
402                     on.subscribe(300, 610)
403                 });
404                 auto actual = ys1.subscriptions();
405                 REQUIRE(required == actual);
406             }
407 
408             THEN("there was one subscription and one unsubscription to the ys2"){
409                 auto required = rxu::to_vector({
410                     on.subscribe(300, 310)
411                 });
412                 auto actual = ys2.subscriptions();
413                 REQUIRE(required == actual);
414             }
415 
416             THEN("there was one subscription and one unsubscription to the ys3"){
417                 auto required = rxu::to_vector({
418                     on.subscribe(300, 310)
419                 });
420                 auto actual = ys3.subscriptions();
421                 REQUIRE(required == actual);
422             }
423         }
424     }
425 }
426 
427 SCENARIO("amb throws before selection", "[amb][join][operators]"){
428     GIVEN("1 cold observable with 3 cold observables of ints."){
429         auto sc = rxsc::make_test();
430         auto w = sc.create_worker();
431         const rxsc::test::messages<int> on;
432         const rxsc::test::messages<rx::observable<int>> o_on;
433 
434         std::runtime_error ex("amb on_error from source");
435 
436         auto ys1 = sc.make_cold_observable({
437             on.next(110, 1),
438             on.completed(200)
439         });
440 
441         auto ys2 = sc.make_cold_observable({
442             on.error(50, ex)
443         });
444 
445         auto ys3 = sc.make_cold_observable({
446             on.next(130, 3),
447             on.completed(300)
448         });
449 
450         auto xs = sc.make_cold_observable({
451             o_on.next(100, ys1),
452             o_on.next(100, ys2),
453             o_on.next(100, ys3),
454             o_on.completed(100)
455         });
456 
457         WHEN("the first observable is selected to produce ints"){
458 
459             auto res = w.start(
__anon371989c60602() 460                 [&]() {
461                     return xs
462                         .amb()
463                         // forget type to workaround lambda deduction bug on msvc 2013
464                         .as_dynamic();
465                 }
466             );
467 
468             THEN("the output contains only an error"){
469                 auto required = rxu::to_vector({
470                     on.error(350, ex)
471                 });
472                 auto actual = res.get_observer().messages();
473                 REQUIRE(required == actual);
474             }
475 
476             THEN("there was one subscription and one unsubscription to the xs"){
477                 auto required = rxu::to_vector({
478                     on.subscribe(200, 300)
479                 });
480                 auto actual = xs.subscriptions();
481                 REQUIRE(required == actual);
482             }
483 
484             THEN("there was one subscription and one unsubscription to the ys1"){
485                 auto required = rxu::to_vector({
486                     on.subscribe(300, 350)
487                 });
488                 auto actual = ys1.subscriptions();
489                 REQUIRE(required == actual);
490             }
491 
492             THEN("there was one subscription and one unsubscription to the ys2"){
493                 auto required = rxu::to_vector({
494                     on.subscribe(300, 350)
495                 });
496                 auto actual = ys2.subscriptions();
497                 REQUIRE(required == actual);
498             }
499 
500             THEN("there was one subscription and one unsubscription to the ys3"){
501                 auto required = rxu::to_vector({
502                     on.subscribe(300, 350)
503                 });
504                 auto actual = ys3.subscriptions();
505                 REQUIRE(required == actual);
506             }
507         }
508     }
509 }
510 
511 SCENARIO("amb throws before selection and emission end", "[amb][join][operators]"){
512     GIVEN("1 cold observable with 3 cold observables of ints."){
513         auto sc = rxsc::make_test();
514         auto w = sc.create_worker();
515         const rxsc::test::messages<int> on;
516         const rxsc::test::messages<rx::observable<int>> o_on;
517 
518         std::runtime_error ex("amb on_error from source");
519 
520         auto ys1 = sc.make_cold_observable({
521             on.next(110, 1),
522             on.completed(200)
523         });
524 
525         auto ys2 = sc.make_cold_observable({
526             on.error(50, ex)
527         });
528 
529         auto ys3 = sc.make_cold_observable({
530             on.next(130, 3),
531             on.completed(300)
532         });
533 
534         auto xs = sc.make_cold_observable({
535             o_on.next(100, ys1),
536             o_on.next(100, ys2),
537             o_on.next(100, ys3),
538             o_on.completed(500)
539         });
540 
541         WHEN("the first observable is selected to produce ints"){
542 
543             auto res = w.start(
__anon371989c60702() 544                 [&]() {
545                     return xs
546                         .amb()
547                         // forget type to workaround lambda deduction bug on msvc 2013
548                         .as_dynamic();
549                 }
550             );
551 
552             THEN("the output contains only an error"){
553                 auto required = rxu::to_vector({
554                     on.error(350, ex)
555                 });
556                 auto actual = res.get_observer().messages();
557                 REQUIRE(required == actual);
558             }
559 
560             THEN("there was one subscription and one unsubscription to the xs"){
561                 auto required = rxu::to_vector({
562                     on.subscribe(200, 350)
563                 });
564                 auto actual = xs.subscriptions();
565                 REQUIRE(required == actual);
566             }
567 
568             THEN("there was one subscription and one unsubscription to the ys1"){
569                 auto required = rxu::to_vector({
570                     on.subscribe(300, 350)
571                 });
572                 auto actual = ys1.subscriptions();
573                 REQUIRE(required == actual);
574             }
575 
576             THEN("there was one subscription and one unsubscription to the ys2"){
577                 auto required = rxu::to_vector({
578                     on.subscribe(300, 350)
579                 });
580                 auto actual = ys2.subscriptions();
581                 REQUIRE(required == actual);
582             }
583 
584             THEN("there was one subscription and one unsubscription to the ys3"){
585                 auto required = rxu::to_vector({
586                     on.subscribe(300, 350)
587                 });
588                 auto actual = ys3.subscriptions();
589                 REQUIRE(required == actual);
590             }
591         }
592     }
593 }
594 
595 SCENARIO("amb loser comes when winner has already emitted", "[amb][join][operators]"){
596     GIVEN("1 cold observable with 3 cold observables of ints."){
597         auto sc = rxsc::make_test();
598         auto w = sc.create_worker();
599         const rxsc::test::messages<int> on;
600         const rxsc::test::messages<rx::observable<int>> o_on;
601 
602         auto ys1 = sc.make_cold_observable({
603             on.next(10, 101),
604             on.next(110, 102),
605             on.next(210, 103),
606             on.completed(310)
607         });
608 
609         auto ys2 = sc.make_cold_observable({
610             on.next(20, 201),
611             on.next(120, 202),
612             on.next(220, 203),
613             on.completed(320)
614         });
615 
616         auto ys3 = sc.make_cold_observable({
617             on.next(30, 301),
618             on.next(130, 302),
619             on.next(230, 303),
620             on.completed(330)
621         });
622 
623         auto xs = sc.make_cold_observable({
624             o_on.next(100, ys1),
625             o_on.next(100, ys2),
626             o_on.next(200, ys3),
627             o_on.completed(200)
628         });
629 
630         WHEN("the first observable is selected to produce ints"){
631 
632             auto res = w.start(
__anon371989c60802() 633                 [&]() {
634                     return xs
635                         .amb()
636                         // forget type to workaround lambda deduction bug on msvc 2013
637                         .as_dynamic();
638                 }
639             );
640 
641             THEN("the output contains ints from the first observable"){
642                 auto required = rxu::to_vector({
643                     on.next(310, 101),
644                     on.next(410, 102),
645                     on.next(510, 103),
646                     on.completed(610)
647                 });
648                 auto actual = res.get_observer().messages();
649                 REQUIRE(required == actual);
650             }
651 
652             THEN("there was one subscription and one unsubscription to the xs"){
653                 auto required = rxu::to_vector({
654                     on.subscribe(200, 400)
655                 });
656                 auto actual = xs.subscriptions();
657                 REQUIRE(required == actual);
658             }
659 
660             THEN("there was one subscription and one unsubscription to the ys1"){
661                 auto required = rxu::to_vector({
662                     on.subscribe(300, 610)
663                 });
664                 auto actual = ys1.subscriptions();
665                 REQUIRE(required == actual);
666             }
667 
668             THEN("there was one subscription and one unsubscription to the ys2"){
669                 auto required = rxu::to_vector({
670                     on.subscribe(300, 310)
671                 });
672                 auto actual = ys2.subscriptions();
673                 REQUIRE(required == actual);
674             }
675 
676             THEN("there were no subscriptions to the ys3"){
677                 auto required = std::vector<rxn::subscription>();
678                 auto actual = ys3.subscriptions();
679                 REQUIRE(required == actual);
680             }
681         }
682     }
683 }
684 
685 SCENARIO("amb empty list", "[amb][join][operators]"){
686     GIVEN("1 empty cold observable of observables of ints."){
687         auto sc = rxsc::make_test();
688         auto w = sc.create_worker();
689         const rxsc::test::messages<int> on;
690         const rxsc::test::messages<rx::observable<int>> o_on;
691 
692         auto xs = sc.make_cold_observable({
693             o_on.completed(200)
694         });
695 
696         WHEN("the first observable is selected to produce ints"){
697 
698             auto res = w.start(
__anon371989c60902() 699                 [&]() {
700                     return xs
701                         .amb()
702                         // forget type to workaround lambda deduction bug on msvc 2013
703                         .as_dynamic();
704                 }
705             );
706 
707             THEN("the output contains only comlpete message"){
708                 auto required = rxu::to_vector({
709                     on.completed(400)
710                 });
711                 auto actual = res.get_observer().messages();
712                 REQUIRE(required == actual);
713             }
714 
715             THEN("there was one subscription and one unsubscription to the xs"){
716                 auto required = rxu::to_vector({
717                     on.subscribe(200, 400)
718                 });
719                 auto actual = xs.subscriptions();
720                 REQUIRE(required == actual);
721             }
722         }
723     }
724 }
725 
726 SCENARIO("amb source throws before selection", "[amb][join][operators]"){
727     GIVEN("1 cold observable with 3 cold observables of ints."){
728         auto sc = rxsc::make_test();
729         auto w = sc.create_worker();
730         const rxsc::test::messages<int> on;
731         const rxsc::test::messages<rx::observable<int>> o_on;
732 
733         std::runtime_error ex("amb on_error from source");
734 
735         auto ys1 = sc.make_cold_observable({
736             on.next(10, 101),
737             on.next(110, 102),
738             on.next(210, 103),
739             on.completed(310)
740         });
741 
742         auto ys2 = sc.make_cold_observable({
743             on.next(20, 201),
744             on.next(120, 202),
745             on.next(220, 203),
746             on.completed(320)
747         });
748 
749         auto ys3 = sc.make_cold_observable({
750             on.next(30, 301),
751             on.next(130, 302),
752             on.next(230, 303),
753             on.completed(330)
754         });
755 
756         auto xs = sc.make_cold_observable({
757             o_on.next(100, ys1),
758             o_on.next(100, ys2),
759             o_on.next(100, ys3),
760             o_on.error(100, ex)
761         });
762 
763         WHEN("the first observable is selected to produce ints"){
764 
765             auto res = w.start(
__anon371989c60a02() 766                 [&]() {
767                     return xs
768                         .amb()
769                         // forget type to workaround lambda deduction bug on msvc 2013
770                         .as_dynamic();
771                 }
772             );
773 
774             THEN("the output contains only an error"){
775                 auto required = rxu::to_vector({
776                     on.error(300, ex)
777                 });
778                 auto actual = res.get_observer().messages();
779                 REQUIRE(required == actual);
780             }
781 
782             THEN("there was one subscription and one unsubscription to the xs"){
783                 auto required = rxu::to_vector({
784                     on.subscribe(200, 300)
785                 });
786                 auto actual = xs.subscriptions();
787                 REQUIRE(required == actual);
788             }
789 
790             THEN("there was one subscription and one unsubscription to the ys1"){
791                 auto required = rxu::to_vector({
792                     on.subscribe(300, 300)
793                 });
794                 auto actual = ys1.subscriptions();
795                 REQUIRE(required == actual);
796             }
797 
798             THEN("there was one subscription and one unsubscription to the ys2"){
799                 auto required = rxu::to_vector({
800                     on.subscribe(300, 300)
801                 });
802                 auto actual = ys2.subscriptions();
803                 REQUIRE(required == actual);
804             }
805 
806             THEN("there was one subscription and one unsubscription to the ys3"){
807                 auto required = rxu::to_vector({
808                     on.subscribe(300, 300)
809                 });
810                 auto actual = ys3.subscriptions();
811                 REQUIRE(required == actual);
812             }
813         }
814     }
815 }
816 
817 SCENARIO("amb source throws after selection", "[amb][join][operators]"){
818     GIVEN("1 cold observable with 3 cold observables of ints."){
819         auto sc = rxsc::make_test();
820         auto w = sc.create_worker();
821         const rxsc::test::messages<int> on;
822         const rxsc::test::messages<rx::observable<int>> o_on;
823 
824         std::runtime_error ex("amb on_error from source");
825 
826         auto ys1 = sc.make_cold_observable({
827             on.next(10, 101),
828             on.next(110, 102),
829             on.next(210, 103),
830             on.completed(310)
831         });
832 
833         auto ys2 = sc.make_cold_observable({
834             on.next(20, 201),
835             on.next(120, 202),
836             on.next(220, 203),
837             on.completed(320)
838         });
839 
840         auto ys3 = sc.make_cold_observable({
841             on.next(30, 301),
842             on.next(130, 302),
843             on.next(230, 303),
844             on.completed(330)
845         });
846 
847         auto xs = sc.make_cold_observable({
848             o_on.next(100, ys1),
849             o_on.next(100, ys2),
850             o_on.next(100, ys3),
851             o_on.error(300, ex)
852         });
853 
854         WHEN("the first observable is selected to produce ints"){
855 
856             auto res = w.start(
__anon371989c60b02() 857                 [&]() {
858                     return xs
859                         .amb()
860                         // forget type to workaround lambda deduction bug on msvc 2013
861                         .as_dynamic();
862                 }
863             );
864 
865             THEN("the output contains ints from the first observable"){
866                 auto required = rxu::to_vector({
867                     on.next(310, 101),
868                     on.next(410, 102),
869                     on.error(500, ex)
870                 });
871                 auto actual = res.get_observer().messages();
872                 REQUIRE(required == actual);
873             }
874 
875             THEN("there was one subscription and one unsubscription to the xs"){
876                 auto required = rxu::to_vector({
877                     on.subscribe(200, 500)
878                 });
879                 auto actual = xs.subscriptions();
880                 REQUIRE(required == actual);
881             }
882 
883             THEN("there was one subscription and one unsubscription to the ys1"){
884                 auto required = rxu::to_vector({
885                     on.subscribe(300, 500)
886                 });
887                 auto actual = ys1.subscriptions();
888                 REQUIRE(required == actual);
889             }
890 
891             THEN("there was one subscription and one unsubscription to the ys2"){
892                 auto required = rxu::to_vector({
893                     on.subscribe(300, 310)
894                 });
895                 auto actual = ys2.subscriptions();
896                 REQUIRE(required == actual);
897             }
898 
899             THEN("there was one subscription and one unsubscription to the ys3"){
900                 auto required = rxu::to_vector({
901                     on.subscribe(300, 310)
902                 });
903                 auto actual = ys3.subscriptions();
904                 REQUIRE(required == actual);
905             }
906         }
907     }
908 }
909 
910 SCENARIO("amb never empty, custom coordination", "[amb][join][operators]"){
911     GIVEN("1 cold observable with 2 hot observables of ints."){
912         auto sc = rxsc::make_test();
913         auto so = rx::synchronize_in_one_worker(sc);
914         auto w = sc.create_worker();
915         const rxsc::test::messages<int> on;
916         const rxsc::test::messages<rx::observable<int>> o_on;
917 
918         auto ys1 = sc.make_hot_observable({
919             on.next(100, 1)
920         });
921 
922         auto ys2 = sc.make_hot_observable({
923             on.next(110, 2),
924             on.completed(400)
925         });
926 
927         auto xs = sc.make_cold_observable({
928             o_on.next(100, ys1),
929             o_on.next(100, ys2),
930             o_on.completed(150)
931         });
932 
933         WHEN("the first observable is selected to produce ints"){
934 
935             auto res = w.start(
__anon371989c60c02() 936                 [&]() {
937                     return xs
938                         .amb(so)
939                         // forget type to workaround lambda deduction bug on msvc 2013
940                         .as_dynamic();
941                 }
942             );
943 
944             THEN("the output contains only complete message"){
945                 auto required = rxu::to_vector({
946                     on.completed(401)
947                 });
948                 auto actual = res.get_observer().messages();
949                 REQUIRE(required == actual);
950             }
951         }
952     }
953 }
954