1 #include "../test.h" 2 3 SCENARIO("defer stops on completion", "[defer][sources]"){ 4 GIVEN("a test cold observable of ints"){ 5 auto sc = rxsc::make_test(); 6 auto w = sc.create_worker(); 7 const rxsc::test::messages<long> on; 8 9 long invoked = 0; 10 11 rxu::detail::maybe<rx::test::testable_observable<long>> xs; 12 13 WHEN("deferred"){ 14 15 auto empty = rx::observable<>::empty<long>(); 16 auto just = rx::observable<>::just(42); 17 auto one = rx::observable<>::from(42); 18 auto error = rx::observable<>::error<long>(rxu::error_ptr()); 19 auto runtimeerror = rx::observable<>::error<long>(std::runtime_error("runtime")); 20 21 auto res = w.start( __anonf4110b5e0102() 22 [&]() { 23 return rx::observable<>::defer( 24 [&](){ 25 invoked++; 26 xs.reset(sc.make_cold_observable({ 27 on.next(100, sc.clock()), 28 on.completed(200) 29 })); 30 return xs.get(); 31 }) 32 // forget type to workaround lambda deduction bug on msvc 2013 33 .as_dynamic(); 34 } 35 ); 36 37 THEN("the output stops on completion"){ 38 auto required = rxu::to_vector({ 39 on.next(300, 200L), 40 on.completed(400) 41 }); 42 auto actual = res.get_observer().messages(); 43 REQUIRE(required == actual); 44 } 45 46 THEN("there was one subscription and one unsubscription"){ 47 auto required = rxu::to_vector({ 48 on.subscribe(200, 400) 49 }); 50 auto actual = xs.get().subscriptions(); 51 REQUIRE(required == actual); 52 } 53 54 THEN("defer was called until completed"){ 55 REQUIRE(1 == invoked); 56 } 57 } 58 } 59 } 60