• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-take.hpp>
3 #include <rxcpp/operators/rx-map.hpp>
4 #include <rxcpp/operators/rx-observe_on.hpp>
5 
6 const int static_onnextcalls = 100000;
7 
8 SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){
9     const int& onnextcalls = static_onnextcalls;
10     GIVEN("a range"){
11         WHEN("multicasting a million ints"){
12             using namespace std::chrono;
13             typedef steady_clock clock;
14 
15             auto el = rx::observe_on_new_thread();
16 
17             for (int n = 0; n < 10; n++)
18             {
19                 std::atomic_bool disposed;
20                 std::atomic_bool done;
21                 auto c = std::make_shared<int>(0);
22 
23                 rx::composite_subscription cs;
__anon2d9421e70102()24                 cs.add([&](){
25                     if (!done) {abort();}
26                     disposed = true;
27                 });
28 
29                 auto start = clock::now();
30                 rxs::range<int>(1)
31                     .take(onnextcalls)
32                     .observe_on(el)
33                     .as_blocking()
34                     .subscribe(
35                         cs,
__anon2d9421e70202(int)36                         [c](int){
37                            ++(*c);
38                         },
__anon2d9421e70302()39                         [&](){
40                             done = true;
41                         });
42                 auto expected = onnextcalls;
43                 REQUIRE(*c == expected);
44                 auto finish = clock::now();
45                 auto msElapsed = duration_cast<milliseconds>(finish-start);
46                 std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl;
47             }
48         }
49     }
50 }
51 
52 SCENARIO("observe_on", "[observe][observe_on]"){
53     GIVEN("a source"){
54         auto sc = rxsc::make_test();
55         auto so = rx::synchronize_in_one_worker(sc);
56         auto w = sc.create_worker();
57         const rxsc::test::messages<int> on;
58 
59         auto xs = sc.make_hot_observable({
60             on.next(150, 1),
61             on.next(210, 2),
62             on.next(240, 3),
63             on.completed(300)
64         });
65 
66         WHEN("subscribe_on is specified"){
67 
68             auto res = w.start(
__anon2d9421e70402() 69                 [so, xs]() {
70                     return xs
71                          .observe_on(so);
72                 }
73             );
74 
75             THEN("the output contains items sent while subscribed"){
76                 auto required = rxu::to_vector({
77                     on.next(211, 2),
78                     on.next(241, 3),
79                     on.completed(301)
80                 });
81                 auto actual = res.get_observer().messages();
82                 REQUIRE(required == actual);
83             }
84 
85             THEN("there was 1 subscription/unsubscription to the source"){
86                 auto required = rxu::to_vector({
87                     on.subscribe(200, 300)
88                 });
89                 auto actual = xs.subscriptions();
90                 REQUIRE(required == actual);
91             }
92 
93         }
94     }
95 }
96 
97 SCENARIO("stream observe_on", "[observe][observe_on]"){
98     GIVEN("a source"){
99         auto sc = rxsc::make_test();
100         auto so = rx::synchronize_in_one_worker(sc);
101         auto w = sc.create_worker();
102         const rxsc::test::messages<int> on;
103 
104         auto xs = sc.make_hot_observable({
105             on.next(150, 1),
106             on.next(210, 2),
107             on.next(240, 3),
108             on.completed(300)
109         });
110 
111         WHEN("observe_on is specified"){
112 
113             auto res = w.start(
__anon2d9421e70502() 114                 [so, xs]() {
115                     return xs
116                          | rxo::observe_on(so);
117                 }
118             );
119 
120             THEN("the output contains items sent while subscribed"){
121                 auto required = rxu::to_vector({
122                     on.next(211, 2),
123                     on.next(241, 3),
124                     on.completed(301)
125                 });
126                 auto actual = res.get_observer().messages();
127                 REQUIRE(required == actual);
128             }
129 
130             THEN("there was 1 subscription/unsubscription to the source"){
131                 auto required = rxu::to_vector({
132                     on.subscribe(200, 300)
133                 });
134                 auto actual = xs.subscriptions();
135                 REQUIRE(required == actual);
136             }
137 
138         }
139     }
140 }
141 
142 class nocompare {
143 public:
144     int v;
145 };
146 
147 SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
148     GIVEN("a source"){
149         auto sc = rxsc::make_test();
150         auto so = rx::observe_on_one_worker(sc);
151         auto w = sc.create_worker();
152         const rxsc::test::messages<nocompare> in;
153         const rxsc::test::messages<int> out;
154 
155         auto xs = sc.make_hot_observable({
156             in.next(150, nocompare{1}),
157             in.next(210, nocompare{2}),
158             in.next(240, nocompare{3}),
159             in.completed(300)
160         });
161 
162         WHEN("observe_on is specified"){
163 
164             auto res = w.start(
__anon2d9421e70602() 165                 [so, xs]() {
166                     return xs
167                          | rxo::observe_on(so)
168                          | rxo::map([](nocompare v){ return v.v; })
169                          | rxo::as_dynamic();
170                 }
171             );
172 
173             THEN("the output contains items sent while subscribed"){
174                 auto required = rxu::to_vector({
175                     out.next(211, 2),
176                     out.next(241, 3),
177                     out.completed(301)
178                 });
179                 auto actual = res.get_observer().messages();
180                 REQUIRE(required == actual);
181             }
182 
183             THEN("there was 1 subscription/unsubscription to the source"){
184                 auto required = rxu::to_vector({
185                     out.subscribe(200, 300)
186                 });
187                 auto actual = xs.subscriptions();
188                 REQUIRE(required == actual);
189             }
190 
191         }
192     }
193 }
194