layout: true --- class: middle .center[ # Introduction to ReactiveX in C++ (rxcpp) .accent[ ### algorithms for values distributed in time ] ]
??? > * --- #### DEMO .center[ .image-80[ ![twitter application](content/twitter_with_word_sentiment.gif) ] ] ??? > * twitter stream api > * sample of live tweets. > * sentiment analysis > * words (wanna, love) --- # topics - .accent[### values distributed in time] - .accent[### write an algorithm] - .accent[### virtuous procrastination] - .accent[### opt-in thread-safety] - .accent[### delegate task execution] - .accent[### adapt existing sources] ??? > * --- # values distributed in time - .accent[ ## time vs. space] - .accent[ ## flow of a subscription] - .accent[ ## when to apply] ??? > * --- # my son says: "space vs time is explained by playing cards" .center[ .image-70[ ![poker game](content/poker-gremlin.gif) ] ] ??? > * kiran's idea --- # each player has 0 or more cards .center[ .image-50[ ![poker hand](content/poker-game-264599_1280.jpg) ] cards in the hand can be processed now .accent[ array and list are __values distributed in space__ ] ] ??? > * --- # the dealer distributes cards in time .center[ .image-60[ ![poker deck](content/poker-deck-875295_1280.jpg) ] cards in the deck cannot be processed now .accent[ user input and IO are __values distributed in time__ ] ] ??? > * --- # values distributed in time .accent[ ### marble diagrams are used to describe values distributed in time.]
??? > * --- # flow of a subscription .accent[ ### how not to get wet! ] .center[ .image-60[ ![water balloons](content/water_balloons.gif) ] ] ??? > * --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` ] .column[ - .accent[demonstrate the contract of a subscription] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end alt failure observable --x subscriber: on_error() else completion observable --x subscriber: on_completed() end deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] - .accent[### on_error is the last call that a subscriber will receive] - .accent[### on_completed is the last call that a subscriber will receive] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end alt failure observable --x subscriber: on_error() else completion observable --x subscriber: on_completed() end subscriber -->> subscriber: unsubscribe() deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] - .accent[### on_error is the last call that a subscriber will receive] - .accent[### on_completed is the last call that a subscriber will receive] - .accent[### unsubscribe is the destructor for the subscription lifetime] ] ] ??? > * --- # values distributed in time .accent[ ## subcriptions are useful to..] - .accent[ ### defer work] - .accent[ ### collect values and process periodically] - .accent[ ### provide intermediate results] - .accent[ ### combine values from multiple sources] ??? > * --- .split-70[ .column[ .image-80[ ![](content/values distributed in time-COMPLETED-green.svg) ] - .accent[ ## demonstrated values distributed in time] - .accent[ ## described flow of a subscription] - .accent[ ## described when to use a subscription] ] .column[ .right[ ![poker game](content/poker-gremlin.gif) ![water balloons](content/water_balloons.gif)
### .accent[__next >>__] write an algorithm ] ] ] ??? > * --- # write an algorithm .center[ ![honda rube goldberg](content/honda rube.gif) ] ??? > * each step is an algorithm for values distributed in time > * a value from the previous step arrives and is transformed into a new value > * the new value travels in time to the next algorithm --- # write an algorithm - transform .accent[ `transform` calls a function with each value that arrives and passes out the result of the function. ]
> `map` is a common alias for `transform`. ??? > * --- # write an algorithm - transform .split-60[ .column[ ``` auto map = [](auto selector){ ``` ] .column[ ] ] -- .split-60[ .column[ ``` return [=](auto in){ return create([=](auto out){ ``` ] .column[ - .accent[an operator is a function that takes an observable and returns an observable] ] ] -- .split-60[ .column[ ``` return in | subscribe( out.get_subscription(), [](auto v){ out.on_next(`selector(v)`); }, ``` ] .column[ - .accent[values from `in` are transformed by `selector` and the result passed to `out`] ] ] -- .split-60[ .column[ ``` [](exception_ptr ep){out.on_error(ep);}, []() {out.on_completed();} ); }); }; }; ``` ] .column[ - .accent[`on_error` and `on_completed` are passed to `out` unchanged] ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` -- .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable deactivate map observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() App subscriber -->> App subscriber: unsubscribe() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() App subscriber -->> App subscriber: unsubscribe() deactivate map observable map subscriber -->> map subscriber: unsubscribe() deactivate range observable ] ] ??? > * --- .split-60[ .column[ ![](content/write an algorithm-COMPLETED-green.svg) - .accent[ ## created transform algorithm] - .accent[ ## used transform algorithm to change values from long to string] - .accent[ ## showed subscription flow through algorithm] ] .column[ .right[ .image-80[ ![honda rube goldberg](content/honda rube.gif) ]
### .accent[__next >>__] virtuous procrastination ] ] ] ??? > * --- # virtuous procrastination .center[ .image-70[ ![](content/lemur on photographer.gif) ] ] ??? > * defer is virtuous procrastination as a survival trait > * deferral is used for observable and scheduler --- # how to get the json from stream.twitter.com -- .split-60[ .column[ ``` auto requesttwitterstream = `defer`([=](){ auto url = oauth2SignUrl("https://stream.twitter..."); ``` ] .column[ - .accent[`defer` is used to call `oauth2SignUrl` each time the request is repeated] ] ] ??? > * `defer` is used to call `oauth2SignUrl` each time the request is repeated -- .split-60[ .column[ ``` return http.create(http_request{url, method, {}, {}}) | ``` ] .column[ - .accent[`create` an observable that will start a request when `subscribe` is called] ] ] -- .split-60[ .column[ ``` map([](http_response r){ return r.body.chunks; }) | ``` ] .column[ - .accent[`chunks` is an `observable
` that emits parts of the body as they arrive] ] ] ??? > * only keep the strings from the request body -- .split-60[ .column[ ``` merge(tweetthread); }) | twitter_stream_reconnection(tweetthread); ``` ] .column[ - .accent[emit all tweets on a dedicated thread] - .accent[`twitter_stream_reconnection` implements the twitter reconnect protocol for errors] ] ] ??? > * emit all the tweets on the specified thread > * twitter_stream_reconnection - new operator to retry the request on failure --- # how to handle twitter retry protocol -- .split-60[ .column[ ``` auto twitter_stream_reconnection = [](auto tweetthread){ return [=](observable
chunks){ return chunks | ``` ] .column[ - .accent[an operator is a function that takes an observable and returns an observable] ] ] ??? > * an operator is a function that takes an observable and returns an observable -- .split-60[ .column[ ``` `timeout`(90s, tweetthread) | ``` ] .column[ - .accent[first rule is to reconnect if nothing has arrived for 90 seconds] ] ] ??? > * error when no string has been appended to the body in 90 seconds -- .split-60[ .column[ ``` on_error_resume_next([=](exception_ptr ep) { try {rethrow_exception(ep); } catch (const http_exception& ex) { return twitterRetryAfterHttp(ex); } catch (const timeout_error& ex) { return `empty
()`; } return error
(ep, tweetthread); }) | ``` ] .column[ - .accent[`twitterRetryAfterHttp` returns an observable that completes after a time (based on the rules)] - .accent[`timeout_error` should reconnect now]
- .accent[unhandled errors are re-thrown] ] ] ??? > * catches errors and returns a new observable to use > * the returned observable might complete instantly > * the returned observable might complete after a delay -- .split-60[ .column[ ``` repeat(); }; }; ``` ] .column[ - .accent[when the stream completes, repeat the request] ] ] ??? > * when the chunks observable is completed, make a new http request. --- .split-60[ .column[ ![](content/virtuous procrastination-COMPLETED-green.svg) .accent[ ## __defer__ work] .accent[ ## so that work can be..] * .accent[ ## __repeated__] * .accent[ ## __retried__] * .accent[ ## __shared__] ] .column[ .right[ .image-80[ ![](content/lemur on photographer.gif) ]
### .accent[__next >>__] opt-in thread-safety ] ] ] ??? --- # opt-in thread-safety .center[ .image-60[ ![](content/snl conductor.gif) ] ] ??? > * --- # how to write a twitter app -- .split-60[ .column[ ``` auto tweets = twitterrequest(`tweetthread`, http) | parsetweets(`poolthread`, `tweetthread`) | publish() | ref_count(); // share ``` ] .column[ * .accent[ request and parse tweets] * .accent[ share parsed tweets] ] ] -- .split-60[ .column[ ``` auto models = iterate(actions /*, `currentthread`*/) | merge(`mainthread`)| scan(Model{}, [=](Model& m, auto f){ auto r = f(m); return r; }) | ``` ] .column[ * .accent[ actions - process tweets into model updates] * .accent[ run actions on mainthread] ] ] -- .split-60[ .column[ ``` sample_with_time(200ms, `mainthread`) | publish() | ref_count(); // share ``` ] .column[ * .accent[ update to the latest model every 200ms and share] ] ] -- .split-60[ .column[ ``` iterate(renderers /*, `currentthread`*/) | merge(/*`currentthread`*/) | subscribe
(); ``` ] .column[ * .accent[ renderers - process the latest model onto the screen] * .accent[ subscribe starts the app] ] ] ??? > * --- # how to batch calls to sentiment web service -- .split-60[ .column[ ``` auto sentimentaction = tweets | buffer_with_time(500ms, `tweetthread`) | ``` ] .column[ - .accent[ buffer tweets into a vector and emit the vector every 500ms] ] ] -- .split-60[ .column[ ``` filter([](vector
v){ return !v.empty(); }) | map([=](const vector
& buffy) { ``` ] .column[ - .accent[ignore empty vectors] ] ] -- .split-60[ .column[ ``` vector
text = buffy | view::transform(tweettext); ``` ] .column[ - .accent[ __range-v3__ is used to extract a vector of strings from the json] ] ] -- .split-60[ .column[ ``` return sentimentrequest(poolthread, http, text) | map([=](const string& body){ ``` ] .column[ - .accent[send the vector to get a vector of the sentiment of each] ] ] -- .split-60[ .column[ ``` auto sentiments = json::parse(body); auto combined = view::zip(sentiments, buffy); // . . . }); }); ``` ] .column[ - .accent[parse the sentiment vector from the json] - .accent[ __range-v3__ zips the tweet and sentiment vectors to match the tweet with the sentiment] ] ] ??? > * --- .split-60[ .column[ ![](content/opt--in thread--safety-COMPLETED-green.svg) - .accent[ ## described non-thread-safe scheduler default] - .accent[ ## specified thread-safe schedulers to coordinate __multiple__ streams] - .accent[ ## specified thread-safe schedulers to coordinate __time__ with streams] ] .column[ .right[ ![](content/snl conductor.gif)
### .accent[__next >>__] delegate task execution ] ] ] ??? > * --- # delegate task execution .center[ .image-70[ ![](content/fetch newspaper.gif) ] ] ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp) -- ``` class io_service : public scheduler_interface { asio::io_service& io_service_; class strand_worker : public worker_interface; public: explicit io_service(asio::io_service& io_service); ``` -- ``` virtual clock_type::time_point now() const { return `clock_type::now`(); } ``` -- ``` virtual worker create_worker(composite_subscription cs) const { return worker(move(cs), make_shared<`strand_worker`>(io_service_)); } }; ``` ??? > * factory for strand_worker > * use real time of underlying clock --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp) .split-60[ .column[ ``` class strand_worker : public worker_interface { mutable asio::strand strand_; public: explicit strand_worker(asio::io_service& io_service); virtual clock_type::time_point now() const { return `clock_type::now`(); } virtual void schedule(const schedulable& scbl) const; virtual void schedule(clock_type::time_point when, const schedulable& scbl) const; }; ``` ] .column[ - .accent[ ### a worker must ensure that only one schedulable is called at a time] - .accent[ ### this worker uses asio::strand to order the calls] - .accent[ ### this worker uses real time for now()] ] ] ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp) ``` virtual void schedule(const schedulable& scbl) const { if (!scbl.is_subscribed()) return; strand_.`post`([scbl] { if (scbl.`is_subscribed`()) { `scbl`(); } }); } ``` ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service.hpp) ``` virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (!scbl.is_subscribed()) return; auto diff_ms = duration_cast
(when - now()); auto timer = make_shared
(strand_.get_io_service()); timer->expires_from_now(diff_ms); timer->`async_wait`(strand_.wrap([timer, scbl](const system::error_code& ec) { if (!ec && scbl.`is_subscribed`()) { `scbl`(); } })); } ``` ??? > * --- # delegate task execution - .accent[ ## delegate tasks to; thread pool, event loop, actor, channel, etc..] - .accent[ ## implement a scheduler to delegate tasks] ``` auto now() -> time_point; ``` ``` auto schedule(schedulable what) -> void; ``` ``` auto schedule(time_point at, schedulable what) -> void; ``` ??? > * --- .split-60[ .column[ ![](content/delegate task execution-COMPLETED-green.svg) .accent[ ### described `scheduler` and `worker`] .accent[ ### implemented `scheduler` and `worker` to delegate tasks to `asio::io_service`] ] .column[ .right[ ![](content/fetch newspaper.gif)
### .accent[__next >>__] adapt async sources ] ] ] ??? > * --- background-image: url(content/drop%20cam.gif) # adapt async sources ??? > * --- # adapt async sources - http requests using libcurl -- - .accent[ ### `curl_multi_perform` supports multiplexing requests on a thread] -- - .accent[ ### all the calls to curl must be made from that thread] -- - .accent[ ### completion and results must be delivered to the matching request] ??? > * --- # adapt async sources - http requests using libcurl ``` auto worker = create
([](subscriber
out){ while(out.is_subscribed()) { `curl_multi_perform`(curlm, /*. . .*/); for(;;) { CURLMsg *message = nullptr; message = `curl_multi_info_read`(curlm, /*. . .*/); out.on_next(message); if (!!message /*. . .*/) { continue; } break; } int handlecount = 0; `curl_multi_wait`(curlm, nullptr, 0, 500, &handlecount); } out.on_completed(); }) | subscribe_on(`httpthread`) | publish() | connect_forever(); // share ``` ??? > * --- # adapt async sources - http requests using libcurl ``` auto worker = create
([](subscriber
out){ while(out.`is_subscribed`()) { curl_multi_perform(curlm, /*. . .*/); for(;;) { CURLMsg *message = nullptr; message = curl_multi_info_read(curlm, /*. . .*/); out.`on_next`(message); if (!!message /*. . .*/) { continue; } break; } int handlecount = 0; curl_multi_wait(curlm, nullptr, 0, 500, &handlecount); } out.`on_completed`(); }) | subscribe_on(httpthread) | publish() | connect_forever(); // share ``` ??? > * --- # adapt async sources - http requests using libcurl to create an http request, use `worker` observable to run the curl api calls on the httpthread. .split-50[ .column[ .accent[ subscribe to http request] ``` worker | take(1) | tap([] (CURLMsg*){ auto curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); // . . . curl_multi_add_handle(curlm, curl); // }) | subscribe(); ``` ] .column[ .accent[ unsubscribe http request (cancel)] ``` worker | take(1) | tap([] (CURLMsg*){ // // // // curl_multi_remove_handle(curlm, curl); curl_easy_cleanup(curl); }) | subscribe(); ``` ] ] ??? > * --- .split-60[ .column[ ![](content/adapt async sources-COMPLETED-green.svg) - .accent[ ## adapted libcurl to rxcpp] - .accent[ ## built __polling loop__ to adapt libcurl to rxcpp] - .accent[ ## __callback__, __future__, __completion port__, etc.. patterns can also be adapted to rxcpp] ] .column[ .right[ .image-50[ ![](content/drop cam.gif) ]
### .accent[__next >>__] recap ] ] ] ??? > * --- # recap ??? > * --- # recap .accent[ ## values distributed in time] .center[ .image-60[ ![](content/water_balloons.gif) ] ] ??? > * --- # recap .accent[ ## write an algorithm] .center[ .image-60[ ![](content/honda rube.gif) ] ] ??? > * --- # recap .accent[ ## virtuous procrastination] .center[ .image-60[ ![](content/lemur on photographer.gif) ] ] ??? > * --- # recap .accent[ ## opt-in thread-safety] .center[ .image-60[ ![](content/snl conductor.gif) ] ] ??? > * --- # recap .accent[ ## delegate task execution] .center[ .image-50[ ![](content/fetch newspaper.gif) ] ] ??? > * --- background-image: url(content/drop%20cam.gif) # recap .accent[ ## adapt existing sources] ??? > * --- # things I desire - .accent[ REST service library] - .accent[ http service library] - .accent[ bindings for ux libraries, like [https://github.com/tetsurom/rxqt](https://github.com/tetsurom/rxqt)] - .accent[ bindings for asio, like [https://github.com/pudae/example](https://github.com/pudae/example)] - .accent[ rxcpp v3 [https://github.com/kirkshoop/rxcppv3](https://github.com/kirkshoop/rxcppv3)] - .accent[ standardization] - .accent[ native algorithm support in libraries (asio, boost, poco, etc..)] ??? > * --- .split-40[ .column[ ## credits .accent[__Gor Nishanov__] introduced me to .accent[__Axel Naumann__] who arranged my visit. .accent[__Eric Mittelette__] shaped this presentation from start to finish. I am deeply grateful for all his time and effort. .accent[__Niall Connaughton__] presented the `RxJS` twitter analisys app that inspired me to build one in `rxcpp` .accent[__Aaron Lahman__] made the first prototype of rxcpp. .accent[__Grigoriy Chudnov__, __Valery Kopylov__] and many other rxcpp contributors.. ] .column[ ## resources - .accent[ [https://github.com/kirkshoop/twitter](https://github.com/kirkshoop/twitter)] - .accent[ [https://github.com/Reactive-Extensions/RxCpp](https://github.com/Reactive-Extensions/RxCpp)] - .accent[ [http://reactive-extensions.github.io/RxCpp/](http://reactive-extensions.github.io/RxCpp/)] - .accent[ [https://github.com/kirkshoop/rxcppv3](https://github.com/kirkshoop/rxcppv3)] - .accent[ [http://rxmarbles.com/](http://rxmarbles.com/)] - .accent[ [http://reactivex.io/intro.html](http://reactivex.io/intro.html)] - .accent[ [http://reactivex.io/learnrx/](http://reactivex.io/learnrx/)] ] ] ??? > * --- ##complete. questions?
??? > * --- # appendix ??? > * --- # rxcpp architechture .mermaid[ classDiagram subscription --o subscriber : 1 observer --o subscriber : 1 subscription : bool is_subscribed() subscription : void unsubscribe() observer : void on_next(T) observer : void on_error(exception_ptr) observer : void on_completed() subscriber : subscription get_subscription() subscriber : observer get_observer() subscriber : void on_next(T) subscriber : void on_error(exception_ptr) subscriber : void on_completed() subscriber : bool is_subscribed() subscriber : void unsubscribe() observable o-- subscriber : 0..n observable : subscription subscribe(subscriber) ] ??? > * --- # rxcpp scheduler architechture .mermaid[ classDiagram action --o schedulable : 1 subscription --o worker : 1 subscription --o schedulable : 1 subscription : bool is_subscribed() subscription : void unsubscribe() action : void operator()() schedulable : subscription get_subscription() schedulable : action get_schedulable() scheduler -- worker worker o-- schedulable : 0..n scheduler : time_point now() scheduler : worker create_worker(subscription) worker : scheduler get_scheduler() worker : time_point now() worker : subscription schedule(time_point at, schedulable) ] ??? > * --- # how to call sentiment web service -- ``` auto requestsentiment = defer([=]() { ``` -- ``` std::map
headers; headers["Content-Type"] = "application/json"; headers["Authorization"] = "Bearer " + key; ``` -- ``` auto body = json::parse( R"({"Inputs":{"input1":[{"tweet_text": "Hi!"}]},"GlobalParameters":{}})" ); ``` -- ``` return http.create(http_request{url, "POST", headers, body.dump()}) | map([](http_response r){ return r.body.complete; }) | `merge(poolthread)`; }); ``` ??? > *