• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-concat.hpp>
3 #include <rxcpp/operators/rx-reduce.hpp>
4 #include <rxcpp/operators/rx-observe_on.hpp>
5 
6 const int static_onnextcalls = 1000000;
7 
8 SCENARIO("synchronize concat ranges", "[!hide][range][synchronize][concat][perf]"){
9     const int& onnextcalls = static_onnextcalls;
10     GIVEN("some ranges"){
11         WHEN("generating ints"){
12             using namespace std::chrono;
13             typedef steady_clock clock;
14 
15             auto so = rx::synchronize_event_loop();
16 
17             int n = 1;
18             auto sectionCount = onnextcalls / 3;
19             auto start = clock::now();
20             auto c = rxs::range(0, sectionCount - 1, 1, so)
21                 .concat(
22                     so,
23                     rxs::range(sectionCount, sectionCount * 2 - 1, 1, so),
24                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
25                 .as_blocking()
26                 .count();
27 
28             auto finish = clock::now();
29             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
30                    duration_cast<milliseconds>(start.time_since_epoch());
31             std::cout << "concat sync ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
32         }
33     }
34 }
35 
36 SCENARIO("observe_on concat ranges", "[!hide][range][observe_on][concat][perf]"){
37     const int& onnextcalls = static_onnextcalls;
38     GIVEN("some ranges"){
39         WHEN("generating ints"){
40             using namespace std::chrono;
41             typedef steady_clock clock;
42 
43             auto so = rx::observe_on_event_loop();
44 
45             int n = 1;
46             auto sectionCount = onnextcalls / 3;
47             auto start = clock::now();
48             int c = rxs::range(0, sectionCount - 1, 1, so)
49                 .concat(
50                     so,
51                     rxs::range(sectionCount, sectionCount * 2 - 1, 1, so),
52                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
53                 .as_blocking()
54                 .count();
55 
56             auto finish = clock::now();
57             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
58                    duration_cast<milliseconds>(start.time_since_epoch());
59             std::cout << "concat observe_on ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
60         }
61     }
62 }
63 
64 SCENARIO("serialize concat ranges", "[!hide][range][serialize][concat][perf]"){
65     const int& onnextcalls = static_onnextcalls;
66     GIVEN("some ranges"){
67         WHEN("generating ints"){
68             using namespace std::chrono;
69             typedef steady_clock clock;
70 
71             auto so = rx::serialize_event_loop();
72 
73             int n = 1;
74             auto sectionCount = onnextcalls / 3;
75             auto start = clock::now();
76             int c = rxs::range(0, sectionCount - 1, 1, so)
77                 .concat(
78                     so,
79                     rxs::range(sectionCount, sectionCount * 2 - 1, 1, so),
80                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
81                 .as_blocking()
82                 .count();
83 
84             auto finish = clock::now();
85             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
86                    duration_cast<milliseconds>(start.time_since_epoch());
87             std::cout << "concat serial ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
88         }
89     }
90 }
91 
92 
93 SCENARIO("concat completes", "[concat][join][operators]"){
94     GIVEN("1 hot observable with 3 cold observables of ints."){
95         auto sc = rxsc::make_test();
96         auto w = sc.create_worker();
97         const rxsc::test::messages<int> on;
98         const rxsc::test::messages<rx::observable<int>> o_on;
99 
100         auto ys1 = sc.make_cold_observable({
101             on.next(10, 101),
102             on.next(20, 102),
103             on.next(110, 103),
104             on.next(120, 104),
105             on.next(210, 105),
106             on.next(220, 106),
107             on.completed(230)
108         });
109 
110         auto ys2 = sc.make_cold_observable({
111             on.next(10, 201),
112             on.next(20, 202),
113             on.next(30, 203),
114             on.next(40, 204),
115             on.completed(50)
116         });
117 
118         auto ys3 = sc.make_cold_observable({
119             on.next(10, 301),
120             on.next(20, 302),
121             on.next(30, 303),
122             on.next(40, 304),
123             on.next(120, 305),
124             on.completed(150)
125         });
126 
127         auto xs = sc.make_hot_observable({
128             o_on.next(300, ys1),
129             o_on.next(400, ys2),
130             o_on.next(500, ys3),
131             o_on.completed(600)
132         });
133 
134         WHEN("each int is merged"){
135 
136             auto res = w.start(
__anon34dfc1eb0102() 137                 [&]() {
138                     return xs
139                         | rxo::concat()
140                         // forget type to workaround lambda deduction bug on msvc 2013
141                         | rxo::as_dynamic();
142                 }
143             );
144 
145             THEN("the output contains merged ints"){
146                 auto required = rxu::to_vector({
147                     on.next(310, 101),
148                     on.next(320, 102),
149                     on.next(410, 103),
150                     on.next(420, 104),
151                     on.next(510, 105),
152                     on.next(520, 106),
153                     on.next(540, 201),
154                     on.next(550, 202),
155                     on.next(560, 203),
156                     on.next(570, 204),
157                     on.next(590, 301),
158                     on.next(600, 302),
159                     on.next(610, 303),
160                     on.next(620, 304),
161                     on.next(700, 305),
162                     on.completed(730)
163                 });
164                 auto actual = res.get_observer().messages();
165                 REQUIRE(required == actual);
166             }
167 
168             THEN("there was one subscription and one unsubscription to the ints"){
169                 auto required = rxu::to_vector({
170                     on.subscribe(200, 600)
171                 });
172                 auto actual = xs.subscriptions();
173                 REQUIRE(required == actual);
174             }
175 
176             THEN("there was one subscription and one unsubscription to the ys1"){
177                 auto required = rxu::to_vector({
178                     on.subscribe(300, 530)
179                 });
180                 auto actual = ys1.subscriptions();
181                 REQUIRE(required == actual);
182             }
183 
184             THEN("there was one subscription and one unsubscription to the ys2"){
185                 auto required = rxu::to_vector({
186                     on.subscribe(530, 580)
187                 });
188                 auto actual = ys2.subscriptions();
189                 REQUIRE(required == actual);
190             }
191 
192             THEN("there was one subscription and one unsubscription to the ys3"){
193                 auto required = rxu::to_vector({
194                     on.subscribe(580, 730)
195                 });
196                 auto actual = ys3.subscriptions();
197                 REQUIRE(required == actual);
198             }
199         }
200     }
201 }
202