1 #include "../test.h"
2 #include <rxcpp/operators/rx-concat.hpp>
3 #include <rxcpp/operators/rx-group_by.hpp>
4 #include <rxcpp/operators/rx-reduce.hpp>
5 #include <rxcpp/operators/rx-map.hpp>
6 #include <rxcpp/operators/rx-merge.hpp>
7 #include <rxcpp/operators/rx-take.hpp>
8 #include <rxcpp/operators/rx-start_with.hpp>
9 #include <rxcpp/operators/rx-observe_on.hpp>
10
11 #include <locale>
12 #include <sstream>
13
14 SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){
15 GIVEN("a for loop"){
16 WHEN("partitioning pi series across all hardware threads"){
17
18 std::atomic_int c;
19 c = 0;
__anon769080da0102(int k) 20 auto pi = [&](int k) {
21 ++c;
22 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
23 };
24
25 using namespace std::chrono;
26 auto start = steady_clock::now();
27
28 // share an output thread across all the producer threads
29 auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
30
31 struct work
32 {
33 int index;
34 int first;
35 int last;
36 };
37
38 // use all available hardware threads
39 auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
40 map(
__anon769080da0202(int index)41 [](int index){
42 static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
43 int first = (chunk * index) + 1;
44 int last = chunk * (index + 1);
45 return work{index, first, last};}
46 ).
47 group_by(
__anon769080da0302(work w) 48 [](work w) -> int {return w.index % std::thread::hardware_concurrency();},
__anon769080da0402(work w)49 [](work w){return w;}).
50 map(
__anon769080da0502(rxcpp::grouped_observable<int, work> onproc) 51 [=](rxcpp::grouped_observable<int, work> onproc) {
52 auto key = onproc.get_key();
53 // share a producer thread across all the ranges in this group of chunks
54 auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
55 return onproc.
56 map(
57 [=](work w){
58 std::stringstream message;
59 message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
60
61 return rxcpp::observable<>::range(w.first, w.last, producerthread).
62 map(pi).
63 sum(). // each thread maps and reduces its contribution to the answer
64 map(
65 [=](long double v){
66 std::stringstream message;
67 message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
68 return std::make_tuple(message.str(), v);
69 }).
70 start_with(std::make_tuple(message.str(), 0.0L)).
71 as_dynamic();
72 }).
73 concat(). // only subscribe to one range at a time in this group.
74 observe_on(outputthread).
75 map(rxcpp::util::apply_to(
76 [](std::string message, long double v){
77 std::cout << message << std::endl;
78 return v;
79 })).
80 as_dynamic();
81 }).
82 merge().
83 sum(). // reduces the contributions from all the threads to the answer
84 as_blocking().
85 last();
86
87 std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
88 auto finish = steady_clock::now();
89 auto msElapsed = duration_cast<milliseconds>(finish-start);
90 std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
91
92 }
93 }
94 }
95
96 SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){
97 GIVEN("a for loop"){
98 WHEN("partitioning pi series across all hardware threads"){
99
100 std::atomic_int c;
101 c = 0;
__anon769080da0902(int k) 102 auto pi = [&](int k) {
103 ++c;
104 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
105 };
106
107 using namespace std::chrono;
108 auto start = steady_clock::now();
109
110 struct work
111 {
112 int index;
113 int first;
114 int last;
115 };
116
117 // use all available hardware threads
118 auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
119 map(
__anon769080da0a02(int index)120 [](int index){
121 static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
122 int first = (chunk * index) + 1;
123 int last = chunk * (index + 1);
124 return work{index, first, last};
125 }).
126 map(
__anon769080da0b02(work w)127 [=](work w){
128 std::stringstream message;
129 message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
130
131 // create a new thread for every chunk
132 return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()).
133 map(pi).
134 sum(). // each thread maps and reduces its contribution to the answer
135 map(
136 [=](long double v){
137 std::stringstream message;
138 message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
139 return std::make_tuple(message.str(), v);
140 }).
141 start_with(std::make_tuple(message.str(), 0.0L)).
142 as_dynamic();
143 }).
144 merge(rxcpp::observe_on_new_thread()).
145 map(rxcpp::util::apply_to(
__anon769080da0d02(std::string message, long double v)146 [](std::string message, long double v){
147 std::cout << message << std::endl;
148 return v;
149 })).
150 sum(). // reduces the contributions from all the threads to the answer
151 as_blocking().
152 last();
153
154 std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
155 auto finish = steady_clock::now();
156 auto msElapsed = duration_cast<milliseconds>(finish-start);
157 std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
158
159 }
160 }
161 }
162
whitespace(char c)163 char whitespace(char c) {
164 return std::isspace<char>(c, std::locale::classic());
165 }
166
trim(std::string s)167 std::string trim(std::string s) {
168 auto first = std::find_if_not(s.begin(), s.end(), whitespace);
169 auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace);
170 if (last != s.rend()) {
171 s.erase(s.end() - (last-s.rbegin()), s.end());
172 }
173 s.erase(s.begin(), first);
174 return s;
175 }
176
tolowerLess(char lhs,char rhs)177 bool tolowerLess(char lhs, char rhs) {
178 return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic());
179 }
180
tolowerStringLess(const std::string & lhs,const std::string & rhs)181 bool tolowerStringLess(const std::string& lhs, const std::string& rhs) {
182 return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess);
183 }
184
185 SCENARIO("group_by", "[group_by][operators]"){
186 GIVEN("1 hot observable of ints."){
187 auto sc = rxsc::make_test();
188 auto w = sc.create_worker();
189 const rxsc::test::messages<std::string> on;
190 int keyInvoked = 0;
191 int marbleInvoked = 0;
192
193 auto xs = sc.make_hot_observable({
194 on.next(90, "error"),
195 on.next(110, "error"),
196 on.next(130, "error"),
197 on.next(220, " foo"),
198 on.next(240, " FoO "),
199 on.next(270, "baR "),
200 on.next(310, "foO "),
201 on.next(350, " Baz "),
202 on.next(360, " qux "),
203 on.next(390, " bar"),
204 on.next(420, " BAR "),
205 on.next(470, "FOO "),
206 on.next(480, "baz "),
207 on.next(510, " bAZ "),
208 on.next(530, " fOo "),
209 on.completed(570),
210 on.next(580, "error"),
211 on.completed(600),
212 on.error(650, std::runtime_error("error in completed sequence"))
213 });
214
215 WHEN("group normalized strings"){
216
217 auto res = w.start(
__anon769080da0e02() 218 [&]() {
219 return xs
220 .group_by(
221 [&](std::string v){
222 ++keyInvoked;
223 return trim(std::move(v));
224 },
225 [&](std::string v){
226 ++marbleInvoked;
227 std::reverse(v.begin(), v.end());
228 return v;
229 },
230 tolowerStringLess)
231 .map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();})
232 // forget type to workaround lambda deduction bug on msvc 2013
233 .as_dynamic();
234 }
235 );
236
237 THEN("the output contains groups of group keys"){
238 auto required = rxu::to_vector({
239 on.next(220, "foo"),
240 on.next(270, "baR"),
241 on.next(350, "Baz"),
242 on.next(360, "qux"),
243 on.completed(570)
244 });
245 auto actual = res.get_observer().messages();
246 REQUIRE(required == actual);
247 }
248
249 THEN("there was one subscription and one unsubscription to the xs"){
250 auto required = rxu::to_vector({
251 on.subscribe(200, 570)
252 });
253 auto actual = xs.subscriptions();
254 REQUIRE(required == actual);
255 }
256
257 THEN("key selector was invoked for each value"){
258 REQUIRE(12 == keyInvoked);
259 }
260
261 THEN("marble selector was invoked for each value"){
262 REQUIRE(12 == marbleInvoked);
263 }
264 }
265 }
266 }
267
268 SCENARIO("group_by take 1", "[group_by][take][operators]"){
269 GIVEN("1 hot observable of ints."){
270 auto sc = rxsc::make_test();
271 auto w = sc.create_worker();
272 const rxsc::test::messages<long> on;
273 int keyInvoked = 0;
274 int marbleInvoked = 0;
275 int groupEmitted = 0;
276
277 auto xs = sc.make_hot_observable({
278 on.next(130, -1),
279 on.next(220, 0),
280 on.next(240, -1),
281 on.next(270, 2),
282 on.next(310, -3),
283 on.next(350, 4),
284 on.next(360, -5),
285 on.next(390, 6),
286 on.next(420, -7),
287 on.next(470, 8),
288 on.next(480, -9),
289 on.completed(570)
290 });
291
292 WHEN("1 group of ints is emitted"){
293
294 auto res = w.start(
__anon769080da1202() 295 [&]() {
296 return xs
297 | rxo::group_by(
298 [&](long v) {
299 ++keyInvoked;
300 return v % 2;
301 },
302 [&](long v){
303 ++marbleInvoked;
304 return v;
305 })
306 | rxo::take(1)
307 | rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
308 ++groupEmitted;
309 return g;
310 })
311 | rxo::merge()
312 // forget type to workaround lambda deduction bug on msvc 2013
313 | rxo::as_dynamic();
314 }
315 );
316
317 THEN("the output contains groups of ints"){
318 auto required = rxu::to_vector({
319 on.next(220, 0),
320 on.next(270, 2),
321 on.next(350, 4),
322 on.next(390, 6),
323 on.next(470, 8),
324 on.completed(570)
325 });
326 auto actual = res.get_observer().messages();
327 REQUIRE(required == actual);
328 }
329
330 THEN("there was one subscription and one unsubscription to the xs"){
331 auto required = rxu::to_vector({
332 on.subscribe(200, 570)
333 });
334 auto actual = xs.subscriptions();
335 REQUIRE(required == actual);
336 }
337
338 THEN("key selector was invoked for each value"){
339 REQUIRE(10 == keyInvoked);
340 }
341
342 THEN("marble selector was invoked for each value"){
343 REQUIRE(5 == marbleInvoked);
344 }
345
346 THEN("1 group emitted"){
347 REQUIRE(1 == groupEmitted);
348 }
349 }
350 }
351 }
352
353 SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){
354 GIVEN("1 hot observable of ints."){
355 auto sc = rxsc::make_test();
356 auto w = sc.create_worker();
357 const rxsc::test::messages<long> on;
358 int keyInvoked = 0;
359 int marbleInvoked = 0;
360 int groupEmitted = 0;
361
362 auto xs = sc.make_hot_observable({
363 on.next(130, -1),
364 on.next(220, 0),
365 on.next(240, -1),
366 on.next(270, 2),
367 on.next(310, -3),
368 on.next(350, 4),
369 on.next(360, -5),
370 on.next(390, 6),
371 on.next(420, -7),
372 });
373
374 WHEN("1 group of ints is emitted"){
375
376 auto res = w.start(
__anon769080da1602() 377 [&]() {
378 return xs
379 .group_by(
380 [&](long v) {
381 ++keyInvoked;
382 return v % 2;
383 },
384 [&](long v){
385 ++marbleInvoked;
386 return v;
387 })
388 .take(1)
389 .map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
390 ++groupEmitted;
391 return g.take(4);
392 })
393 .merge()
394 // forget type to workaround lambda deduction bug on msvc 2013
395 .as_dynamic();
396 }
397 );
398
399 THEN("the output contains groups of ints"){
400 auto required = rxu::to_vector({
401 on.next(220, 0),
402 on.next(270, 2),
403 on.next(350, 4),
404 on.next(390, 6),
405 on.completed(390)
406 });
407 auto actual = res.get_observer().messages();
408 REQUIRE(required == actual);
409 }
410
411 THEN("there was one subscription and one unsubscription to the xs"){
412 auto required = rxu::to_vector({
413 on.subscribe(200, 390)
414 });
415 auto actual = xs.subscriptions();
416 REQUIRE(required == actual);
417 }
418
419 THEN("key selector was invoked for each value"){
420 REQUIRE(7 == keyInvoked);
421 }
422
423 THEN("marble selector was invoked for each value"){
424 REQUIRE(4 == marbleInvoked);
425 }
426
427 THEN("1 group emitted"){
428 REQUIRE(1 == groupEmitted);
429 }
430 }
431 }
432 }