Monday, July 25, 2011

Receive Loops

The usual way to implement an actor is a list of patterns (an expected message format and a corresponding callback) in a loop. A naive approach might look like this:

for (;;)
{
  receive
  (
    on<int>() >> [](int value) { ... },
    on<double>() >> [](double value) { ... },
    ...
  );
}

That's a naive approach, because you define your pattern inside of the loop. Instead of re-using the pattern, you create new objects in each iteration.

This example re-uses the pattern, but the code is more verbose and breaks locality; the pattern is no longer part of the loop and you have to scroll up to see where the pattern is defined.

auto pattern =
(
  on<int>() >> [](int value) { ... },
  on<double>() >> [](double value) { ... },
  ...
);
for (;;)
{
  receive(pattern);
}

Note the syntax auto pattern = ()! Your compiler won't accept the code if you miss the brackets, because he's thinking the comma separates two declarations.

To provide a more elegant, and less error-prone, way to express loops, there are three "predefined" loops in libcppa:


  1. receive_loop([PATTERN]);
     
  2. receive_while([LAMBDA -> bool])([PATTERN]);
     
  3. do_receive([PATTERN]).until([LAMBDA -> bool]);


The first one loops forever. The other two use lambda expressions for the loop condition. C++ doesn't have a nicer way to pass a block of code (or even one simple statement) to a function, therefore it's more verbose than the built-in while() loop (yes, Scala does a much better job here...).

Anyway, this is a short but complete example program that uses two of the receive loops:

#include <iostream>
#include "cppa/cppa.hpp"

using std::cout;
using std::endl;
using namespace cppa;

void ping()
{
  receive_loop
  (
    on<atom("Pong"), int>() >> [](int value)
    {
      cout << "ping received { Pong, " << value << " }" << endl;
      reply(atom("Ping"), value + 1);
    }
  );
}

void pong(actor_ptr ping_actor)
{
  link(ping_actor);
  send(ping_actor, atom("Pong"), 0);
  int last_ping = 0;
  receive_while([&]() { return last_ping < 9; })
  (
    on<atom("Ping"), int>() >> [&](int value)
    {
      cout << "pong received { Ping, " << value << " }" << endl;
      last_ping = value;
      reply(atom("Pong"), value + 1);
    }
  );
  // non-normal exit reason forces ping_actor to quit
  quit(exit_reason::user_defined);
}

int main()
{
  spawn(pong, spawn(ping));
  await_all_others_done();
  return 0;
}
And this is the output of the example program:
ping received { Pong, 0 }
pong received { Ping, 1 }
ping received { Pong, 2 }
pong received { Ping, 3 }
ping received { Pong, 4 }
pong received { Ping, 5 }
ping received { Pong, 6 }
pong received { Ping, 7 }
ping received { Pong, 8 }
pong received { Ping, 9 }
ping received { Pong, 10 }