1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-merge_delay_error.hpp> 4 #include <rxcpp/operators/rx-observe_on.hpp> 5 6 //merge_delay_error must work the very same way as `merge()` except the error handling 7 8 SCENARIO("merge_delay_error completes", "[merge][join][operators]"){ 9 GIVEN("1 hot observable with 3 cold observables of ints."){ 10 auto sc = rxsc::make_test(); 11 auto w = sc.create_worker(); 12 const rxsc::test::messages<int> on; 13 const rxsc::test::messages<rx::observable<int>> o_on; 14 15 auto ys1 = sc.make_cold_observable({ 16 on.next(10, 101), 17 on.next(20, 102), 18 on.next(110, 103), 19 on.next(120, 104), 20 on.next(210, 105), 21 on.next(220, 106), 22 on.completed(230) 23 }); 24 25 auto ys2 = sc.make_cold_observable({ 26 on.next(10, 201), 27 on.next(20, 202), 28 on.next(30, 203), 29 on.next(40, 204), 30 on.completed(50) 31 }); 32 33 auto ys3 = sc.make_cold_observable({ 34 on.next(10, 301), 35 on.next(20, 302), 36 on.next(30, 303), 37 on.next(40, 304), 38 on.next(120, 305), 39 on.completed(150) 40 }); 41 42 auto xs = sc.make_hot_observable({ 43 o_on.next(300, ys1), 44 o_on.next(400, ys2), 45 o_on.next(500, ys3), 46 o_on.completed(600) 47 }); 48 49 WHEN("each int is merged"){ 50 51 auto res = w.start( __anon3d92c6da0102() 52 [&]() { 53 return xs 54 | rxo::merge_delay_error() 55 // forget type to workaround lambda deduction bug on msvc 2013 56 | rxo::as_dynamic(); 57 } 58 ); 59 60 THEN("the output contains merged ints"){ 61 auto required = rxu::to_vector({ 62 on.next(310, 101), 63 on.next(320, 102), 64 on.next(410, 103), 65 on.next(410, 201), 66 on.next(420, 104), 67 on.next(420, 202), 68 on.next(430, 203), 69 on.next(440, 204), 70 on.next(510, 105), 71 on.next(510, 301), 72 on.next(520, 106), 73 on.next(520, 302), 74 on.next(530, 303), 75 on.next(540, 304), 76 on.next(620, 305), 77 on.completed(650) 78 }); 79 auto actual = res.get_observer().messages(); 80 REQUIRE(required == actual); 81 } 82 83 THEN("there was one subscription and one unsubscription to the xs"){ 84 auto required = rxu::to_vector({ 85 on.subscribe(200, 600) 86 }); 87 auto actual = xs.subscriptions(); 88 REQUIRE(required == actual); 89 } 90 91 THEN("there was one subscription and one unsubscription to the ys1"){ 92 auto required = rxu::to_vector({ 93 on.subscribe(300, 530) 94 }); 95 auto actual = ys1.subscriptions(); 96 REQUIRE(required == actual); 97 } 98 99 THEN("there was one subscription and one unsubscription to the ys2"){ 100 auto required = rxu::to_vector({ 101 on.subscribe(400, 450) 102 }); 103 auto actual = ys2.subscriptions(); 104 REQUIRE(required == actual); 105 } 106 107 THEN("there was one subscription and one unsubscription to the ys3"){ 108 auto required = rxu::to_vector({ 109 on.subscribe(500, 650) 110 }); 111 auto actual = ys3.subscriptions(); 112 REQUIRE(required == actual); 113 } 114 } 115 } 116 } 117 118 SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){ 119 GIVEN("1 hot observable with 3 cold observables of ints."){ 120 auto sc = rxsc::make_test(); 121 auto w = sc.create_worker(); 122 const rxsc::test::messages<int> on; 123 const rxsc::test::messages<rx::observable<int>> o_on; 124 125 auto ys1 = sc.make_cold_observable({ 126 on.next(10, 101), 127 on.next(20, 102), 128 on.next(110, 103), 129 on.next(120, 104), 130 on.next(210, 105), 131 on.next(230, 107), 132 on.completed(240) 133 }); 134 135 auto ys2 = sc.make_cold_observable({ 136 on.next(10, 201), 137 on.next(20, 202), 138 on.next(30, 203), 139 on.error(40, std::runtime_error("merge_delay_error on_error from ys2")), 140 on.next(50, 205), 141 on.completed(60) 142 }); 143 144 auto ys3 = sc.make_cold_observable({ 145 on.next(10, 301), 146 on.next(20, 302), 147 on.next(30, 303), 148 on.next(40, 304), 149 on.next(120, 305), 150 on.completed(150) 151 }); 152 153 WHEN("each int is merged"){ 154 155 auto res = w.start( __anon3d92c6da0202() 156 [&]() { 157 return ys1 158 .merge_delay_error(ys2, ys3); 159 } 160 ); 161 162 rx::composite_exception ex; 163 THEN("the output contains merged ints"){ 164 auto required = rxu::to_vector({ 165 on.next(210, 101), 166 on.next(210, 201), 167 on.next(210, 301), 168 on.next(220, 102), 169 on.next(220, 202), 170 on.next(220, 302), 171 on.next(230, 203), 172 on.next(230, 303), 173 on.next(240, 304), 174 on.next(310, 103), 175 on.next(320, 104), 176 on.next(320, 305), 177 on.next(410, 105), 178 on.next(430, 107), 179 on.error(440, ex) 180 }); 181 auto actual = res.get_observer().messages(); 182 REQUIRE(required == actual); 183 } 184 185 THEN("there was one subscription and one unsubscription to the ys1"){ 186 auto required = rxu::to_vector({ 187 on.subscribe(200, 440) 188 }); 189 auto actual = ys1.subscriptions(); 190 REQUIRE(required == actual); 191 } 192 193 THEN("there was one subscription and one unsubscription to the ys2"){ 194 auto required = rxu::to_vector({ 195 on.subscribe(200, 240) 196 }); 197 auto actual = ys2.subscriptions(); 198 REQUIRE(required == actual); 199 } 200 201 THEN("there was one subscription and one unsubscription to the ys3"){ 202 auto required = rxu::to_vector({ 203 on.subscribe(200, 350) 204 }); 205 auto actual = ys3.subscriptions(); 206 REQUIRE(required == actual); 207 } 208 } 209 } 210 } 211 212 SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){ 213 GIVEN("1 hot observable with 3 cold observables of ints."){ 214 auto sc = rxsc::make_test(); 215 auto w = sc.create_worker(); 216 const rxsc::test::messages<int> on; 217 const rxsc::test::messages<rx::observable<int>> o_on; 218 219 auto ys1 = sc.make_cold_observable({ 220 on.next(10, 101), 221 on.next(20, 102), 222 on.next(110, 103), 223 on.next(120, 104), 224 on.next(210, 105), 225 on.error(220, std::runtime_error("merge_delay_error on_error from ys1")), 226 on.next(230, 107), 227 on.completed(240) 228 }); 229 230 auto ys2 = sc.make_cold_observable({ 231 on.next(10, 201), 232 on.next(20, 202), 233 on.next(30, 203), 234 on.error(40, std::runtime_error("merge_delay_error on_error from ys2")), 235 on.next(50, 205), 236 on.completed(60) 237 }); 238 239 auto ys3 = sc.make_cold_observable({ 240 on.next(10, 301), 241 on.next(20, 302), 242 on.next(30, 303), 243 on.next(40, 304), 244 on.next(120, 305), 245 on.completed(150) 246 }); 247 248 WHEN("each int is merged"){ 249 250 auto res = w.start( __anon3d92c6da0302() 251 [&]() { 252 return ys1 253 .merge_delay_error(ys2, ys3); 254 } 255 ); 256 257 rx::composite_exception ex; 258 THEN("the output contains merged ints"){ 259 auto required = rxu::to_vector({ 260 on.next(210, 101), 261 on.next(210, 201), 262 on.next(210, 301), 263 on.next(220, 102), 264 on.next(220, 202), 265 on.next(220, 302), 266 on.next(230, 203), 267 on.next(230, 303), 268 on.next(240, 304), 269 on.next(310, 103), 270 on.next(320, 104), 271 on.next(320, 305), 272 on.next(410, 105), 273 on.error(420, ex) 274 }); 275 auto actual = res.get_observer().messages(); 276 REQUIRE(required == actual); 277 } 278 279 THEN("there was one subscription and one unsubscription to the ys1"){ 280 auto required = rxu::to_vector({ 281 on.subscribe(200, 420) 282 }); 283 auto actual = ys1.subscriptions(); 284 REQUIRE(required == actual); 285 } 286 287 THEN("there was one subscription and one unsubscription to the ys2"){ 288 auto required = rxu::to_vector({ 289 on.subscribe(200, 240) 290 }); 291 auto actual = ys2.subscriptions(); 292 REQUIRE(required == actual); 293 } 294 295 THEN("there was one subscription and one unsubscription to the ys3"){ 296 auto required = rxu::to_vector({ 297 on.subscribe(200, 350) 298 }); 299 auto actual = ys3.subscriptions(); 300 REQUIRE(required == actual); 301 } 302 } 303 } 304 } 305