Getting Started with Tokio

2016-12-23

If you're interested in writing servers in Rust, Tokio is a project that should definitely be on your radar. It promises to leverage the exciting new work on futures in Rust to make it easy to build extremely efficient networked servers with natural, composable code.

It's also a project that can be hard to wrap your head around when you first encounter it. In this post, I'll attempt to illustrate how the various pieces of Tokio fit together by building a very simple server on top of a very simple protocol.

As a disclaimer before we get any further, I want to emphasize that I am very much a beginner when it comes to Tokio and much of what follows is, at best, woefully incomplete. Also, the project has not reached a stable version at the time of this writing, so it's possible that these APIs will change.

Introduction

With that out of the way, what is Tokio? The project's most direct inspiration was Scala's Finagle. Unlike the Scala and the JVM, however, Rust didn't have mature asynchronous I/O libraries on which to build. As a result, Tokio ends up being more like Finagle + Netty than Finagle alone.

Coming from the Ruby world, I've found that Rack is also a useful point of comparison when trying to understand Tokio. Like Rack, Tokio provides two major benefits for application developers looking to write servers:

  1. A simple, standard interface for writing services
  2. A design encouraging the creation of generic, reusable middleware

As you might expect from projects in languages as different as Ruby and Rust, the similarities pretty much end there. The list of differences is long, but the two most important are that Tokio is not specific to HTTP and that it derives a great deal of power from the Rust type system in a way that Rack obviously cannot.

Before we dive into any code, I'll try to give a brief overview of how the Tokio project is organized and how the pieces fit together. Layering is a very important concept to Tokio and that's represented in the organization of the project.

To start, there are two projects outside of Tokio itself that help put it in context. The first is mio and the second is futures. I won't cover these in detail, but a short summary is that mio provides a thin wrapper over OS mechanisms for asynchronous I/O (e.g. epoll), and futures provides an efficient, composable interface for asynchronous tasks (think Result, but async).

The base layer of Tokio, tokio-core, has two primary functions. First, it's an integration of mio with futures. This sounds pretty obvious at a high level, but it makes a huge difference in usability when compared with mio itself. It gives you things like your normal TcpListener and UdpSocket, but asynchronous, as well as a reactor (i.e. event loop) on which to run them. Second, it gives you tools to take a raw byte stream from those elements and turn it into discrete "frames", or meaningful chunks of bytes. This means that instead of reasoning about an infinite stream of bytes, you can start thinking about things like an HTTP header frame or a Websocket data frame.

Going up a layer, we find tokio-proto. The point of this crate is to help you turn those frames from tokio-core into full requests and responses for whatever network protocol you're using (e.g. HTTP). It has all kinds of tools to help protocol implementers handle pipelining, multiplexing, streaming, etc.

Finally, at the top of the stack and of the most interest to application developers, there is tokio-service. This is a very small crate and exists pretty much just to define the Service trait:

trait Service {
    type Request;
    type Response;
    type Error;
    type Future: Future<Item=Self::Response, Error=Self::Error>;
    fn call(&mut self, req: Self::Request) -> Self::Future;
}

In English, this means you implement a service by defining what you accept, what you return, how you can fail, and a function that contains the logic. If you get rid of the types, this should look a lot like Rack. Just like with Rack, it's easy to imagine a service that wraps another service, transforming the input and output arbitrarily.

If you're an application developer, this is really the only layer you'll need to care about. The idea is that the community will provide high-quality implementations of common protocols and you'll be free to layer services on top while completely forgetting that tokio-core and tokio-proto even exist.

Building an example server

As a learning exercise, we'll now implement a server from the ground (i.e. tokio-core) up, based on a simple line protocol. When we're done, we should be able to connect with nc and exchange some simple requests and responses.

To get started, we'll mostly be following the example from the tokio-proto docs. The basic steps are:

  1. Implement a Codec for our protocol
  2. Implement a ServerProto for our codec
  3. Implement a Service that can run on top of our server
  4. Use the provided TcpServer to run our service

