Event-Driven Flows

2 hours ago 1

This post is in response to two claims about coroutines:

  1. Their reference function parameters may become dangling too easily.
  2. They are indistinguishable from regular functions from the declaration alone.

But rather than talking about coroutines, we will look at event-driven control flows in general. Modelling them is one of the primary motivations (alongside “generators”) for having coroutines. By understanding the need we will be better equipped to understand the tool.

Event-driven flows

A canonical example of an event-driven flow is the handling of signals in C. Signals will be risen at unpredictable points in time, so rather than actively checking for them, we define a callback and associate it with the indicated signal:

signal(SIGINT, on_interrupt);

After having performed this association, we move on to doing other things. It is the implementation (system, or parts of the program we do not write) that will make sure to invoke the callback when the signal is risen (if at all). We can illustrate it with a diagram:

A signal is a special case of an event: something that occurs asynchronously and needs to be handled with a callback (or “handler”, or “delegate”). The same pattern of handling asynchronous events is used in efficient implementations of servers that need to process external requests. A server awaiting requests can be modeled with event handling:

Now, the session itself requires reading the data from a port and then writing data to a port, both of which are performed by the system and finish asynchronously. These also can be modeled by events, rendering the following diagram:

All these functions — on_req, on_read, on_write, finish — constitute what the programmer considers a session. They will be evaluated in order: finish no sooner than on_write, on_write no sooner than on_read, so in a way they are a “sequence” even though they are callbacks in response to asynchronous events. But what is more important, all these operations will have to share state, such as the socket or buffers. So, someone will have to create this state in the first function from the sequence — on_req — and make sure that the state is recycled when the last function in the sequence is finished: either finish or earlier in case of an error.

Lifetime management

As the above diagrams illustrate, the stack-based automatic variables cannot be used to manage the session state, as it spans across different scopes. We will need to do it manually, and although it is not strictly necessary, we will likely end up using shared_ptrs.

An implementation of a server transaction like the one above, using a third-party library for executing event-driven tasks, would look something like this:

void session(Socket sock) { auto s = make_shared<State>(move(sock)); exec.async_read(s->sock, s->buffer, [s](error_code ec, int len) { if (!ec) exec.async_write(s->sock, {s->buffer, len}, [s](error_code ec, int) { if (!ec) finish(s); }); }); }

The above example is based on the Boost.ASIO library, particularly on one demo example. The key things to observe:

  1. Function session() when invoked does almost nothing: it creates the shared state and associates a callback (lambda) with an event “on finished read”. Then it immediately returns to the caller. No reading or writing is performed during this function’s execution!
  2. Nested lambdas indicate that we are dealing with a sequence of callbacks.
  3. We are capturing the shared_ptr to the session state by value, to make sure that it lives as long as the last callback lives.
  4. Function parameter sock is immediately moved to the shared state, so that it can live longer than function session().

If this looks complicated, that’s because it is. Event-driven flows are not an easy thing. The complexity in the code reflects the complexity of the reality.

The contract

Now, lets think for a moment how the above function session() is presented to the users: what do users need to know to be able to use the function correctly. It will likely be something like this:

/** Spawns an event-driven session that will perform: * 1. request and respond to a read event, * 2. request and respond to a write event, * on the provided socket `sock`. * * Postcondition: A read task has been registered in the * execution framework. */ void session(Socket sock);

The point here is that the user that wants to use the function correctly already knows that when they call this function and get the control back, no reading or writing has been performed yet. The user will move on, and destroy all the function argument objects, and will keep in mind that in the future, the necessary reads and writes will be performed. We could call it an “asynchronous function”, meaning that the point of returning control to the caller is not tied to the point of having executed the entire “session” till the end.

Implications

The fact that being an “asynchronous function” is part of the contract has implications on what the implementer of the function can do. First, the reference and “view” function parameters: their semantics are, “I do not need a new object, I will use the object from the caller’s scope”. If you reference them from any of the callbacks, you will almost certainly get an invalid object access: the callback will be called in response to an event that will likely happen way after the caller has finished. In our above example with lambdas, because we are using no default captures, this would be caught by the compiler. The only safe place to use the reference function parameters is in the initial part, before any callback.

Second, one could be tempted to use a more object-oriented design to implement our asynchronous function. That is, have a class Session and member functions read(), write() and finish().

class Session { State _s; public: explicit Session(Socket sock); void operator()() { exec.async_read(_s->sock, _s->buffer, bind(&Session::on_read, this, _1, _2)); } void on_read(error_code ec, int len) { if (!ec) exec.async_write(_s->sock, {_s->buffer, len}, bind(&Session::on_write, this, _1, _2)); } void on_write(error_code ec, int len) { if (!ec) finish(); } void finish(); };

If you do not like std::bind, you could use lambdas instead:

