async_generator<T> implements a new AsyncRange Concept. begin() and ++iterator return an awaitable type that produces an iterator later, while end() returns an iterator immediately. A new set of algorithms is needed and a new async-range-for has been added that inserts await into i = await begin() and await ++i. A function that returns async_generator<T> is allowed to use await and yield_value.

// asyncop.cpp : Defines the entry point for the console application.
//

#include <iostream>
#include <future>
#include <sstream>

#include <chrono>
namespace t = std::chrono;
using clk = t::system_clock;
using namespace std::chrono_literals;

#include <experimental/resumable>
#include <experimental/generator>
namespace ex = std::experimental;

#include "../async_generator.h"
#include "../async_schedulers.h"
namespace as = async::scheduler;
#include "../async_operators.h"
namespace ao = async::operators;

template<class Work, typename U = decltype(std::declval<Work>()(0))>
async::async_generator<U> schedule_periodically(
    clk::time_point initial,
    clk::duration period,
    Work work) {
    int64_t tick = 0;
    auto what = [&tick, &work]() {
        return work(tick);
    };
    for (;;) {
        auto when = initial + (period * tick);
        auto result = __await as::schedule(when, what);
        __yield_value result;
        ++tick;
    }
}

template<class... T>
void outln(T... t) {
    std::cout << std::this_thread::get_id();
    int seq[] = {(std::cout << t, 0)...};
    std::cout << std::endl;
}

std::future<void> async_test() {
    auto start = clk::now();
    for __await(auto&& rt :
        schedule_periodically(start + 1s, 1s,
            [](int64_t tick) {return tick; }) ) {
        outln(" for await - ", rt);
        if (rt == 5) break;
    }
}

int wmain() {
    try {
        outln(" wmain start");
        auto done = async_test();
        outln(" wmain wait..");
        done.get();
        outln(" wmain done");
    }
    catch (const std::exception& e) {
        outln(" exception ", e.what());
    }
}

schedule_periodically() uses the awaitable schedule() explored in await the future and await + yield_value to implement a stable periodic tick.

async_test() uses for await to respond to the ticks produced by schedule_periodically().

This simple code results in a monster diagram! A digression is in order.

digression

The github history has a lot of deleted code from all my attempts to get a working async_generator. I started with the simple await examples and made small modifications. Then I wrote a very complex implementation of schedule_periodically.

template<class Work>
auto schedule_periodically(std::chrono::system_clock::time_point initial, std::chrono::system_clock::duration period, Work work) {
    struct async_schedule_periodically
    {
        static void CALLBACK TimerCallback(PTP_CALLBACK_INSTANCE, void* Context, PTP_TIMER) {
            auto that = reinterpret_cast<async_schedule_periodically*>(Context);
            auto result = that->work(that->tick++);
            that->current_value = &result;
            that->waiter();
        }
        static void Next(async_schedule_periodically** ppParent, ex::resumable_handle<> cb) {
            auto that = *ppParent;

            that->waiter = cb;
            that->ppParent_ = ppParent;

            auto at = that->initial + (that->period * that->tick);
            auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(at - std::chrono::system_clock::now());
            int64_t relative_count = -(duration.count() / 100);

            that->timer = CreateThreadpoolTimer(TimerCallback, that, nullptr);
            if (that->timer == 0) throw std::system_error(GetLastError(), std::system_category());
            SetThreadpoolTimer(that->timer, (PFILETIME)&relative_count, 0, 0);
        }
        PTP_TIMER timer = nullptr;
        ex::resumable_handle<> waiter;
        int64_t tick = 0;
        async_schedule_periodically ** ppParent_;

        std::chrono::system_clock::time_point initial;
        std::chrono::system_clock::duration period;
        Work work;

        typedef decltype((*(Work*)nullptr)(int64_t{})) value_type;
        value_type const* current_value;

        struct async_iterator
        {
            async_schedule_periodically * parent_;

            auto operator++() {
                struct awaiter {
                    async_schedule_periodically ** parent_;
                    bool await_ready() {
                        return false;
                    }
                    void await_suspend(ex::resumable_handle<> cb) {
                        Next(parent_, cb);
                    }
                    void await_resume() {
                    }
                    ~awaiter(){
                        if ((*parent_)->timer) CloseThreadpoolTimer((*parent_)->timer);
                        (*parent_)->timer = 0;
                    }
                };
                return awaiter{ &parent_ };
            }

            async_iterator operator++(int) = delete;

            bool operator==(async_iterator const& _Right) const { return parent_ == _Right.parent_; }

            bool operator!=(async_iterator const& _Right) const { return !(*this == _Right); }

            value_type const& operator*() const {
                return *parent_->current_value;
            }

            value_type const* operator->() const { return std::addressof(operator*()); }

        };

        async_schedule_periodically(
            std::chrono::system_clock::time_point i,
            std::chrono::system_clock::duration p,
            Work w)
        : initial(i), period(p), work(std::move(w)), tick(0) {}

        auto begin() {
            struct awaiter {
                async_schedule_periodically * pChannel;

                bool await_ready() { return false; }

                void await_suspend(ex::resumable_handle<> cb) {
                    Next(&pChannel, cb);
                }
                auto await_resume() {
                    return async_iterator{ pChannel };
                }
                ~awaiter(){
                    if (pChannel->timer) CloseThreadpoolTimer(pChannel->timer);
                    pChannel->timer = 0;
                }
            };
            return awaiter{ this };
        }

        async_iterator end() { return{ nullptr }; }
    };

    return async_schedule_periodically{ initial, period, std::move(work) };
}

