1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-element_at.hpp
6
7 \brief Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission.
8
9 \param index the index of the element to return.
10
11 \return An observable that emit an item located at a specified index location.
12
13 \sample
14 \snippet element_at.cpp element_at sample
15 \snippet output.txt element_at sample
16 */
17
18 #if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP)
19 #define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP
20
21 #include "../rx-includes.hpp"
22
23 namespace rxcpp {
24
25 namespace operators {
26
27 namespace detail {
28
29 template<class... AN>
30 struct element_at_invalid_arguments {};
31
32 template<class... AN>
33 struct element_at_invalid : public rxo::operator_base<element_at_invalid_arguments<AN...>> {
34 using type = observable<element_at_invalid_arguments<AN...>, element_at_invalid<AN...>>;
35 };
36 template<class... AN>
37 using element_at_invalid_t = typename element_at_invalid<AN...>::type;
38
39 template<class T>
40 struct element_at {
41 typedef rxu::decay_t<T> source_value_type;
42
43 struct element_at_values {
element_at_valuesrxcpp::operators::detail::element_at::element_at_values44 element_at_values(int i)
45 : index(i)
46 {
47 }
48 int index;
49 };
50
51 element_at_values initial;
52
element_atrxcpp::operators::detail::element_at53 element_at(int i)
54 : initial(i)
55 {
56 }
57
58 template<class Subscriber>
59 struct element_at_observer : public element_at_values
60 {
61 typedef element_at_observer<Subscriber> this_type;
62 typedef source_value_type value_type;
63 typedef rxu::decay_t<Subscriber> dest_type;
64 typedef observer<value_type, this_type> observer_type;
65 dest_type dest;
66 mutable int current;
67
element_at_observerrxcpp::operators::detail::element_at::element_at_observer68 element_at_observer(dest_type d, element_at_values v)
69 : element_at_values(v),
70 dest(d),
71 current(0)
72 {
73 }
on_nextrxcpp::operators::detail::element_at::element_at_observer74 void on_next(source_value_type v) const {
75 if (current++ == this->index) {
76 dest.on_next(v);
77 dest.on_completed();
78 }
79 }
on_errorrxcpp::operators::detail::element_at::element_at_observer80 void on_error(rxu::error_ptr e) const {
81 dest.on_error(e);
82 }
on_completedrxcpp::operators::detail::element_at::element_at_observer83 void on_completed() const {
84 if(current <= this->index) {
85 dest.on_error(rxu::make_error_ptr(std::range_error("index is out of bounds")));
86 }
87 }
88
makerxcpp::operators::detail::element_at::element_at_observer89 static subscriber<value_type, observer_type> make(dest_type d, element_at_values v) {
90 return make_subscriber<value_type>(d, this_type(d, v));
91 }
92 };
93
94 template<class Subscriber>
operator ()rxcpp::operators::detail::element_at95 auto operator()(Subscriber dest) const
96 -> decltype(element_at_observer<Subscriber>::make(std::move(dest), initial)) {
97 return element_at_observer<Subscriber>::make(std::move(dest), initial);
98 }
99 };
100
101 }
102
103 /*! @copydoc rx-element_at.hpp
104 */
105 template<class... AN>
element_at(AN &&...an)106 auto element_at(AN&&... an)
107 -> operator_factory<element_at_tag, AN...> {
108 return operator_factory<element_at_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110
111 }
112
113 template<>
114 struct member_overload<element_at_tag>
115 {
116 template<class Observable,
117 class Enabled = rxu::enable_if_all_true_type_t<
118 is_observable<Observable>
119 >,
120 class SourceValue = rxu::value_type_t<Observable>,
121 class element_at = rxo::detail::element_at<SourceValue>>
memberrxcpp::member_overload122 static auto member(Observable&& o, int index)
123 -> decltype(o.template lift<SourceValue>(element_at(index))) {
124 return o.template lift<SourceValue>(element_at(index));
125 }
126
127 template<class... AN>
memberrxcpp::member_overload128 static operators::detail::element_at_invalid_t<AN...> member(const AN...) {
129 std::terminate();
130 return {};
131 static_assert(sizeof...(AN) == 10000, "element_at takes (required int)");
132 }
133 };
134
135 }
136
137 #endif
138