1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-buffer_count.hpp
6
7 \brief Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
8 If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.
9
10 \param count the maximum size of each buffers before it should be emitted.
11 \param skip how many items need to be skipped before starting a new buffers (optional).
12
13 \return Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable.
14 If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.
15
16 \sample
17 \snippet buffer.cpp buffer count sample
18 \snippet output.txt buffer count sample
19
20 \sample
21 \snippet buffer.cpp buffer count+skip sample
22 \snippet output.txt buffer count+skip sample
23 */
24
25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP
27
28 #include "../rx-includes.hpp"
29
30 namespace rxcpp {
31
32 namespace operators {
33
34 namespace detail {
35
36 template<class... AN>
37 struct buffer_count_invalid_arguments {};
38
39 template<class... AN>
40 struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
41 using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>;
42 };
43 template<class... AN>
44 using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type;
45
46 template<class T>
47 struct buffer_count
48 {
49 typedef rxu::decay_t<T> source_value_type;
50 typedef std::vector<source_value_type> value_type;
51
52 struct buffer_count_values
53 {
buffer_count_valuesrxcpp::operators::detail::buffer_count::buffer_count_values54 buffer_count_values(int c, int s)
55 : count(c)
56 , skip(s)
57 {
58 }
59 int count;
60 int skip;
61 };
62
63 buffer_count_values initial;
64
buffer_countrxcpp::operators::detail::buffer_count65 buffer_count(int count, int skip)
66 : initial(count, skip)
67 {
68 }
69
70 template<class Subscriber>
71 struct buffer_count_observer : public buffer_count_values
72 {
73 typedef buffer_count_observer<Subscriber> this_type;
74 typedef std::vector<T> value_type;
75 typedef rxu::decay_t<Subscriber> dest_type;
76 typedef observer<value_type, this_type> observer_type;
77 dest_type dest;
78 mutable int cursor;
79 mutable std::deque<value_type> chunks;
80
buffer_count_observerrxcpp::operators::detail::buffer_count::buffer_count_observer81 buffer_count_observer(dest_type d, buffer_count_values v)
82 : buffer_count_values(v)
83 , dest(std::move(d))
84 , cursor(0)
85 {
86 }
on_nextrxcpp::operators::detail::buffer_count::buffer_count_observer87 void on_next(T v) const {
88 if (cursor++ % this->skip == 0) {
89 chunks.emplace_back();
90 }
91 for(auto& chunk : chunks) {
92 chunk.push_back(v);
93 }
94 while (!chunks.empty() && int(chunks.front().size()) == this->count) {
95 dest.on_next(std::move(chunks.front()));
96 chunks.pop_front();
97 }
98 }
on_errorrxcpp::operators::detail::buffer_count::buffer_count_observer99 void on_error(rxu::error_ptr e) const {
100 dest.on_error(e);
101 }
on_completedrxcpp::operators::detail::buffer_count::buffer_count_observer102 void on_completed() const {
103 auto done = on_exception(
104 [&](){
105 while (!chunks.empty()) {
106 dest.on_next(std::move(chunks.front()));
107 chunks.pop_front();
108 }
109 return true;
110 },
111 dest);
112 if (done.empty()) {
113 return;
114 }
115 dest.on_completed();
116 }
117
makerxcpp::operators::detail::buffer_count::buffer_count_observer118 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
119 auto cs = d.get_subscription();
120 return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
121 }
122 };
123
124 template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_count125 auto operator()(Subscriber dest) const
126 -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
127 return buffer_count_observer<Subscriber>::make(std::move(dest), initial);
128 }
129 };
130
131 }
132
133 /*! @copydoc rx-buffer_count.hpp
134 */
135 template<class... AN>
buffer(AN &&...an)136 auto buffer(AN&&... an)
137 -> operator_factory<buffer_count_tag, AN...> {
138 return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
139 }
140
141 }
142
143 template<>
144 struct member_overload<buffer_count_tag>
145 {
146 template<class Observable,
147 class Enabled = rxu::enable_if_all_true_type_t<
148 is_observable<Observable>>,
149 class SourceValue = rxu::value_type_t<Observable>,
150 class BufferCount = rxo::detail::buffer_count<SourceValue>,
151 class Value = rxu::value_type_t<BufferCount>>
memberrxcpp::member_overload152 static auto member(Observable&& o, int count, int skip)
153 -> decltype(o.template lift<Value>(BufferCount(count, skip))) {
154 return o.template lift<Value>(BufferCount(count, skip));
155 }
156
157 template<class Observable,
158 class Enabled = rxu::enable_if_all_true_type_t<
159 is_observable<Observable>>,
160 class SourceValue = rxu::value_type_t<Observable>,
161 class BufferCount = rxo::detail::buffer_count<SourceValue>,
162 class Value = rxu::value_type_t<BufferCount>>
memberrxcpp::member_overload163 static auto member(Observable&& o, int count)
164 -> decltype(o.template lift<Value>(BufferCount(count, count))) {
165 return o.template lift<Value>(BufferCount(count, count));
166 }
167
168 template<class... AN>
memberrxcpp::member_overload169 static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
170 std::terminate();
171 return {};
172 static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)");
173 }
174 };
175
176 }
177
178 #endif
179