Created by Kirk Shoop / @kirkshoop / source
Photo Credit: Olgierd Pstrykotwórca cc
Photo Credit: Rob Grambau cc
Photo Credit: Carlos Caicedo cc
Movie Credit: Office Space
Photo Credit: Rik Hermans cc
Credit: RxJava Wiki
Credit: RxJava Wiki
Credit: RxJava Wiki
Credit: RxJava Wiki
Many interactive diagrams can be found at RxMarbles
Idea credit: David Sankel's cppnow2014 presentation
auto orbit_points = orbitPointsFromTimeInPeriod( timeInPeriodFromMilliseconds( updates. milliseconds())); auto location_points = mouse. moves(). map(pointFromMouse); location_points. combine_latest(std::plus<>(), orbit_points). subscribe( [=](ofPoint c){ // update the point that the draw() call will use center = c; });
rxcpp::observable<float> ofxCircle::timeInPeriodFromMilliseconds( rxcpp::observable<unsigned long long> timeInMilliseconds){ return timeInMilliseconds. map( [this](unsigned long long tick){ // map the tick into the range 0.0-1.0 return ofMap(tick % int(orbit_period * 1000), 0, int(orbit_period * 1000), 0.0, 1.0); }); } rxcpp::observable<ofPoint> ofxCircle::orbitPointsFromTimeInPeriod( rxcpp::observable<float> timeInPeriod){ return timeInPeriod. map( [this](float t){ // map the time value to a point on a circle return ofPoint(orbit_radius * std::cos(t * 2 * 3.14), orbit_radius * std::sin(t * 2 * 3.14)); }); }
IEnumerable
// C# List<string> fruits = new List<string> { "apple", "passionfruit", "banana", "mango", "orange", "blueberry", "grape", "strawberry" }; IEnumerable<string> query = fruits.Where(fruit => fruit.Length < 6); IEnumerable<int> squares = Enumerable.Range(1, 10).Select(x => x * x);
LINQ (Pull)
// C# // GetEnumerator starts an independent in-order traversal of the source IEnumerator<T> IEnumerable<T>::GetEnumerator(); void IEnumerator<T>::IDisposable::Dispose(); // cancelation bool IEnumerator<T>::MoveNext(); // false for complete T IEnumerator<T>::Current; // throws for error
Rx (Push)
// C# // Subscribe inverts GetEnumerator and MoveNext IDisposable IObservable<T>::Subscribe(Observer<T>); void IDisposable::Dispose(); // cancelation Observer<T>::OnComplete(); // inverts MoveNext return value Observer<T>::OnNext(T); // inverts Current Observer<T>::OnError(Exception); // inverts throw
// C# DateTimeOffset Scheduler::Now; // current time IDisposable Scheduler::Schedule(DateTimeOffset dueTime, Action action); void IDisposable::Dispose(); // cancelation void Action::Action();
Credit: RxJava Wiki
Time in Rx is Discrete, not Continuous
subscription
// lifetime bool subscription::is_subscribed(); void subscription::unsubscribe(); // nested lifetimes weak_subscription composite_subscription::add(Subscription /*void()*/); void composite_subscription::remove(weak_subscription);
observable<>
static observable<T> observable<>::create<T>( OnSubscribe /*void(subscriber<T> out)*/); // sources static observable<T0> observable<>::from(T0, TN...); static observable<T> observable<>::iterate(Collection<T>); static observable<T> observable<>::range( T first, T last, difference_type step); static observable<long> observable<>::interval( rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period); static observable<T> observable<>::never<T>(); static observable<T> observable<>::empty<T>(); static observable<T> observable<>::error<T>(Exception); // . . .
observable<T>
composite_subscription observable<T>::subscribe( composite_subscription lifetime, OnNext /*void(T)*/, OnError /*void(std::exception_ptr)*/, OnCompleted /*void()*/); // operators observable<T> observable<T>::filter(Predicate /*bool(T)*/); observable<U> observable<T>::map(Transform /*U(T)*/); observable<V> observable<T>::flat_map( Extract /*observable<U>(T)*/, Transform /*V(T, U)*/); observable<U> observable<T0>::combine_latest( Transform /*U(T0, TN...)*/, observable<TN>...); observable<T> observable<T>::merge(observable<T>...); observable<T> observable<T>::concat(observable<T>...); // . . .
Coordination
// Default - not thread safe // noop for when all observables are using the same thread auto noop_immediate = identity_immediate(); auto noop_trampoline = identity_current_thread(); // Opt-in - thread safe // Uses a mutex to serialize calls from multiple threads auto serialize_with_pool = serialize_event_loop(); auto serialize_with_new = serialize_new_thread(); // Uses a queue to shift all calls to a new thread auto observe_on_pool = observe_on_event_loop(); auto observe_on_new = observe_on_new_thread();
auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<unsigned long long> m_on; const rxsc::test::messages<float> p_on; auto xs = sc.make_hot_observable({ m_on.next(300, 250), m_on.next(400, 500), m_on.next(500, 750), m_on.next(600, 1000), m_on.completed(700) }); orbit_offset = 0; orbit_period = 1.0; auto res = w.start( [&]() { return timeInPeriodFromMilliseconds(xs); }); auto required = rxu::to_vector({ p_on.next(300, 0.25), p_on.next(400, 0.5), p_on.next(500, 0.75), p_on.next(600, 0.0), p_on.completed(700) }); auto actual = res.get_observer().messages();
const rxsc::test::messages<float> p_on; const rxsc::test::messages<ofPoint> pt_on; auto roundedPt = [](ofPoint pt){ return ofPoint(std::round(pt.x), std::round(pt.y)); }; auto xs = sc.make_hot_observable({ p_on.next(300, 0.25), p_on.next(400, 0.5), p_on.next(500, 0.75), p_on.next(600, 0.0), p_on.completed(700) }); orbit_radius = 50; auto res = w.start( [&]() { return orbitPointsFromTimeInPeriod(xs).map(roundedPt); }); auto required = rxu::to_vector({ pt_on.next(300, ofPoint(0, 50)), pt_on.next(400, ofPoint(-50, 0)), pt_on.next(500, ofPoint(0, -50)), pt_on.next(600, ofPoint( 50, 0)), pt_on.completed(700) }); auto actual = res.get_observer().messages();
Library Credit: philsquared/Catch
SCENARIO("take 2 - fails", "[take][fails][operators]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messageson; auto xs = sc.make_hot_observable({ on.next(150, 1), on.next(210, 2), on.next(220, 3), on.next(230, 4), on.next(240, 5), on.completed(250) }); WHEN("2 values are taken"){ auto res = w.start( [xs]() { return xs.skip(2).as_dynamic(); }); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(210, 2), on.next(220, 3), on.completed(220) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } // . . . }}}
nexts.setup(next). map(send_urls). merge(). group_by( [=](const next_url& nu) { // round-robin requests across multiple threads return int(nu.first % thread_count); }, [](const next_url& nu) {return nu;}). map( [=](const rxcpp::grouped_observable<int, next_url>& urls){ auto producerthread = rxcpp::observe_on_one_worker( rxcpp::observe_on_new_thread(). create_coordinator(). get_scheduler()); return http_get_image( producerthread, urls.get_key(), urls, halts); }). merge(). subscribe();
rxcpp::observable<http_response_image> http_get_image( rxcpp::observe_on_one_worker producer, int key, const rxcpp::observable<next_url>& urls, const rxcpp::observable<int> stops){ return urls. map( [=](const next_url& url){ return make_http_request(producer, key, url, stops); }). #if 0 // abort old request and start new request immediately switch_on_next(). #else // hold on to new requests until the previous have finished. concat(). #endif map( [=](http_response_image progress){ return update_ui(key, progress); }); } }
rxcpp::observable<http_response_image> make_http_request(. . .){ ++queued; // ofx tracing hud does not support multiple threads yet trace_off(); return http.get(url.second). subscribe_on(producer). map(http_progress_image). merge(). observe_on(ofxRx::observe_on_update()). lift<http_response_image>( [=](rxcpp::subscriber<http_response_image> out){ return error_display(key, out); }). finally( [=](){ if (--queued == 0) {trace_on();} avg[key] = (progress_labels[key].first + avg[key]) / 2; }). retry(). take_until(stops); }
rxcpp::observable<std::shared_ptr<ofPixels>> http_image(const ofxRx::HttpProgress& hp) { return hp. body(). scan( std::make_shared<ofBuffer>(), [](std::shared_ptr<ofBuffer> acc, ofxRx::BufferRef<char> b){ acc->append(b.begin(), b.size()); return acc; }). last(). // got all the data, do heavy lifting on the background thread map(image_from_buffer); }
rxcpp::observable<next_url> send_urls(int) { static int count = 0; // adds the image url multiple times (20) // one url is added every 200 milliseconds return rxcpp::observable<>:: interval( ofxRx::observe_on_update().now(), std::chrono::milliseconds(200), ofxRx::observe_on_update()). take(20). map( [=](long){ return next_url( count++, "http://. . ./poster_rodents_small.jpg"); }); }
rxcpp::observable<http_response_image> http_progress_image(const ofxRx::HttpProgress& hp) { return http_progress(hp). combine_latest( http_image(hp). start_with(std::shared_ptr<ofPixels>())); }
rxcpp::observable<int> http_progress(const ofxRx::HttpProgress& hp) { return hp. response(). map(http_status_to_error). map( [](ofx::HTTP::ClientResponseProgressArgs rp){ return int(rp.getProgress() * 100); }). distinct_until_changed(); }
subscriber<T>
subscriber<T> make_subscriber<T>( composite_subscription lifetime, OnNext /*void(T)*/, OnError /*void(std::exception_ptr)*/, OnCompleted /*void()*/); // observer<T> void subscriber<T>::on_next(T); void subscriber<T>::on_error(std::exception_ptr); void subscriber<T>::on_completed(); // composite_subscription bool subscriber<T>::is_subscribed(); void subscriber<T>::unsubscribe(); weak_subscription subscriber<T>::add( Subscription /*void()*/); void subscriber<T>::remove(weak_subscription);
Coordination
rxsc::scheduler::clock_type::time_point identity_one_worker::now(); coordinatoridentity_one_worker::create_coordinator(composite_subscription);
coordinator<Coordinator>
rxsc::scheduler::clock_type::time_point coordinator<Coordinator>::now();
scheduler and worker
// scheduler std::chrono::steady_clock scheduler::clock_type; clock_type::time_point scheduler::now(); worker scheduler::create_worker( composite_subscription lifetime /*cancel all actions*/); // worker clock_type::time_point worker::now(); void worker::schedule( time_point when, composite_subscription lifetime, // cancel Action Action /*void()*/); void worker::schedule_periodically( time_point first, duration interval, composite_subscription lifetime, // cancel Action Action /*void()*/);
returns the current time
each worker represents the
Represenation: [type] value;
Algorithms:
operators like +, -, *, /, &, &&, |, ||, ~, ?:
and statements like switch, if
Represenation: iterator begin, iterator end
Algorithms: std::copy_if, std::transform, std::accumulate, std::rotate, std::sort, std::find, std::search
These do not compose well because they use two values to represent a sequence. Thus..
Ranges are under active development. There is more than one representation.
Represenation: range values;
Algorithms: std::copy_if, std::transform, std::accumulate, std::rotate, std::sort, std::find, std::search
Represenation: [integral] start, [integral] end
Algorithms: parallel_for, parallel_transform, parallel_reduce, parallel_sort
Represenation:
std::promise<T> produces;
std::future<T> consumes;
Proposed Algorithms: then, when_all, when_any, map, bind, catch_error
This is under active development. This representation is the one that this presentation will cover.
Represenation:
rxcpp::observable<T> produces;
rxcpp::subscriber<T> consumes;
Algorithms: filter, map, reduce, group_by, flat_map, concat, merge, skip, take, buffer, window, combine_latest
async does not block the calling stack
take and skip are the same, while map and filter are different