1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("buffer count sample"){ 7 printf("//! [buffer count sample]\n"); 8 auto values = rxcpp::observable<>::range(1, 5).buffer(2); 9 values. 10 subscribe( __anon4592fcc90102(std::vector<int> v)11 [](std::vector<int> v){ 12 printf("OnNext:"); 13 std::for_each(v.begin(), v.end(), [](int a){ 14 printf(" %d", a); 15 }); 16 printf("\n"); 17 }, __anon4592fcc90302()18 [](){printf("OnCompleted\n");}); 19 printf("//! [buffer count sample]\n"); 20 } 21 22 SCENARIO("buffer count+skip sample"){ 23 printf("//! [buffer count+skip sample]\n"); 24 auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3); 25 values. 26 subscribe( __anon4592fcc90402(std::vector<int> v)27 [](std::vector<int> v){ 28 printf("OnNext:"); 29 std::for_each(v.begin(), v.end(), [](int a){ 30 printf(" %d", a); 31 }); 32 printf("\n"); 33 }, __anon4592fcc90602()34 [](){printf("OnCompleted\n");}); 35 printf("//! [buffer count+skip sample]\n"); 36 } 37 38 #include "main.hpp" 39 40 SCENARIO("buffer period+skip+coordination sample"){ 41 printf("//! [buffer period+skip+coordination sample]\n"); 42 printf("[thread %s] Start task\n", get_pid().c_str()); 43 auto period = std::chrono::milliseconds(4); 44 auto skip = std::chrono::milliseconds(6); 45 auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). __anon4592fcc90702(long v)46 map([](long v){ 47 printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v); 48 return v; 49 }). 50 take(7). 51 buffer_with_time(period, skip, rxcpp::observe_on_new_thread()); 52 values. 53 as_blocking(). 54 subscribe( __anon4592fcc90802(std::vector<long> v)55 [](std::vector<long> v){ 56 printf("[thread %s] OnNext:", get_pid().c_str()); 57 std::for_each(v.begin(), v.end(), [](long a){ 58 printf(" %ld", a); 59 }); 60 printf("\n"); 61 }, __anon4592fcc90a02()62 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 63 printf("[thread %s] Finish task\n", get_pid().c_str()); 64 printf("//! [buffer period+skip+coordination sample]\n"); 65 } 66 67 SCENARIO("buffer period+skip sample"){ 68 printf("//! [buffer period+skip sample]\n"); 69 auto period = std::chrono::milliseconds(4); 70 auto skip = std::chrono::milliseconds(6); 71 auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). 72 take(7). 73 buffer_with_time(period, skip); 74 values. 75 subscribe( __anon4592fcc90b02(std::vector<long> v)76 [](std::vector<long> v){ 77 printf("OnNext:"); 78 std::for_each(v.begin(), v.end(), [](long a){ 79 printf(" %ld", a); 80 }); 81 printf("\n"); 82 }, __anon4592fcc90d02()83 [](){printf("OnCompleted\n");}); 84 printf("//! [buffer period+skip sample]\n"); 85 } 86 87 SCENARIO("buffer period+skip overlapping sample"){ 88 printf("//! [buffer period+skip overlapping sample]\n"); 89 auto period = std::chrono::milliseconds(6); 90 auto skip = std::chrono::milliseconds(4); 91 auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). 92 take(7). 93 buffer_with_time(period, skip); 94 values. 95 subscribe( __anon4592fcc90e02(std::vector<long> v)96 [](std::vector<long> v){ 97 printf("OnNext:"); 98 std::for_each(v.begin(), v.end(), [](long a){ 99 printf(" %ld", a); 100 }); 101 printf("\n"); 102 }, __anon4592fcc91002()103 [](){printf("OnCompleted\n");}); 104 printf("//! [buffer period+skip overlapping sample]\n"); 105 } 106 107 SCENARIO("buffer period+skip empty sample"){ 108 printf("//! [buffer period+skip empty sample]\n"); 109 auto period = std::chrono::milliseconds(2); 110 auto skip = std::chrono::milliseconds(4); 111 auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). 112 buffer_with_time(period, skip); 113 values. 114 subscribe( __anon4592fcc91102(std::vector<long> v)115 [](std::vector<long> v){ 116 printf("OnNext:"); 117 std::for_each(v.begin(), v.end(), [](long a){ 118 printf(" %ld", a); 119 }); 120 printf("\n"); 121 }, __anon4592fcc91302()122 [](){printf("OnCompleted\n");}); 123 printf("//! [buffer period+skip empty sample]\n"); 124 } 125 126 SCENARIO("buffer period+coordination sample"){ 127 printf("//! [buffer period+coordination sample]\n"); 128 auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). 129 take(7). 130 buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread()); 131 values. 132 as_blocking(). 133 subscribe( __anon4592fcc91402(std::vector<long> v)134 [](std::vector<long> v){ 135 printf("OnNext:"); 136 std::for_each(v.begin(), v.end(), [](long a){ 137 printf(" %ld", a); 138 }); 139 printf("\n"); 140 }, __anon4592fcc91602()141 [](){printf("OnCompleted\n");}); 142 printf("//! [buffer period+coordination sample]\n"); 143 } 144 145 SCENARIO("buffer period sample"){ 146 printf("//! [buffer period sample]\n"); 147 auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). 148 take(7). 149 buffer_with_time(std::chrono::milliseconds(4)); 150 values. 151 subscribe( __anon4592fcc91702(std::vector<long> v)152 [](std::vector<long> v){ 153 printf("OnNext:"); 154 std::for_each(v.begin(), v.end(), [](long a){ 155 printf(" %ld", a); 156 }); 157 printf("\n"); 158 }, __anon4592fcc91902()159 [](){printf("OnCompleted\n");}); 160 printf("//! [buffer period sample]\n"); 161 } 162 163 SCENARIO("buffer period+count+coordination sample"){ 164 printf("//! [buffer period+count+coordination sample]\n"); 165 auto int1 = rxcpp::observable<>::range(1L, 3L); 166 auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); 167 auto values = int1. 168 concat(int2). 169 buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop()); 170 values. 171 as_blocking(). 172 subscribe( __anon4592fcc91a02(std::vector<long> v)173 [](std::vector<long> v){ 174 printf("OnNext:"); 175 std::for_each(v.begin(), v.end(), [](long a){ 176 printf(" %ld", a); 177 }); 178 printf("\n"); 179 }, __anon4592fcc91c02()180 [](){printf("OnCompleted\n");}); 181 printf("//! [buffer period+count+coordination sample]\n"); 182 } 183 184 SCENARIO("buffer period+count sample"){ 185 printf("//! [buffer period+count sample]\n"); 186 auto int1 = rxcpp::observable<>::range(1L, 3L); 187 auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); 188 auto values = int1. 189 concat(int2). 190 buffer_with_time_or_count(std::chrono::milliseconds(20), 2); 191 values. 192 subscribe( __anon4592fcc91d02(std::vector<long> v)193 [](std::vector<long> v){ 194 printf("OnNext:"); 195 std::for_each(v.begin(), v.end(), [](long a){ 196 printf(" %ld", a); 197 }); 198 printf("\n"); 199 }, __anon4592fcc91f02()200 [](){printf("OnCompleted\n");}); 201 printf("//! [buffer period+count sample]\n"); 202 } 203