Implementing the codec

Since we don't need to keep track of any state, we'll start with a unit struct:

pub struct LineCodec;

Then we'll implement the Codec trait for our struct, which requires us to define the types of our input and output frames as well as functions for encoding and decoding them. In our case, we'll be dealing with newline-delimited strings going in each direction. Luckily, the tokio-line project was recently updated and has some good example code we can borrow:

// Borrowed from tokio-line with slight modifications
impl Codec for LineCodec {
    type In = String;
    type Out = String;

    fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error> {
        // If our buffer contains a newline...
        if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
            // remove this line and the newline from the buffer.
            let line = buf.drain_to(n);
            buf.drain_to(1); // Also remove the '\n'.

            // Turn this data into a UTF-8 string and return it
            return match str::from_utf8(line.as_ref()) {
                Ok(s) => Ok(Some(s.to_string())),
                Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
            }
        }

        // Otherwise, we don't have enough data for a full message yet
        Ok(None)
    }

    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
        for byte in msg.as_bytes() {
            buf.push(*byte);
        }

        buf.push(b'\n');
        Ok(())
    }
}

That's a lot to digest, so don't worry if the details aren't perfectly clear. The important part is that we declare that we're dealing with strings going in and out, and we provide functions to read and write those strings with newline delimiters.

Implementing the server protocol

Next, we define another unit struct for our ServerProto. There's no real logic here; we're just declaring how different pieces fit together:

struct LineServerProto;

impl<T: Io + 'static> ServerProto<T> for LineServerProto {
    type Request = String;
    type Response = String;
    type Error = io::Error;
    type Transport = Framed<T, LineCodec>;
    type BindTransport = Result<Self::Transport, io::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(LineCodec))
    }
}

The first thing to notice is that we're generic over any type T that implements the Io trait from tokio-core. This just means that our server can run on top of any I/O-like object (i.e. anything that lets it read and write bytes). This Io object is then combined with our codec into a Framed object, also from tokio-core. The idea here is that given a source of bytes (Io) and a way of chunking bytes into discrete messages (our LineCodec), the Framed object can transform the byte stream into a stream of incoming messages. In the other direction, it also has everything it needs to take outgoing messages and write them out as a byte stream.

Implementing the service

At this point, we have almost everything we need to instantiate a tokio-proto TcpServer and begin serving requests. The only thing missing is an actual Service to run on top of our protocol:

struct HelloWorldService;

impl Service for HelloWorldService {
    type Request = String;
    type Response = String;
    type Error = io::Error;
    type Future = FutureResult<Self::Response, Self::Error>;

    fn call(&mut self, req: Self::Request) -> Self::Future {
        if req.contains('\n') {
            futures::failed(io::Error::new(io::ErrorKind::InvalidInput, "message contained new line"))
        } else {
            let resp = match req.as_str() {
                "hello" => "world".to_string(),
                _ => "idk".to_string(),
            };
            futures::finished(resp)
        }
    }
}

The payoff here is that we get to write a relatively vanilla Rust function that operates on arbitrary Rust types, using futures but not caring at all about things like TCP or serialization formats.

Running the server

To actually run this thing, all that's left is to feed it to a TcpServer along with our protocol and an address to bind to:

fn main() {
    let addr = "0.0.0.0:12345".parse().unwrap();
    TcpServer::new(LineServerProto, addr)
        .serve(|| Ok(HelloWorldService));
}

Then we can connect and take it for a spin:

$ nc localhost 12345
hello
world
what?
idk
^C

And it seems to work! You can get the full code here if you want to try it out yourself.

Hopefully that was an informative high-level tour of Tokio, and hopefully I didn't get anything too terribly wrong. You could take this example a lot further, layering middleware for timeouts or request parsing, exploring the client side of what tokio-proto provides, etc, etc, but I think this post is long enough as it is.