• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include "rxcpp/operators/rx-timeout.hpp"
3 
4 using namespace std::chrono;
5 
6 SCENARIO("should timeout if the source never emits any items", "[timeout][operators]"){
7     GIVEN("a source"){
8         auto sc = rxsc::make_test();
9         auto so = rx::synchronize_in_one_worker(sc);
10         auto w = sc.create_worker();
11         const rxsc::test::messages<int> on;
12 
13         rxcpp::timeout_error ex("timeout has occurred");
14 
15         auto xs = sc.make_hot_observable({
16             on.next(150, 1)
17         });
18 
19         WHEN("timeout is set"){
20 
21             auto res = w.start(
__anon34a1c21a0102() 22                 [so, xs]() {
23                     return xs
24                         | rxo::timeout(milliseconds(10), so);
25                 }
26             );
27 
28             THEN("the error notification message is captured"){
29                 auto required = rxu::to_vector({
30                     on.error(211, ex)
31                 });
32                 auto actual = res.get_observer().messages();
33                 REQUIRE(required == actual);
34             }
35 
36             THEN("there was 1 subscription/unsubscription to the source"){
37                 auto required = rxu::to_vector({
38                     on.subscribe(200, 212)
39                 });
40                 auto actual = xs.subscriptions();
41                 REQUIRE(required == actual);
42             }
43         }
44     }
45 }
46 
47 SCENARIO("should not timeout if completed before the specified timeout duration", "[timeout][operators]"){
48     GIVEN("a source"){
49         auto sc = rxsc::make_test();
50         auto so = rx::synchronize_in_one_worker(sc);
51         auto w = sc.create_worker();
52         const rxsc::test::messages<int> on;
53 
54         auto xs = sc.make_hot_observable({
55             on.next(150, 1),
56             on.completed(250)
57         });
58 
59         WHEN("timeout is set"){
60 
61             auto res = w.start(
__anon34a1c21a0202() 62                 [so, xs]() {
63                     return xs.timeout(so, milliseconds(100));
64                 }
65             );
66 
67             THEN("the output only contains complete message"){
68                 auto required = rxu::to_vector({
69                     on.completed(251)
70                 });
71                 auto actual = res.get_observer().messages();
72                 REQUIRE(required == actual);
73             }
74 
75             THEN("there was 1 subscription/unsubscription to the source"){
76                 auto required = rxu::to_vector({
77                     on.subscribe(200, 250)
78                 });
79                 auto actual = xs.subscriptions();
80                 REQUIRE(required == actual);
81             }
82 
83         }
84     }
85 }
86 
87 SCENARIO("should not timeout if all items are emitted within the specified timeout duration", "[timeout][operators]"){
88     GIVEN("a source"){
89         auto sc = rxsc::make_test();
90         auto so = rx::synchronize_in_one_worker(sc);
91         auto w = sc.create_worker();
92         const rxsc::test::messages<int> on;
93 
94         auto xs = sc.make_hot_observable({
95             on.next(150, 1),
96             on.next(210, 2),
97             on.next(240, 3),
98             on.completed(250)
99         });
100 
101         WHEN("timeout is set"){
102 
103             auto res = w.start(
__anon34a1c21a0302() 104                 [so, xs]() {
105                     return xs.timeout(milliseconds(40), so);
106                 }
107             );
108 
109             THEN("the output contains the emitted items while subscribed"){
110                 auto required = rxu::to_vector({
111                     on.next(211, 2),
112                     on.next(241, 3),
113                     on.completed(251)
114                 });
115                 auto actual = res.get_observer().messages();
116                 REQUIRE(required == actual);
117             }
118 
119             THEN("there was 1 subscription/unsubscription to the source"){
120                 auto required = rxu::to_vector({
121                     on.subscribe(200, 250)
122                 });
123                 auto actual = xs.subscriptions();
124                 REQUIRE(required == actual);
125             }
126 
127         }
128     }
129 }
130 
131 SCENARIO("should timeout if there are no emitted items within the timeout duration", "[timeout][operators]"){
132     GIVEN("a source"){
133         auto sc = rxsc::make_test();
134         auto so = rx::synchronize_in_one_worker(sc);
135         auto w = sc.create_worker();
136         const rxsc::test::messages<int> on;
137 
138         rxcpp::timeout_error ex("timeout has occurred");
139 
140         auto xs = sc.make_hot_observable({
141             on.next(150, 1),
142             on.next(210, 2),
143             on.next(240, 3),
144             // -- no emissions
145             on.completed(300)
146         });
147 
148         WHEN("timeout is set"){
149 
150             auto res = w.start(
__anon34a1c21a0402() 151                 [so, xs]() {
152                     return xs.timeout(milliseconds(40), so);
153                 }
154             );
155 
156             THEN("an error notification message is captured"){
157                 auto required = rxu::to_vector({
158                     on.next(211, 2),
159                     on.next(241, 3),
160                     on.error(281, ex)
161                 });
162                 auto actual = res.get_observer().messages();
163                 REQUIRE(required == actual);
164             }
165 
166             THEN("there was 1 subscription/unsubscription to the source"){
167                 auto required = rxu::to_vector({
168                     on.subscribe(200, 282)
169                 });
170                 auto actual = xs.subscriptions();
171                 REQUIRE(required == actual);
172             }
173 
174         }
175     }
176 }
177 
178 SCENARIO("should not timeout if there is an error", "[timeout][operators]"){
179     GIVEN("a source"){
180         auto sc = rxsc::make_test();
181         auto so = rx::synchronize_in_one_worker(sc);
182         auto w = sc.create_worker();
183         const rxsc::test::messages<int> on;
184 
185         std::runtime_error ex("on_error from source");
186 
187         auto xs = sc.make_hot_observable({
188             on.next(150, 1),
189             on.error(250, ex)
190         });
191 
192         WHEN("timeout is set"){
193 
194             auto res = w.start(
__anon34a1c21a0502() 195                 [so, xs]() {
196                     return xs.timeout(milliseconds(100), so);
197                 }
198             );
199 
200             THEN("the output contains only an error message"){
201                 auto required = rxu::to_vector({
202                     on.error(251, ex)
203                 });
204                 auto actual = res.get_observer().messages();
205                 REQUIRE(required == actual);
206             }
207 
208             THEN("there was 1 subscription/unsubscription to the source"){
209                 auto required = rxu::to_vector({
210                     on.subscribe(200, 250)
211                 });
212                 auto actual = xs.subscriptions();
213                 REQUIRE(required == actual);
214             }
215 
216         }
217     }
218 }
219