If you're interested in writing servers in Rust, Tokio is a project that should definitely be on your radar. It promises to leverage the exciting new work on futures in Rust to make it easy to build extremely efficient networked servers with natural, composable code.
It's also a project that can be hard to wrap your head around when you first encounter it. In this post, I'll attempt to illustrate how the various pieces of Tokio fit together by building a very simple server on top of a very simple protocol.
As a disclaimer before we get any further, I want to emphasize that I am very much a beginner when it comes to Tokio and much of what follows is, at best, woefully incomplete. Also, the project has not reached a stable version at the time of this writing, so it's possible that these APIs will change.
Update: In the time since I originally wrote this, the 0.1 releases of all the tokio crates have been published and the APIs have stabilized a bit. I've update the code here to match the released APIs and added a section on the new tokio-io crate.
With that out of the way, what is Tokio? The project's most direct inspiration was Scala's Finagle. Unlike the Scala and the JVM, however, Rust didn't have mature asynchronous I/O libraries on which to build. As a result, Tokio ends up being more like Finagle + Netty than Finagle alone.
Coming from the Ruby world, I've found that Rack is also a useful point of comparison when trying to understand Tokio. Like Rack, Tokio provides two major benefits for application developers looking to write servers:
As you might expect from projects in languages as different as Ruby and Rust, the similarities pretty much end there. The list of differences is long, but the two most important are that Tokio is not specific to HTTP and that it derives a great deal of power from the Rust type system in a way that Rack obviously cannot.
Before we dive into any code, I'll try to give a brief overview of how the Tokio project is organized and how the pieces fit together. Layering is a very important concept to Tokio and that's represented in the organization of the project.
To start, there are two projects outside of Tokio itself that help put it in
context. The first is mio and the second is futures. I won't cover
these in detail, but a short summary is that mio provides a thin wrapper over OS
mechanisms for asynchronous I/O (e.g. epoll
), and futures provides
an efficient, composable interface for asynchronous tasks (think Result
, but
async).
The base layer of Tokio, tokio-core, is an integration of mio with futures.
This sounds pretty obvious at a high level, but it makes a huge difference in
usability when compared with mio itself. It gives you things like your normal
TcpListener
and UdpSocket
, but asynchronous, as well as a reactor (i.e.
event loop) on which to run them.
Next, there's tokio-io. This crate was introduced as an extraction from tokio-core and includes some fundamental traits encapsulating async I/O. These traits express the semantics of raw byte streams and the logic of turning them into discrete "frames", or meaningful chunks of bytes. An implementation would allow you start dealing with things like HTTP header frames or Websocket data frames instead of a stream of raw bytes. It also allows implementers of these traits to avoid a hard dependency on a specific runtime like tokio-core.
Going up a layer, we find tokio-proto. The point of this crate is to help you turn those frames from tokio-core into full requests and responses for whatever network protocol you're using (e.g. HTTP). It has all kinds of tools to help protocol implementers handle pipelining, multiplexing, streaming, etc.
Finally, at the top of the stack and of the most interest to application
developers, there is tokio-service. This is a very small crate and exists pretty
much just to define the Service
trait:
trait Service {
type Request;
type Response;
type Error;
type Future: Future<Item=Self::Response, Error=Self::Error>;
fn call(&self, req: Self::Request) -> Self::Future;
}
In English, this means you implement a service by defining what you accept, what you return, how you can fail, and a function that contains the logic. If you get rid of the types, this should look a lot like Rack. Just like with Rack, it's easy to imagine a service that wraps another service, transforming the input and output arbitrarily.
If you're an application developer, this is really the only layer you'll need to care about. The idea is that the community will provide high-quality implementations of common protocols and you'll be free to layer services on top while completely forgetting that tokio-core, tokio-io, and tokio-proto even exist.
As a learning exercise, we'll now implement a server from the ground (i.e.
tokio-core) up, based on a simple line protocol. When we're done, we should be
able to connect with nc
and exchange some simple requests and responses.
To get started, we'll mostly be following the example from the tokio-proto docs. The basic steps are:
ServerProto
for our codecService
that can run on top of our serverTcpServer
to run our serviceSince we don't need to keep track of any state, we'll start with a unit struct:
pub struct LineCodec;
Then we'll implement the Encoder
and Decoder
traits for our struct, which
require us to define the types of our input and output frames as well as
functions for encoding and decoding them. In our case, we'll be dealing with
newline-delimited strings going in each direction. Luckily, the tokio-line
project was recently updated and has some good example code we can borrow:
// Borrowed from tokio-line with slight modifications
impl Decoder for LineCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
// If our buffer contains a newline...
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
// remove this line and the newline from the buffer.
let line = buf.split_to(n);
buf.split_to(1); // Also remove the '\n'.
// Turn this data into a UTF-8 string and return it
return match str::from_utf8(line.as_ref()) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
}
}
// Otherwise, we don't have enough data for a full message yet
Ok(None)
}
}
impl Encoder for LineCodec {
type Item = String;
type Error = io::Error;
fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
for byte in msg.as_bytes() {
buf.put_u8(*byte);
}
buf.put_u8(b'\n');
Ok(())
}
}
That's a lot to digest, so don't worry if the details aren't perfectly clear. The important part is that we declare that we're dealing with strings going in and out, and we provide functions to read and write those strings with newline delimiters.
Next, we define another unit struct for our ServerProto
. There's no real logic
here; we're just declaring how different pieces fit together:
struct LineServerProto;
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for LineServerProto {
type Request = String;
type Response = String;
type Transport = Framed<T, LineCodec>;
type BindTransport = Result<Self::Transport, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(LineCodec))
}
}
The first thing to notice is that we're generic over any type T
that
implements the AsyncRead
and AsyncWrite
traits from tokio-io. This just
means that our server can run on top of any I/O-like object (i.e. anything that
lets it read and write bytes). This object is then combined with our codec into
a Framed
object, also from tokio-io.
The idea here is that the decode
function of our codec knows how to "chunk"
the bytes from our I/O object into a stream of discrete messages (i.e. frames).
In the other direction, encode
takes outgoing messages and writes them out as
a stream of bytes. This allows our higher level services to deal only with full
messages.
At this point, we have almost everything we need to instantiate a tokio-proto
TcpServer
and begin serving requests. The only thing missing is an actual
Service
to run on top of our protocol:
struct HelloWorldService;
impl Service for HelloWorldService {
type Request = String;
type Response = String;
type Error = io::Error;
type Future = FutureResult<Self::Response, Self::Error>;
fn call(&self, req: String) -> Self::Future {
if req.contains('\n') {
futures::failed(io::Error::new(io::ErrorKind::InvalidInput, "message contained new line"))
} else {
let resp = match req.as_str() {
"hello" => "world".to_string(),
_ => "idk".to_string(),
};
futures::finished(resp)
}
}
}
The payoff here is that we get to write a relatively vanilla Rust function that operates on arbitrary Rust types, using futures but not caring at all about things like TCP or serialization formats.
To actually run this thing, all that's left is to feed it to a TcpServer
along
with our protocol and an address to bind to:
fn main() {
let addr = "0.0.0.0:12345".parse().unwrap();
TcpServer::new(LineServerProto, addr)
.serve(|| Ok(HelloWorldService));
}
Then we can connect and take it for a spin:
$ nc localhost 12345
hello
world
what?
idk
^C
And it seems to work! You can get the full code here if you want to try it out yourself.
Hopefully that was an informative high-level tour of Tokio, and hopefully I didn't get anything too terribly wrong. You could take this example a lot further, layering middleware for timeouts or request parsing, exploring the client side of what tokio-proto provides, etc, etc, but I think this post is long enough as it is.