1 #include "../test.h" 2 3 SCENARIO("create stops on completion", "[create][sources]"){ 4 GIVEN("a test cold observable of ints"){ 5 auto sc = rxsc::make_test(); 6 auto w = sc.create_worker(); 7 const rxsc::test::messages<int> on; 8 9 long invoked = 0; 10 11 WHEN("created"){ 12 13 auto res = w.start( __anon44bf292c0102() 14 [&]() { 15 return rx::observable<>::create<int>( 16 [&](const rx::subscriber<int>& s){ 17 invoked++; 18 s.on_next(1); 19 s.on_next(2); 20 }) 21 // forget type to workaround lambda deduction bug on msvc 2013 22 .as_dynamic(); 23 } 24 ); 25 26 THEN("the output contains all items"){ 27 auto required = rxu::to_vector({ 28 on.next(200, 1), 29 on.next(200, 2) 30 }); 31 auto actual = res.get_observer().messages(); 32 REQUIRE(required == actual); 33 } 34 35 THEN("create was called until completed"){ 36 REQUIRE(1 == invoked); 37 } 38 } 39 } 40 } 41 42 SCENARIO("when observer::on_next is overridden", "[create][observer][sources]"){ 43 GIVEN("a test cold observable of ints"){ 44 auto sc = rxsc::make_test(); 45 auto w = sc.create_worker(); 46 const rxsc::test::messages<int> on; 47 48 long invoked = 0; 49 50 WHEN("created"){ 51 52 auto res = w.start( __anon44bf292c0302() 53 [&]() { 54 return rx::observable<>::create<int>( 55 [&](const rx::subscriber<int>& so){ 56 invoked++; 57 auto sn = rx::make_subscriber<int>(so, 58 rx::make_observer<int>(so.get_observer(), 59 [](rx::observer<int>& o, int v){ 60 o.on_next(v + 1); 61 })); 62 sn.on_next(1); 63 sn.on_next(2); 64 }) 65 // forget type to workaround lambda deduction bug on msvc 2013 66 .as_dynamic(); 67 } 68 ); 69 70 THEN("the output contains all items incremented by 1"){ 71 auto required = rxu::to_vector({ 72 on.next(200, 2), 73 on.next(200, 3) 74 }); 75 auto actual = res.get_observer().messages(); 76 REQUIRE(required == actual); 77 } 78 79 THEN("create was called until completed"){ 80 REQUIRE(1 == invoked); 81 } 82 } 83 } 84 } 85