1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-scan.hpp
6
7 \brief For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
8
9 \tparam Seed the type of the initial value for the accumulator.
10 \tparam Accumulator the type of the data accumulating function.
11
12 \param seed the initial value for the accumulator.
13 \param a an accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call.
14
15 \return An observable that emits the results of each call to the accumulator function.
16
17 \sample
18 \snippet scan.cpp scan sample
19 \snippet output.txt scan sample
20 */
21
22 #if !defined(RXCPP_OPERATORS_RX_SCAN_HPP)
23 #define RXCPP_OPERATORS_RX_SCAN_HPP
24
25 #include "../rx-includes.hpp"
26
27 namespace rxcpp {
28
29 namespace operators {
30
31 namespace detail {
32
33 template<class... AN>
34 struct scan_invalid_arguments {};
35
36 template<class... AN>
37 struct scan_invalid : public rxo::operator_base<scan_invalid_arguments<AN...>> {
38 using type = observable<scan_invalid_arguments<AN...>, scan_invalid<AN...>>;
39 };
40 template<class... AN>
41 using scan_invalid_t = typename scan_invalid<AN...>::type;
42
43 template<class T, class Observable, class Accumulator, class Seed>
44 struct scan : public operator_base<rxu::decay_t<Seed>>
45 {
46 typedef rxu::decay_t<Observable> source_type;
47 typedef rxu::decay_t<Accumulator> accumulator_type;
48 typedef rxu::decay_t<Seed> seed_type;
49
50 struct scan_initial_type
51 {
scan_initial_typerxcpp::operators::detail::scan::scan_initial_type52 scan_initial_type(source_type o, accumulator_type a, seed_type s)
53 : source(std::move(o))
54 , accumulator(std::move(a))
55 , seed(s)
56 {
57 }
58 source_type source;
59 accumulator_type accumulator;
60 seed_type seed;
61 };
62 scan_initial_type initial;
63
scanrxcpp::operators::detail::scan64 scan(source_type o, accumulator_type a, seed_type s)
65 : initial(std::move(o), a, s)
66 {
67 }
68
69 template<class Subscriber>
on_subscriberxcpp::operators::detail::scan70 void on_subscribe(Subscriber o) const {
71 struct scan_state_type
72 : public scan_initial_type
73 , public std::enable_shared_from_this<scan_state_type>
74 {
75 scan_state_type(scan_initial_type i, Subscriber scrbr)
76 : scan_initial_type(i)
77 , result(scan_initial_type::seed)
78 , out(std::move(scrbr))
79 {
80 }
81 seed_type result;
82 Subscriber out;
83 };
84 auto state = std::make_shared<scan_state_type>(initial, std::move(o));
85 state->source.subscribe(
86 state->out,
87 // on_next
88 [state](T t) {
89 state->result = state->accumulator(state->result, t);
90 state->out.on_next(state->result);
91 },
92 // on_error
93 [state](rxu::error_ptr e) {
94 state->out.on_error(e);
95 },
96 // on_completed
97 [state]() {
98 state->out.on_completed();
99 }
100 );
101 }
102 };
103
104 }
105
106 /*! @copydoc rx-scan.hpp
107 */
108 template<class... AN>
scan(AN &&...an)109 auto scan(AN&&... an)
110 -> operator_factory<scan_tag, AN...> {
111 return operator_factory<scan_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
112 }
113
114 }
115
116 template<>
117 struct member_overload<scan_tag>
118 {
119 template<class Observable, class Seed, class Accumulator,
120 class Enabled = rxu::enable_if_all_true_type_t<
121 is_observable<Observable>,
122 is_accumulate_function_for<rxu::value_type_t<Observable>, rxu::decay_t<Seed>, rxu::decay_t<Accumulator>>>,
123 class SourceValue = rxu::value_type_t<Observable>,
124 class Scan = rxo::detail::scan<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<Seed>>,
125 class Value = rxu::value_type_t<Scan>,
126 class Result = observable<Value, Scan>>
memberrxcpp::member_overload127 static Result member(Observable&& o, Seed s, Accumulator&& a) {
128 return Result(Scan(std::forward<Observable>(o), std::forward<Accumulator>(a), s));
129 }
130
131 template<class... AN>
memberrxcpp::member_overload132 static operators::detail::scan_invalid_t<AN...> member(AN...) {
133 std::terminate();
134 return {};
135 static_assert(sizeof...(AN) == 10000, "scan takes (Seed, Accumulator); Accumulator must be a function with the signature Seed(Seed, T)");
136 }
137 };
138
139 }
140
141 #endif
142