• Home
Name Date Size #Lines LOC

..--

Ix/CPP/03-May-2024-4,2813,418

Rx/v2/03-May-2024-60,24047,574

projects/03-May-2024-2,9502,320

.editorconfigD03-May-2024148 108

.gitattributesD03-May-20242.5 KiB6459

.gitignoreD03-May-20241.8 KiB141114

.travis.ymlD03-May-20247.8 KiB220168

AUTHORS.txtD03-May-2024539 4237

Android.bpD03-May-20241.5 KiB5045

CMakeLists.txtD03-May-2024111 63

DeveloperManual.mdD03-May-20243.2 KiB3320

LICENSED03-May-202411.1 KiB202169

METADATAD03-May-2024443 1917

MODULE_LICENSE_APACHE2D03-May-20240

NOTICED03-May-202411.1 KiB202169

OWNERSD03-May-2024135 43

README.mdD03-May-20249.3 KiB239188

Readme.htmlD03-May-202430.8 KiB309252

appveyor.ymlD03-May-20241 KiB6047

license.mdD03-May-202411.1 KiB202169

README.md

1The Reactive Extensions for C++ (__RxCpp__) is a library of algorithms for values-distributed-in-time. The [__Range-v3__](https://github.com/ericniebler/range-v3) library does the same for values-distributed-in-space.
2
3Platform    | Status |
4----------- | :------------ |
5Windows | [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
6Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCpp.svg?style=flat-square)](https://travis-ci.org/ReactiveX/RxCpp)
7
8Source        | Badges |
9------------- | :--------------- |
10Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) <br/> [![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases) <br/> [![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/4.1.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
11Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
12Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp)
13Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp) <br/> [![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/)
14
15# Usage
16
17__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the [Catch](https://github.com/philsquared/Catch) library.
18
19# Example
20Add `Rx/v2/src` to the include paths
21
22[![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)
23
24```cpp
25#include "rxcpp/rx.hpp"
26namespace Rx {
27using namespace rxcpp;
28using namespace rxcpp::sources;
29using namespace rxcpp::operators;
30using namespace rxcpp::util;
31}
32using namespace Rx;
33
34#include <regex>
35#include <random>
36using namespace std;
37using namespace std::chrono;
38
39int main()
40{
41    random_device rd;   // non-deterministic generator
42    mt19937 gen(rd());
43    uniform_int_distribution<> dist(4, 18);
44
45    // for testing purposes, produce byte stream that from lines of text
46    auto bytes = range(0, 10) |
47        flat_map([&](int i){
48            auto body = from((uint8_t)('A' + i)) |
49                repeat(dist(gen)) |
50                as_dynamic();
51            auto delim = from((uint8_t)'\r');
52            return from(body, delim) | concat();
53        }) |
54        window(17) |
55        flat_map([](observable<uint8_t> w){
56            return w |
57                reduce(
58                    vector<uint8_t>(),
59                    [](vector<uint8_t> v, uint8_t b){
60                        v.push_back(b);
61                        return v;
62                    }) |
63                as_dynamic();
64        }) |
65        tap([](vector<uint8_t>& v){
66            // print input packet of bytes
67            copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
68            cout << endl;
69        });
70
71    //
72    // recover lines of text from byte stream
73    //
74
75    auto removespaces = [](string s){
76        s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
77        return s;
78    };
79
80    // create strings split on \r
81    auto strings = bytes |
82        concat_map([](vector<uint8_t> v){
83            string s(v.begin(), v.end());
84            regex delim(R"/(\r)/");
85            cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
86            cregex_token_iterator end;
87            vector<string> splits(cursor, end);
88            return iterate(move(splits));
89        }) |
90        filter([](const string& s){
91            return !s.empty();
92        }) |
93        publish() |
94        ref_count();
95
96    // filter to last string in each line
97    auto closes = strings |
98        filter(
99            [](const string& s){
100                return s.back() == '\r';
101            }) |
102        Rx::map([](const string&){return 0;});
103
104    // group strings by line
105    auto linewindows = strings |
106        window_toggle(closes | start_with(0), [=](int){return closes;});
107
108    // reduce the strings for a line into one string
109    auto lines = linewindows |
110        flat_map([&](observable<string> w) {
111            return w | start_with<string>("") | sum() | Rx::map(removespaces);
112        });
113
114    // print result
115    lines |
116        subscribe<string>(println(cout));
117
118    return 0;
119}
120```
121
122# Reactive Extensions
123
124>The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.
125
126Credit [ReactiveX.io](http://reactivex.io/intro.html)
127
128### Other language implementations
129
130* Java: [RxJava](https://github.com/ReactiveX/RxJava)
131* JavaScript: [rxjs](https://github.com/ReactiveX/rxjs)
132* C#: [Rx.NET](https://github.com/Reactive-Extensions/Rx.NET)
133* [More..](http://reactivex.io/languages.html)
134
135### Resources
136
137* [Intro](http://reactivex.io/intro.html)
138* [Tutorials](http://reactivex.io/tutorials.html)
139* [Marble Diagrams](http://rxmarbles.com/)
140* [twitter stream analysis app](https://github.com/kirkshoop/twitter)
141  * [![baldwin pass to wilson](https://img.youtube.com/vi/QkvCzShHyVU/0.jpg)](https://www.youtube.com/watch?v=QkvCzShHyVU)
142* _Algorithm Design For Values Distributed In Time_
143  * [![C++ Russia 2016](https://img.youtube.com/vi/Re6DS5Ff0uE/0.jpg)](https://www.youtube.com/watch?v=Re6DS5Ff0uE)
144  * [![CppCon 2016](https://img.youtube.com/vi/FcQURwM806o/0.jpg)](https://www.youtube.com/watch?v=FcQURwM806o)
145
146# Cloning RxCpp
147
148RxCpp uses a git submodule (in `ext/catch`) for the excellent [Catch](https://github.com/philsquared/Catch) library. The easiest way to ensure that the submodules are included in the clone is to add `--recursive` in the clone command.
149
150```shell
151git clone --recursive https://github.com/ReactiveX/RxCpp.git
152cd RxCpp
153```
154
155# Building RxCpp Unit Tests
156
157* RxCpp is regularly tested on OSX and Windows.
158* RxCpp is regularly built with Clang, Gcc and VC
159* RxCpp depends on the latest compiler releases.
160
161RxCpp uses CMake to create build files for several platforms and IDE's
162
163### ide builds
164
165#### XCode
166```shell
167mkdir projects/build
168cd projects/build
169cmake -G"Xcode" ../CMake -B.
170```
171
172#### Visual Studio 2017
173```batch
174mkdir projects\build
175cd projects\build
176cmake -G "Visual Studio 15" ..\CMake\
177msbuild Project.sln
178```
179
180### makefile builds
181
182#### OSX
183```shell
184mkdir projects/build
185cd projects/build
186cmake -G"Unix Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
187make
188```
189
190#### Linux --- Clang
191```shell
192mkdir projects/build
193cd projects/build
194cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXE_LINKER_FLAGS="-stdlib=libc++" -B. ../CMake
195make
196```
197
198#### Linux --- GCC
199```shell
200mkdir projects/build
201cd projects/build
202cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
203make
204```
205
206#### Windows
207```batch
208mkdir projects\build
209cd projects\build
210cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ..\CMake
211nmake
212```
213
214The build only produces test and example binaries.
215
216# Running tests
217
218* You can use the CMake test runner `ctest`
219* You can run the test binaries directly `rxcpp_test_*`
220* Tests can be selected by name or tag
221Example of by-tag
222
223`rxcpp_test_subscription [perf]`
224
225# Documentation
226
227RxCpp uses Doxygen to generate project [documentation](http://reactivex.github.io/RxCpp).
228
229When Doxygen+Graphviz is installed, CMake creates a special build task named `doc`. It creates actual documentation and puts it to `projects/doxygen/html/` folder, which can be published to the `gh-pages` branch. Each merged pull request will build the docs and publish them.
230
231[Developers Material](DeveloperManual.md)
232
233# Contributing Code
234
235Before submitting a feature or substantial code contribution please  discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source.
236
237# Microsoft Open Source Code of Conduct
238This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
239

Readme.html

1<h1>Reactive Extensions</h1>
2
3<p>Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.</p>
4<p>Data sequences can take many forms, such as a stream of data from a file or web service, web services requests, system notifications, or a series of events such as user input. </p>
5<p>Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrives. The Rx library is available for application development in C++, .NET, Ruby, Python, Silverlight, Windows Phone 7 and JavaScript. For more information on these different platforms, see Differences Between Versions of Rx topic.</p>
6
7<h2>Pulling vs. Pushing Data</h2>
8<p>In interactive programming, the application actively polls a data source for more information by pulling data from a sequence that represents the source. The iterator allows us to get the current item by returning the current property, and determines whether there are more items to iterate (by calling some on_next method). </p>
9<p>The application is active in the data retrieval process, controlling the pace of the retrieval by calling on_next at its own convenience. This pattern is synchronous, which means that the application might be blocked while polling the data source. Such pulling pattern is similar to visiting your library and checking out a book. After you are done with the book, you pay another visit to check out another one.</p>
10<p>On the other hand, in reactive programming, the application is offered more information by subscribing to a data stream (called observable sequence in Rx), and any update is handed to it from the source. The application is passive in the data retrieval process: apart from subscribing to the observable source, it does not actively poll the source, but merely reacts to the data being pushed to it. When the stream has no more data to offer, or when it errs, the source will send a notice to the subscriber. In this way, the application will not be blocked by waiting for the source to update. </p>
11<p>This is the push pattern employed by Reactive Extensions. It is similar to joining a book club in which you register your interest in a particular genre, and books that match your interest are automatically sent to you as they are published. You do not need to stand in line to acquire something that you want. Employing a push pattern is helpful in many scenarios, especially in a UI-heavy environment in which the UI thread cannot be blocked while the application is waiting for some events. In summary, by using Rx, you can make your application more responsive.</p>
12<p>The push model implemented by Rx is represented by the observable pattern of Rx.Observable/Observer. The Rx.Observable will notify all the observers automatically of any state changes. To register an interest through a subscription, you use the subscribe method of Rx.Observable, which takes on an Observer and returns a disposable. This gives you the ability to track and dispose of the subscription. In addition, Rx’s LINQ implementation over observable sequences allows developers to compose complex event processing queries over push-based sequences such as events, APM-based (“AsyncResult”) computations, Task-based computations, and asynchronous workflows. For more information on the Observable/Observer classes, see Exploring The Major Classes in Rx. For tutorials on using the different features in Rx, see Using Rx.</p>
13
14<h1>Getting Started with Rx</h1>
15<p>This section describes in general what Reactive Extensions (Rx) is, and how it can benefit programmers who are creating asynchronous applications.</p>
16
17<h3>In This Section</h3>
18<p>1.	When Will You Use Rx<br />
192.	Installing Rx<br />
203.	Differences Between Versions of Rx</p>
21
22<h3>Related Sections</h3>
23<p>Using Rx<br />
24Reactive Extensions on MSDN Developer Center</p>
25
26<h1>When Will You Use Rx</h1>
27<p>This topic describes the advantage of using Rx for users who are currently using the .NET event model for asynchronous programming.</p>
28
29<h2>Advantages of using Rx</h2>
30<p>Whether you are authoring a traditional desktop or web-based application, you have to deal with asynchronous programming from time to time. Desktop applications have I/O or UI threads that might take a long time to complete and potentially block all other active threads. However, a user of the modern asynchronous programming model has to manage exceptions and cancellation of events manually. To compose or filter events, he has to write custom code that is hard to decipher and maintain.</p>
31<p>In addition, if your application interacts with multiple sources of data, the conventional way to manage all of these interactions is to implement separate methods as event handlers for each of these data streams. For example, as soon as a user types a character, a keydown event is pushed to your keydown event handler method. Inside this keydown event handler, you have to provide code to react to this event, or to coordinate between all of the different data streams and process this data into a useable form.</p>
32<p>Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer class. The Observable class maintains a list of dependent Observer threads and notifies them automatically of any state changes. You can query observable sequences using standard LINQ query operators implemented by the Rx.Observable type. Thus you can filter, aggregate, and compose on multiple events easily by using these LINQ operators. Cancellation and exceptions can also be handled gracefully by using extension methods provided by Rx.</p>
33<p>The following example shows how easy it is to implement an observable in C++.</p>
34<pre><code>
35    auto ints = rxcpp::observable&lt;>::create<int>(
36        [](rxcpp::subscriber&lt;int> s){
37            s.on_next(1);
38            s.on_next(2);
39            s.on_completed();
40    });
41
42    ints.
43        subscribe(
44            [](int v){printf("OnNext: %d\n", v);},
45            [](){printf("OnCompleted\n");});
46</code></pre>
47<p>You can also use schedulers to control when the subscription starts, and when notifications are pushed to subscribers. For more information on this, see Using Schedulers for Concurrency Control.</p>
48
49<h2>Filtering</h2>
50<p>One drawback of the C++ event model is that your event handler is always called every time an event is raised, and events arrive exactly as they were sent out by the source. To filter out events that you are not interested in, or transform data before your handler is called, you have to add custom filter logic to your handler.</p>
51<p>Take an application that detects mouse-down as an example. In the current event programming model, the application can react to the event raised by displaying a message. In Rx, such mouse-down events are treated as a stream of information about clicks. Whenever you click the mouse, information (e.g., cursor position) about this click appears in a stream, ready to be processed. In this paradigm, events (or event streams) are very similar to lists or other collections. This means that we can use techniques for working with collections to process events.  For example, you can filter out those clicks that appear outside a specific area, and only display a message when the user clicks inside an area. Or you can wait a specific period of time, and inform the user the number of “valid” clicks during this period. Similarly, you can capture a stream of stock ticks and only respond to those ticks that have changed for a specific range during a particular time window. All these can be done easily by using LINQ-query style operators provided by Rx. </p>
52<p>In this way, a function can take an event, process it, and then pass out the processed stream to an application. This gives you flexibility that is not available in the current programming model. Moreover, as Rx is performing all the plumbing work at the background for filtering, synchronizing, and transforming the data, your handler can just react to the data it receives and do something with it. This results in cleaner code that is easier to read and maintain. For more information on filtering, see Querying Observable Collections using LINQ Operators.</p>
53
54<h2>Manipulating Events</h2>
55<p>Rx represents events as a collection of objects: e.g., a OnMouseMove event contains a collection of Point values. Due to the first-class object nature of observables, they can be passed around as function parameters and returns, or stored in a variable.</p>
56
57<h1>Installing Rx</h1>
58<p>This topic describes where you can download the Reactive Extensions (Rx) SDK.</p>
59
60<h2>To download Rx</h2>
61<p>Reactive Extensions is available for different platforms such as C++, Javascript, .NET Framework 3.5, 4.0, 4.5, Silverlight 3 and 4, as well as Windows Phone 7 & 8. You can download the libraries, as well as learn about their prerequisites at the <a href="http://msdn.microsoft.com/en-us/data/gg577609" target="_blank">Rx MSDN Developer Center.</a></p>
62
63<h1>Differences Between Versions of Rx</h1>
64<p>The following topic describes the various platforms for which you can develop solutions using Reactive Extensions.</p>
65<p>To get the latest release of Rx, as well as learn about its prerequisites, please visit the <a href="http://msdn.microsoft.com/en-us/data/gg577609" target="_blank">Rx MSDN Developer Center</a>. </p>
66
67<h2>C++</h2>
68<p>The Reactive Extensions for C++ (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in C++.</p>
69
70<h2>Javascript</h2>
71<p>Rx for Javascript (RxJS) allows you to use LINQ operators in JavaScript. It provides easy-to-use conversions from existing DOM, XmlHttpRequest (AJAX), and jQuery events to push-based observable collections, allowing users to seamlessly integrate Rx into their existing JavaScript-based websites. </p>
72<p>RxJS brings similar capabilities to client script and integrates with jQuery events (Rx.Observable.FromJQueryEvent). It also supports Script#.</p>
73
74<h2>Ruby</h2>
75<p>Rx for Ruby (Rx.rb) allows you to use Linq operators to create push-based observable collections in Ruby.</p>
76
77<h2>Python</h2>
78<p>RX for Python (Rx.py) allows you to use Linq operators in Python. Rx.py allows you to implement push-based observable collections, allowing users to seamlessly integrate Rx into their existing Python applications.</p>
79
80<h2>.NET Framework</h2>
81<p>The core Rx interfaces, IObservable<T> and IObserver<T>, ship as part of .NET Framework 4. If you are running on .NET Framework 3.5 SP1, or if you want to take advantage of the LINQ operators implemented in Observable type, as well as many other features such as schedulers, you can download the Rx header-only library in the <a href="http://msdn.microsoft.com/en-us/data/gg577609" target="_blank">Rx MSDN Developer Center</a>.</p>
82
83<h2>Silverlight </h2>
84<p>Silverlight disallows you from making cross-threading calls, thus you cannot use a background thread to update the UI. Instead of writing verbose code using the Dispatcher.BeginInvoke call to explicitly execute code on the main UI thread, you can use the factory Observable.Start method provided by the Rx header-only library to invoke an action asynchronously. Cross-threading is taken care of transparently by Rx under the hood.</p>
85<p>You can also use the various Observable operator overloads that take in a Scheduler, and specify the System.Reactive.Concurrency.DispatcherScheduler to be used.</p>
86
87<h2>Windows Phone</h2>
88<p>Windows Phone 7 ships with a version of the Reactive Extensions baked into the ROM of the device. For more information, see <a href="http://msdn.microsoft.com/en-us/library/ff431792(VS.92).aspx">Reactive Extensions for .NET Overview for Windows Phone</a>. Documentation for this version of the Reactive Extensions can be found in Windows Phone API library at <a href="http://msdn.microsoft.com/en-us/library/ff707857(v=VS.92).aspx">Microsoft.Phone.Reactive Namespace</a>. </p>
89<p>The <a href="http://msdn.microsoft.com/en-us/data/gg577609" target="_blank">Rx MSDN Developer Center</a> also contains an updated version of Rx for WP7, which has new definitions in the System.Reactive.Linq namespace. Note that the new APIs will not clash with the library built in to the phone (nor do they replace the version in the ROM). For more information on the differences of these 2 versions, see this <a href="http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-for-windows-phone-7.aspx">Rx team blog post</a>.</p>
90<p>Rx is available for Windows Phone 8 as well as Windows Phone 7. A .NET portable library is available using Nuget that is useful for developing libraries that work on Windows Phone, Windows Store apps, and in classic Windows desktop or server applications.</p>
91
92<h1>Using Rx</h1>
93<p>This section includes topics that explain how you use Rx to create and subscribe to sequences, bridge existing events and existing asynchronous patterns, as well as using schedulers. It also describes more advanced tasks such as implementing your own operators.</p>
94
95<h3>In This Section</h3>
96<p>1.	Exploring The Major Interfaces in Rx<br />
972.	Creating and Querying Event Streams<br />
983.	Subjects<br />
996.	Implementing your own operators for IObservable<br />
1007.	Using Observable Providers</p>
101
102<h1>Exploring The Major Interfaces in Rx</h1>
103<p>This topic describes the major Reactive Extensions (Rx) interfaces used to represent observable sequences and subscribe to them. </p>
104
105<h2>Observable/Observer</h2>
106<p>Rx exposes asynchronous and event-based data sources as push-based, observable sequences. This Observable class represents a data source that can be observed, meaning that it can send data to anyone who is interested. It maintains a list of dependent Observer implementations representing such interested listeners, and notifies them automatically of any state changes.</p>
107<p>As described in What is Rx, the other half of the push model is represented by the Observer class, which represents an observer who registers an interest through a subscription. Items are subsequently handed to the observer from the observable sequence to which it subscribes. </p>
108<p>In order to receive notifications from an observable collection, you use the subscribe method of Observable to hand it an Observer object. In return for this observer, the subscribe method returns a disposable object that acts as a handle for the subscription. This allows you to clean up the subscription after you are done. Calling dispose on this object detaches the observer from the source so that notifications are no longer delivered. As you can infer, in Rx you do not need to explicitly unsubscribe from an event. </p>
109<p>Observers support three publication events, reflected by the interface’s methods. OnNext can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events can send out a Point object every time the mouse has moved. The other two methods are used to indicate completion or errors.</p>
110<p>The following lists the Observable/Observer definitions.</p>
111<pre><code>
112namespace rxcpp {
113    template &lt;class T>
114    struct subscriber
115    {
116        // observer&lt;T>
117        void on_next(T);
118        void on_error(std::exception_ptr);
119        void on_completed();
120
121        // composite_subscription
122        bool is_subscribed();
123        void unsubscribe();
124    };
125
126    template &lt;class T>
127    struct observable
128    {
129        composite_subscription subscribe(subscriber&lt;T> out);
130    };
131}
132</code></pre>
133<p>Note that the OnError event returns an exception_ptr type. The example above shows passing the error to a handler function.</p>
134<p>You can treat the observable sequence (such as a sequence of mouse-over events) as if it were a normal collection. Thus you can write LINQ queries over the collection to do things like filtering, grouping, composing, etc. To make observable sequences more useful, the Rx header-only library provides many factory LINQ operators so that you do not need to implement any of these on your own. This will be covered in the Querying Observable Collections using LINQ Operators topic.</p>
135
136<h3>See Also</h3>
137Creating and Subscribing to Simple Observable Sequences
138Querying Observable Collections using LINQ Operators
139
140<h1>Creating and Querying Observable Sequences</h1>
141<p>This section describes how you can create and subscribe to an observable sequence, convert an existing C++ event into a sequence and query it. </p>
142
143<h3>In This Section</h3>
144<p>Creating and Subscribing to Simple Observable Sequences<br />
145Querying Observable Collections using LINQ Operators</p>
146
147<h1>Creating and Subscribing to Simple Observable Sequences</h1>
148<p>You do not need to implement the Observable interface manually to create an observable sequences. Similarly, you do not need to implement Observer either to subscribe to a sequence. By installing the Reactive Extension header-only library, you can take advantage of the Observable type which provides many LINQ operators for you to create a simple sequence with zero, one or more elements. In addition, Rx provides Subscribe methods that take various combinations of OnNext, OnError and OnCompleted handlers in terms of delegates.</p>
149
150<h2>Creating and subscribing to a simple sequence</h2>
151<p>The following sample uses the range operator of the Observable type to create a simple observable collection of numbers. The observer subscribes to this collection using the Subscribe method of the Observable class, and provides actions that are delegates which handle OnNext, OnCompleted and OnError. </p>
152<p>The range operator has several overloads. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards.  </p>
153<p>As soon as the subscription happens, the values are sent to the observer. The OnNext delegate then prints out the values.</p>
154<pre><code>
155    auto values1 = rxcpp::observable&lt;>::range(1, 5);
156    values1.
157        subscribe(
158            [](int v){printf("OnNext: %d\n", v);},
159            [](){printf("OnCompleted\n");});
160</code></pre>
161<p>When an observer subscribes to an observable sequence, the thread calling the subscribe method can be different from the thread in which the sequence runs till completion. Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the Using Schedulers topic.</p>
162<p>Notice that the subscribe method returns a Disposable, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the Dispose method on the observable sequence, the observer will stop listening to the observable for data. Normally, you do not need to explicitly call Dispose unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. When the Disposable instance is collected by the garbage collector, Rx does not automatically dispose of the subscription. However, note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an OnCompleted or OnError messages is published). </p>
163<p>In addition to creating an observable sequence from scratch, you can convert existing enumerators, C++ events and asynchronous patterns into observable sequences. The other topics in this section will show you how to do this. </p>
164<p>Notice that this topic only shows you a few operators that can create an observable sequence from scratch. To learn more about other LINQ operators, see Query Observable Collections using LINQ Operators.</p>
165
166<h2>Converting an Enumerable Collection to an Observable Sequence</h2>
167<p>Using the Iterate operator, you can convert an array colection to an observable sequence and subscribe to it. </p>
168<pre><code>
169    std::array&lt; int, 3 > a={{1, 2, 3}};
170    auto values1 = rxcpp::observable&lt;>::iterate(a);
171    values1.
172        subscribe(
173            [](int v){printf("OnNext: %d\n", v);},
174            [](){printf("OnCompleted\n");});
175</code></pre>
176
177<h3>See Also</h3>
178Query Observable Collections using LINQ Operators
179
180<h1>Querying Observable Sequences using LINQ Operators</h1>
181<p>We have converted existing C++ events into observable sequences to subscribe to them. In this topic, we will look at the first-class nature of observable sequences as Observable objects, in which generic LINQ operators are supplied by the Rx header-only library to manipulate these objects. Most operators take an observable sequence and perform some logic on it and output another observable sequence. In addition, as you can see from our code samples, you can even chain multiple operators on a source sequence to tweak the resulting sequence to your exact requirement. </p>
182
183<h2>Using Different Operators</h2>
184<p>We have already used the Create and Generate operators in previous topics to create and return simple sequences. In this topic, we will use other LINQ operators of the Observable type so that you can filter, group and transform data. Such operators take observable sequence(s) as input, and produce observable sequence(s) as output.</p>
185
186<h2>Combining different sequences</h2>
187<p>In this section, we will examine some of the operators that combine various observable sequences into a single observable sequence. Notice that data are not transformed when we combine sequences. </p>
188<p>In the following sample, we use the concat operator to combine two sequences into a single sequence and subscribe to it. </p>
189<pre><code>
190    auto values = rxcpp::observable&lt;>::range(1); // infinite (until overflow) stream of integers
191
192    auto s1 = values.
193        take(3).
194        map([](int prime) { return std::make_tuple("1:", prime);});
195
196    auto s2 = values.
197        take(3).
198        map([](int prime) { return std::make_tuple("2:", prime);});
199
200    s1.
201        concat(s2).
202        subscribe(rxcpp::util::apply_to(
203            [](const char* s, int p) {
204                printf("%s %d\n", s, p);
205            }));
206</code></pre>
207<p>Notice that the resultant sequence is </p>
208<pre>
2091: 1
2101: 2
2111: 3
2122: 1
2132: 2
2142: 3
215</pre>
216<p>This is because when you use the concat operator, the 2nd sequence (source2) will not be active until after the 1st sequence (source1) has finished pushing all its values. It is only after source1 has completed, then source2 will start to push values to the resultant sequence. The subscriber will then get all the values from the resultant sequence. </p>
217<p>Compare this with the merge operator. If you run the following sample code, you will get </p>
218<pre>
2191: 1
2202: 1
2211: 2
2222: 2
2231: 3
2242: 3
225</pre>
226<p>This is because the two sequences are active at the same time and values are pushed out as they occur in the sources. The resultant sequence only completes when the last source sequence has finished pushing values. </p>
227<p>Notice that for Merge to work, all the source observable sequences need to be of the same type of Observable. The resultant sequence will be of the type Observable. If source1 produces an OnError in the middle of the sequence, then the resultant sequence will complete immediately.</p>
228<pre><code>
229    auto values = rxcpp::observable&lt;>::range(1); // infinite (until overflow) stream of integers
230
231    auto s1 = values.
232        map([](int prime) { return std::make_tuple("1:", prime);});
233
234    auto s2 = values.
235        map([](int prime) { return std::make_tuple("2:", prime);});
236
237    s1.
238        merge(s2).
239        take(6).
240        as_blocking().
241        subscribe(rxcpp::util::apply_to(
242            [](const char* s, int p) {
243                printf("%s %d\n", s, p);
244            }));
245</code></pre>
246
247<h1>Subjects</h1>
248<p>This section describes the Subject type implemented by Reactive Extensions. It also describes various implementations of Subject which serves different purposes. </p>
249<h3>In This Section</h3>
250<p>1.	Using Subjects</p>
251
252<h1>Using Subjects</h1>
253<p>The Subject type implements both Observable and Observer, in the sense that it is both an observer and an observable. You can use a subject to subscribe all the observers, and then subscribe the subject to a backend data source. In this way, the subject can act as a proxy for a group of subscribers and a source. You can use subjects to implement a custom observable with caching, buffering and time shifting. In addition, you can use subjects to broadcast data to multiple subscribers. </p>
254<p>By default, subjects do not perform any synchronization across threads. They do not take a scheduler but rather assume that all serialization and grammatical correctness are handled by the caller of the subject.  A subject simply broadcasts to all subscribed observers in the thread-safe list of subscribers. Doing so has the advantage of reducing overhead and improving performance. If, however, you want to synchronize outgoing calls to observers using a scheduler, you can use the Synchronize method to do so.</p>
255
256<h2>Different types of Subjects</h2>
257<p>The Subject type in the Rx library is a basic implementation of the Subject interface (you can also implement the Subject interface to create your own subject types). There are other implementations of Subject that offer different functionalities. All of these types store some (or all of) values pushed to them via OnNext, and broadcast it back to its observers. This means that if you subscribe to any of these more than once (i.e. subscribe -> unsubscribe -> subscribe again), you will see at least one of the same value again. </p>
258
259<h1>Scheduling and Concurrency</h1>
260<p>This section describes how you can use a scheduler to control when to start a sequence or subscribe to an event. </p>
261
262<h1>Scheduler Types</h1>
263<p>The various Scheduler types provided by Rx are:</p>
264<p>ImmediateScheduler: Default scheduler, pushes notifications as they are recieved.</p>
265<p>EventLoopScheduler: Used when creating a separate thread for Rx sequences.</p>
266
267<h1>Using Schedulers</h1>
268<p>A scheduler controls when a subscription starts and when notifications are published. It consists of three components. It is first a data structure. When you schedule for tasks to be completed, they are put into the scheduler for queueing based on priority or other criteria. It also offers an execution context which denotes where the task is executed (e.g., in the thread pool, current thread, or in another app domain). Lastly, it has a clock which provides a notion of time for itself (by accessing the Now property of a scheduler). Tasks being scheduled on a particular scheduler will adhere to the time denoted by that clock only.</p>
269
270<h2>Using Schedulers</h2>
271<p>You may have already used schedulers in your Rx code without explicitly stating the type of schedulers to be used. This is because all Observable operators that deal with concurrency have multiple overloads. If you do not use the overload which takes a scheduler as an argument, Rx will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen.  For example, for operators returning an observable with a finite and small number of messages, Rx calls ImmediateScheduler.  For operators returning a potentially large or infinite number of messages, CurrentThread is called.</p>
272<p>In the following example, the source observable sequences are each running in their own threads using EventLoopScheduler.</p>
273<pre><code>
274    auto threads = rxcpp::observe_on_event_loop();
275
276    auto values = rxcpp::observable&lt;>::range(1); // infinite (until overflow) stream of integers
277
278    auto s1 = values.
279        subscribe_on(threads).
280        map([](int prime) { std::this_thread::yield(); return std::make_tuple("1:", prime);});
281
282    auto s2 = values.
283        subscribe_on(threads).
284        map([](int prime) { std::this_thread::yield(); return std::make_tuple("2:", prime);});
285
286    s1.
287        merge(s2).
288        take(6).
289        observe_on(threads).
290        as_blocking().
291        subscribe(rxcpp::util::apply_to(
292            [](const char* s, int p) {
293                printf("%s %d\n", s, p);
294            }));
295</code></pre>
296<p>This will queue up on the observer quickly. This code by uses the observe_on operator, which allows you to specify the context that you want to use to send pushed notifications (OnNext) to observers. By default, the observe_on operator ensures that OnNext will be called as many times as possible on the current thread. You can use its overloads and redirect the OnNext outputs to a different context. In addition, you can use the subscribe_on operator to return a proxy observable that delegates actions to a specific scheduler. For example, for a UI-intensive application, you can delegate all background operations to be performed on a scheduler running in the background by using subscribe_on and passing to it a Concurrency.EventLoopScheduler. </p>
297<p>You should also note that by using the observe_on operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. If you have a query that composes various observable sequences running on many different execution contexts, and you are doing filtering in the query, it is best to place observe_on later in the query. This is because a query will potentially filter out a lot of messages, and placing the observe_on operator earlier in the query would do extra work on messages that would be filtered out anyway. Calling the observe_on operator at the end of the query will create the least performance impact.</p>
298
299<h1>Implementing Your Own Operators for Observable</h1>
300<p>You can extend Rx by adding new operators for operations that are not provided by the LINQ library, or by creating your own implementation of standard query operators to improve readability and performance. Writing a customized version of a standard LINQ operator is useful when you want to operate with in-memory objects and when the intended customization does not require a comprehensive view of the query.</p>
301
302<h2>Creating New Operators</h2>
303<p>LINQ offers a full set of operators that cover most of the possible operations on a set of entities. However, you might need an operator to add a particular semantic meaning to your query—especially if you can reuse that same operator several times in your code. </p>
304<p>By reusing existing LINQ operators when you build a new one, you can take advantage of the existing performance or exception handling capabilities implemented in the Rx libraries.</p>
305<p>When writing a custom operator, it is good practice not to leave any disposables unused; otherwise, you may find that resources could actually be leaked and cancellation may not work correctly.</p>
306
307<h2>Customizing Existing Operators</h2>
308<p>Adding new operators to LINQ is a way to extend its capabilities. However, you can also improve code readability by wrapping existing operators into more specialized and meaningful ones.</p>
309