1 #include "../test.h" 2 #include <rxcpp/operators/rx-map.hpp> 3 #include <rxcpp/operators/rx-merge.hpp> 4 #include <rxcpp/operators/rx-window_toggle.hpp> 5 6 SCENARIO("window toggle, basic", "[window_toggle][operators]"){ 7 GIVEN("1 hot observable of ints and hot observable of opens."){ 8 auto sc = rxsc::make_test(); 9 auto so = rx::synchronize_in_one_worker(sc); 10 auto w = sc.create_worker(); 11 const rxsc::test::messages<int> on; 12 const rxsc::test::messages<std::string> o_on; 13 14 auto xs = sc.make_hot_observable({ 15 on.next(90, 1), 16 on.next(180, 2), 17 on.next(250, 3), 18 on.next(260, 4), 19 on.next(310, 5), 20 on.next(340, 6), 21 on.next(410, 7), 22 on.next(420, 8), 23 on.next(470, 9), 24 on.next(550, 10), 25 on.completed(590) 26 }); 27 28 auto ys = sc.make_hot_observable({ 29 on.next(255, 50), 30 on.next(330, 100), 31 on.next(350, 50), 32 on.next(400, 90), 33 on.completed(900) 34 }); 35 36 WHEN("ints are split into windows"){ 37 using namespace std::chrono; 38 39 int wi = 0; 40 41 auto res = w.start( __anonff7e3ccc0102() 42 [&]() { 43 return xs 44 | rxo::window_toggle(ys, [&](int y){ 45 return rx::observable<>::timer(milliseconds(y), so); 46 }, so) 47 | rxo::map([wi](rxcpp::observable<int> w) mutable { 48 auto ti = wi++; 49 return w 50 | rxo::map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) 51 // forget type to workaround lambda deduction bug on msvc 2013 52 | rxo::as_dynamic(); 53 }) 54 | rxo::merge() 55 // forget type to workaround lambda deduction bug on msvc 2013 56 | rxo::as_dynamic(); 57 } 58 ); 59 60 THEN("the output contains ints assigned to windows"){ 61 auto required = rxu::to_vector({ 62 o_on.next(261, "0 4"), 63 o_on.next(341, "1 6"), 64 o_on.next(411, "1 7"), 65 o_on.next(411, "3 7"), 66 o_on.next(421, "1 8"), 67 o_on.next(421, "3 8"), 68 o_on.next(471, "3 9"), 69 o_on.completed(591) 70 }); 71 auto actual = res.get_observer().messages(); 72 REQUIRE(required == actual); 73 } 74 75 THEN("there was one subscription and one unsubscription to the observable"){ 76 auto required = rxu::to_vector({ 77 o_on.subscribe(200, 590) 78 }); 79 auto actual = xs.subscriptions(); 80 REQUIRE(required == actual); 81 } 82 } 83 } 84 } 85 86 SCENARIO("window toggle, basic same", "[window_toggle][operators]"){ 87 GIVEN("1 hot observable of ints and hot observable of opens."){ 88 auto sc = rxsc::make_test(); 89 auto so = rx::synchronize_in_one_worker(sc); 90 auto w = sc.create_worker(); 91 const rxsc::test::messages<int> on; 92 const rxsc::test::messages<std::string> o_on; 93 94 auto xs = sc.make_hot_observable({ 95 on.next(90, 1), 96 on.next(180, 2), 97 on.next(250, 3), 98 on.next(260, 4), 99 on.next(310, 5), 100 on.next(340, 6), 101 on.next(410, 7), 102 on.next(420, 8), 103 on.next(470, 9), 104 on.next(550, 10), 105 on.completed(590) 106 }); 107 108 auto ys = sc.make_hot_observable({ 109 on.next(255, 50), 110 on.next(330, 100), 111 on.next(350, 50), 112 on.next(400, 90), 113 on.completed(900) 114 }); 115 116 WHEN("ints are split into windows"){ 117 using namespace std::chrono; 118 119 int wi = 0; 120 121 auto res = w.start( __anonff7e3ccc0502() 122 [&]() { 123 return xs 124 .window_toggle(ys, [&](int){ 125 return ys; 126 }, so) 127 .map([wi](rxcpp::observable<int> w) mutable { 128 auto ti = wi++; 129 return w 130 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) 131 // forget type to workaround lambda deduction bug on msvc 2013 132 .as_dynamic(); 133 }) 134 .merge() 135 // forget type to workaround lambda deduction bug on msvc 2013 136 .as_dynamic(); 137 } 138 ); 139 140 THEN("the output contains ints assigned to windows"){ 141 auto required = rxu::to_vector({ 142 o_on.next(261, "0 4"), 143 o_on.next(311, "0 5"), 144 o_on.next(341, "1 6"), 145 o_on.next(411, "3 7"), 146 o_on.next(421, "3 8"), 147 o_on.next(471, "3 9"), 148 o_on.next(551, "3 10"), 149 o_on.completed(591) 150 }); 151 auto actual = res.get_observer().messages(); 152 REQUIRE(required == actual); 153 } 154 155 THEN("there was one subscription and one unsubscription to the observable"){ 156 auto required = rxu::to_vector({ 157 o_on.subscribe(200, 590) 158 }); 159 auto actual = xs.subscriptions(); 160 REQUIRE(required == actual); 161 } 162 } 163 } 164 } 165 166 SCENARIO("window toggle, error", "[window_toggle][operators]"){ 167 GIVEN("1 hot observable of ints and hot observable of opens."){ 168 auto sc = rxsc::make_test(); 169 auto so = rx::synchronize_in_one_worker(sc); 170 auto w = sc.create_worker(); 171 const rxsc::test::messages<int> on; 172 const rxsc::test::messages<std::string> o_on; 173 174 std::runtime_error ex("window_toggle on_error from source"); 175 176 auto xs = sc.make_hot_observable({ 177 on.next(90, 1), 178 on.next(180, 2), 179 on.next(250, 3), 180 on.next(260, 4), 181 on.next(310, 5), 182 on.next(340, 6), 183 on.next(410, 7), 184 on.error(420, ex), 185 on.next(470, 9), 186 on.next(550, 10), 187 on.completed(590) 188 }); 189 190 auto ys = sc.make_hot_observable({ 191 on.next(255, 50), 192 on.next(330, 100), 193 on.next(350, 50), 194 on.next(400, 90), 195 on.completed(900) 196 }); 197 198 WHEN("ints are split into windows"){ 199 using namespace std::chrono; 200 201 int wi = 0; 202 203 auto res = w.start( __anonff7e3ccc0902() 204 [&]() { 205 return xs 206 .window_toggle(ys, [&](int){ 207 return ys; 208 }, so) 209 .map([wi](rxcpp::observable<int> w) mutable { 210 auto ti = wi++; 211 return w 212 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) 213 // forget type to workaround lambda deduction bug on msvc 2013 214 .as_dynamic(); 215 }) 216 .merge() 217 // forget type to workaround lambda deduction bug on msvc 2013 218 .as_dynamic(); 219 } 220 ); 221 222 THEN("the output contains ints assigned to windows"){ 223 auto required = rxu::to_vector({ 224 o_on.next(261, "0 4"), 225 o_on.next(311, "0 5"), 226 o_on.next(341, "1 6"), 227 o_on.next(411, "3 7"), 228 o_on.error(421, ex) 229 }); 230 auto actual = res.get_observer().messages(); 231 REQUIRE(required == actual); 232 } 233 234 THEN("there was one subscription and one unsubscription to the observable"){ 235 auto required = rxu::to_vector({ 236 o_on.subscribe(200, 420) 237 }); 238 auto actual = xs.subscriptions(); 239 REQUIRE(required == actual); 240 } 241 } 242 } 243 } 244 245 SCENARIO("window toggle, disposed", "[window_toggle][operators]"){ 246 GIVEN("1 hot observable of ints and hot observable of opens."){ 247 auto sc = rxsc::make_test(); 248 auto so = rx::synchronize_in_one_worker(sc); 249 auto w = sc.create_worker(); 250 const rxsc::test::messages<int> on; 251 const rxsc::test::messages<std::string> o_on; 252 253 auto xs = sc.make_hot_observable({ 254 on.next(90, 1), 255 on.next(180, 2), 256 on.next(250, 3), 257 on.next(260, 4), 258 on.next(310, 5), 259 on.next(340, 6), 260 on.next(410, 7), 261 on.next(420, 8), 262 on.next(470, 9), 263 on.next(550, 10), 264 on.completed(590) 265 }); 266 267 auto ys = sc.make_hot_observable({ 268 on.next(255, 50), 269 on.next(330, 100), 270 on.next(350, 50), 271 on.next(400, 90), 272 on.completed(900) 273 }); 274 275 WHEN("ints are split into windows"){ 276 using namespace std::chrono; 277 278 int wi = 0; 279 280 auto res = w.start( __anonff7e3ccc0d02() 281 [&]() { 282 return xs 283 .window_toggle(ys, [&](int){ 284 return ys; 285 }, so) 286 .map([wi](rxcpp::observable<int> w) mutable { 287 auto ti = wi++; 288 return w 289 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) 290 // forget type to workaround lambda deduction bug on msvc 2013 291 .as_dynamic(); 292 }) 293 .merge() 294 // forget type to workaround lambda deduction bug on msvc 2013 295 .as_dynamic(); 296 }, 297 420 298 ); 299 300 THEN("the output contains ints assigned to windows"){ 301 auto required = rxu::to_vector({ 302 o_on.next(261, "0 4"), 303 o_on.next(311, "0 5"), 304 o_on.next(341, "1 6"), 305 o_on.next(411, "3 7") 306 }); 307 auto actual = res.get_observer().messages(); 308 REQUIRE(required == actual); 309 } 310 311 THEN("there was one subscription and one unsubscription to the observable"){ 312 auto required = rxu::to_vector({ 313 o_on.subscribe(200, 420) 314 }); 315 auto actual = xs.subscriptions(); 316 REQUIRE(required == actual); 317 } 318 } 319 } 320 } 321 322