layout: true --- class: middle .center[ # Adding Async Algorithms to std #### algorithms for values distributed in time ]
??? > * structure as interview (slide per question) with answer prompts in the notes --- # me .center[ ## .green[ ## I love code ] ] ??? > * team meeting .green[weekly number] good week/bad week --- # me .center[ ## .green[ I work at microsoft ] ] ??? > * roomate in CS - .green[homework] was easy. > * took windows programming class for fun. classmate insisted that I send resume. became intern then contractor then employee --- #what is the goal here? * ## .green[additive] to existing (stl, rangev3, parrallel_stl) * ## .green[scan] dense material * ## .green[derive] requirements from selected algorithms ??? > * .green[additive] - space vs. time, minimal vs. maximal resources. > * .green[scan] - go fast through dense slides then questions at end. > * .green[derive] - find the requirements for the __iterator__ for async algorithms --- #What are examples of values distributed in time? ??? --- module: Context function: intervalsv3 types: arguments: output: rxv3output # ints distributed in time .split-60[ .column[ __code__ (rxcppv3) ```cpp auto threeeven = copy_if(even) | take(3) | `delay`(makeStrand, 1s); intervals(makeStrand, steady_clock::now(), 1s) | threeeven | as_interface
() | finally([](){cout << "caller stopped" << endl;}) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * C++ code is compiled with emscripten and running live in the slides, triggered by slide transitions > * rxccpv3 is a poc using C++14 > * scheduling time with strands --- ## delay
??? > * needs to schedule a function for .green[later] > * live .green[marble] diagrams > * .green[top line] is input values in time > * .green[bottom line] is output values in time --- # ux events distributed in time __code__ (coroutine algorithms) ```cpp std::future
AddVisuals(CoreWindow window, VisualCollection visuals) { for co_await (auto move : window.co_PointerPressed() | transform([](auto press) -> float2 {return press.args.CurrentPoint().Position(); }) | `filter`([](auto point) {return !VisualAndOffsetFromPoint(visuals, point).first; })) { AddVisual(visuals, point); } } ``` ??? > * co_algorithm is a poc using proposed co_await feature > * co_PointerPressed is a poc addition to the moderncpp library --- ## copy_if
??? > needs to selectively pass to the next handler --- # ux events distributed in time __code__ (coroutine algorithms) ```cpp std::future
MoveVisuals(CoreWindow window, VisualCollection visuals) { for co_await (auto move : `window.co_PointerPressed()` | transform([](auto press) -> float2 {return press.args.CurrentPoint().Position(); }) | transform([=](auto point) {return VisualAndOffsetFromPoint(visuals, point); }) | filter([](auto selected) {return !!selected.first; }) | transform([=](auto selected) { MoveToTop(visuals, selected.first); return `window.co_PointerMoved()` | transform([](auto move) -> float2 {return move.args.CurrentPoint().Position(); }) | transform([=](auto point) { auto to = float2{ point.x + selected.second.x, point.y + selected.second.y }; return std::make_pair(selected.first, to); }) | take_until(`window.co_PointerReleased()`); }) | `merge`()) { move.first.Offset({ move.second.x, move.second.y, 0.0f }); } } ``` ??? > * produces a new sequence of moves for a selected item that starts on each press and stops when released --- ## merge
??? > needs to merge values from multiple sources --- function: rxmousedrags output: rxmousedragsoutput # ux events distributed in time .split-60[ .column[ __code__ (rxcpp) ```cpp auto down$ = mousedown$("#window"); auto up$ = mouseup$("#window"); auto move$ = mousemove$("#window"); down$ | flat_map([=](MouseEvent){ return move$ | `take_until`(up$) | map([](MouseEvent){return 1;}) | start_with(0) | sum(); }) | map( [](int c){ return to_string(c) + " moves while mouse down"; }) | subscribe(println(cout)); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * rxcpp is a complete library with many algorithms that is designed for .green[C++11] > * on each mouse up, .green[print] the number of mouse moves since mouse down. > * shows how you can .green[adapt] existing sequences to async algorithms > * the mouse event functions wrap .green[HTML DOM addeventlistener] access provided by emscripten. > * .green[$] is used to indicate a source of values over time. Popularized by @stalz --- ## take_until
??? > * needs to stop the source early when the trigger value arrives --- function: rxlinesfrombytes types: number,number,number arguments: 1000, 4, 11 output: rxlinesfrombytesoutput # packets of bytes distributed in time .split-60[ .column[ __code__ (rxcpp) ```cpp asyncReadBytes() | tap(`printVectorOfBytes`) | concat_map(vectorOfStringsFromVectorOfBytes) | group_by(groupFromString) | flat_map(appendGroupStrings) | subscribe(`println(cout)`); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * a .green[packet] of bytes arrives once a second and is printed. > * each set of bytes is .green[split] on `\r` > * sets ending in '\r' are .green[grouped] into lines > * and then each group is .green[reduced] into a single line and then printed --- function: rxhttp types: string, number arguments: https://raw.githubusercontent.com/Reactive-Extensions/RxCpp/master/README.md, 80 output: rxhttplocalhostoutput # http requests distributed in time .split-60[ .column[ __code__ (rxcpp) ```cpp struct data { int size; string firstLine;}; struct model { map
store; }; httpGet("https://aka.ms/rxcppreadme") | flat_map([](response_t r) { return r.progress() | `combine_latest`( [=](progress_t p, vector
d){ return make_tuple(r.url(), p, d); }, r.load()) | `scan`( model{}, updateModelFromTuple); }) | subscribe(println(cout)); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * .green[combines] progress and load signals from many requests into a model. > * a common pattern is to .green[scan] into a model. the result is a source of model updates > * in the JS world, scan to a model is an implementation of the .green[`flux`] concept. > * httpGet wraps .green[`XmlHttpRequest`] access provided by emscripten. --- ## scan
??? > * scan .green[holds] `x` as state. > * the .green[updator] function is called when each value arrives. > * `x` is assigned the .green[result] of the function. > * the current value of `x` is .green[emitted]. --- name: whyalgo #why alogrithms? * ## .green[documented] * ## .green[stable] * ## .green[optimized] * ## .green[descriptive] ??? > * the .green[behavior], .green[complexity] and .green[tradeoffs] selected are documented and consistent across implementations > * the implementation is .green[well specified], .green[tested] and .green[actively maintained] > * the implementation is .green[minimal] and .green[efficient] > * the usage .green[describes] the steps clearly (transform and copy_if vs for loop and an if statement) > * after a while looking at raw callbacks and promises will become an excercise in .green[finding the set of algorithms] that it re-implements --- ##what do algorithms operate on? ??? > * algorithms deal with sequences of .green[values] --- #What ways can a sequence be delivered? ??? --- #What ways can a sequence be delivered? .split-50[ .column[ ## in space * ## .green[vector] of mouse positions * ## .green[generator] of mouse positions ] .column[ `using mouseMoves = vector
>;` `$$ \begin{array}{|c|c|c|c|c|} \hline 0,0 & 100,100 & 200,200 & 300,300 & 400,400\\ \hline \end{array} $$` ```cpp auto mouseMoves(int start, int end) -> std::generator
> { for(;start != end; ++start){ auto position = start * 100; co_yield make_tuple(position, position); } } ``` ] ] ??? > * values distributed in .green[space] > * stl, rangev3 --- #What ways can a sequence be delivered? .split-50[ .column[ ## in time * ## mouse move .green[events] * ## network .green[packets] ] .column[ ```cpp auto window::mouseMoves() -> co_generator
> { for co_await(auto event : events()) { if (event.id == MOUSEMOVE) { co_yield mousePositionFrom(event); } } } auto socket::bytes() -> co_generator
> { vector
out; while (out = co_await read(. . .)) { co_yield out; } } ``` ] ] ??? > * values distributed in .green[time] > * rxcpp --- # ReactiveExtensions * ## .green[algorithms] for values distributed in time * ## .green[implementations] for many [languages](http://reactivex.io/languages.html) ??? > * .green[rxcpp] - not the only way to represent the requirements > * .green[rxcpp] - extensive community and prod usage (netflix, microsoft, etc..) > * .green[algorithms] - have you watched .green[sean parent]'s _avoid raw loops_ talk? > * .green[algorithms] - have you watched .green[eric neibler]'s _calendar printer_ talk using range-v3? --- # what algorithms are supported in rxcpp? __rxcpp__ uses cpp11 in vs2013, vs2015, clang and gcc .split-40[ .column[ * ### .green[Combining] * amb, buffer, combine_latest, concat, concat_map, flat_map, group_by, merge , switch_if_empty, switch_on_next, window, window_toggle, with_latest_from, zip * ### .green[Transforming] * delay, map, pairwise, on_error_resume_next, reduce, scan ] .column[ * ### .green[Filtering] * default_if_empty, distinct, distinct_until_changed, element_at, ignore_elements, take, take_last, take_until, skip, skip_last, skip_until, sample, debounce, filter * ### .green[Others] * all, contains, exists, observe_on, publish, repeat, replay, retry, sequence_equal, subscribe_on, tap, time_interval, timeout, timestamp ] ] ??? --- ## what algorithms will help define the concepts? ??? --- ## copy_if
??? > needs to selectively pass to the next handler --- ## last_or_default
??? > needs to know when the source is complete --- ## take
??? > take needs to be able to stop the source early. --- ## delay
??? > needs to schedule a function to be called later --- ## resume_error  ??? > needs to handle errors by switching to a different source --- ## what features are required to build the desired algorithms? ### .green[rxcppv3] is a proof-of-concept in cpp14 that will be used to explore the requirements ??? --- ## what are the minimum features? .split-40[ .column[ ####sequence concepts ```cpp struct observable { void bind(observer); }; struct observer { template
void next(T); }; struct lifter { observer lift(observer); }; ``` ] .column[ ####sequence implementations ```cpp const auto ints = [](auto first, auto last){ return make_observable([=](auto r){ for(auto i = first;; ++i){ r.next(i); if (i == last) break; } }); }; const auto copy_if = [](auto pred){ return make_lifter([=](auto r){ return make_observer(r, [=](auto& r, auto v){ if (pred(v)) r.next(v); }); }); }; ``` ] ] ??? > Minimum > * a .green[source] of values > * a .green[handler] for values > * algorithms that .green[lift handlers] --- module: Context function: pushv3 types: number,number arguments: 0, 9 output: pushv3output ## push sequence .split-60[ .column[ __code__ ```cpp ints(0, 9) | copy_if(even) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- ## what needs to change to support last_or_default?
??? > * needs to know when the source is .green[complete] > * also needs to store .green[state] - covered later. --- .split-40[ .column[ ####sequence concepts ```cpp struct observer { template
void next(T); void `complete`(); }; ``` ] .column[ ####sequence implementations ```cpp const auto last_or_default = [](auto def){ return make_lifter([=](auto scbr){ return make_subscriber([=](auto ctx){ auto r = scbr.create(ctx); using last_t = std::decay_t
; auto last = make_state
(ctx.lifetime, def); return make_observer(r, r.lifetime, [last](auto& , auto v){ last.get() = v; }, [last](auto& r){ r.next(last.get()); r.`complete`(); }); }); }); }; ``` ] ] ??? > * .green[copies] each value as it arrives > * when complete, .green[sends] the copied value before sending complete --- module: Context function: lastv3 types: number,number,number arguments: 0, 100000, 42 output: lastv3output ## what needs to change to support last_or_default? .split-60[ .column[ __code__ ```cpp ints(0, 100000) | copy_if(even) | last_or_default(42) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * 1 million ints are filtered to 500 thousand even ints and then the last even int is printed. --- ## what needs to change to support take?
??? > * needs to be able to stop the source (.green[cancellation]). > * needs to be able to keep .green[state]. --- ## what features are needed to support asynchronous lifetime and cancellation? * ## .green[signal] for explicit cancellation of lifetime graph * ## .green[registration] for cancellation signal * ## .green[nested] lifetime graph * ## .green[make_shared
] equivalent for asynchronous lifetime ??? --- .split-50[ .column[ ####sequence concepts ```cpp struct subscription { bool is_stopped(); void stop(); // `signal` // `registration` void insert(function
stopper); // `nested` void insert(const subscription& s); void erase(const subscription& s); // `make_shared
` template
state
make_state( ArgN... argn); }; template
struct context { subscription lifetime; Payload& get(); }; ``` ] .column[ ####sequence concepts ```cpp struct starter { template
subscription start(context
); }; struct subscriber { template
observer create(context
); }; struct observable { starter bind(subscriber); }; struct lifter { subscriber lift(subscriber); }; struct adaptor { observable adapt(observable); }; ``` ] ] ??? > * subscription .green[owns] the lifetime of a subscribed expression. > * subscription lifetimes can be .green[nested]. > * .green[stop] will call stop on all nested subscriptions then call all registered stoppers in the .green[inverse] order they were inserted. > * .green[make_state] will allocate heap that is scoped by the subscription lifetime. all the state is destructed when the last copy of the subscription is destructed. > * .green[adaptor] and .green[lifter] are two different ways to implement algorithms > * .green[subscriber] allows late binding of the lifetime to an observer > * .green[starter] allows lazy start of a completed expression --- ####sequence implementations ```cpp const auto take = [](int n){ return make_adaptor([=](auto source){ return make_observable([=](auto scrb){ return source.bind( make_subscriber([=](auto ctx){ auto r = scrb.create(ctx); auto remaining = `make_state
`(r.lifetime, n); auto lifted = make_observer(r, r.lifetime, [remaining](auto& r, auto v){ r.next(v); if (--remaining.get() == 0) { r.`complete`(); } }); if (n == 0) { lifted.`complete`(); } return lifted; })); }); }); }; ``` ??? > * expanded .green[make_observer], that can pass observer calls through by default and connect the lifetime. > * use async state to store the last value. > * use complete to stop the lifetime early --- module: Context function: takev3 types: number,number,number arguments: 0, 9, 3 output: takev3output ## what needs to change to support take? .split-60[ .column[ __code__ ```cpp ints(0, 9) | copy_if(even) | take(3) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- ## what needs to change to support failure? ??? > * the exception must be .green[caught] > * the exception must be .green[sent] to the observer > * .green[stop] must be signaled to cancel the expression --- ####sequence concepts ```cpp struct observer { template
void next(T); template
void `error`(E); void complete(); }; ``` ??? > * stop is already working. > * add an error method on the observer > * ensure that calling error then calls stop --- module: Context function: errorv3 types: number,number,number arguments: 0, 9, 3 output: errorv3output ## what needs to change to support failure? .split-60[ .column[ __code__ ```cpp ints(0, 9) | copy_if(always_throw) | take(count) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- ## what needs to change to support delay?
??? > * need to schedule a function to be called later --- ## what needs to change to support delay? .split-50[ .column[ ####defer concepts ```cpp template
struct `strand` { subscription lifetime; Clock::time_point now(); void defer_at(Clock::time_point, observer); }; template
struct context { subscription lifetime; Clock::time_point now(); void defer_at(Clock::time_point, observer); Payload& get(); }; ``` ] .column[ ####sequence concepts ```cpp struct starter { template
subscription start(context
); }; struct subscriber { template
observer create(context
); }; ``` ] ] ??? > * a strand is a FIFO queue of .green[observers] > * a strand controls the value of .green[now] > * when a strand is .green[stopped] all pending observers are stopped. --- module: Context function: delayv3 types: number, number, number arguments: 1000, 1500, 3 output: delayv3output ## what needs to change to support delay? .split-60[ .column[ __code__ ```cpp intervals(makeStrand, steady_clock::now(), 1s) | printproduced(cout) | delay(makeStrand, 1500ms) | take(3) | printto(cout) | start
(subscription{}, destruction{}); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- ## what needs to change to support testing? ??? need to virtualize time --- ## what needs to change to support testing? .split-50[ .column[ ```cpp struct virtual_clock { static bool is_steady() const; time_point `now`() const; void `now`(time_point at); }; ``` ] .column[ ```cpp struct test_loop { void call(item_type& next) const; void step(typename clock_type::duration d) const; void run() const; makeStrand make() const; }; ``` ] ] ??? --- ## what needs to change to support testing? .split-50[ .column[ ```cpp struct recorded { lifetime_record lifespan( time_point start = duration{200}, time_point stop = duration{1000}); marble_record next(time_point at, T v); marble_record error(time_point at, error_t e); marble_record complete(time_point at); observable hot(std::vector
m); observable cold(std::vector
m); //... }; ``` ] .column[ ```cpp //... struct test_result { time_point origin; lifetime_record lifespan; map
> marbles; }; lifter record(string key) const; template
terminator test(test_loop
& tl, lifetime_record l = lifetime_record{}); }; ``` ] ] ??? --- module: TestContext function: testdelayv3 types: number, number, number arguments: 1000, 1500, 3 output: testdelayv3output ## what needs to change to support testing? .split-60[ .column[ __code__ ```cpp rx::test_loop<> loop; rx::recorded
on; auto tr = on.hot({ on.next(on.origin() + 1s * 1, 0), on.next(on.origin() + 1s * 2, 1), on.next(on.origin() + 1s * 3, 2), on.next(on.origin() + 1s * 4, 3), on.next(on.origin() + 1s * 5, 4) }) | printout(cout, "produced") | on.record("produced") | delay(loop.make(), 1500ms) | take(3) | printout(cout, "") | on.record("emitted") | on.test(loop); loop.run(); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- module: TestContext>Context function: testdelayv3>delayv3 types: number, number, number>number, number, number arguments: 1000, 1500, 3>1000, 1500, 3 output: testdelayv3rightoutput>delayv3leftoutput ## what needs to change to support testing? .split-50[ .column[ __output__ (emscripten) .emscripten-output[
] ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? --- ## what needs to change to support testing? .split-50[ .column[ #### record test result ```cpp rx::test_loop<> loop; rx::recorded
on; auto tr = on.hot({ on.next(on.origin() + 1s * 1, 0), on.next(on.origin() + 1s * 2, 1), on.next(on.origin() + 1s * 3, 2), on.next(on.origin() + 1s * 4, 3), on.next(on.origin() + 1s * 5, 4) }) | printout(cout, "produced") | on.record("produced") | delay(loop.make(), 1500ms) | take(3) | printout(cout, "") | on.record("emitted") | on.test(loop); loop.run(); ``` ] .column[ #### assert test succeeded ```cpp auto expected = on.expected({ on.next(origin() + 1s * 1 + 1500ms, 0), on.next(origin() + 1s * 2 + 1500ms, 1), on.next(origin() + 1s * 3 + 1500ms, 2), on.complete(origin() + 1s * 3 + 1500ms) }); if (tr.get().marbles["emitted"] == expected) { cout << "SUCCEEDED" << endl; } else { cout << "FAILED" << endl; cout << "Actual:" << endl; cout << tr.get().marbles["emitted"] << endl; cout << "Expected:" << endl; cout << expected << endl; } ``` ] ] ??? --- module: TestContext function: testdelayv3 types: number, number, number arguments: 1000, 1500, 1 output: testdelayv3failoutput ## what needs to change to support testing? .split-60[ .column[ __failing code__ ```cpp rx::test_loop<> loop; rx::recorded
on; auto tr = on.hot({ on.next(on.origin() + 1s * 1, 0), on.next(on.origin() + 1s * 2, 1), on.next(on.origin() + 1s * 3, 2), on.next(on.origin() + 1s * 4, 3), on.next(on.origin() + 1s * 5, 4) }) | printout(cout, "produced") | on.record("produced") | delay(loop.make(), 1500ms) | `take(1)` | printout(cout, "") | on.record("emitted") | on.test(loop); loop.run(); ``` ] .column[ __output__ (emscripten) .emscripten-output[
] ] ] ??? > * .green[fail] the test by using the wrong value for take. --- #Requirements * ## .green[next], .green[complete], .green[error]. * ## .green[lifetime], .green[allocation]. * ## .green[cancellation], .green[scheduling]. * ## .green[virtual-time], .green[testing] ??? --- ## What if these algorithms are implemented using the coroutine proposal? ```cpp co_value_generator
`transform`(Source source, Selector select) { for co_await (auto&& v : source) { co_yield select(v); } } ``` ```cpp co_value_generator
`concat`(Source source) { for co_await (auto&& s : source) { for co_await (auto&& v : s) { co_yield v; } } } ``` ??? > * some algorithms are easy to write > * transform - call the select function for each value > * concat - consume each nested source sequentially --- ## What if these algorithms are implemented using the coroutine proposal? ```cpp struct merge_value_promise : co_generator_promise
{ // >200 lines of code merge_source_awaiter push(Source s) const { auto& p = co_await merge_source_awaiter::get(); p.bind(this, canceled); for co_await (auto& v : s) { co_yield v; } } }; co_generator
> merge(Source source) { auto& p = co_await merge_value_promise
::get(); co_await p.caller_awaiter(nullptr); for co_await (auto&& s : source) { p.push(std::move(s)); } } ``` ??? > * some algorithms are harder > * merge - consume all nested sources concurrently and interleave their values > * coroutines express sequential processing of values distributed in time naturally. > * expressing concurrent programming in coroutines requires extensive code. --- ## What if these algorithms are implemented using the coroutine proposal? #### transform usage ```cpp future
print0to9doubled() { for co_await(auto v : ints(0, 9) | transform([](int v){return v * 2;})) { cout << v << endl; } } ``` ??? > * usage composes like the other libraries > * consumption uses `for co_await()` on the expression --- ## What if these algorithms are implemented using the coroutine proposal? .split-50[ .column[ #### sequence concepts ```cpp template
struct co_generator { using iterator = co_iterator
; co_iterator_awaiter
begin() const; iterator end() const; co_generator_promise
const * p; }; ``` ] .column[ #### sequence concepts ```cpp template
struct co_iterator : std::iterator
{ // end iterator co_iterator(nullptr_t) : m_p(nullptr); // iterator co_iterator(co_generator_promise
const & p); co_inc_awaiter
operator++(); bool operator==(co_iterator const &rhs) const; T &operator*(); T *operator->(); co_generator_promise
const * m_p; }; ``` ] ] ??? --- ## What if these algorithms are implemented using the coroutine proposal? .split-50[ .column[ #### sequence concepts ```cpp template
struct co_iterator_awaiter { bool await_ready(); void await_suspend( const coroutine_handle<>& handle); co_iterator
await_resume(); co_generator_promise
const * m_p; }; ``` ] .column[ #### sequence concepts ```cpp template
struct co_inc_awaiter { bool await_ready(); void await_suspend( const coroutine_handle<>& handle); co_iterator
& await_resume(); co_iterator
* m_it; }; ``` ] ] ??? --- ##complete. questions?
??? > * concat some .green[questions]