1 #include "../test.h" 2 #include <rxcpp/operators/rx-map.hpp> 3 4 SCENARIO("map stops on completion", "[map][operators]") { 5 GIVEN("a test hot observable of ints") { 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 long invoked = 0; 10 11 auto xs = sc.make_hot_observable({ 12 on.next(180, 1), 13 on.next(210, 2), 14 on.next(240, 3), 15 on.next(290, 4), 16 on.next(350, 5), 17 on.completed(400), 18 on.next(410, -1), 19 on.completed(420), 20 on.error(430, std::runtime_error("error on unsubscribed stream")) 21 }); 22 23 WHEN("mapped to ints that are one larger") { 24 25 auto res = w.start( __anonc803e6d10102() 26 [xs, &invoked]() { 27 return xs 28 .map([&invoked](int x) { 29 invoked++; 30 return x + 1; 31 }) 32 // forget type to workaround lambda deduction bug on msvc 2013 33 .as_dynamic(); 34 } 35 ); 36 37 THEN("the output stops on completion") { 38 auto required = rxu::to_vector({ 39 on.next(210, 3), 40 on.next(240, 4), 41 on.next(290, 5), 42 on.next(350, 6), 43 on.completed(400) 44 }); 45 auto actual = res.get_observer().messages(); 46 REQUIRE(required == actual); 47 } 48 49 THEN("there was one subscription and one unsubscription") { 50 auto required = rxu::to_vector({ 51 on.subscribe(200, 400) 52 }); 53 auto actual = xs.subscriptions(); 54 REQUIRE(required == actual); 55 } 56 57 THEN("map was called until completed") { 58 REQUIRE(4 == invoked); 59 } 60 } 61 } 62 } 63 64 SCENARIO("map - never", "[map][operators]") { 65 GIVEN("a source") { 66 auto sc = rxsc::make_test(); 67 auto w = sc.create_worker(); 68 const rxsc::test::messages<int> on; 69 70 auto xs = sc.make_hot_observable({ 71 on.next(150, 1) 72 }); 73 74 WHEN("values are mapped") { 75 76 auto res = w.start( __anonc803e6d10302() 77 [xs]() { 78 return xs 79 | rxo::map([](int x) { 80 return x + 1; 81 }) 82 // forget type to workaround lambda deduction bug on msvc 2013 83 | rxo::as_dynamic(); 84 } 85 ); 86 87 THEN("the output is empty") { 88 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 89 auto actual = res.get_observer().messages(); 90 REQUIRE(required == actual); 91 } 92 93 THEN("there was 1 subscription/unsubscription to the source") { 94 auto required = rxu::to_vector({ 95 on.subscribe(200, 1000) 96 }); 97 auto actual = xs.subscriptions(); 98 REQUIRE(required == actual); 99 } 100 } 101 } 102 } 103 104 SCENARIO("map - empty", "[map][operators]") { 105 GIVEN("a source") { 106 auto sc = rxsc::make_test(); 107 auto w = sc.create_worker(); 108 const rxsc::test::messages<int> on; 109 110 auto xs = sc.make_hot_observable({ 111 on.next(150, 1), 112 on.completed(250) 113 }); 114 115 WHEN("values are mapped") { 116 117 auto res = w.start( __anonc803e6d10502() 118 [xs]() { 119 return xs 120 .map([](int x) { 121 return x + 1; 122 }) 123 // forget type to workaround lambda deduction bug on msvc 2013 124 .as_dynamic(); 125 } 126 ); 127 128 THEN("the output only contains complete message") { 129 auto required = rxu::to_vector({ 130 on.completed(250) 131 }); 132 auto actual = res.get_observer().messages(); 133 REQUIRE(required == actual); 134 } 135 136 THEN("there was 1 subscription/unsubscription to the source") { 137 auto required = rxu::to_vector({ 138 on.subscribe(200, 250) 139 }); 140 auto actual = xs.subscriptions(); 141 REQUIRE(required == actual); 142 } 143 144 } 145 } 146 } 147 148 SCENARIO("map - items emitted", "[map][operators]") { 149 GIVEN("a source") { 150 auto sc = rxsc::make_test(); 151 auto w = sc.create_worker(); 152 const rxsc::test::messages<int> on; 153 154 auto xs = sc.make_hot_observable({ 155 on.next(150, 1), 156 on.next(210, 2), 157 on.next(240, 3), 158 on.completed(300) 159 }); 160 161 WHEN("values are mapped") { 162 163 auto res = w.start( __anonc803e6d10702() 164 [xs]() { 165 return xs 166 .map([](int x) { 167 return x + 1; 168 }) 169 // forget type to workaround lambda deduction bug on msvc 2013 170 .as_dynamic(); 171 } 172 ); 173 174 THEN("the output only contains items sent while subscribed") { 175 auto required = rxu::to_vector({ 176 on.next(210, 3), 177 on.next(240, 4), 178 on.completed(300) 179 }); 180 auto actual = res.get_observer().messages(); 181 REQUIRE(required == actual); 182 } 183 184 THEN("there was 1 subscription/unsubscription to the source") { 185 auto required = rxu::to_vector({ 186 on.subscribe(200, 300) 187 }); 188 auto actual = xs.subscriptions(); 189 REQUIRE(required == actual); 190 } 191 192 } 193 } 194 } 195 196 SCENARIO("map - throw", "[map][operators]") { 197 GIVEN("a source") { 198 auto sc = rxsc::make_test(); 199 auto w = sc.create_worker(); 200 const rxsc::test::messages<int> on; 201 202 std::runtime_error ex("map on_error from source"); 203 204 auto xs = sc.make_hot_observable({ 205 on.next(150, 1), 206 on.error(250, ex) 207 }); 208 209 WHEN("values are mapped") { 210 211 auto res = w.start( __anonc803e6d10902() 212 [xs]() { 213 return xs 214 .map([](int x) { 215 return x + 1; 216 }) 217 // forget type to workaround lambda deduction bug on msvc 2013 218 .as_dynamic(); 219 } 220 ); 221 222 THEN("the output only contains only error") { 223 auto required = rxu::to_vector({ 224 on.error(250, ex) 225 }); 226 auto actual = res.get_observer().messages(); 227 REQUIRE(required == actual); 228 } 229 230 THEN("there was 1 subscription/unsubscription to the source") { 231 auto required = rxu::to_vector({ 232 on.subscribe(200, 250) 233 }); 234 auto actual = xs.subscriptions(); 235 REQUIRE(required == actual); 236 } 237 238 } 239 } 240 } 241