• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 
3 #include <rxcpp/rx-coroutine.hpp>
4 
5 #ifdef _RESUMABLE_FUNCTIONS_SUPPORTED
6 
7 SCENARIO("coroutine completes", "[coroutine]"){
8     GIVEN("a source") {
9         auto sc = rxsc::make_test();
10         auto w = sc.create_worker();
11         const rxsc::test::messages<int> on;
12 
13         auto xs = sc.make_hot_observable({
14             on.next(110, 1),
15             on.next(210, 2),
16             on.next(310, 10),
17             on.completed(350)
18         });
19 
20         WHEN("for co_await"){
21 
22             std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
23 
24             w.advance_to(rxsc::test::subscribed_time);
25 
__anon5129b6260102() 26             auto d = [&]() -> std::future<void> {
27                 RXCPP_TRY {
28                     for co_await (auto n : xs | rxo::as_dynamic()) {
29                         messages.push_back(on.next(w.clock(), n));
30                     }
31                     messages.push_back(on.completed(w.clock()));
32                 } RXCPP_CATCH(...) {
33                     messages.push_back(on.error(w.clock(), rxu::current_exception()));
34                 }
35             }();
36 
37             w.advance_to(rxsc::test::unsubscribed_time);
38 
39             THEN("the function completed"){
40                 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
41             }
42 
43             THEN("the output only contains true"){
44                 auto required = rxu::to_vector({
45                     on.next(210, 2),
46                     on.next(310, 10),
47                     on.completed(350)
48                 });
49                 auto actual = messages;
50                 REQUIRE(required == actual);
51             }
52 
53             THEN("there was 1 subscription/unsubscription to the source"){
54                 auto required = rxu::to_vector({
55                     on.subscribe(200, 350)
56                 });
57                 auto actual = xs.subscriptions();
58                 REQUIRE(required == actual);
59             }
60 
61         }
62     }
63 }
64 
65 SCENARIO("coroutine errors", "[coroutine]"){
66     GIVEN("a source") {
67         auto sc = rxsc::make_test();
68         auto w = sc.create_worker();
69         const rxsc::test::messages<int> on;
70 
71         std::runtime_error ex("error in source");
72 
73         auto xs = sc.make_hot_observable({
74             on.next(110, 1),
75             on.next(210, 2),
76             on.error(310, ex),
77             on.next(310, 10),
78             on.completed(350)
79         });
80 
81         WHEN("for co_await"){
82 
83             std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
84 
85             w.advance_to(rxsc::test::subscribed_time);
86 
__anon5129b6260202() 87             auto d = [&]() -> std::future<void> {
88                 RXCPP_TRY {
89                     for co_await (auto n : xs | rxo::as_dynamic()) {
90                         messages.push_back(on.next(w.clock(), n));
91                     }
92                     messages.push_back(on.completed(w.clock()));
93                 } RXCPP_CATCH(...) {
94                     messages.push_back(on.error(w.clock(), rxu::current_exception()));
95                 }
96             }();
97 
98             w.advance_to(rxsc::test::unsubscribed_time);
99 
100             THEN("the function completed"){
101                 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
102             }
103 
104             THEN("the output only contains true"){
105                 auto required = rxu::to_vector({
106                     on.next(210, 2),
107                     on.error(310, ex)
108                 });
109                 auto actual = messages;
110                 REQUIRE(required == actual);
111             }
112 
113             THEN("there was 1 subscription/unsubscription to the source"){
114                 auto required = rxu::to_vector({
115                     on.subscribe(200, 310)
116                 });
117                 auto actual = xs.subscriptions();
118                 REQUIRE(required == actual);
119             }
120 
121         }
122     }
123 }
124 
125 #endif
126