This worked but the code at the beginning of this post hides so much complexity and, as I will show in a future post, async_generator will allow much more interesting functions than schedule_periodically. It took a long time to merge the generator and this complex schedule_periodically to arrive at async_generator. The key was splitting out async_iterator, yield_to and yield_from. It was much easier to assemble those pieces together than the monolithic code above. Along the way I ran into codegen bugs and lifetime bugs. Gor Nishanov was very kind and after much toil and tribulation I saw the simple code above start working.

But enough degression, on to the diagram.

diagram

I spent a lot of time rearranging and refining this diagram because it contains a lot of information. In the next section I will try to build a story out of this diagram.

Whew! I wrote an instrumented app to generate this diagram so it is pretty accurate. In the process it found a few bugs in the async_generator that are fixed now!

breakdown the diagram

First of all, the diagram is divided into;

  • the left side, which is driven by the compiler
  • the right side, which shows the function calls from the source at the top of this post.

Right in the middle is the promise. The promise provided by the async_generator is the rendezvous point that is used to coordinate the left and right sides. The yield_to and yield_from awaitables are used to ping-pong the execution between the consumer and the producer.

async_iterator, async_generator and promise_type can be reduced to these core functions.

template<typename T, typename Promise>
struct async_iterator : public std::iterator<std::input_iterator_tag, T>
{
    async_iterator(Promise* p) : p(p) {}
    yield_to<T, Promise> operator++() {return{ p };}
. . .
    T const& operator*() const
    {
        if (p->Error) {std::rethrow_exception(p->Error);}
        return *p->CurrentValue;
    }
};

template<typename T, typename Alloc = std::allocator<char>>
struct async_generator
{
    struct promise_type {
        ex::resumable_handle<> To{ nullptr };
        ex::resumable_handle<> From{ nullptr };
    . . .
        promise_type& get_return_object() {return *this;}
        ex::suspend_always initial_suspend() {return{};}
        ex::suspend_always final_suspend() {return{};}
        bool cancellation_requested() const {return done;}
        void set_result() {done = true;}
        void set_exception(std::exception_ptr Exc) {Error = std::move(Exc);}
        yield_from<T, promise_type> yield_value(T Value)
        {
            CurrentValue = std::addressof(Value);
            return{ this };
        }
    };
. . .
    yield_to<T, promise_type> begin() {
        Coro();
        return{ std::addressof(Coro.promise()) };
    }
};

yield_to is returned by async_generator::begin() and async_iterator::operator++(). When suspended yield_to will save the consumer’s resumption point in the promise and resume the producer if it has been saved in the promise. When resumed yield_to will return a new async_iterator for the produced value.

template<typename T, typename Promise>
struct yield_to
{
    yield_to(Promise* p) : p(p) {}
    bool await_ready() noexcept {return false;}
    void await_suspend(ex::resumable_handle<> r) noexcept
    {
        p->To = r;
        if (p->From) {
            ex::resumable_handle<> coro{ p->From };
            p->From = nullptr;
            coro();
        }
    }
    async_iterator<T, Promise> await_resume() noexcept {return{ p };}
    Promise* p = nullptr;
};

yield_from is returned by promise::yield_value() (this is called by the compiler when the producer uses the ‘__yield_value expression’). When suspended yield_from will save the producer’s resumption point in the promise and resume the consumer if it has been saved in the promise.

template<typename T, typename Promise>
struct yield_from
{
    yield_from(Promise* p) : p(p) {}
    bool await_ready() noexcept {return false;}
    void await_suspend(ex::resumable_handle<> r) noexcept
    {
        p->From = r;
        if (p->To) {
            ex::resumable_handle<> coro{ p->To };
            p->To = nullptr;
            coro();
        }
    }
    void await_resume() noexcept {}
    Promise* p = nullptr;
};

the blue and green arrows in the diagram show where each saved resumption point is called to achieve the interleaved execution of the consumer and producer.

read the diagram

Time goes from top to bottom.

Start in the top-right side at the caller column. This is wmain calling into async_test. Reading the right side from top to bottom shows the progression through the consumer loop over the async_generator that is found in the async_test function and the producer loop that is found in schedule_periodically.

The ‘wait for completion’ box on the right side calls out where async_test returns (as soon as the os timer is scheduled) the future and the caller waits on it.

On the left side the vertical lines display lifetimes of the objects and show all the work that the compiler is doing to satisfy the async_test consumer. Starting from the top again, there are boxes for the ‘initial’ setup of async_test, ‘iteration 0’ and ‘iteration 1..N’ and then ‘final’ where async_test completes and exits.

output

3316 wmain start
3316 wmain wait..
8240 for await - 0
8240 for await - 1
8240 for await - 2
8240 for await - 3
8240 for await - 4
8240 for await - 5
3316 wmain done

teaser

just a little something from the future..

// also known as copy_if
template<typename T, typename P>
async_generator<T> filter(async_generator<T> s, P p) {
    for __await(auto&& v : s) {
        if (p(v)) {
            __yield_value v;
        }
    }
}

try it out

After installing Visual Studio 2015 Preview clone await and open the await solution. This code is in the asyncop project in the solution.