1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 #include <array> 7 8 SCENARIO("ref_count other diamond sample"){ 9 printf("//! [ref_count other diamond sample]\n"); 10 11 /* 12 * Implements the following diamond graph chain with publish+ref_count without using threads. 13 * This version is composable because it does not use connect explicitly. 14 * 15 * Values 16 * / \ 17 * *2 *100 18 * \ / 19 * Merge 20 * | 21 * RefCount 22 */ 23 24 std::array<double, 5> a={{1.0, 2.0, 3.0, 4.0, 5.0}}; 25 auto values = rxcpp::observable<>::iterate(a) 26 // The root of the chain is only subscribed to once. __anonbe012e380102(double v) 27 .tap([](double v) { printf("[0] OnNext: %lf\n", v); }) 28 .publish(); 29 __anonbe012e380202(double v) 30 auto values_to_long = values.map([](double v) { return (long) v; }); 31 32 // Left side multiplies by 2. 33 auto left = values_to_long.map( __anonbe012e380302(long v) 34 [](long v) -> long {printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2L;} ); 35 36 // Right side multiplies by 100. 37 auto right = values_to_long.map( __anonbe012e380402(long v) 38 [](long v) -> long {printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100L; }); 39 40 // Merge the left,right sides together. 41 // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...]. 42 auto merged = left.merge(right); 43 44 // When this value is subscribed to, it calls connect on values. 45 auto connect_on_subscribe = merged.ref_count(values); 46 47 // This immediately starts emitting all values and blocks until they are completed. 48 connect_on_subscribe.subscribe( __anonbe012e380502(long v) 49 [](long v) { printf("[3] OnNext: %ld\n", v); }, __anonbe012e380602() 50 [&]() { printf("[3] OnCompleted:\n"); }); 51 52 printf("//! [ref_count other diamond sample]\n"); 53 } 54 55 // see also examples/doxygen/publish.cpp for non-ref_count diamonds 56