• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SUBSCRIBER_HPP)
6 #define RXCPP_RX_SUBSCRIBER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 template<class T>
13 struct subscriber_base : public observer_base<T>, public subscription_base
14 {
15     typedef tag_subscriber subscriber_tag;
16 };
17 
18 /*!
19     \brief binds an observer that consumes values with a composite_subscription that controls lifetime.
20 
21     \ingroup group-core
22 
23 */
24 template<class T, class Observer = observer<T>>
25 class subscriber : public subscriber_base<T>
26 {
27     static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
28     static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
29     typedef subscriber<T, Observer> this_type;
30     typedef rxu::decay_t<Observer> observer_type;
31 
32     composite_subscription lifetime;
33     observer_type destination;
34     trace_id id;
35 
36     struct nextdetacher
37     {
~nextdetacherrxcpp::subscriber::nextdetacher38         ~nextdetacher()
39         {
40             trace_activity().on_next_return(*that);
41             if (do_unsubscribe) {
42                 that->unsubscribe();
43             }
44         }
nextdetacherrxcpp::subscriber::nextdetacher45         nextdetacher(const this_type* that)
46             : that(that)
47             , do_unsubscribe(true)
48         {
49         }
50         template<class U>
operator ()rxcpp::subscriber::nextdetacher51         void operator()(U u) {
52             trace_activity().on_next_enter(*that, u);
53             RXCPP_TRY {
54                 that->destination.on_next(std::move(u));
55                 do_unsubscribe = false;
56             } RXCPP_CATCH(...) {
57                 auto ex = rxu::current_exception();
58                 trace_activity().on_error_enter(*that, ex);
59                 that->destination.on_error(std::move(ex));
60                 trace_activity().on_error_return(*that);
61             }
62         }
63         const this_type* that;
64         volatile bool do_unsubscribe;
65     };
66 
67     struct errordetacher
68     {
~errordetacherrxcpp::subscriber::errordetacher69         ~errordetacher()
70         {
71             trace_activity().on_error_return(*that);
72             that->unsubscribe();
73         }
errordetacherrxcpp::subscriber::errordetacher74         errordetacher(const this_type* that)
75             : that(that)
76         {
77         }
operator ()rxcpp::subscriber::errordetacher78         inline void operator()(rxu::error_ptr ex) {
79             trace_activity().on_error_enter(*that, ex);
80             that->destination.on_error(std::move(ex));
81         }
82         const this_type* that;
83     };
84 
85     struct completeddetacher
86     {
~completeddetacherrxcpp::subscriber::completeddetacher87         ~completeddetacher()
88         {
89             trace_activity().on_completed_return(*that);
90             that->unsubscribe();
91         }
completeddetacherrxcpp::subscriber::completeddetacher92         completeddetacher(const this_type* that)
93             : that(that)
94         {
95         }
operator ()rxcpp::subscriber::completeddetacher96         inline void operator()() {
97             trace_activity().on_completed_enter(*that);
98             that->destination.on_completed();
99         }
100         const this_type* that;
101     };
102 
103     subscriber();
104 public:
105     typedef typename composite_subscription::weak_subscription weak_subscription;
106 
subscriber(const this_type & o)107     subscriber(const this_type& o)
108         : lifetime(o.lifetime)
109         , destination(o.destination)
110         , id(o.id)
111     {
112     }
subscriber(this_type && o)113     subscriber(this_type&& o)
114         : lifetime(std::move(o.lifetime))
115         , destination(std::move(o.destination))
116         , id(std::move(o.id))
117     {
118     }
119 
120     template<class U, class O>
121     friend class subscriber;
122 
123     template<class O>
subscriber(const subscriber<T,O> & o,typename std::enable_if<!std::is_same<O,observer<T>>::value && std::is_same<Observer,observer<T>>::value,void ** >::type=nullptr)124     subscriber(
125         const subscriber<T, O>& o,
126         typename std::enable_if<
127                !std::is_same<O, observer<T>>::value &&
128                std::is_same<Observer, observer<T>>::value, void**>::type = nullptr)
129         : lifetime(o.lifetime)
130         , destination(o.destination.as_dynamic())
131         , id(o.id)
132     {
133     }
134 
135     template<class U>
subscriber(trace_id id,composite_subscription cs,U && o)136     subscriber(trace_id id, composite_subscription cs, U&& o)
137         : lifetime(std::move(cs))
138         , destination(std::forward<U>(o))
139         , id(std::move(id))
140     {
141         static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
142         static_assert(is_observer<U>::value, "must pass observer to subscriber");
143         trace_activity().create_subscriber(*this);
144     }
145 
operator =(this_type o)146     this_type& operator=(this_type o) {
147         lifetime = std::move(o.lifetime);
148         destination = std::move(o.destination);
149         id = std::move(o.id);
150         return *this;
151     }
152 
get_observer() const153     const observer_type& get_observer() const {
154         return destination;
155     }
get_observer()156     observer_type& get_observer() {
157         return destination;
158     }
get_subscription() const159     const composite_subscription& get_subscription() const {
160         return lifetime;
161     }
get_subscription()162     composite_subscription& get_subscription() {
163         return lifetime;
164     }
get_id() const165     trace_id get_id() const {
166         return id;
167     }
168 
as_dynamic() const169     subscriber<T> as_dynamic() const {
170         return subscriber<T>(id, lifetime, destination.as_dynamic());
171     }
172 
173     // observer
174     //
175     template<class V>
on_next(V && v) const176     void on_next(V&& v) const {
177         if (!is_subscribed()) {
178             return;
179         }
180         nextdetacher protect(this);
181         protect(std::forward<V>(v));
182     }
on_error(rxu::error_ptr e) const183     void on_error(rxu::error_ptr e) const {
184         if (!is_subscribed()) {
185             return;
186         }
187         errordetacher protect(this);
188         protect(std::move(e));
189     }
on_completed() const190     void on_completed() const {
191         if (!is_subscribed()) {
192             return;
193         }
194         completeddetacher protect(this);
195         protect();
196     }
197 
198     // composite_subscription
199     //
is_subscribed() const200     bool is_subscribed() const {
201         return lifetime.is_subscribed();
202     }
add(subscription s) const203     weak_subscription add(subscription s) const {
204         return lifetime.add(std::move(s));
205     }
206     template<class F>
add(F f) const207     auto add(F f) const
208     -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
209         return lifetime.add(make_subscription(std::move(f)));
210     }
remove(weak_subscription w) const211     void remove(weak_subscription w) const {
212         return lifetime.remove(std::move(w));
213     }
clear() const214     void clear() const {
215         return lifetime.clear();
216     }
unsubscribe() const217     void unsubscribe() const {
218         return lifetime.unsubscribe();
219     }
220 
221 };
222 
223 template<class T, class Observer>
make_subscriber(subscriber<T,Observer> o)224 auto make_subscriber(
225             subscriber<T,   Observer> o)
226     ->      subscriber<T,   Observer> {
227     return  subscriber<T,   Observer>(std::move(o));
228 }
229 
230 // observer
231 //
232 
233 template<class T>
make_subscriber()234 auto make_subscriber()
235     -> typename std::enable_if<
236         detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
237             subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>>::type {
238     return  subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
239                             observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
240 }
241 
242 template<class T, class I>
make_subscriber(const observer<T,I> & o)243 auto make_subscriber(
244     const                   observer<T, I>& o)
245     ->      subscriber<T,   observer<T, I>> {
246     return  subscriber<T,   observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
247 }
248 template<class T, class Observer>
make_subscriber(const Observer & o)249 auto make_subscriber(const Observer& o)
250     -> typename std::enable_if<
251     is_observer<Observer>::value &&
252     !is_subscriber<Observer>::value,
253             subscriber<T,   Observer>>::type {
254     return  subscriber<T,   Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
255 }
256 template<class T, class Observer>
make_subscriber(const Observer & o)257 auto make_subscriber(const Observer& o)
258     -> typename std::enable_if<
259         !detail::is_on_next_of<T, Observer>::value &&
260         !is_subscriber<Observer>::value &&
261         !is_subscription<Observer>::value &&
262         !is_observer<Observer>::value,
263             subscriber<T,   observer<T, Observer>>>::type {
264     return  subscriber<T,   observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
265 }
266 template<class T, class OnNext>
make_subscriber(const OnNext & on)267 auto make_subscriber(const OnNext& on)
268     -> typename std::enable_if<
269         detail::is_on_next_of<T, OnNext>::value,
270             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
271     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), composite_subscription(),
272                             observer<T, detail::stateless_observer_tag, OnNext>(on));
273 }
274 template<class T, class OnNext, class OnError>
make_subscriber(const OnNext & on,const OnError & oe)275 auto make_subscriber(const OnNext& on, const OnError& oe)
276     -> typename std::enable_if<
277         detail::is_on_next_of<T, OnNext>::value &&
278         detail::is_on_error<OnError>::value,
279             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
280     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), composite_subscription(),
281                             observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
282 }
283 template<class T, class OnNext, class OnCompleted>
make_subscriber(const OnNext & on,const OnCompleted & oc)284 auto make_subscriber(const OnNext& on, const OnCompleted& oc)
285     -> typename std::enable_if<
286         detail::is_on_next_of<T, OnNext>::value &&
287         detail::is_on_completed<OnCompleted>::value,
288             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
289     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
290                             observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
291 }
292 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(const OnNext & on,const OnError & oe,const OnCompleted & oc)293 auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
294     -> typename std::enable_if<
295         detail::is_on_next_of<T, OnNext>::value &&
296         detail::is_on_error<OnError>::value &&
297         detail::is_on_completed<OnCompleted>::value,
298             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
299     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
300                             observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
301 }
302 
303 // explicit lifetime
304 //
305 
306 template<class T>
make_subscriber(const composite_subscription & cs)307 auto make_subscriber(const composite_subscription& cs)
308     ->      subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
309     return  subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), cs,
310                             observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
311 }
312 
313 template<class T, class I>
make_subscriber(const composite_subscription & cs,const observer<T,I> & o)314 auto make_subscriber(const composite_subscription& cs,
315     const                   observer<T, I>& o)
316     ->      subscriber<T,   observer<T, I>> {
317     return  subscriber<T,   observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
318 }
319 template<class T, class I>
make_subscriber(const composite_subscription & cs,const subscriber<T,I> & s)320 auto make_subscriber(const composite_subscription& cs,
321     const                   subscriber<T, I>& s)
322     ->      subscriber<T,   I> {
323     return  subscriber<T,   I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
324 }
325 template<class T, class Observer>
make_subscriber(const composite_subscription & cs,const Observer & o)326 auto make_subscriber(const composite_subscription& cs, const Observer& o)
327     -> typename std::enable_if<
328         !is_subscriber<Observer>::value &&
329         is_observer<Observer>::value,
330             subscriber<T,   Observer>>::type {
331     return  subscriber<T,   Observer>(trace_id::make_next_id_subscriber(), cs, o);
332 }
333 template<class T, class Observer>
make_subscriber(const composite_subscription & cs,const Observer & o)334 auto make_subscriber(const composite_subscription& cs, const Observer& o)
335     -> typename std::enable_if<
336         !detail::is_on_next_of<T, Observer>::value &&
337         !is_subscriber<Observer>::value &&
338         !is_subscription<Observer>::value &&
339         !is_observer<Observer>::value,
340             subscriber<T,   observer<T, Observer>>>::type {
341     return  subscriber<T,   observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o));
342 }
343 template<class T, class OnNext>
make_subscriber(const composite_subscription & cs,const OnNext & on)344 auto make_subscriber(const composite_subscription& cs, const OnNext& on)
345     -> typename std::enable_if<
346         detail::is_on_next_of<T, OnNext>::value,
347             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
348     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
349                             observer<T, detail::stateless_observer_tag, OnNext>(on));
350 }
351 template<class T, class OnNext, class OnError>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnError & oe)352 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe)
353     -> typename std::enable_if<
354         detail::is_on_next_of<T, OnNext>::value &&
355         detail::is_on_error<OnError>::value,
356             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
357     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
358                             observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
359 }
360 template<class T, class OnNext, class OnCompleted>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)361 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
362     -> typename std::enable_if<
363         detail::is_on_next_of<T, OnNext>::value &&
364         detail::is_on_completed<OnCompleted>::value,
365             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
366     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
367                             observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
368 }
369 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)370 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
371     -> typename std::enable_if<
372         detail::is_on_next_of<T, OnNext>::value &&
373         detail::is_on_error<OnError>::value &&
374         detail::is_on_completed<OnCompleted>::value,
375             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
376     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
377                             observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
378 }
379 
380 // explicit id
381 //
382 
383 template<class T>
make_subscriber(trace_id id)384 auto make_subscriber(trace_id id)
385     ->      subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
386     return  subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), composite_subscription(),
387                             observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
388 }
389 
390 template<class T>
make_subscriber(trace_id id,const composite_subscription & cs)391 auto make_subscriber(trace_id id, const composite_subscription& cs)
392     ->      subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
393     return  subscriber<T,   observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), cs,
394                             observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
395 }
396 
397 template<class T, class I>
make_subscriber(trace_id id,const observer<T,I> & o)398 auto make_subscriber(trace_id id,
399     const                   observer<T, I>& o)
400     ->      subscriber<T,   observer<T, I>> {
401     return  subscriber<T,   observer<T, I>>(std::move(id), composite_subscription(), o);
402 }
403 template<class T, class I>
make_subscriber(trace_id id,const composite_subscription & cs,const observer<T,I> & o)404 auto make_subscriber(trace_id id, const composite_subscription& cs,
405     const                   observer<T, I>& o)
406     ->      subscriber<T,   observer<T, I>> {
407     return  subscriber<T,   observer<T, I>>(std::move(id), cs, o);
408 }
409 template<class T, class Observer>
make_subscriber(trace_id id,const Observer & o)410 auto make_subscriber(trace_id id, const Observer& o)
411     -> typename std::enable_if<
412         is_observer<Observer>::value,
413             subscriber<T,   Observer>>::type {
414     return  subscriber<T,   Observer>(std::move(id), composite_subscription(), o);
415 }
416 template<class T, class Observer>
make_subscriber(trace_id id,const composite_subscription & cs,const Observer & o)417 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
418     -> typename std::enable_if<
419         is_observer<Observer>::value,
420             subscriber<T,   Observer>>::type {
421     return  subscriber<T,   Observer>(std::move(id), cs, o);
422 }
423 template<class T, class Observer>
make_subscriber(trace_id id,const Observer & o)424 auto make_subscriber(trace_id id, const Observer& o)
425     -> typename std::enable_if<
426         !detail::is_on_next_of<T, Observer>::value &&
427         !is_subscriber<Observer>::value &&
428         !is_subscription<Observer>::value &&
429         !is_observer<Observer>::value,
430             subscriber<T,   observer<T, Observer>>>::type {
431     return  subscriber<T,   observer<T, Observer>>(std::move(id), composite_subscription(), o);
432 }
433 template<class T, class Observer>
make_subscriber(trace_id id,const composite_subscription & cs,const Observer & o)434 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
435     -> typename std::enable_if<
436         !detail::is_on_next_of<T, Observer>::value &&
437         !is_subscriber<Observer>::value &&
438         !is_subscription<Observer>::value &&
439         !is_observer<Observer>::value,
440             subscriber<T,   observer<T, Observer>>>::type {
441     return  subscriber<T,   observer<T, Observer>>(std::move(id), cs, o);
442 }
443 template<class T, class OnNext>
make_subscriber(trace_id id,const OnNext & on)444 auto make_subscriber(trace_id id, const OnNext& on)
445     -> typename std::enable_if<
446         detail::is_on_next_of<T, OnNext>::value,
447             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
448     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), composite_subscription(),
449                             observer<T, detail::stateless_observer_tag, OnNext>(on));
450 }
451 template<class T, class OnNext>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on)452 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
453     -> typename std::enable_if<
454         detail::is_on_next_of<T, OnNext>::value,
455             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
456     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
457                             observer<T, detail::stateless_observer_tag, OnNext>(on));
458 }
459 template<class T, class OnNext, class OnError>
make_subscriber(trace_id id,const OnNext & on,const OnError & oe)460 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
461     -> typename std::enable_if<
462         detail::is_on_next_of<T, OnNext>::value &&
463         detail::is_on_error<OnError>::value,
464             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
465     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), composite_subscription(),
466                             observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
467 }
468 template<class T, class OnNext, class OnError>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe)469 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
470     -> typename std::enable_if<
471         detail::is_on_next_of<T, OnNext>::value &&
472         detail::is_on_error<OnError>::value,
473             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
474     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
475                             observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
476 }
477 template<class T, class OnNext, class OnCompleted>
make_subscriber(trace_id id,const OnNext & on,const OnCompleted & oc)478 auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
479     -> typename std::enable_if<
480         detail::is_on_next_of<T, OnNext>::value &&
481         detail::is_on_completed<OnCompleted>::value,
482             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
483     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), composite_subscription(),
484                             observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
485 }
486 template<class T, class OnNext, class OnCompleted>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)487 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
488     -> typename std::enable_if<
489         detail::is_on_next_of<T, OnNext>::value &&
490         detail::is_on_completed<OnCompleted>::value,
491             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
492     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
493                             observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
494 }
495 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(trace_id id,const OnNext & on,const OnError & oe,const OnCompleted & oc)496 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
497     -> typename std::enable_if<
498         detail::is_on_next_of<T, OnNext>::value &&
499         detail::is_on_error<OnError>::value &&
500         detail::is_on_completed<OnCompleted>::value,
501             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
502     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), composite_subscription(),
503                             observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
504 }
505 template<class T, class OnNext, class OnError, class OnCompleted>
make_subscriber(trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)506 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
507     -> typename std::enable_if<
508         detail::is_on_next_of<T, OnNext>::value &&
509         detail::is_on_error<OnError>::value &&
510         detail::is_on_completed<OnCompleted>::value,
511             subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
512     return  subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
513                             observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
514 }
515 
516 // chain defaults from subscriber
517 //
518 
519 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const observer<T,I> & o)520 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr,
521     const                   observer<T, I>& o)
522     ->       subscriber<T,   observer<T, I>> {
523     auto r = subscriber<T,   observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
524     trace_activity().connect(r, scbr);
525     return r;
526 }
527 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const observer<T,I> & o)528 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id,
529     const                   observer<T, I>& o)
530     ->       subscriber<T,   observer<T, I>> {
531     auto r = subscriber<T,   observer<T, I>>(std::move(id), scbr.get_subscription(), o);
532     trace_activity().connect(r, scbr);
533     return r;
534 }
535 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const Observer & o)536 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
537     -> typename std::enable_if<
538         is_observer<Observer>::value,
539              subscriber<T,   Observer>>::type {
540     auto r = subscriber<T,   Observer>(std::move(id), scbr.get_subscription(), o);
541     trace_activity().connect(r, scbr);
542     return r;
543 }
544 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const Observer & o)545 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
546     -> typename std::enable_if<
547         !is_subscription<Observer>::value &&
548         is_observer<Observer>::value,
549              subscriber<T,   Observer>>::type {
550     auto r = subscriber<T,   Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
551     trace_activity().connect(r, scbr);
552     return r;
553 }
554 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const Observer & o)555 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
556     -> typename std::enable_if<
557         !detail::is_on_next_of<T, Observer>::value &&
558         !is_subscriber<Observer>::value &&
559         !is_subscription<Observer>::value &&
560         !is_observer<Observer>::value,
561              subscriber<T,   observer<T, Observer>>>::type {
562     auto r = subscriber<T,   observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o));
563     trace_activity().connect(r, scbr);
564     return r;
565 }
566 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const Observer & o)567 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
568     -> typename std::enable_if<
569         !detail::is_on_next_of<T, Observer>::value &&
570         !is_subscriber<Observer>::value &&
571         !is_subscription<Observer>::value &&
572         !is_observer<Observer>::value,
573              subscriber<T,   observer<T, Observer>>>::type {
574     auto r = subscriber<T,   observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
575     trace_activity().connect(r, scbr);
576     return r;
577 }
578 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on)579 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
580     -> typename std::enable_if<
581         detail::is_on_next_of<T, OnNext>::value,
582              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
583     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
584                              observer<T, detail::stateless_observer_tag, OnNext>(on));
585     trace_activity().connect(r, scbr);
586     return r;
587 }
588 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on)589 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
590     -> typename std::enable_if<
591         detail::is_on_next_of<T, OnNext>::value,
592              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
593     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), scbr.get_subscription(),
594                              observer<T, detail::stateless_observer_tag, OnNext>(on));
595     trace_activity().connect(r, scbr);
596     return r;
597 }
598 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnError & oe)599 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
600     -> typename std::enable_if<
601         detail::is_on_next_of<T, OnNext>::value &&
602         detail::is_on_error<OnError>::value,
603              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
604     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
605                              observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
606     trace_activity().connect(r, scbr);
607     return r;
608 }
609 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnError & oe)610 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
611     -> typename std::enable_if<
612         detail::is_on_next_of<T, OnNext>::value &&
613         detail::is_on_error<OnError>::value,
614              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
615     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), scbr.get_subscription(),
616                              observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
617     trace_activity().connect(r, scbr);
618     return r;
619 }
620 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnCompleted & oc)621 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
622     -> typename std::enable_if<
623         detail::is_on_next_of<T, OnNext>::value &&
624         detail::is_on_completed<OnCompleted>::value,
625              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
626     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
627                              observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
628     trace_activity().connect(r, scbr);
629     return r;
630 }
631 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnCompleted & oc)632 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
633     -> typename std::enable_if<
634         detail::is_on_next_of<T, OnNext>::value &&
635         detail::is_on_completed<OnCompleted>::value,
636              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
637     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), scbr.get_subscription(),
638                              observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
639     trace_activity().connect(r, scbr);
640     return r;
641 }
642 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const OnNext & on,const OnError & oe,const OnCompleted & oc)643 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
644     -> typename std::enable_if<
645         detail::is_on_next_of<T, OnNext>::value &&
646         detail::is_on_error<OnError>::value &&
647         detail::is_on_completed<OnCompleted>::value,
648              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
649     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
650                              observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
651     trace_activity().connect(r, scbr);
652     return r;
653 }
654 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const OnNext & on,const OnError & oe,const OnCompleted & oc)655 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
656     -> typename std::enable_if<
657         detail::is_on_next_of<T, OnNext>::value &&
658         detail::is_on_error<OnError>::value &&
659         detail::is_on_completed<OnCompleted>::value,
660              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
661     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), scbr.get_subscription(),
662                              observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
663     trace_activity().connect(r, scbr);
664     return r;
665 }
666 
667 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> &,const composite_subscription & cs,const observer<T,I> & o)668 auto make_subscriber(const subscriber<OtherT, OtherObserver>& , const composite_subscription& cs,
669     const                   observer<T, I>& o)
670     ->      subscriber<T,   observer<T, I>> {
671     return  subscriber<T,   observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
672 }
673 template<class T, class OtherT, class OtherObserver, class I>
make_subscriber(const subscriber<OtherT,OtherObserver> &,trace_id id,const composite_subscription & cs,const observer<T,I> & o)674 auto make_subscriber(const subscriber<OtherT, OtherObserver>&, trace_id id, const composite_subscription& cs,
675     const                   observer<T, I>& o)
676     ->      subscriber<T,   observer<T, I>> {
677     return  subscriber<T,   observer<T, I>>(std::move(id), cs, o);
678 }
679 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const Observer & o)680 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
681     -> typename std::enable_if<
682         is_observer<Observer>::value,
683              subscriber<T,   Observer>>::type {
684     auto r = subscriber<T,   Observer>(trace_id::make_next_id_subscriber(), cs, o);
685     trace_activity().connect(r, scbr);
686     return r;
687 }
688 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const Observer & o)689 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
690     -> typename std::enable_if<
691         is_observer<Observer>::value,
692              subscriber<T,   Observer>>::type {
693     auto r = subscriber<T,   Observer>(std::move(id), cs, o);
694     trace_activity().connect(r, scbr);
695     return r;
696 }
697 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const Observer & o)698 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
699     -> typename std::enable_if<
700         !detail::is_on_next_of<T, Observer>::value &&
701         !is_subscriber<Observer>::value &&
702         !is_subscription<Observer>::value &&
703         !is_observer<Observer>::value,
704              subscriber<T,   observer<T, Observer>>>::type {
705     auto r = subscriber<T,   observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o);
706     trace_activity().connect(r, scbr);
707     return r;
708 }
709 template<class T, class OtherT, class OtherObserver, class Observer>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const Observer & o)710 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
711     -> typename std::enable_if<
712         !detail::is_on_next_of<T, Observer>::value &&
713         !is_subscriber<Observer>::value &&
714         !is_subscription<Observer>::value &&
715         !is_observer<Observer>::value,
716              subscriber<T,   observer<T, Observer>>>::type {
717     auto r = subscriber<T,   observer<T, Observer>>(std::move(id), cs, o);
718     trace_activity().connect(r, scbr);
719     return r;
720 }
721 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on)722 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
723     -> typename std::enable_if<
724         detail::is_on_next_of<T, OnNext>::value,
725              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
726     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
727                              observer<T, detail::stateless_observer_tag, OnNext>(on));
728     trace_activity().connect(r, scbr);
729     return r;
730 }
731 template<class T, class OtherT, class OtherObserver, class OnNext>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on)732 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
733     -> typename std::enable_if<
734         detail::is_on_next_of<T, OnNext>::value,
735              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>>::type {
736     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
737                              observer<T, detail::stateless_observer_tag, OnNext>(on));
738     trace_activity().connect(r, scbr);
739     return r;
740 }
741 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnError & oe)742 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
743     -> typename std::enable_if<
744         detail::is_on_next_of<T, OnNext>::value &&
745         detail::is_on_error<OnError>::value,
746              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
747     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
748                              observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
749     trace_activity().connect(r, scbr);
750     return r;
751 }
752 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe)753 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
754     -> typename std::enable_if<
755         detail::is_on_next_of<T, OnNext>::value &&
756         detail::is_on_error<OnError>::value,
757              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
758     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
759                              observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
760     trace_activity().connect(r, scbr);
761     return r;
762 }
763 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)764 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
765     -> typename std::enable_if<
766         detail::is_on_next_of<T, OnNext>::value &&
767         detail::is_on_completed<OnCompleted>::value,
768              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
769     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
770                              observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
771     trace_activity().connect(r, scbr);
772     return r;
773 }
774 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnCompleted & oc)775 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
776     -> typename std::enable_if<
777         detail::is_on_next_of<T, OnNext>::value &&
778         detail::is_on_completed<OnCompleted>::value,
779              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
780     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
781                              observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
782     trace_activity().connect(r, scbr);
783     return r;
784 }
785 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)786 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
787     -> typename std::enable_if<
788         detail::is_on_next_of<T, OnNext>::value &&
789         detail::is_on_error<OnError>::value &&
790         detail::is_on_completed<OnCompleted>::value,
791              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
792     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
793                              observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
794     trace_activity().connect(r, scbr);
795     return r;
796 }
797 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
make_subscriber(const subscriber<OtherT,OtherObserver> & scbr,trace_id id,const composite_subscription & cs,const OnNext & on,const OnError & oe,const OnCompleted & oc)798 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
799     -> typename std::enable_if<
800         detail::is_on_next_of<T, OnNext>::value &&
801         detail::is_on_error<OnError>::value &&
802         detail::is_on_completed<OnCompleted>::value,
803              subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
804     auto r = subscriber<T,   observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
805                              observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
806     trace_activity().connect(r, scbr);
807     return r;
808 }
809 
810 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,const composite_subscription & cs)811 auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs)
812     ->      subscriber<T,   Observer> {
813     auto r = subscriber<T,   Observer>(scbr.get_id(), cs, scbr.get_observer());
814     trace_activity().connect(r, scbr);
815     return r;
816 }
817 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,trace_id id,const composite_subscription & cs)818 auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs)
819     ->      subscriber<T,   Observer> {
820     auto r = subscriber<T,   Observer>(std::move(id), cs, scbr.get_observer());
821     trace_activity().connect(r, scbr);
822     return r;
823 }
824 
825 template<class T, class Observer>
make_subscriber(const subscriber<T,Observer> & scbr,trace_id id)826 auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id)
827     ->      subscriber<T,   Observer> {
828     auto r = subscriber<T,   Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
829     trace_activity().connect(r, scbr);
830     return r;
831 }
832 
833 }
834 
835 #endif
836