March 29, 2017

Building Streaming APIs in Rust

I’ve been working recently on Backtalk, a little toy Rust framework for building streaming APIs. Let’s say you’re building a hot new tech startup — Facebook for Cats. You’re going to need a web server to track all these cats. Let’s build one quickly using Backtalk. If you want to follow along, just clone Backtalk with git clone https://github.com/lord/backtalk.git, and run cargo run --example <example number>. Examples are numbered 1–7. You can alternatively add

[dependencies]
backtalk = "0.1.0"

to your Cargo.toml if you’re working in your own project.

Getting a resource set up

extern crate backtalk;
use backtalk::*;

fn main() {
  let mut server = Server::new();
  // code will be added here
  server.listen("127.0.0.1:3000");
}

If we visit 127.0.0.1:3000, we’ll see a 404 error page. We haven’t added any resources to our API! Let’s add one now.

server.resource("/cats", move |req: Request| {
  Error::server_error("we haven't implemented this yet!")
});

We’ve just mounted a resource at /cats. A resource is just a function that converts a Request object into a either a Reply (on success) or Error object. If we visit 127.0.0.1:3000/cats, you can see our error appear! Exciting. You’ll also find that if you visit 127.0.0.1:3000/cats/123, you’ll also see our error. The server.resource function mounts our resource at a number of HTTP routes:

  • GET /cats produces a List request
  • GET /cats/123 produces a Get request
  • POST /cats produces a Post request
  • PATCH /cats/123 produces a Patch request
  • POST /cats/123/meow produces a Action("meow") request
  • DELETE /cats/123 produces a Delete request

Any of these routes will call our resource function. You can try these out with curl — for instance, if we run curl -X DELETE /cats/123, we should see our familiar “we haven’t implemented this yet” error message.

Database adapters

Unfortunately, servers that just return errors are not very useful. Let’s hook up our resource to an actual database.

let database = memory::MemoryChannel::new();
server.resource("/cats", move |req: Request| {
  database.handle(req)
});

Welp, that was pretty easy. Our database (in this case, an in-memory store for development) has a handle function that converts a Request into a Response , so all we need to do is return the resulting response and be done. We can test out our resource with curl:

$ curl -X POST -d '{"name": "Fluffums"}' http://localhost:3000/cats
{"id":"1","name":"Fluffums"}
$ curl http://localhost:3000/cats/1
{"id":"1","name":"Fluffums"}
$ curl -X PATCH -d '{"name": "Fluffums Jr."}' http://localhost:3000/cats/1
{"id":"1","name":"Fluffums Jr."}
$ curl -X DELETE http://localhost:3000/cats/1
{"id":"1"}

Oh wait, it’s asynchronous

So I lied earlier when I said database.handle(req) returns a Response object. If we actually look at the type signature of the handle function, it looks like this:

fn handle(&self, req: Request) -> BoxFuture<Reply, Error>

We accept a Request, but instead of a simple Result<Reply, Error> object, we’re returning this crazy thing called a BoxFuture. This means our server is actually asynchronous — instead of database.handle blocking until a response from the database comes back, it can return a BoxFuture immediately, which is a sort of placeholder for the response that will exist at some later point. If you’ve used promises, in Javascript, these are the same thing. The server recognizes these Future responses, and knows to not send the HTTP response until the BoxFuture resolves with the information from the database. While it waits, unlike a synchronous server, it can take in and process more requests, making it very efficient for web servers that spend a lot of time waiting on things like databases or other third-party servers to reply.

However, let’s say we wanted to do some additional processing on a future value. Since the response from handle doesn’t actually have the data yet, we can’t just peek inside the future to see the data. You may be familiar with Result‘s and_then function — if the Result is Ok , it maps the Ok value to a new value with a closure. If the Result was Err , it leaves it alone. A Future has the same and_then function, but since the value isn’t known yet, the closure gets run at some point in the future, instead of immediately.

We can use this and_then function on our database to transform the resulting data. If the database returns data, our closure will add the “example” key to the resulting JSON, and then return the modified reply. If the database returns an error, and_then won’t even call our closure.

server.resource("/cats", move |req: Request| {
  database.handle(req).and_then(|mut reply| {
    {
        let mut data = reply.data_mut().unwrap();
        data.insert("example".to_string(), json!("data"));
    }
    reply
  })
});

This closure works after the database handler is called, so the actual database never sees this example data.

Sometimes you need an Arc

Request objects in Backtalk have a convenience function and_then that works just like the and_then on a future. This lets you take code that looks like this:

doSomething(request)
  .and_then(|req| db.handle(req))
  .and_then(|reply| doSomethingElse(reply))

and convert it to something that looks like this:

request
  .and_then(|req| doSomething(req))
  .and_then(|req| db.handle(req))
  .and_then(|reply| doSomethingElse(reply))

which is much prettier and intuitive, if you ask me. Let’s convert our existing code, both to show the syntax, and to prove a point about futures.

let database = memory::MemoryAdapter::new();
server.resource("/cats", move |req: Request| {
  req
    .and_then(|req| {
      database.handle(req)
    })
});

If you try to run the code above, you’ll notice we get an error.

  --> examples/5.rs:14:9
   |
