layout: true --- class: middle .center[ # No raw std::thread! .accent[ ### Live Tweet Analysis in C++ ] ]
??? > * --- #### DEMO .center[ .image-80[ ![twitter application](content/twitter_with_word_sentiment.gif) ] ] ??? > * For 10 years I searched for the right abstraction to write responsive apps without sync primitives > * I once built a rudimentary Sender/Receiver pattern to stream xml+zip documents in Windows > * A co-worker on that project later pointed me to Rx > * In Rx I saw the pattern completed and used to build algorithms > * It was like the first time I saw the STL or the first time I read _Modern C++ Design_ > * I built Rx in c++ because I wanted to build responsive apps using Rx > * twitter stream api > * sample of live tweets. > * sentiment analysis > * words (wanna, love) --- # topics .split-60[ .column[ .accent[ #### pick the pattern] - .accent[ ### primitives are too primitive] - .accent[ ### handling Tweets] - .accent[ ### values distributed in time] - .accent[ ### write an algorithm] ] .column[ .accent[ #### use the pattern] - .accent[ ### virtuous procrastination] - .accent[ ### opt-in thread-safety] - .accent[ ### adapt existing sources] - .accent[ ### algorithmic trancendence] ] ] ??? > * --- # primitives are too primitive .center[ .image-60[ ![primitive technology](content/primitivetechnology.gif) ] .accent[primitive technology ] [https://www.youtube.com/channel/UCAL3JXZSzSm8AlZyD3nQdBA](https://www.youtube.com/channel/UCAL3JXZSzSm8AlZyD3nQdBA) ] ??? > * these videos are mesmerizing to watch and I yearn to build some of these things > * however, the time and cuts and blisters and pain mean that this is not the right way to build a house commercially. > * I expect that we all yearn to write an OS or a language or a lock-free queue > * However, I wrote Rx using async primitives because I do not want to write code using async primitives > * there is a reason to write C++ and use a linker to produce the machine code > * there is a reason to use STL and algorithms and libraries written by others > * assembly & intrinsics are the wrong abstractions. > * atomics, thread, async, mutex, condition_variable, promise, future --- # primitives are too primitive .center[ .image-60[ ![Sean Parent](content/seanparent.png) ] .accent[C++ Seasoning ] [https://channel9.msdn.com/Events/GoingNative/2013/Cpp-Seasoning](https://channel9.msdn.com/Events/GoingNative/2013/Cpp-Seasoning) ] ??? > * In 2013, I attended Sean Parent's C++ Seasoning talk at Going Native where he advocated for having no raw for loops in code. > * One of the things Sean said was that when reviewing code he would try to find the algorithms that each for loop was reimplementing - often poorly - and ask for the loop to be replaced by the algorithms. > * Later in the talk Sean added another goal - no raw syncronization primitives. --- .split-60[ .column[ ![](content/primitives are too primitive-COMPLETED-green.svg) - .accent[ ## avoid reimplementing algorithms] - .accent[ ## avoid using synchronization primitives] ] .column[ .right[ ![](content/primitivetechnology.gif)
### .accent[__next >>__] handling Tweets ] ] ] ??? > * --- # handling Tweets .center[ .image-70[ ![robin hood](content/robinhood.gif) ] ] ??? > * --- # handling Tweets .accent[__callback per Tweet__ ] .center[ .image-50[ ![converters](content/converters.jpg) ] ] ??? > * awkward to chain together > * no consistent handling of errors. cancellation. completion. > * easy to lose signals in each connection --- # handling Tweets .accent[__promise per Tweet__ ] .center[ .image-60[ ![egg relay](content/Egg_Relay_Race_2015.gif) ] .accent[ ] [https://www.youtube.com/watch?v=JyVSmP-MKgY](https://www.youtube.com/watch?v=JyVSmP-MKgY) ] ??? > * awkward syncronization to transfer each value from each future to the next promise. --- # handling Tweets .accent[__promise per Tweet__ ] .split-50[ .column[ ``` auto onnext = [&c](int){++c;}; for (int i = 0; i < 10000000; i++) { std::promise
ready; ready.set_value(unit()); auto isready = ready.get_future(); if (isready.wait_for(0ms) == timeout) { isready.wait(); } onnext(0); } ``` ] .column[ .accent[ ] - 10,000,000 on_next calls - 3018ms elapsed - .accent[3,313,450] ops/sec ] ] ??? --- # handling Tweets .accent[__subscription to all Tweets__ ] .center[ .image-70[ ![robin hood](content/robinhood.gif) ] .accent[ ] ] --- # handling Tweets .accent[__subscription to all Tweets__ ] .split-50[ .column[ ``` auto o = rx::make_subscriber
( [&c](int){++c;}, [](std::exception_ptr){abort();}); for (int i = 0; i < 10000000; i++) { o.on_next(i); } o.on_completed(); ``` ] .column[ .accent[ ] - 10,000,000 on_next calls - 17ms elapsed - .accent[588,235,000] ops/sec ] ] ??? > * subscriber does contract enforcement --- # handling Tweets .accent[raw callbacks ] ``` void parseline(const string& line, auto& handlers) { try { auto text = tweettext(json::parse(line)); auto words = splitwords(text); // publish tweets - multicast for(auto& f : handlers) { `f(text, words)`; } } catch (const exception& ex){ cerr << ex.what() << endl; } } ``` ??? > * each tweet is parsed and passed to all the handlers > * callbacks are harder to compose > * callback missing lifetime and procrastination --- # handling Tweets .accent[raw promises ] .split-50[ .column[ w/o coroutines ``` `each`(objProc.begin(), objProc.end(), [&](It
c, It
e) { // split chunks and group into tweets string chunk = partial + *c; partial.clear(); string line; for (auto& fragment : split(chunk, "\r\n")){ if (!isEndOfTweet(fragment)) { partial = line = fragment; continue; } partial.clear(); `parseline`(line, handlers); } }); ``` ] .column[ coroutines ``` `for await` (auto& c : objProc) { // split chunks and group into tweets string chunk = partial + *c; partial.clear(); string line; for (auto& fragment : split(chunk, "\r\n")){ if (!isEndOfTweet(fragment)) { partial = line = fragment; continue; } partial.clear(); `parseline`(line, handlers); } } ``` ] ] ??? > * each will map to for-await > * for-await will over-serialize > * new promise created for each value > * composition requires additional concepts and code --- # handling Tweets .accent[subscription ] ``` auto tweets = defer([=](){ auto url = oauth2SignUrl("https://stream.twitter..."); return http.create(http_request{url, method, {}, {}}) | map([](http_response r){ return r.body.chunks; }) | merge(tweetthread); }) | twitter_stream_reconnection(tweetthread) | parsetweets(poolthread, tweetthread) | publish() | ref_count(); ``` ??? > * good cmoposition > * good efficiency > * more descriptive code --- .split-60[ .column[ ![](content/handling Tweets-COMPLETED-green.svg) - .accent[ ## mentioned callback composition] - .accent[ ## showed promise and subscription costs] - .accent[ ## showed examples of each] ] .column[ .right[ ![](content/robinhood.gif)
### .accent[__next >>__] values distributed in time ] ] ] ??? > * --- # values distributed in time .accent[ ### my son says: "space vs time is explained by playing cards"] .center[ .image-60[ ![poker game](content/poker-gremlin.gif) ] ] ??? > * kiran's idea --- # values distributed in time .accent[ ### each player has 0 or more cards] .center[ .image-40[ ![poker hand](content/poker-game-264599_1280.jpg) ] ] ??? > * cards in the hand can be processed now > * array and list are __values distributed in space__ --- # values distributed in time .accent[ ### the dealer distributes cards in time] .center[ .image-50[ ![poker deck](content/poker-deck-875295_1280.jpg) ] ] ??? > * cards in the deck cannot be processed now > * user input and IO are __values distributed in time__ --- # values distributed in time .accent[ ### marble diagrams are used to describe values distributed in time.]
??? > * --- # flow of a subscription .accent[ ### how not to get wet! ] .center[ .image-60[ ![water balloons](content/water_balloons.gif) ] ] ??? > * --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` ] .column[ - .accent[demonstrate the contract of a subscription] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end alt failure observable --x subscriber: on_error() else completion observable --x subscriber: on_completed() end deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] - .accent[### on_error is the last call that a subscriber will receive] - .accent[### on_completed is the last call that a subscriber will receive] ] ] --- .split-60[ .column[ ``` observable | subscribe(subscriber); ``` .center[ .mermaid[ sequenceDiagram participant observable participant subscriber participant App App->> observable: subscribe(subscriber) activate observable loop value observable -->> subscriber: on_next() end alt failure observable --x subscriber: on_error() else completion observable --x subscriber: on_completed() end subscriber -->> subscriber: unsubscribe() deactivate observable ] ] ] .column[ - .accent[demonstrate the contract of a subscription] - .accent[### observables defer work] - .accent[### calls to the subscriber will never overlap in time] - .accent[### on_error is the last call that a subscriber will receive] - .accent[### on_completed is the last call that a subscriber will receive] - .accent[### unsubscribe is the destructor for the subscription lifetime] ] ] ??? > * --- .split-70[ .column[ .image-80[ ![](content/values distributed in time-COMPLETED-green.svg) ] - .accent[ ## demonstrated values distributed in time] - .accent[ ## described flow of a subscription] - .accent[ ## subcriptions are useful to..] - .accent[ ### defer work] - .accent[ ### provide intermediate results] - .accent[ ### combine values from multiple sources] ] .column[ .right[ ![poker game](content/poker-gremlin.gif) ![water balloons](content/water_balloons.gif)
### .accent[__next >>__] write an algorithm ] ] ] ??? > * --- # write an algorithm .center[ ![honda rube goldberg](content/honda rube.gif) ] ??? > * each step is an algorithm for values distributed in time > * a value from the previous step arrives and is transformed into a new value > * the new value travels in time to the next algorithm --- # write an algorithm - transform .accent[ `transform` calls a function with each value that arrives and passes out the result of the function. ]
> `map` is a common alias for `transform`. ??? > * --- # write an algorithm - transform .split-60[ .column[ ``` auto map = [](auto selector){ ``` ] .column[ ] ] -- .split-60[ .column[ ``` return [=](auto in){ return create([=](auto out){ ``` ] .column[ - .accent[an operator is a function that takes an observable and returns an observable] ] ] -- .split-60[ .column[ ``` return in | subscribe( out.get_subscription(), [](auto v){ out.on_next(`selector(v)`); }, ``` ] .column[ - .accent[values from `in` are transformed by `selector` and the result passed to `out`] ] ] -- .split-60[ .column[ ``` [](exception_ptr ep){out.on_error(ep);}, []() {out.on_completed();} ); }); }; }; ``` ] .column[ - .accent[`on_error` and `on_completed` are passed to `out` unchanged] ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` -- .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable deactivate map observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() App subscriber -->> App subscriber: unsubscribe() deactivate map observable deactivate range observable ] ] ??? > * --- ``` range(2, 2) | map([](long l){return to_string(l);}) | subscribe(); ``` .center[ .mermaid[ sequenceDiagram participant range observable participant map subscriber participant map observable participant App subscriber participant App App ->> map observable: subscribe(App subscriber) activate map observable map observable ->> range observable: subscribe(map subscriber) activate range observable range observable -->> map subscriber: on_next(2) map subscriber -->> App subscriber: on_next(to_string(2)) range observable --x map subscriber: on_completed() map subscriber --x App subscriber: on_completed() App subscriber -->> App subscriber: unsubscribe() deactivate map observable map subscriber -->> map subscriber: unsubscribe() deactivate range observable ] ] ??? > * --- .split-60[ .column[ ![](content/write an algorithm-COMPLETED-green.svg) - .accent[ ## created transform algorithm] - .accent[ ## used transform algorithm to change values from long to string] - .accent[ ## showed subscription flow through algorithm] ] .column[ .right[ .image-80[ ![honda rube goldberg](content/honda rube.gif) ]
### .accent[__next >>__] virtuous procrastination ] ] ] ??? > * --- # virtuous procrastination .center[ .image-70[ ![](content/lemur on photographer.gif) ] ] ??? > * defer is virtuous procrastination as a survival trait > * deferral is used for observable and scheduler --- # how to get the json from stream.twitter.com -- .split-60[ .column[ ``` auto requesttwitterstream = `defer`([=](){ auto url = oauth2SignUrl("https://stream.twitter..."); ``` ] .column[ - .accent[`defer` is used to call `oauth2SignUrl` each time the request is repeated] ] ] ??? > * `defer` is used to call `oauth2SignUrl` each time the request is repeated -- .split-60[ .column[ ``` return http.create(http_request{url, method, {}, {}}) | ``` ] .column[ - .accent[`create` an observable that will start a request when `subscribe` is called] ] ] -- .split-60[ .column[ ``` map([](http_response r){ return r.body.chunks; }) | ``` ] .column[ - .accent[`chunks` is an `observable
` that emits parts of the body as they arrive] ] ] ??? > * only keep the strings from the request body -- .split-60[ .column[ ``` merge(tweetthread); }) | twitter_stream_reconnection(tweetthread); ``` ] .column[ - .accent[emit all tweets on a dedicated thread] - .accent[`twitter_stream_reconnection` implements the twitter reconnect protocol for errors] ] ] ??? > * emit all the tweets on the specified thread > * twitter_stream_reconnection - new operator to retry the request on failure --- # how to handle twitter retry protocol -- .split-60[ .column[ ``` auto twitter_stream_reconnection = [](auto tweetthread){ return [=](observable
chunks){ return chunks | ``` ] .column[ - .accent[an operator is a function that takes an observable and returns an observable] ] ] ??? > * an operator is a function that takes an observable and returns an observable -- .split-60[ .column[ ``` `timeout`(90s, tweetthread) | ``` ] .column[ - .accent[first rule is to reconnect if nothing has arrived for 90 seconds] ] ] ??? > * error when no string has been appended to the body in 90 seconds -- .split-60[ .column[ ``` on_error_resume_next([=](exception_ptr ep) { try {rethrow_exception(ep); } catch (const http_exception& ex) { return twitterRetryAfterHttp(ex); } catch (const timeout_error& ex) { return `empty
()`; } return error
(ep, tweetthread); }) | ``` ] .column[ - .accent[`twitterRetryAfterHttp` returns an observable that completes after a time (based on the rules)] - .accent[`timeout_error` should reconnect now]
- .accent[unhandled errors are re-thrown] ] ] ??? > * catches errors and returns a new observable to use > * the returned observable might complete instantly > * the returned observable might complete after a delay -- .split-60[ .column[ ``` repeat(); }; }; ``` ] .column[ - .accent[when the stream completes, repeat the request] ] ] ??? > * when the chunks observable is completed, make a new http request. --- .split-60[ .column[ ![](content/virtuous procrastination-COMPLETED-green.svg) .accent[ ## __defer__ work] .accent[ ## so that work can be..] * .accent[ ## __repeated__] * .accent[ ## __retried__] * .accent[ ## __shared__] ] .column[ .right[ .image-80[ ![](content/lemur on photographer.gif) ]
### .accent[__next >>__] opt-in thread-safety ] ] ] ??? --- # opt-in thread-safety .center[ .image-60[ ![](content/snl conductor.gif) ] ] ??? > * --- # how to write a twitter app -- .split-60[ .column[ ``` auto tweets = twitterrequest(`tweetthread`, http) | parsetweets(`poolthread`, `tweetthread`) | publish() | ref_count(); // share ``` ] .column[ * .accent[ request and parse tweets] * .accent[ share parsed tweets] ] ] -- .split-60[ .column[ ``` auto models = iterate(actions /*, `currentthread`*/) | merge(`mainthread`)| scan(Model{}, [=](Model& m, auto f){ auto r = f(m); return r; }) | ``` ] .column[ * .accent[ actions - process tweets into model updates] * .accent[ run actions on mainthread] ] ] -- .split-60[ .column[ ``` sample_with_time(200ms, `mainthread`) | publish() | ref_count(); // share ``` ] .column[ * .accent[ update to the latest model every 200ms and share] ] ] -- .split-60[ .column[ ``` iterate(renderers /*, `currentthread`*/) | merge(/*`currentthread`*/) | subscribe
(); ``` ] .column[ * .accent[ renderers - process the latest model onto the screen] * .accent[ subscribe starts the app] ] ] ??? > * --- # how to batch calls to sentiment web service -- .split-60[ .column[ ``` auto sentimentaction = tweets | buffer_with_time(500ms, `tweetthread`) | ``` ] .column[ - .accent[ buffer tweets into a vector and emit the vector every 500ms] ] ] -- .split-60[ .column[ ``` filter([](vector
v){ return !v.empty(); }) | map([=](const vector
& buffy) { ``` ] .column[ - .accent[ignore empty vectors] ] ] -- .split-60[ .column[ ``` vector
text = buffy | view::transform(tweettext); ``` ] .column[ - .accent[ __range-v3__ is used to extract a vector of strings from the json] ] ] -- .split-60[ .column[ ``` return sentimentrequest(poolthread, http, text) | map([=](const string& body){ ``` ] .column[ - .accent[send the vector to get a vector of the sentiment of each] ] ] -- .split-60[ .column[ ``` auto sentiments = json::parse(body); auto combined = view::zip(sentiments, buffy); // . . . }); }); ``` ] .column[ - .accent[parse the sentiment vector from the json] - .accent[ __range-v3__ zips the tweet and sentiment vectors to match the tweet with the sentiment] ] ] ??? > * --- # Azure Machine Learning .center[ .image-60[ ![](content/dashboard.png)] ] ??? > * --- # Azure Machine Learning .accent[ __Request__] ```http POST /subscriptions/
/services/
/execute HTTP/1.1 Host: ussouthcentral.services.azureml.net Connection: keep-alive Content-Length: 148 Authorization: Bearer
{"Inputs":{"input1":[{"tweet_text":"..."}]},"GlobalParameters":{}} ``` .accent[ __Response__] ```http HTTP/1.1 200 OK Content-Length: 77 Content-Type: application/json; format=swagger; charset=utf-8 {"Results":{"output1":[{"Sentiment":"positive","Score":"0.87877231836319"}]}} ``` ??? > * --- # Azure Machine Learning .center[ .image-60[ ![](content/experiment.png)] ] ??? > * --- .split-60[ .column[ ![](content/opt--in thread--safety-COMPLETED-green.svg) - .accent[ ## described non-thread-safe scheduler default] - .accent[ ## specified thread-safe schedulers to coordinate __multiple__ streams] - .accent[ ## specified thread-safe schedulers to coordinate __time__ with streams] ] .column[ .right[ ![](content/snl conductor.gif)
### .accent[__next >>__] adapt asyc sources ] ] ] ??? > * --- background-image: url(content/drop%20cam.gif) # adapt async sources ??? > * --- # adapt async sources - http requests using libcurl -- - .accent[ ### `curl_multi_perform` supports multiplexing requests on a thread] -- - .accent[ ### all the calls to curl must be made from that thread] -- - .accent[ ### completion and results must be delivered to the matching request] ??? > * --- # adapt async sources - http requests using libcurl ``` auto worker = create
([](subscriber
out){ while(out.is_subscribed()) { `curl_multi_perform`(curlm, /*. . .*/); for(;;) { CURLMsg *message = nullptr; message = `curl_multi_info_read`(curlm, /*. . .*/); out.on_next(message); if (!!message /*. . .*/) { continue; } break; } int handlecount = 0; `curl_multi_wait`(curlm, nullptr, 0, 500, &handlecount); } out.on_completed(); }) | subscribe_on(`httpthread`) | publish() | connect_forever(); // share ``` ??? > * --- # adapt async sources - http requests using libcurl ``` auto worker = create
([](subscriber
out){ while(out.`is_subscribed`()) { curl_multi_perform(curlm, /*. . .*/); for(;;) { CURLMsg *message = nullptr; message = curl_multi_info_read(curlm, /*. . .*/); out.`on_next`(message); if (!!message /*. . .*/) { continue; } break; } int handlecount = 0; curl_multi_wait(curlm, nullptr, 0, 500, &handlecount); } out.`on_completed`(); }) | subscribe_on(httpthread) | publish() | connect_forever(); // share ``` ??? > * --- # adapt async sources - http requests using libcurl to create an http request, use `worker` observable to run the curl api calls on the httpthread. .split-50[ .column[ .accent[ subscribe to http request] ``` worker | take(1) | tap([] (CURLMsg*){ auto curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); // . . . curl_multi_add_handle(curlm, curl); // }) | subscribe(); ``` ] .column[ .accent[ unsubscribe http request (cancel)] ``` worker | take(1) | tap([] (CURLMsg*){ // // // // curl_multi_remove_handle(curlm, curl); curl_easy_cleanup(curl); }) | subscribe(); ``` ] ] ??? > * --- .split-60[ .column[ ![](content/adapt async sources-COMPLETED-green.svg) - .accent[ ## adapted libcurl to rxcpp] - .accent[ ## built __polling loop__ to adapt libcurl to rxcpp] - .accent[ ## __callback__, __future__, __completion port__, etc.. patterns can also be adapted to rxcpp] ] .column[ .right[ .image-50[ ![](content/drop cam.gif) ]
### .accent[__next >>__] algorithmic trancendence ] ] ] ??? > * --- # algorithmic trancendence .center[ .image-40[ ![](content/trancendence.gif) ] ] ??? > * .accent[Algorithms] transcend .accent[Languages] --- ## sample of the algorithms available [http://reactive-extensions.github.io/RxCpp/namespacerxcpp_1_1operators.html](http://reactive-extensions.github.io/RxCpp/namespacerxcpp_1_1operators.html) .split-60[ .column[ * .accent[__buffer__] - [reactivex.io](http://reactivex.io/documentation/operators/buffer.html) * combine_latest - [rxmarbles.com](http://rxmarbles.com/#combineLastest) * concat - [rxmarbles.com](http://rxmarbles.com/#concat) * concat_map * .accent[__debounce__] - [rxmarbles.com](http://rxmarbles.com/#debounce) * .accent[__delay__] - [rxmarbles.com](http://rxmarbles.com/#delay) * distinct - [rxmarbles.com](http://rxmarbles.com/#distinct) * .accent[__distinct_until_changed__] - [rxmarbles.com](http://rxmarbles.com/#distinctUntilChanged) * element_at - [rxmarbles.com](http://rxmarbles.com/#elementAt) * .accent[__filter__] - [rxmarbles.com](http://rxmarbles.com/#filter) * .accent[__finally__] * flat_map - [reactivex.io](http://reactivex.io/documentation/operators/flatmap.html) ] .column[ * .accent[__group_by__] - [reactivex.io](http://reactivex.io/documentation/operators/groupby.html) * .accent[__ignore_elements__] - [reactivex.io](http://reactivex.io/documentation/operators/ignoreelements.html) * .accent[__map__] - [rxmarbles.com](http://rxmarbles.com/#map) * .accent[__merge__] - [rxmarbles.com](http://rxmarbles.com/#merge) * observe_on - [reactivex.io](http://reactivex.io/documentation/operators/observeon.html) * .accent[__on_error_resume_next__] - [reactivex.io](http://reactivex.io/documentation/operators/catch.html) * pairwise * .accent[__publish__] - [reactivex.io](http://reactivex.io/documentation/operators/publish.html) * reduce - [rxmarbles.com](http://rxmarbles.com/#reduce) * .accent[__repeat__] * replay - [reactivex.io](http://reactivex.io/documentation/operators/replay.html) * .accent[__retry__] - [reactivex.io](http://reactivex.io/documentation/operators/retry.html) ] ] ``` ``` > .accent[__algorithms__] __used in the twitter app__ --- ## sample of the algorithms available [http://reactive-extensions.github.io/RxCpp/namespacerxcpp_1_1operators.html](http://reactive-extensions.github.io/RxCpp/namespacerxcpp_1_1operators.html) .split-60[ .column[ * .accent[__sample__] - [rxmarbles.com](http://rxmarbles.com/#sample) * .accent[__scan__] - [rxmarbles.com](http://rxmarbles.com/#scan) * sequence_equal - [reactivex.io](http://reactivex.io/documentation/operators/sequenceequal.html) * skip - [rxmarbles.com](http://rxmarbles.com/#skip) * skip_last - [rxmarbles.com](http://rxmarbles.com/#skipLast) * skip_until - [rxmarbles.com](http://rxmarbles.com/#skipUntil) * .accent[__start_with__] - [rxmarbles.com](http://rxmarbles.com/#startWith) * .accent[__subscribe_on__] - [reactivex.io](http://reactivex.io/documentation/operators/subscribeon.html) * switch_if_empty - [reactivex.io](http://reactivex.io/documentation/operators/switch.html) * .accent[__switch_on_next__] - [reactivex.io](http://reactivex.io/documentation/operators/switch.html) ] .column[ * take - [rxmarbles.com](http://rxmarbles.com/#take) * take_last - [rxmarbles.com](http://rxmarbles.com/#takeLast) * .accent[__take_until__] - [rxmarbles.com](http://rxmarbles.com/#takeUntil) * take_while - [reactivex.io](http://reactivex.io/documentation/operators/takewhile.html) * time_interval - [reactivex.io](http://reactivex.io/documentation/operators/timeinterval.html) * .accent[__timeout__] - [reactivex.io](http://reactivex.io/documentation/operators/timeout.html) * timestamp - [reactivex.io](http://reactivex.io/documentation/operators/timestamp.html) * .accent[__window__] - [reactivex.io](http://reactivex.io/documentation/operators/window.html) * .accent[__with_latest_from__] - [rxmarbles.com](http://rxmarbles.com/#withLatestFrom) * zip - [rxmarbles.com](http://rxmarbles.com/#zip) ] ] ``` ```
> .accent[__algorithms__] __used in the twitter app__ --- ## learn algorithms once, use them in any Language .split-50[ .column[ * .accent[Java]: [https://github.com/ReactiveX/RxJava](https://github.com/ReactiveX/RxJava) * .accent[JavaScript]: [https://github.com/Reactive-Extensions/RxJS](https://github.com/Reactive-Extensions/RxJS), [https://github.com/ReactiveX/RxJS](https://github.com/ReactiveX/RxJS) * .accent[C#]: [https://github.com/Reactive-Extensions/Rx.NET](https://github.com/Reactive-Extensions/Rx.NET) * .accent[C#(Unity)]: [https://github.com/neuecc/UniRx](https://github.com/neuecc/UniRx) * .accent[Scala]: [https://github.com/ReactiveX/RxScala](https://github.com/ReactiveX/RxScala) * .accent[Clojure]: [https://github.com/ReactiveX/RxClojure](https://github.com/ReactiveX/RxClojure) * .accent[C++]: [https://github.com/Reactive-Extensions/RxCpp](https://github.com/Reactive-Extensions/RxCpp) * .accent[Lua]: [https://github.com/bjornbytes/RxLua](https://github.com/bjornbytes/RxLua) * .accent[Ruby]: [https://github.com/Reactive-Extensions/Rx.rb](https://github.com/Reactive-Extensions/Rx.rb) ] .column[ * .accent[Python]: [https://github.com/ReactiveX/RxPY](https://github.com/ReactiveX/RxPY) * .accent[Go]: [https://github.com/ReactiveX/RxGo](https://github.com/ReactiveX/RxGo) * .accent[Groovy]: [https://github.com/ReactiveX/RxGroovy](https://github.com/ReactiveX/RxGroovy) * .accent[JRuby]: [https://github.com/ReactiveX/RxJRuby](https://github.com/ReactiveX/RxJRuby) * .accent[Kotlin]: [https://github.com/ReactiveX/RxKotlin](https://github.com/ReactiveX/RxKotlin) * .accent[Swift]: [https://github.com/kzaher/RxSwift](https://github.com/kzaher/RxSwift) * .accent[PHP]: [https://github.com/ReactiveX/RxPHP](https://github.com/ReactiveX/RxPHP) * .accent[Elixir]: [https://github.com/alfert/reaxive](https://github.com/alfert/reaxive) * .accent[Dart]: [https://github.com/ReactiveX/rxdart](https://github.com/ReactiveX/rxdart) ] ] ??? > * --- # use algorithms to solve a problem once, reuse in any Language -- .accent[__parsing messages out of chunks of characters__] [http://stackoverflow.com/questions/31208418/split-iobservablebyte-to-characters-then-to-line](http://stackoverflow.com/questions/31208418/split-iobservablebyte-to-characters-then-to-line) > I receive events with a `byte[]`, this array might contains part of a line, multiple lines or one line. What I want is find a way to have an `IObservable` of Line so `IObservable
`, where each element of the sequence will be a line. -- [https://dev.twitter.com/streaming/overview/processing](https://dev.twitter.com/streaming/overview/processing) > The body of a streaming API response consists of a series of newline-delimited messages, where “newline” is considered to be `\r\n` (in hex, `0x0D 0x0A`) and “message” is a JSON encoded data structure or a blank line.
> Note that Tweet content may sometimes contain linefeed `\n` characters, but will not contain carriage returns `\r`. Therefore, to make sure you get whole message payloads, break out each message on `\r\n` boundaries, as `\n` may occur in the middle of a message. ??? > * --- .split-50[ .column[ .accent[__C#__] from the StackOverflow answer ```cs var strings = bytes. Select(arr => (Regex.Split( Encoding.Default. GetString(arr, 0, arr.Length - 1), "(\r)")). Where(s=> s.Length != 0). ToObservable()). Concat(). Publish(). RefCount(); var closes = strings. Where(s => s.EndsWith("\r")); var linewindows = strings.Window(closes); var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r)); ``` ] .column[ .accent[__C++__] code to extract tweets in the twitter app ```cpp auto strings = chunks | concat_map([](const string& s){ auto splits = split(s, "\r\n"); return iterate(move(splits)); }) | filter([](const string& s){ return !s.empty(); }) | publish() | ref_count(); auto closes = strings | filter(isEndOfTweet); auto linewindows = strings | window_toggle(closes | start_with(0), [=](int){return closes;}); auto lines = linewindows | flat_map([](const observable
& w) { return w | start_with("") | sum(); }); ``` ] ] ??? > * --- .split-60[ .column[ ![](content/algorithmic trancendence-COMPLETED-green.svg) - .accent[ ## sampled available algorithms] - .accent[ ## showed __languages__ with ReactiveX implementations] - .accent[ ## built __message parsing__ in C# and C++] ] .column[ .right[ ![](content/trancendence.gif)
### .accent[__next >>__] recap ] ] ] ??? > * --- # recap ??? > * --- # recap .accent[ ## primitives are too primitive] .center[ .image-60[ ![](content/primitivetechnology.gif) ] ] ??? > * --- # recap .accent[ ## handling Tweets] .center[ .image-60[ ![](content/robinhood.gif) ] ] ??? > * --- # recap .accent[ ## values distributed in time] .center[ .image-60[ ![](content/water_balloons.gif) ] ] ??? > * --- # recap .accent[ ## write an algorithm] .center[ .image-60[ ![](content/honda rube.gif) ] ] ??? > * --- # recap .accent[ ## virtuous procrastination] .center[ .image-60[ ![](content/lemur on photographer.gif) ] ] ??? > * --- # recap .accent[ ## opt-in thread-safety] .center[ .image-60[ ![](content/snl conductor.gif) ] ] ??? > * --- background-image: url(content/drop%20cam.gif) # recap .accent[ ## adapt existing sources] ??? > * --- # recap .accent[ ## algorithmic trancendence] .center[ .image-30[ ![](content/trancendence.gif) ] ] ??? > * --- # things I desire - .accent[ REST service library] - .accent[ http service library] - .accent[ bindings for ux libraries, like [https://github.com/tetsurom/rxqt](https://github.com/tetsurom/rxqt)] - .accent[ bindings for asio, like [https://github.com/pudae/example](https://github.com/pudae/example)] - .accent[ rxcpp v3 [https://github.com/kirkshoop/rxcppv3](https://github.com/kirkshoop/rxcppv3)] - .accent[ standardization] - .accent[ native algorithm support in libraries (asio, boost, poco, etc..)] ??? > * --- .split-40[ .column[ ## credits .accent[__Eric Mittelette__] shaped this presentation from start to finish. I am deeply grateful for all his time and effort. .accent[__Niall Connaughton__] presented the `RxJS` twitter analisys app that inspired me to build one with `rxcpp` .accent[__Aaron Lahman__] made the first prototype of `rxcpp`. .accent[__Grigoriy Chudnov__, __Valery Kopylov__] and many other `rxcpp` contributors.. ] .column[ ## resources - .accent[ [https://github.com/kirkshoop/twitter](https://github.com/kirkshoop/twitter)] - .accent[ [https://github.com/Reactive-Extensions/RxCpp](https://github.com/Reactive-Extensions/RxCpp)] - .accent[ [http://reactive-extensions.github.io/RxCpp/](http://reactive-extensions.github.io/RxCpp/)] - .accent[ [https://github.com/kirkshoop/rxcppv3](https://github.com/kirkshoop/rxcppv3)] - .accent[ [http://rxmarbles.com/](http://rxmarbles.com/)] - .accent[ [http://reactivex.io/intro.html](http://reactivex.io/intro.html)] - .accent[ [http://reactivex.io/learnrx/](http://reactivex.io/learnrx/)] ] ] ??? > * --- # Lightning talk slides .accent[ ## errors - forgotten, but not gone] [https://kirkshoop.github.io/norawthread/errors.html](https://kirkshoop.github.io/norawthread/errors.html) .accent[ ## Networking TS w/Algorithms] [https://kirkshoop.github.io/norawthread/rxnetts.html](https://kirkshoop.github.io/norawthread/rxnetts.html) ??? > * --- ##complete. questions?
??? > * --- # appendix ??? > * --- # rxcpp architecture .mermaid[ classDiagram subscription --o subscriber : 1 observer --o subscriber : 1 subscription : bool is_subscribed() subscription : void unsubscribe() observer : void on_next(T) observer : void on_error(exception_ptr) observer : void on_completed() subscriber : subscription get_subscription() subscriber : observer get_observer() subscriber : void on_next(T) subscriber : void on_error(exception_ptr) subscriber : void on_completed() subscriber : bool is_subscribed() subscriber : void unsubscribe() observable o-- subscriber : 0..n observable : subscription subscribe(subscriber) ] ??? > * --- # rxcpp scheduler architecture .mermaid[ classDiagram action --o schedulable : 1 subscription --o worker : 1 subscription --o schedulable : 1 subscription : bool is_subscribed() subscription : void unsubscribe() action : void operator()() schedulable : subscription get_subscription() schedulable : action get_schedulable() scheduler -- worker worker o-- schedulable : 0..n scheduler : time_point now() scheduler : worker create_worker(subscription) worker : scheduler get_scheduler() worker : time_point now() worker : subscription schedule(time_point at, schedulable) ] ??? > * --- # how to call sentiment web service -- ``` auto requestsentiment = defer([=]() { ``` -- ``` std::map
headers; headers["Content-Type"] = "application/json"; headers["Authorization"] = "Bearer " + key; ``` -- ``` auto body = json::parse( R"({"Inputs":{"input1":[{"tweet_text": "Hi!"}]},"GlobalParameters":{}})" ); ``` -- ``` return http.create(http_request{url, "POST", headers, body.dump()}) | map([](http_response r){ return r.body.complete; }) | `merge(poolthread)`; }); ``` ??? > * --- # delegate task execution .center[ .image-70[ ![](content/fetch newspaper.gif) ] ] ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service) -- ``` class io_service : public scheduler_interface { asio::io_service& io_service_; class strand_worker : public worker_interface; public: explicit io_service(asio::io_service& io_service); ``` -- ``` virtual clock_type::time_point now() const { return `clock_type::now`(); } ``` -- ``` virtual worker create_worker(composite_subscription cs) const { return worker(move(cs), make_shared<`strand_worker`>(io_service_)); } }; ``` ??? > * factory for strand_worker > * use real time of underlying clock --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service) .split-60[ .column[ ``` class strand_worker : public worker_interface { mutable asio::strand strand_; public: explicit strand_worker(asio::io_service& io_service); virtual clock_type::time_point now() const { return `clock_type::now`(); } virtual void schedule(const schedulable& scbl) const; virtual void schedule(clock_type::time_point when, const schedulable& scbl) const; }; ``` ] .column[ - .accent[ ### a worker must ensure that only one schedulable is called at a time] - .accent[ ### this worker uses asio::strand to order the calls] - .accent[ ### this worker uses real time for now()] ] ] ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service) ``` virtual void schedule(const schedulable& scbl) const { if (!scbl.is_subscribed()) return; strand_.`post`([scbl] { if (scbl.`is_subscribed`()) { `scbl`(); } }); } ``` ??? > * --- # delegate task execution - asio `io_service` [https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service](https://github.com/pudae/example/blob/master/rx_test/rxasio/io_service) ``` virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (!scbl.is_subscribed()) return; auto diff_ms = duration_cast
(when - now()); auto timer = make_shared
(strand_.get_io_service()); timer->expires_from_now(diff_ms); timer->`async_wait`(strand_.wrap([timer, scbl](const system::error_code& ec) { if (!ec && scbl.`is_subscribed`()) { `scbl`(); } })); } ``` ??? > * --- # delegate task execution - .accent[ ## delegate tasks to; thread pool, event loop, actor, channel, etc..] - .accent[ ## implement a scheduler to delegate tasks] ``` auto now() -> time_point; ``` ``` auto schedule(schedulable what) -> void; ``` ``` auto schedule(time_point at, schedulable what) -> void; ``` ??? > * --- .split-60[ .column[ ![](content/delegate task execution-COMPLETED-green.svg) .accent[ ### described `scheduler` and `worker`] .accent[ ### implemented `scheduler` and `worker` to delegate tasks to `asio::io_service`] ] .column[ .right[ ![](content/fetch newspaper.gif)
### .accent[__next >>__] adapt async sources ] ] ] ??? > * --- # recap .accent[ ## delegate task execution] .center[ .image-50[ ![](content/fetch newspaper.gif) ] ] ??? > * --- .split-50[ .column[ .center[ ## .accent[Raw Loop] ![raw loop](content/rawloop.png) ] ] .column[ .center[ ## .accent[Algorithms] ![algorithms](content/norawloop.png) ] ] ] ??? > * One of the things Sean said was that when reviewing code he would try to find the algorithms that each for loop was reimplementing - often poorly - and ask for the loop to be replaced by the algorithms. --- .center[ .image-70[ ![no raw async primitives](content/norawasyncprimitives.png) ] ] ??? > * Later in the talk Sean added another goal - no raw syncronization primitives. --- ## sending telemetry from client page ```ts var telemetry = new Rx.Subject
(); ``` -- ```ts export function writeEntry(data: ITelemetryData) { telemetry.onNext(new TelemetryData(data)); } ``` ??? > * --- ## sending telemetry from client page ```ts var pending: { bodysize: number, entries: TelemetryData[]; } = { bodysize: 0, entries: [] }; ``` -- ```ts var maxTimeTrigger = Rx.Observable.interval(60 * 1000); ``` -- ```ts var maxCountTrigger = telemetry.filter(e => pending.entries.length >= 50).map(e => -1); ``` -- ```ts var maxSizeTrigger = telemetry.filter(e => pending.bodysize >= 20000).map(e => -2); ``` -- ```ts var boundaries = Rx.Observable.merge(maxTimeTrigger, maxCountTrigger, maxSizeTrigger).share(); ``` ??? > * --- ```ts export var telemetryUpdate$ = telemetry. window(boundaries.startWith(-1), () => boundaries). ``` -- ```ts flatMap(w => w.reduce((data, entry) => { data.bodysize += JSON.stringify(entry).length; data.entries.push(entry); return data; }, pending)). filter(data => data.entries.length !== 0). ``` -- ```ts flatMap(data => Rx.Observable. fromPromise(flushTelemetry(FlushRequest.Async)). ``` -- ```ts // exponential backoff on failure retryWhen(errors => errors. scan((c, e) => c + 1, 0). flatMap(i => Rx.Observable.timer(Math.pow(5, i) * 1000))). ``` -- ```ts // give up timeout(10 * 60 * 1000, Rx.Observable.empty())). ``` -- ```ts retry(); var subscription = telemetryUpdate$.subscribe(); ``` ??? > *