• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-delay.hpp>
3 
4 using namespace std::chrono;
5 
6 SCENARIO("delay - never", "[delay][operators]"){
7     GIVEN("a source"){
8         auto sc = rxsc::make_test();
9         auto so = rx::synchronize_in_one_worker(sc);
10         auto w = sc.create_worker();
11         const rxsc::test::messages<int> on;
12 
13         auto xs = sc.make_hot_observable({
14             on.next(150, 1)
15         });
16 
17         WHEN("values are delayed"){
18 
19             auto res = w.start(
__anond930f8850102() 20                 [so, xs]() {
21                     return xs | rxo::delay(milliseconds(10), so);
22                 }
23             );
24 
25             THEN("the output is empty"){
26                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
27                 auto actual = res.get_observer().messages();
28                 REQUIRE(required == actual);
29             }
30 
31             THEN("there was 1 subscription/unsubscription to the source"){
32                 auto required = rxu::to_vector({
33                     on.subscribe(200, 1001)
34                 });
35                 auto actual = xs.subscriptions();
36                 REQUIRE(required == actual);
37             }
38         }
39     }
40 }
41 
42 SCENARIO("delay - empty", "[delay][operators]"){
43     GIVEN("a source"){
44         auto sc = rxsc::make_test();
45         auto so = rx::synchronize_in_one_worker(sc);
46         auto w = sc.create_worker();
47         const rxsc::test::messages<int> on;
48 
49         auto xs = sc.make_hot_observable({
50             on.next(150, 1),
51             on.completed(250)
52         });
53 
54         WHEN("values are delayed"){
55 
56             auto res = w.start(
__anond930f8850202() 57                 [so, xs]() {
58                     return xs.delay(so, milliseconds(10));
59                 }
60             );
61 
62             THEN("the output only contains complete message"){
63                 auto required = rxu::to_vector({
64                     on.completed(260)
65                 });
66                 auto actual = res.get_observer().messages();
67                 REQUIRE(required == actual);
68             }
69 
70             THEN("there was 1 subscription/unsubscription to the source"){
71                 auto required = rxu::to_vector({
72                     on.subscribe(200, 250)
73                 });
74                 auto actual = xs.subscriptions();
75                 REQUIRE(required == actual);
76             }
77 
78         }
79     }
80 }
81 
82 SCENARIO("delay - return", "[delay][operators]"){
83     GIVEN("a source"){
84         auto sc = rxsc::make_test();
85         auto so = rx::synchronize_in_one_worker(sc);
86         auto w = sc.create_worker();
87         const rxsc::test::messages<int> on;
88 
89         auto xs = sc.make_hot_observable({
90             on.next(150, 1),
91             on.next(210, 2),
92             on.next(240, 3),
93             on.completed(300)
94         });
95 
96         WHEN("values are delayed"){
97 
98             auto res = w.start(
__anond930f8850302() 99                 [so, xs]() {
100                     return xs.delay(milliseconds(10), so);
101                 }
102             );
103 
104             THEN("the output only contains delayed items sent while subscribed"){
105                 auto required = rxu::to_vector({
106                     on.next(220, 2),
107                     on.next(250, 3),
108                     on.completed(310)
109                 });
110                 auto actual = res.get_observer().messages();
111                 REQUIRE(required == actual);
112             }
113 
114             THEN("there was 1 subscription/unsubscription to the source"){
115                 auto required = rxu::to_vector({
116                     on.subscribe(200, 300)
117                 });
118                 auto actual = xs.subscriptions();
119                 REQUIRE(required == actual);
120             }
121 
122         }
123     }
124 }
125 
126 SCENARIO("delay - throw", "[delay][operators]"){
127     GIVEN("a source"){
128         auto sc = rxsc::make_test();
129         auto so = rx::synchronize_in_one_worker(sc);
130         auto w = sc.create_worker();
131         const rxsc::test::messages<int> on;
132 
133         std::runtime_error ex("delay on_error from source");
134 
135         auto xs = sc.make_hot_observable({
136             on.next(150, 1),
137             on.error(250, ex)
138         });
139 
140         WHEN("values are delayed"){
141 
142             auto res = w.start(
__anond930f8850402() 143                 [so, xs]() {
144                     return xs.delay(milliseconds(10), so);
145                 }
146             );
147 
148             THEN("the output only contains only error"){
149                 auto required = rxu::to_vector({
150                     on.error(251, ex)
151                 });
152                 auto actual = res.get_observer().messages();
153                 REQUIRE(required == actual);
154             }
155 
156             THEN("there was 1 subscription/unsubscription to the source"){
157                 auto required = rxu::to_vector({
158                     on.subscribe(200, 250)
159                 });
160                 auto actual = xs.subscriptions();
161                 REQUIRE(required == actual);
162             }
163 
164         }
165     }
166 }
167