void Session::operator()() { exec.async_read(_s->sock, _s->buffer, [this](error_code ec, int len) { on_read(ec, len); }); } void Session::on_read(error_code ec, int len) { if (!ec) exec.async_write(_s->sock, {_s->buffer, len}, [this](error_code ec, int len) { on_write(ec, len); }); } void Session::on_write(error_code ec, int len) { if (!ec) finish(); }

In either case we have a bug in form of using a dangling reference: we are passing the this pointer to the callback, a pointer to the object that will no longer exist when the callback is invoked. Capturing *this instead would not compile because we would be making a copy of the Session object, which is not copyable.

So unless we want to go back to storing the shared SessionState separately, as in the initial example, we could make the entire object shared, by design. This is what std::enable_shared_from_this is for:

class Session : public enable_shared_from_this<Session> { State _s; public: explicit Session(Socket sock); void operator()() { exec.async_read(_s->sock, _s->buffer, [shared_from_this()](error_code ec, int len) { on_read(ec, len); }); } void on_read(error_code ec, int len) { if (!ec) exec.async_write(_s->sock, {_s->buffer, len}, [shared_from_this()](error_code ec, int len) { on_write(ec, len); }); } void on_write(error_code ec, int len); void finish(); };

The point here is, even for the objects created inside the “asynchronous function” you can easily get into the lifetime issues, and the safe sane bet could be to just go shared_ptr all the way.

In summary, implementing “asynchronous functions” is difficult, and full of lifetime management traps. Whoever needs to write such code is already aware of these pitfalls, and is very cautious about the lifetime of any involved object. And we haven’t said a word about coroutines yet.

Enter the coroutines

Coroutines are a tool for writing the “asynchronous functions”. They take care of most of the lifetime management issues. Our session implemented with coroutines would look as follows:

task<void> session(Socket sock) { Buffer buffer; int len = co_await exec.co_read(sock, buffer); // throws on failure co_await exec.co_read(sock, {s->buffer, len}); finish(); }

Keyword co_await is just a different, clever way of specifying what is the “current” code and what is the callback. This is not a sequential code: we are still associating future events with callbacks. The only thing that changes is that we now have a new scope that starts when the function is invoked and ends when the last callback is finished. The coroutines are all about this new kind of scope, with normal automatic object lifetime and exception propagation properties. (But if you cannot use exceptions, coroutines can offer different means of handling errors.)

(If you prefer learning from videos, I also have talk with a gentle introduction to coroutines.)

The coroutine mechanism allocates memory to store the non-reference function parameters and all the automatic objects, until the last callback has finished. It moves the function parameters into that storage, as the original parameter object will be destroyed as soon as the control is returned to the caller (remember: at that point the coroutine has still a lot of the body to execute).

But other than that, the same rules for “asynchronous functions” apply: if, for some reason, you take an argument by reference, this still means “I do not need a new object, I need to refer to the original in the caller’s scope”. This is a very risky thing to do for an “asynchronous function”, regardless if it is implemented via coroutines or via the manual registration of callbacks.

How does this look from the caller’s perspective. Strictly speaking, to the caller a call to the coroutine is almost indistinguishable from the call to a normal function. But a responsible caller must distinguish regular functions from “asynchronous functions”: those that guarantee their results not when they return to the caller but way later.

Thus, the practically useful statement is: coroutines are almost in indistinguishable from other “asynchronous functions”. And for these two kinds, practically the same constraints and perils apply.

Given that you have to implement “asynchronous functions” one way or another, are coroutines worse in any capacity relative to the alternatives (in spite of many obvious advantages)? There are a couple of nuances.

  1. We said that for non-coroutine implementations you can safely use the reference parameters before the first event-callback association. This is not necessarily the case for coroutines: due to the initial_suspend the coroutine may be configured to return control to the caller before the first instruction in the function body is executed. this means that even the first instruction in the function body may be executed as a callback, when the caller is already dead.
  2. Coroutines may give you a false sense of security. Because they take care of so many things, you may get a false impression that it is impossible for you to run into any lifetime management issues. In the manual lambdas implementation, at least the lambda captures prevented you from accesses through dangling references. This guardrail is gone for coroutines.

In summary, when you consider that coroutines are for implementing “asynchronous functions”, you will conclude that all the gotchas discussed apply to “asynchronous functions” due to their nature. Coroutines do not really add significantly new gotchas. Lifetime management is hard for functions that do not obey the natural scoping rules. Coroutines make this task manageable, but they do not relieve you from the responsibility for doing it right.

Of course, coroutines are not only for implementing “asynchronous functions”, but implementing generators is not that different in nature. In that case a coroutine is still a sequence of callbacks seen as one lexical scope, and here the same issues with reference function parameters apply.

(P.S. Note that Boost.ASIO offers the coroutine interface, as can be seen in this demo example.)

Read Entire Article