1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-reduce.hpp
6
7 \brief For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.
8
9 \tparam Seed the type of the initial value for the accumulator
10 \tparam Accumulator the type of the data accumulating function
11 \tparam ResultSelector the type of the result producing function
12
13 \param seed the initial value for the accumulator
14 \param a an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
15 \param rs a result producing function that makes the final value from the last accumulator call result
16
17 \return An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.
18
19 Some basic reduce-type operators have already been implemented:
20 - rxcpp::operators::first
21 - rxcpp::operators::last
22 - rxcpp::operators::count
23 - rxcpp::operators::sum
24 - rxcpp::operators::average
25 - rxcpp::operators::min
26 - rxcpp::operators::max
27
28 \sample
29 Geometric mean of source values:
30 \snippet reduce.cpp reduce sample
31 \snippet output.txt reduce sample
32
33 If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
34 \snippet reduce.cpp reduce empty sample
35 \snippet output.txt reduce empty sample
36
37 If the accumulator raises an exception, it is returned by the resulting observable in on_error:
38 \snippet reduce.cpp reduce exception from accumulator sample
39 \snippet output.txt reduce exception from accumulator sample
40
41 The same for exceptions raised by the result selector:
42 \snippet reduce.cpp reduce exception from result selector sample
43 \snippet output.txt reduce exception from result selector sample
44 */
45
46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP)
47 #define RXCPP_OPERATORS_RX_REDUCE_HPP
48
49 #include "../rx-includes.hpp"
50
51 namespace rxcpp {
52
53 namespace operators {
54
55 namespace detail {
56
57 template<class... AN>
58 struct reduce_invalid_arguments {};
59
60 template<class... AN>
61 struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> {
62 using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>;
63 };
64 template<class... AN>
65 using reduce_invalid_t = typename reduce_invalid<AN...>::type;
66
67 template<class Seed, class ResultSelector>
68 struct is_result_function_for {
69
70 typedef rxu::decay_t<ResultSelector> result_selector_type;
71 typedef rxu::decay_t<Seed> seed_type;
72
73 struct tag_not_valid {};
74
75 template<class CS, class CRS>
76 static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr));
77 template<class CS, class CRS>
78 static tag_not_valid check(...);
79
80 typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
81 static const bool value = !std::is_same<type, tag_not_valid>::value;
82 };
83
84 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
85 struct reduce_traits
86 {
87 typedef rxu::decay_t<Observable> source_type;
88 typedef rxu::decay_t<Accumulator> accumulator_type;
89 typedef rxu::decay_t<ResultSelector> result_selector_type;
90 typedef rxu::decay_t<Seed> seed_type;
91
92 typedef T source_value_type;
93
94 typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
95 };
96
97 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
98 struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>>
99 {
100 typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type;
101 typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits;
102
103 typedef typename traits::source_type source_type;
104 typedef typename traits::accumulator_type accumulator_type;
105 typedef typename traits::result_selector_type result_selector_type;
106 typedef typename traits::seed_type seed_type;
107
108 typedef typename traits::source_value_type source_value_type;
109 typedef typename traits::value_type value_type;
110
111 struct reduce_initial_type
112 {
~reduce_initial_typerxcpp::operators::detail::reduce::reduce_initial_type113 ~reduce_initial_type()
114 {
115 }
reduce_initial_typerxcpp::operators::detail::reduce::reduce_initial_type116 reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
117 : source(std::move(o))
118 , accumulator(std::move(a))
119 , result_selector(std::move(rs))
120 , seed(std::move(s))
121 {
122 }
123 source_type source;
124 accumulator_type accumulator;
125 result_selector_type result_selector;
126 seed_type seed;
127
128 private:
129 reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
130 };
131 reduce_initial_type initial;
132
~reducerxcpp::operators::detail::reduce133 ~reduce()
134 {
135 }
reducerxcpp::operators::detail::reduce136 reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
137 : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
138 {
139 }
140 template<class Subscriber>
on_subscriberxcpp::operators::detail::reduce141 void on_subscribe(Subscriber o) const {
142 struct reduce_state_type
143 : public reduce_initial_type
144 , public std::enable_shared_from_this<reduce_state_type>
145 {
146 reduce_state_type(reduce_initial_type i, Subscriber scrbr)
147 : reduce_initial_type(i)
148 , source(i.source)
149 , current(reduce_initial_type::seed)
150 , out(std::move(scrbr))
151 {
152 }
153 source_type source;
154 seed_type current;
155 Subscriber out;
156
157 private:
158 reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
159 };
160 auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
161 state->source.subscribe(
162 state->out,
163 // on_next
164 [state](T t) {
165 seed_type next = state->accumulator(std::move(state->current), std::move(t));
166 state->current = std::move(next);
167 },
168 // on_error
169 [state](rxu::error_ptr e) {
170 state->out.on_error(e);
171 },
172 // on_completed
173 [state]() {
174 auto result = on_exception(
175 [&](){return state->result_selector(std::move(state->current));},
176 state->out);
177 if (result.empty()) {
178 return;
179 }
180 state->out.on_next(std::move(result.get()));
181 state->out.on_completed();
182 }
183 );
184 }
185 private:
186 reduce& operator=(reduce o) RXCPP_DELETE;
187 };
188
189 template<class T>
190 struct initialize_seeder {
191 typedef T seed_type;
seedrxcpp::operators::detail::initialize_seeder192 static seed_type seed() {
193 return seed_type{};
194 }
195 };
196
197 template<class T>
198 struct average {
199 struct seed_type
200 {
seed_typerxcpp::operators::detail::average::seed_type201 seed_type()
202 : value()
203 , count(0)
204 {
205 }
206 rxu::maybe<T> value;
207 int count;
208 rxu::detail::maybe<double> stage;
209 };
seedrxcpp::operators::detail::average210 static seed_type seed() {
211 return seed_type{};
212 }
213 template<class U>
operator ()rxcpp::operators::detail::average214 seed_type operator()(seed_type a, U&& v) {
215 if (a.count != 0 &&
216 (a.count == std::numeric_limits<int>::max() ||
217 ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) ||
218 ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) {
219 // would overflow, calc existing and reset for next batch
220 // this will add error to the final result, but the alternative
221 // is to fail on overflow
222 double avg = static_cast<double>(*(a.value)) / a.count;
223 if (!a.stage.empty()) {
224 a.stage.reset((*a.stage + avg) / 2);
225 } else {
226 a.stage.reset(avg);
227 }
228 a.value.reset(std::forward<U>(v));
229 a.count = 1;
230 } else if (a.value.empty()) {
231 a.value.reset(std::forward<U>(v));
232 a.count = 1;
233 } else {
234 *(a.value) += v;
235 ++a.count;
236 }
237 return a;
238 }
operator ()rxcpp::operators::detail::average239 double operator()(seed_type a) {
240 if (!a.value.empty()) {
241 double avg = static_cast<double>(*(a.value)) / a.count;
242 if (!a.stage.empty()) {
243 avg = (*a.stage + avg) / 2;
244 }
245 return avg;
246 }
247 rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value"));
248 }
249 };
250
251 template<class T>
252 struct sum {
253 typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::sum254 static seed_type seed() {
255 return seed_type();
256 }
257 template<class U>
operator ()rxcpp::operators::detail::sum258 seed_type operator()(seed_type a, U&& v) const {
259 if (a.empty())
260 a.reset(std::forward<U>(v));
261 else
262 *a = *a + v;
263 return a;
264 }
operator ()rxcpp::operators::detail::sum265 T operator()(seed_type a) const {
266 if (a.empty())
267 rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value"));
268 return *a;
269 }
270 };
271
272 template<class T>
273 struct max {
274 typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::max275 static seed_type seed() {
276 return seed_type();
277 }
278 template<class U>
operator ()rxcpp::operators::detail::max279 seed_type operator()(seed_type a, U&& v) {
280 if (a.empty() || *a < v)
281 a.reset(std::forward<U>(v));
282 return a;
283 }
operator ()rxcpp::operators::detail::max284 T operator()(seed_type a) {
285 if (a.empty())
286 rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value"));
287 return *a;
288 }
289 };
290
291 template<class T>
292 struct min {
293 typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::min294 static seed_type seed() {
295 return seed_type();
296 }
297 template<class U>
operator ()rxcpp::operators::detail::min298 seed_type operator()(seed_type a, U&& v) {
299 if (a.empty() || v < *a)
300 a.reset(std::forward<U>(v));
301 return a;
302 }
operator ()rxcpp::operators::detail::min303 T operator()(seed_type a) {
304 if (a.empty())
305 rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value"));
306 return *a;
307 }
308 };
309
310 template<class T>
311 struct first {
312 using seed_type = rxu::maybe<T>;
seedrxcpp::operators::detail::first313 static seed_type seed() {
314 return seed_type();
315 }
316 template<class U>
operator ()rxcpp::operators::detail::first317 seed_type operator()(seed_type a, U&& v) {
318 a.reset(std::forward<U>(v));
319 return a;
320 }
operator ()rxcpp::operators::detail::first321 T operator()(seed_type a) {
322 if (a.empty()) {
323 rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
324 }
325 return *a;
326 }
327 };
328
329 template<class T>
330 struct last {
331 using seed_type = rxu::maybe<T>;
seedrxcpp::operators::detail::last332 static seed_type seed() {
333 return seed_type();
334 }
335 template<class U>
operator ()rxcpp::operators::detail::last336 seed_type operator()(seed_type a, U&& v) {
337 a.reset(std::forward<U>(v));
338 return a;
339 }
operator ()rxcpp::operators::detail::last340 T operator()(seed_type a) {
341 if (a.empty()) {
342 rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
343 }
344 return *a;
345 }
346 };
347
348 }
349
350 /*! @copydoc rx-reduce.hpp
351 */
352 template<class... AN>
reduce(AN &&...an)353 auto reduce(AN&&... an)
354 -> operator_factory<reduce_tag, AN...> {
355 return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
356 }
357
358 /*! @copydoc rx-reduce.hpp
359 */
360 template<class... AN>
accumulate(AN &&...an)361 auto accumulate(AN&&... an)
362 -> operator_factory<reduce_tag, AN...> {
363 return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
364 }
365
366 /*! \brief For each item from this observable reduce it by sending only the first item.
367
368 \return An observable that emits only the very first item emitted by the source observable.
369
370 \sample
371 \snippet math.cpp first sample
372 \snippet output.txt first sample
373
374 When the source observable calls on_error:
375 \snippet math.cpp first empty sample
376 \snippet output.txt first empty sample
377 */
first()378 inline auto first()
379 -> operator_factory<first_tag> {
380 return operator_factory<first_tag>(std::tuple<>{});
381 }
382
383 /*! \brief For each item from this observable reduce it by sending only the last item.
384
385 \return An observable that emits only the very last item emitted by the source observable.
386
387 \sample
388 \snippet math.cpp last sample
389 \snippet output.txt last sample
390
391 When the source observable calls on_error:
392 \snippet math.cpp last empty sample
393 \snippet output.txt last empty sample
394 */
last()395 inline auto last()
396 -> operator_factory<last_tag> {
397 return operator_factory<last_tag>(std::tuple<>{});
398 }
399
400 /*! \brief For each item from this observable reduce it by incrementing a count.
401
402 \return An observable that emits a single item: the number of elements emitted by the source observable.
403
404 \sample
405 \snippet math.cpp count sample
406 \snippet output.txt count sample
407
408 When the source observable calls on_error:
409 \snippet math.cpp count error sample
410 \snippet output.txt count error sample
411 */
count()412 inline auto count()
413 -> operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>> {
414 return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>()));
415 }
416
417 /*! \brief For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
418
419 \return An observable that emits a single item: the average of elements emitted by the source observable.
420
421 \sample
422 \snippet math.cpp average sample
423 \snippet output.txt average sample
424
425 When the source observable completes without emitting any items:
426 \snippet math.cpp average empty sample
427 \snippet output.txt average empty sample
428
429 When the source observable calls on_error:
430 \snippet math.cpp average error sample
431 \snippet output.txt average error sample
432 */
average()433 inline auto average()
434 -> operator_factory<average_tag> {
435 return operator_factory<average_tag>(std::tuple<>{});
436 }
437
438 /*! \brief For each item from this observable reduce it by adding to the previous items.
439
440 \return An observable that emits a single item: the sum of elements emitted by the source observable.
441
442 \sample
443 \snippet math.cpp sum sample
444 \snippet output.txt sum sample
445
446 When the source observable completes without emitting any items:
447 \snippet math.cpp sum empty sample
448 \snippet output.txt sum empty sample
449
450 When the source observable calls on_error:
451 \snippet math.cpp sum error sample
452 \snippet output.txt sum error sample
453 */
sum()454 inline auto sum()
455 -> operator_factory<sum_tag> {
456 return operator_factory<sum_tag>(std::tuple<>{});
457 }
458
459 /*! \brief For each item from this observable reduce it by taking the min value of the previous items.
460
461 \return An observable that emits a single item: the min of elements emitted by the source observable.
462
463 \sample
464 \snippet math.cpp min sample
465 \snippet output.txt min sample
466
467 When the source observable completes without emitting any items:
468 \snippet math.cpp min empty sample
469 \snippet output.txt min empty sample
470
471 When the source observable calls on_error:
472 \snippet math.cpp min error sample
473 \snippet output.txt min error sample
474 */
min()475 inline auto min()
476 -> operator_factory<min_tag> {
477 return operator_factory<min_tag>(std::tuple<>{});
478 }
479
480 /*! \brief For each item from this observable reduce it by taking the max value of the previous items.
481
482 \return An observable that emits a single item: the max of elements emitted by the source observable.
483
484 \sample
485 \snippet math.cpp max sample
486 \snippet output.txt max sample
487
488 When the source observable completes without emitting any items:
489 \snippet math.cpp max empty sample
490 \snippet output.txt max empty sample
491
492 When the source observable calls on_error:
493 \snippet math.cpp max error sample
494 \snippet output.txt max error sample
495 */
max()496 inline auto max()
497 -> operator_factory<max_tag> {
498 return operator_factory<max_tag>(std::tuple<>{});
499 }
500
501 }
502
503 template<>
504 struct member_overload<reduce_tag>
505 {
506
507 template<class Observable, class Seed, class Accumulator, class ResultSelector,
508 class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
509 class Value = rxu::value_type_t<Reduce>,
510 class Result = observable<Value, Reduce>>
memberrxcpp::member_overload511 static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r)
512 {
513 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s)));
514 }
515
516 template<class Observable, class Seed, class Accumulator,
517 class ResultSelector=rxu::detail::take_at<0>,
518 class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
519 class Value = rxu::value_type_t<Reduce>,
520 class Result = observable<Value, Reduce>>
memberrxcpp::member_overload521 static Result member(Observable&& o, Seed&& s, Accumulator&& a)
522 {
523 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s)));
524 }
525
526 template<class... AN>
memberrxcpp::member_overload527 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
528 std::terminate();
529 return {};
530 static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue");
531 }
532 };
533
534 template<>
535 struct member_overload<first_tag>
536 {
537 template<class Observable,
538 class SValue = rxu::value_type_t<Observable>,
539 class Operation = operators::detail::first<SValue>,
540 class Seed = decltype(Operation::seed()),
541 class Accumulator = Operation,
542 class ResultSelector = Operation,
543 class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)),
544 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
545 class RValue = rxu::value_type_t<Reduce>,
546 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload547 static Result member(Observable&& o)
548 {
549 return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed()));
550 }
551
552 template<class... AN>
memberrxcpp::member_overload553 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
554 std::terminate();
555 return {};
556 static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type");
557 }
558 };
559
560 template<>
561 struct member_overload<last_tag>
562 {
563 template<class Observable,
564 class SValue = rxu::value_type_t<Observable>,
565 class Operation = operators::detail::last<SValue>,
566 class Seed = decltype(Operation::seed()),
567 class Accumulator = Operation,
568 class ResultSelector = Operation,
569 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
570 class RValue = rxu::value_type_t<Reduce>,
571 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload572 static Result member(Observable&& o)
573 {
574 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
575 }
576
577 template<class... AN>
memberrxcpp::member_overload578 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
579 std::terminate();
580 return {};
581 static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type");
582 }
583 };
584
585 template<>
586 struct member_overload<sum_tag>
587 {
588 template<class Observable,
589 class SValue = rxu::value_type_t<Observable>,
590 class Operation = operators::detail::sum<SValue>,
591 class Seed = decltype(Operation::seed()),
592 class Accumulator = Operation,
593 class ResultSelector = Operation,
594 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
595 class RValue = rxu::value_type_t<Reduce>,
596 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload597 static Result member(Observable&& o)
598 {
599 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
600 }
601
602 template<class... AN>
memberrxcpp::member_overload603 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
604 std::terminate();
605 return {};
606 static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type");
607 }
608 };
609
610 template<>
611 struct member_overload<average_tag>
612 {
613 template<class Observable,
614 class SValue = rxu::value_type_t<Observable>,
615 class Operation = operators::detail::average<SValue>,
616 class Seed = decltype(Operation::seed()),
617 class Accumulator = Operation,
618 class ResultSelector = Operation,
619 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
620 class RValue = rxu::value_type_t<Reduce>,
621 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload622 static Result member(Observable&& o)
623 {
624 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
625 }
626
627 template<class... AN>
memberrxcpp::member_overload628 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
629 std::terminate();
630 return {};
631 static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type");
632 }
633 };
634
635 template<>
636 struct member_overload<max_tag>
637 {
638 template<class Observable,
639 class SValue = rxu::value_type_t<Observable>,
640 class Operation = operators::detail::max<SValue>,
641 class Seed = decltype(Operation::seed()),
642 class Accumulator = Operation,
643 class ResultSelector = Operation,
644 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
645 class RValue = rxu::value_type_t<Reduce>,
646 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload647 static Result member(Observable&& o)
648 {
649 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
650 }
651
652 template<class... AN>
memberrxcpp::member_overload653 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
654 std::terminate();
655 return {};
656 static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type");
657 }
658 };
659
660 template<>
661 struct member_overload<min_tag>
662 {
663 template<class Observable,
664 class SValue = rxu::value_type_t<Observable>,
665 class Operation = operators::detail::min<SValue>,
666 class Seed = decltype(Operation::seed()),
667 class Accumulator = Operation,
668 class ResultSelector = Operation,
669 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
670 class RValue = rxu::value_type_t<Reduce>,
671 class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload672 static Result member(Observable&& o)
673 {
674 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
675 }
676
677 template<class... AN>
memberrxcpp::member_overload678 static operators::detail::reduce_invalid_t<AN...> member(AN...) {
679 std::terminate();
680 return {};
681 static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type");
682 }
683 };
684
685 }
686
687 #endif
688