13 |       .and_then(|req| {
   |                 ----- capture occurs here
14 |         database.handle(req)
   |         ^^^^^^^^ does not live long enough

Rust’s error messages for closures can still be a bit confusing, but I think the concept of the issue is pretty intuitive. The closure we pass to resource is a move closure, so the database value is owned by this closure. However, the closure we pass to and_then will be run at some indeterminate time in the future, and is owned by the Future. It wouldn’t happen in our specific example, but in an application that creates and deletes servers, it’s possible that the Future, and therefore the and_then closure, could outlive the server! This means when our closure runs, database could not exist, which would be pretty bad. So — how do we make sure our adapter lives long enough? With an Arc !

use std::sync::Arc;
let database = Arc::new(memory::MemoryAdapter::new());
server.resource("/cats", move |req: Request| {
  let database1 = database.clone();
  req
    .and_then(move |req| {
      database1.handle(req)
    })
});

Arc uses reference counting to only delete the adapter when all references to it are gone. When you clone an Arc , you’re actually creating a new reference to the same object, instead of a new copy of the object. We can use an Arc in the same way as our normal adapter, calling .handle directly on it. You’ll find that this code compiles and works perfectly!

Authentication

This server is pretty insecure so far. Right now (if we hosted our server publicly) anybody in the entire world could delete precious kittens from our database. We want to make sure anyone can access and create cats, but only authorized administrators with a super-secure admin password can delete cats. Let’s add an and_then clause before the database handler.

server.resource("/cats", move |req: Request| {
  let database1 = database.clone();
  req
    .and_then(move |req| {
      if req.method() == Method::Delete {
        if let &JsonValue::String(ref password) = req.param("password") {
          if password != "meow" {
            return Error::unauthorized("incorrect password");
          }
        } else {
          return Error::unauthorized("please provide a password");
        }
      }
      req.boxed()
    })
    .and_then(move |req| {
      database1.handle(req)
    })
});

This code should be hopefully somewhat straightforward. Before we pass the request to the database, we check for the correct password, and return an error if the password wasn’t provided or was incorrect. Since the database is called in an and_then , it gets skipped if something returns an Error earlier up in the chain. We return req.boxed() instead of just req — the boxed() converts the request into a BoxFuture , which is compatible with the BoxFuture returned by Error::forbidden . We can test out our super secure system with curl .

$ curl -X DELETE http://localhost:3000/cats/1
{"error":{"message":"please provide a password","type":"authorization"}}
$ curl -X DELETE "http://localhost:3000/cats/1?password=foo"
{"error":{"message":"incorrect password","type":"authorization"}}
$ curl -X DELETE "http://localhost:3000/cats/1?password=meow"
{"id":"1"}

@thegrugq would be proud.

Realtime updates

We now have a pretty good understanding of how to build standard APIs. But what if we wanted to build a realtime stream of newly created cats? No problem. We’re going to use a Channel , which is similar to a Adapter , but instead of producing a static reply from a database, it produces a streaming reply.

let database = Arc::new(memory::MemoryAdapter::new());
let chan = Arc::new(memory::MemoryChannel::new());
server.resource("/cats", move |req: Request| {
  let database1 = database.clone();
  let chan1 = chan.clone();
  req
    .and_then(move |req| {
      match req.method() {
        Method::Listen => chan1.handle(req),
        _ => database1.handle(req),
      }
    })
});

We can run the above code, and run the curl command to get the stream:

$ curl http://127.0.0.1:3000/cats -H 'Accept: text/event-stream'

You’ll see…nothing. curl just hangs. We’re listening for events from our channel, but we aren’t sending any! Let’s fix that.

use std::ops::Deref; // new!!
let database = Arc::new(memory::MemoryAdapter::new());
let chan = Arc::new(memory::MemoryChannel::new());
server.resource("/cats", move |req: Request| {
  let database1 = database.clone();
  let chan1 = chan.clone();
  let chan2 = chan.clone(); // new!!
  req
    .and_then(move |req| {
      match req.method() {
        Method::Listen => chan1.handle(req),
        _ => database1.handle(req),
      }
    })
    // \/ new! \/
    .and_then(move |reply| {
      util::send_from_reply(reply, chan2.deref())
    })
});

send_from_reply automatically sends creations, deletions, and updates as messages down the channel. We could have just as easily implemented our own sending code by replacing this line with chan2.send("event name", JsonObject::new()); return reply .

Anyway, if we now run our curl command:

$ curl http://127.0.0.1:3000/cats -H 'Accept: text/event-stream'

Still hanging! We need to create something to generate an event. With that curl command running, in another terminal window, let’s create something:

$ curl -X POST -d '{"name": "Fluffums"}' http://localhost:3000/cats

If we run back to the other window, we can see our subscription:

$ curl http://127.0.0.1:3000/cats -H 'Accept: text/event-stream'
event:post
data:{"id":"1","name":"Fluffums"}

Success! If we were in a browser, we could subscribe to this stream with EventSource.

Next steps

  • Right now we’re broadcasting all events to all listeners. If we wanted to broadcast events to only specific clients, say, only updating clients for cats they’ve previously created, we’d need to implement our own Channel — the built-in MemoryChannel only supports broadcasting to all clients. If we were running our website across multiple servers with load balancing, we’d also need to implement a custom Channel that could send messages to other servers.
  • Our database is MemoryAdapter, which is a very simple in-memory database that doesn’t save to disk, and is useful just for development. The bad news is if you wanted to connect to a proper database, you’d have to implement your own Adapter. The good news is we should only have to do this once for each kind of database — Adapters can easily be shared in third-party crates!
  • This library is still rough around the edges, and you should probably expect to see the API change a bit. That said, I think the examples above show that it’s starting to become usable!