1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SUBSCRIBER_HPP)
6 #define RXCPP_RX_SUBSCRIBER_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 template<class T>
13 struct subscriber_base : public observer_base<T>, public subscription_base
14 {
15 typedef tag_subscriber subscriber_tag;
16 };
17
18 /*!
19 \brief binds an observer that consumes values with a composite_subscription that controls lifetime.
20
21 \ingroup group-core
22
23 */
24 template<class T, class Observer = observer<T>>
25 class subscriber : public subscriber_base<T>
26 {
27 static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
28 static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
29 typedef subscriber<T, Observer> this_type;
30 typedef rxu::decay_t<Observer> observer_type;
31
32 composite_subscription lifetime;
33 observer_type destination;
34 trace_id id;
35
36 struct nextdetacher
37 {
~nextdetacherrxcpp::subscriber::nextdetacher38 ~nextdetacher()
39 {
40 trace_activity().on_next_return(*that);
41 if (do_unsubscribe) {
42 that->unsubscribe();
43 }
44 }
nextdetacherrxcpp::subscriber::nextdetacher45 nextdetacher(const this_type* that)
46 : that(that)
47 , do_unsubscribe(true)
48 {
49 }
50 template<class U>
operator ()rxcpp::subscriber::nextdetacher51 void operator()(U u) {
52 trace_activity().on_next_enter(*that, u);
53 RXCPP_TRY {
54 that->destination.on_next(std::move(u));
55 do_unsubscribe = false;
56 } RXCPP_CATCH(...) {
57 auto ex = rxu::current_exception();
58 trace_activity().on_error_enter(*that, ex);
59 that->destination.on_error(std::move(ex));
60 trace_activity().on_error_return(*that);
61 }
62 }
63 const this_type* that;
64 volatile bool do_unsubscribe;
65 };
66
67 struct errordetacher
68 {
~errordetacherrxcpp::subscriber::errordetacher69 ~errordetacher()
70 {
71 trace_activity().on_error_return(*that);
72 that->unsubscribe();
73 }
errordetacherrxcpp::subscriber::errordetacher74 errordetacher(const this_type* that)
75 : that(that)
76 {
77 }
operator ()rxcpp::subscriber::errordetacher78 inline void operator()(rxu::error_ptr ex) {
79 trace_activity().on_error_enter(*that, ex);
80 that->destination.on_error(std::move(ex));
81 }
82 const this_type* that;
83 };
84
85 struct completeddetacher
86 {
~completeddetacherrxcpp::subscriber::completeddetacher87 ~completeddetacher()
88 {
89 trace_activity().on_completed_return(*that);
90 that->unsubscribe();
91 }
completeddetacherrxcpp::subscriber::completeddetacher92 completeddetacher(const this_type* that)
93 : that(that)
94 {
95 }
operator ()rxcpp::subscriber::completeddetacher96 inline void operator()() {
97 trace_activity().on_completed_enter(*that);
98 that->destination.on_completed();
99 }
100 const this_type* that;
101 };
102
103 subscriber();
104 public:
105 typedef typename composite_subscription::weak_subscription weak_subscription;
106
subscriber(const this_type & o)107 subscriber(const this_type& o)
108 : lifetime(o.lifetime)
109 , destination(o.destination)
110 , id(o.id)
111 {
112 }
subscriber(this_type && o)113 subscriber(this_type&& o)
114 : lifetime(std::move(o.lifetime))
115 , destination(std::move(o.destination))
116 , id(std::move(o.id))
117 {
118 }
119
120 template<class U, class O>
121 friend class subscriber;
122
123 template<class O>
subscriber(const subscriber<T,O> & o,typename std::enable_if<!std::is_same<O,observer<T>>::value && std::is_same<Observer,observer<T>>::value,void ** >::type=nullptr)124 subscriber(
125 const subscriber<T, O>& o,
126 typename std::enable_if<
127 !std::is_same<O, observer<T>>::value &&
128 std::is_same<Observer, observer<T>>::value, void**>::type = nullptr)
129 : lifetime(o.lifetime)
130 , destination(o.destination.as_dynamic())
131 , id(o.id)
132 {
133 }
134
135 template<class U>
subscriber(trace_id id,composite_subscription cs,U && o)136 subscriber(trace_id id, composite_subscription cs, U&& o)
137 : lifetime(std::move(cs))
138 , destination(std::forward<U>(o))
139 , id(std::move(id))
140 {
141 static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
142 static_assert(is_observer<U>::value, "must pass observer to subscriber");
143 trace_activity().create_subscriber(*this);
144 }
145
operator =(this_type o)146 this_type& operator=(this_type o) {
147 lifetime = std::move(o.lifetime);
148 destination = std::move(o.destination);
149 id = std::move(o.id);
150 return *this;
151 }
152
get_observer() const153 const observer_type& get_observer() const {
154 return destination;
155 }
get_observer()156 observer_type& get_observer() {
157 return destination;
158 }
get_subscription() const159 const composite_subscription& get_subscription() const {
160 return lifetime;
161 }
get_subscription()162 composite_subscription& get_subscription() {
163 return lifetime;
164 }
get_id() const165 trace_id get_id() const {
166 return id;
167 }
168
as_dynamic() const169 subscriber<T> as_dynamic() const {
170 return subscriber<T>(id, lifetime, destination.as_dynamic());
171 }
172
173 // observer
174 //
175 template<class V>
on_next(V && v) const176 void on_next(V&& v) const {
177 if (!is_subscribed()) {
178 return;
179 }
180 nextdetacher protect(this);
181 protect(std::forward<V>(v));
182 }
on_error(rxu::error_ptr e) const183 void on_error(rxu::error_ptr e) const {
184 if (!is_subscribed()) {
185 return;
186 }
187 errordetacher protect(this);
188 protect(std::move(e));
189 }
on_completed() const190 void on_completed() const {
191 if (!is_subscribed()) {
192 return;
193 }
194 completeddetacher protect(this);
195 protect();
196 }
197
198 // composite_subscription
199 //
is_subscribed() const200 bool is_subscribed() const {
201 return lifetime.is_subscribed();
202 }
add(subscription s) const203 weak_subscription add(subscription s) const {
204 return lifetime.add(std::move(s));
205 }
206 template<class F>
add(F f) const207 auto add(F f) const
208 -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
209 return lifetime.add(make_subscription(std::move(f)));
210 }
remove(weak_subscription w) const211 void remove(weak_subscription w) const {
212 return lifetime.remove(std::move(w));
213 }
clear() const214 void clear() const {
215 return lifetime.clear();
216 }
unsubscribe() const217 void unsubscribe() const {
218 return lifetime.unsubscribe();
219 }
220
221 };
222
223 template<class T, class Observer>
make_subscriber(subscriber<T,Observer> o)224 auto make_subscriber(
225 subscriber<T, Observer> o)
226 -> subscriber<T, Observer> {
227 return subscriber<T, Observer>(std::move(o));
228 }
229
230 // observer
231 //
232
233 template<class T>
make_subscriber()234 auto make_subscriber()
235 -> typename std::enable_if<
236 detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
237 subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>>::type {
238 return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
239 observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
240 }
241
242 template<class T, class I>
make_subscriber(const observer<T,I> & o)243 auto make_subscriber(
244 const observer<T, I>& o)
245 -> subscriber<T, observer<T, I>> {
246 return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
247 }
248 template<class T, class Observer>
make_subscriber(const Observer & o)249 auto make_subscriber(const Observer& o)
250 -> typename std::enable_if<
251 is_observer<Observer>::value &&
252 !is_subscriber<Observer>::value,
253 subscriber<T, Observer>>::type {
254 return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
255 }
256 template<class T, class Observer>
make_subscriber(const Observer & o)257 auto make_subscriber(const Observer& o)
258 -> typename std::enable_if<
259 !detail::is_on_next_of<T, Observer>::value &&
260 !is_subscriber<Observer>::value &&
261 !is_subscription<Observer>::value &&
262 !is_observer<Observer>::value,
263 subscriber<T, observer<T, Observer>>>::type {
264 return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
265 }
266 template<class T, class OnNext>
make_subscriber(const OnNext & on)267 auto make_subscriber(const OnNext& on)
268 -> typename std::enable_if<
269 detail::is_on_next_of<T, OnNext>::value,
270 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
271 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), composite_subscription(),
272 observer<T, detail::stateless_observer_tag, OnNext>(on));
273 }
274 template<class T, class OnNext, class OnError>
make_subscriber(const OnNext & on,const OnError & oe)275 auto make_subscriber(const OnNext& on, const OnError& oe)
276 -> typename std::enable_if<
277 detail::is_on_next_of<T, OnNext>::value &&
278 detail::is_on_error<OnError>::value,
279 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
280 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), composite_subscription(),
281 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
282 }
283 template<class T, class OnNext, class OnCompleted>
make_subscriber(const OnNext & on,const OnCompleted & oc)284 auto make_subscriber(const OnNext& on, const OnCompleted& oc)
285 -> typename std::enable_if<
286 detail::is_on_next_of<T, OnNext>::value &&
287 detail::is_on_completed<OnCompleted>::value,
288 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
289 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
290 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
291 }
292 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(const OnNext & on,const OnError & oe,const OnCompleted & oc)293 auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
294 -> typename std::enable_if<
295 detail::is_on_next_of<T, OnNext>::value &&
296 detail::is_on_error<OnError>::value &&
297 detail::is_on_completed<OnCompleted>::value,
298 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
299 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
300 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
301 }
302
303 // explicit lifetime
304 //
305
306 template<class T>
make_subscriber(const composite_subscription & cs)307 auto make_subscriber(const composite_subscription& cs)
308 -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
309 return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), cs,
310 observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
311 }
312
313 template<class T, class I>
make_subscriber(const composite_subscription & cs,const observer<T,I> & o)314 auto make_subscriber(const composite_subscription& cs,
315 const observer<T, I>& o)
316 -> subscriber<T, observer<T, I>> {
317 return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
318 }
319 template<class T, class I>
make_subscriber(const composite_subscription & cs,const subscriber<T,I> & s)320 auto make_subscriber(const composite_subscription& cs,
321 const subscriber<T, I>& s)
322 -> subscriber<T, I> {
323 return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
324 }
325 template<class T, class Observer>
make_subscriber(const composite_subscription & cs,const Observer & o)326 auto make_subscriber(const composite_subscription& cs, const Observer& o)
327 -> typename std::enable_if<
328 !is_subscriber<Observer>::value &&
329 is_observer<Observer>::value,
330 subscriber<T, Observer>>::type {
331 return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
332 }
333 template<class T, class Observer>
make_subscriber(const composite_subscription & cs,const Observer & o)334 auto make_subscriber(const composite_subscription& cs, const Observer& o)
335 -> typename std::enable_if<
336 !detail::is_on_next_of<T, Observer>::value &&
337 !is_subscriber<Observer>::value &&
338 !is_subscription<Observer>::value &&
339 !is_observer<Observer>::value,
340 subscriber<T, observer<T, Observer>>>::type {
341 return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o));
342 }
343 template<class T, class OnNext>
make_subscriber(const composite_subscription & cs,const OnNext & on)344 auto make_subscriber(const composite_subscription& cs, const OnNext& on)
345 -> typename std::enable_if<
346 detail::is_on_next_of<T, OnNext>::value,
347 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
348 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
349 observer<T, detail::stateless_observer_tag, OnNext>(on));
350 }
351 template<class T, class OnNext, class OnError>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnError & oe)352 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe)
353 -> typename std::enable_if<
354 detail::is_on_next_of<T, OnNext>::value &&
355 detail::is_on_error<OnError>::value,
356 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
357 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
358 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
359 }
360 template<class T, class OnNext, class OnCompleted>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)361 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
362 -> typename std::enable_if<
363 detail::is_on_next_of<T, OnNext>::value &&
364 detail::is_on_completed<OnCompleted>::value,
365 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
366 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
367 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
368 }
369 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)370 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
371 -> typename std::enable_if<
372 detail::is_on_next_of<T, OnNext>::value &&
373 detail::is_on_error<OnError>::value &&
374 detail::is_on_completed<OnCompleted>::value,
375 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
376 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
377 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
378 }
379
380 // explicit id
381 //
382
383 template<class T>
make_subscriber(trace_id id)384 auto make_subscriber(trace_id id)
385 -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
386 return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), composite_subscription(),
387 observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
388 }
389
390 template<class T>
make_subscriber(trace_id id,const composite_subscription & cs)391 auto make_subscriber(trace_id id, const composite_subscription& cs)
392 -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
393 return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), cs,
394 observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
395 }
396
397 template<class T, class I>
make_subscriber(trace_id id,const observer<T,I> & o)398 auto make_subscriber(trace_id id,
399 const observer<T, I>& o)
400 -> subscriber<T, observer<T, I>> {
401 return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o);
402 }
403 template<class T, class I>
make_subscriber(trace_id id,const composite_subscription & cs,const observer<T,I> & o)404 auto make_subscriber(trace_id id, const composite_subscription& cs,
405 const observer<T, I>& o)
406 -> subscriber<T, observer<T, I>> {
407 return subscriber<T, observer<T, I>>(std::move(id), cs, o);
408 }
409 template<class T, class Observer>
make_subscriber(trace_id id,const Observer & o)410 auto make_subscriber(trace_id id, const Observer& o)
411 -> typename std::enable_if<
412 is_observer<Observer>::value,
413 subscriber<T, Observer>>::type {
414 return subscriber<T, Observer>(std::move(id), composite_subscription(), o);
415 }
416 template<class T, class Observer>
make_subscriber(trace_id id,const composite_subscription & cs,const Observer & o)417 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
418 -> typename std::enable_if<
419 is_observer<Observer>::value,
420 subscriber<T, Observer>>::type {
421 return subscriber<T, Observer>(std::move(id), cs, o);
422 }
423 template<class T, class Observer>
make_subscriber(trace_id id,const Observer & o)424 auto make_subscriber(trace_id id, const Observer& o)
425 -> typename std::enable_if<
426 !detail::is_on_next_of<T, Observer>::value &&
427 !is_subscriber<Observer>::value &&
428 !is_subscription<Observer>::value &&
429 !is_observer<Observer>::value,
430 subscriber<T, observer<T, Observer>>>::type {
431 return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o);
432 }
433 template<class T, class Observer>
make_subscriber(trace_id id,const composite_subscription & cs,const Observer & o)434 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
435 -> typename std::enable_if<
436 !detail::is_on_next_of<T, Observer>::value &&
437 !is_subscriber<Observer>::value &&
438 !is_subscription<Observer>::value &&
439 !is_observer<Observer>::value,
440 subscriber<T, observer<T, Observer>>>::type {
441 return subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
442 }
443 template<class T, class OnNext>
make_subscriber(trace_id id,const OnNext & on)444 auto make_subscriber(trace_id id, const OnNext& on)
445 -> typename std::enable_if<
446 detail::is_on_next_of<T, OnNext>::value,
447 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
448 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), composite_subscription(),
449 observer<T, detail::stateless_observer_tag, OnNext>(on));
450 }
451 template<class T, class OnNext>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on)452 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
453 -> typename std::enable_if<
454 detail::is_on_next_of<T, OnNext>::value,
455 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
456 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
457 observer<T, detail::stateless_observer_tag, OnNext>(on));
458 }
459 template<class T, class OnNext, class OnError>
make_subscriber(trace_id id,const OnNext & on,const OnError & oe)460 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
461 -> typename std::enable_if<
462 detail::is_on_next_of<T, OnNext>::value &&
463 detail::is_on_error<OnError>::value,
464 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
465 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), composite_subscription(),
466 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
467 }
468 template<class T, class OnNext, class OnError>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe)469 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
470 -> typename std::enable_if<
471 detail::is_on_next_of<T, OnNext>::value &&
472 detail::is_on_error<OnError>::value,
473 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
474 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
475 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
476 }
477 template<class T, class OnNext, class OnCompleted>
make_subscriber(trace_id id,const OnNext & on,const OnCompleted & oc)478 auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
479 -> typename std::enable_if<
480 detail::is_on_next_of<T, OnNext>::value &&
481 detail::is_on_completed<OnCompleted>::value,
482 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
483 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), composite_subscription(),
484 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
485 }
486 template<class T, class OnNext, class OnCompleted>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)487 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
488 -> typename std::enable_if<
489 detail::is_on_next_of<T, OnNext>::value &&
490 detail::is_on_completed<OnCompleted>::value,
491 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
492 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
493 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
494 }
495 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(trace_id id,const OnNext & on,const OnError & oe,const OnCompleted & oc)496 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
497 -> typename std::enable_if<
498 detail::is_on_next_of<T, OnNext>::value &&
499 detail::is_on_error<OnError>::value &&
500 detail::is_on_completed<OnCompleted>::value,
501 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
502 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), composite_subscription(),
503 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
504 }
505 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)506 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
507 -> typename std::enable_if<
508 detail::is_on_next_of<T, OnNext>::value &&
509 detail::is_on_error<OnError>::value &&
510 detail::is_on_completed<OnCompleted>::value,
511 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
512 return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
513 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
514 }
515
516 // chain defaults from subscriber
517 //
518
519 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const observer<T,I> & o)520 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr,
521 const observer<T, I>& o)
522 -> subscriber<T, observer<T, I>> {
523 auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
524 trace_activity().connect(r, scbr);
525 return r;
526 }
527 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const observer<T,I> & o)528 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id,
529 const observer<T, I>& o)
530 -> subscriber<T, observer<T, I>> {
531 auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o);
532 trace_activity().connect(r, scbr);
533 return r;
534 }
535 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const Observer & o)536 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
537 -> typename std::enable_if<
538 is_observer<Observer>::value,
539 subscriber<T, Observer>>::type {
540 auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o);
541 trace_activity().connect(r, scbr);
542 return r;
543 }
544 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const Observer & o)545 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
546 -> typename std::enable_if<
547 !is_subscription<Observer>::value &&
548 is_observer<Observer>::value,
549 subscriber<T, Observer>>::type {
550 auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
551 trace_activity().connect(r, scbr);
552 return r;
553 }
554 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const Observer & o)555 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
556 -> typename std::enable_if<
557 !detail::is_on_next_of<T, Observer>::value &&
558 !is_subscriber<Observer>::value &&
559 !is_subscription<Observer>::value &&
560 !is_observer<Observer>::value,
561 subscriber<T, observer<T, Observer>>>::type {
562 auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o));
563 trace_activity().connect(r, scbr);
564 return r;
565 }
566 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const Observer & o)567 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
568 -> typename std::enable_if<
569 !detail::is_on_next_of<T, Observer>::value &&
570 !is_subscriber<Observer>::value &&
571 !is_subscription<Observer>::value &&
572 !is_observer<Observer>::value,
573 subscriber<T, observer<T, Observer>>>::type {
574 auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
575 trace_activity().connect(r, scbr);
576 return r;
577 }
578 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on)579 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
580 -> typename std::enable_if<
581 detail::is_on_next_of<T, OnNext>::value,
582 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
583 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
584 observer<T, detail::stateless_observer_tag, OnNext>(on));
585 trace_activity().connect(r, scbr);
586 return r;
587 }
588 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on)589 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
590 -> typename std::enable_if<
591 detail::is_on_next_of<T, OnNext>::value,
592 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
593 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), scbr.get_subscription(),
594 observer<T, detail::stateless_observer_tag, OnNext>(on));
595 trace_activity().connect(r, scbr);
596 return r;
597 }
598 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnError & oe)599 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
600 -> typename std::enable_if<
601 detail::is_on_next_of<T, OnNext>::value &&
602 detail::is_on_error<OnError>::value,
603 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
604 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
605 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
606 trace_activity().connect(r, scbr);
607 return r;
608 }
609 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnError & oe)610 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
611 -> typename std::enable_if<
612 detail::is_on_next_of<T, OnNext>::value &&
613 detail::is_on_error<OnError>::value,
614 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
615 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), scbr.get_subscription(),
616 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
617 trace_activity().connect(r, scbr);
618 return r;
619 }
620 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnCompleted & oc)621 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
622 -> typename std::enable_if<
623 detail::is_on_next_of<T, OnNext>::value &&
624 detail::is_on_completed<OnCompleted>::value,
625 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
626 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
627 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
628 trace_activity().connect(r, scbr);
629 return r;
630 }
631 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnCompleted & oc)632 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
633 -> typename std::enable_if<
634 detail::is_on_next_of<T, OnNext>::value &&
635 detail::is_on_completed<OnCompleted>::value,
636 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
637 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), scbr.get_subscription(),
638 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
639 trace_activity().connect(r, scbr);
640 return r;
641 }
642 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnError & oe,const OnCompleted & oc)643 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
644 -> typename std::enable_if<
645 detail::is_on_next_of<T, OnNext>::value &&
646 detail::is_on_error<OnError>::value &&
647 detail::is_on_completed<OnCompleted>::value,
648 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
649 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
650 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
651 trace_activity().connect(r, scbr);
652 return r;
653 }
654 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnError & oe,const OnCompleted & oc)655 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
656 -> typename std::enable_if<
657 detail::is_on_next_of<T, OnNext>::value &&
658 detail::is_on_error<OnError>::value &&
659 detail::is_on_completed<OnCompleted>::value,
660 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
661 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), scbr.get_subscription(),
662 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
663 trace_activity().connect(r, scbr);
664 return r;
665 }
666
667 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> &,const composite_subscription & cs,const observer<T,I> & o)668 auto make_subscriber(const subscriber<OtherT, OtherObserver>& , const composite_subscription& cs,
669 const observer<T, I>& o)
670 -> subscriber<T, observer<T, I>> {
671 return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
672 }
673 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> &,trace_id id,const composite_subscription & cs,const observer<T,I> & o)674 auto make_subscriber(const subscriber<OtherT, OtherObserver>&, trace_id id, const composite_subscription& cs,
675 const observer<T, I>& o)
676 -> subscriber<T, observer<T, I>> {
677 return subscriber<T, observer<T, I>>(std::move(id), cs, o);
678 }
679 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const Observer & o)680 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
681 -> typename std::enable_if<
682 is_observer<Observer>::value,
683 subscriber<T, Observer>>::type {
684 auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
685 trace_activity().connect(r, scbr);
686 return r;
687 }
688 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const Observer & o)689 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
690 -> typename std::enable_if<
691 is_observer<Observer>::value,
692 subscriber<T, Observer>>::type {
693 auto r = subscriber<T, Observer>(std::move(id), cs, o);
694 trace_activity().connect(r, scbr);
695 return r;
696 }
697 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const Observer & o)698 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
699 -> typename std::enable_if<
700 !detail::is_on_next_of<T, Observer>::value &&
701 !is_subscriber<Observer>::value &&
702 !is_subscription<Observer>::value &&
703 !is_observer<Observer>::value,
704 subscriber<T, observer<T, Observer>>>::type {
705 auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o);
706 trace_activity().connect(r, scbr);
707 return r;
708 }
709 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const Observer & o)710 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
711 -> typename std::enable_if<
712 !detail::is_on_next_of<T, Observer>::value &&
713 !is_subscriber<Observer>::value &&
714 !is_subscription<Observer>::value &&
715 !is_observer<Observer>::value,
716 subscriber<T, observer<T, Observer>>>::type {
717 auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
718 trace_activity().connect(r, scbr);
719 return r;
720 }
721 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on)722 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
723 -> typename std::enable_if<
724 detail::is_on_next_of<T, OnNext>::value,
725 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
726 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
727 observer<T, detail::stateless_observer_tag, OnNext>(on));
728 trace_activity().connect(r, scbr);
729 return r;
730 }
731 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on)732 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
733 -> typename std::enable_if<
734 detail::is_on_next_of<T, OnNext>::value,
735 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
736 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
737 observer<T, detail::stateless_observer_tag, OnNext>(on));
738 trace_activity().connect(r, scbr);
739 return r;
740 }
741 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnError & oe)742 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
743 -> typename std::enable_if<
744 detail::is_on_next_of<T, OnNext>::value &&
745 detail::is_on_error<OnError>::value,
746 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
747 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
748 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
749 trace_activity().connect(r, scbr);
750 return r;
751 }
752 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe)753 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
754 -> typename std::enable_if<
755 detail::is_on_next_of<T, OnNext>::value &&
756 detail::is_on_error<OnError>::value,
757 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
758 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
759 observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
760 trace_activity().connect(r, scbr);
761 return r;
762 }
763 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)764 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
765 -> typename std::enable_if<
766 detail::is_on_next_of<T, OnNext>::value &&
767 detail::is_on_completed<OnCompleted>::value,
768 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
769 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
770 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
771 trace_activity().connect(r, scbr);
772 return r;
773 }
774 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)775 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
776 -> typename std::enable_if<
777 detail::is_on_next_of<T, OnNext>::value &&
778 detail::is_on_completed<OnCompleted>::value,
779 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
780 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
781 observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
782 trace_activity().connect(r, scbr);
783 return r;
784 }
785 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)786 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
787 -> typename std::enable_if<
788 detail::is_on_next_of<T, OnNext>::value &&
789 detail::is_on_error<OnError>::value &&
790 detail::is_on_completed<OnCompleted>::value,
791 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
792 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
793 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
794 trace_activity().connect(r, scbr);
795 return r;
796 }
797 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)798 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
799 -> typename std::enable_if<
800 detail::is_on_next_of<T, OnNext>::value &&
801 detail::is_on_error<OnError>::value &&
802 detail::is_on_completed<OnCompleted>::value,
803 subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
804 auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
805 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
806 trace_activity().connect(r, scbr);
807 return r;
808 }
809
810 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,const composite_subscription & cs)811 auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs)
812 -> subscriber<T, Observer> {
813 auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer());
814 trace_activity().connect(r, scbr);
815 return r;
816 }
817 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,trace_id id,const composite_subscription & cs)818 auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs)
819 -> subscriber<T, Observer> {
820 auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer());
821 trace_activity().connect(r, scbr);
822 return r;
823 }
824
825 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,trace_id id)826 auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id)
827 -> subscriber<T, Observer> {
828 auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
829 trace_activity().connect(r, scbr);
830 return r;
831 }
832
833 }
834
835 #endif
836