Comfy web services in Rust. [Saturday, 2020-04-18]

I think Rust is officially comfy as a language for backend and/or web service development, but upcoming uncomfyness looms ever-present.

Here’s a brief history of how I felt and what I thought writing backend services in Rust (both professionally and not) for the past 5 years.

2015-2016

Nice, Rust is finally stable, I don’t have to fix my code every other day, jolly good show!

Look, you got hyper, you can make HTTP speaking stuff, and if you really feel like you need a web framework you even got Iron for something Rusty and nickel if you prefer express.js style frameworks.

Just pick a postgres, some serde and off you go, write all the things!

But wait, everything is synchronous, someone should do something about that, we can’t be webscale without asynchronous I/O, right?

I heard about mio tho, maybe good things will happen.

2017

Oh okay, there’s this thing called tokio, it’s a little bit confusing even if I wrote reactors and async I/O libraries in the past, but I guess I’m just too stupid to understand it.

Wait, when is hyper going to support this? Hopefully soon, I bet I’m going to have to rewrite everything when that happens.

But what is this Rocket thing people have been raving about? Looks pretty neat, reminds me of Sinatra from Ruby, I like it, too bad my shit is already using Iron and running on stable, really wish I could use it.

All these other attempts at web frameworks don’t really spark any interest in me or push me to move over either.

Gotham is a resonating meh, the only good thing it has going for it is it runs on stable, but it’s the same old stuff, and still synchronous.

rouille is kind of interesting and is coming from one of my favorite Rust peeps, but it’s still synchronous (on purpose, love you tomaka).

canteen is also nothing new, and I don’t like neither Flask nor Python, so there’s that.

2018

Finally, hyper is asynchronous now, look at it go in the benchmarks! tokio reactor goes brrrrr.

What about Iron tho? It’s forever unmaintained, sad. Libraries are starting to use tokio, and now I have 3 different versions of hyper since I’m stuck with Iron and reqwest uses asynchronous hyper, great, I really wish I could move to something new, but what to?

Rocket is still synchronous and doesn’t seem to be going to become asynchronous any time soon (still hasn’t, lmao), and what’s the point of going to nightly and staying synchronous if I’m going to have to rewrite everything again anyway.

I heard about actix-web which works on stable, but at this point I might as well just be using Erlang or Elixir. I don’t really think the actor model is the best fit for a web framework anyway, and as an actor system actix seems to be missing the point entirely, where are my supervision trees? Where is let it fail? It just doesn’t map well to Rust in its current form.

There’s experimental nightly support for some form of async/await using macros, could give it a shot with tokio and hyper and see where we end up. I bet async/await is going to look very similar to this anyway, the rest will be implementation details.

Okay, now I’d like something fancier than postgres to do my SQL stuff, I heard many good things about Diesel, let me look into it.

Okay, it’s way too ORMy for me and the generated documentation is essentialy unreadable, I’d rather be writing SQL, and I don’t like the way the migration tool works, and it’s not asynchronous, and I’m going to nightly, to use tokio and hyper with async/await macros, to then also be fudged, guess I’ll just use tokio-postgres.

As for migrations, I will just suffer and write a shitty Ruby script, suffering is good.

At this point warp and juniper catch my eye, the APIs are still kind of rough, but they show promise, my heart fills with hope as contingent dread quietly broods in the shadows.

2019

Everyone seems to be working hard on stabilizing async/await and the ecosystem is slowly moving towards using tokio and making all the things async, which is good.

The ecosystem is now fractured between libraries using the latest tokio and std::future stuff and the old tokio and futures stuff, which is bad.

Nothing much has happened in the realm of web frameworks, Rocket still isn’t async, actix-web is involved in a shitstorm with the community, all the web frameworks that have been popping up were targeting stable Rust and so had to be synchronous and be boring, providing nothing really new. In the meantime Diesel is also still synchronous.

The end of the year approaches when BAM! async/await hits Rust stable, everyone rejoices as the long and arduous thousands of hours communally spent on this endeavour come to fruition.

BAM! SQLx is a thing, compile-time checked SQL queries, async from the start, all the comfy query writing you ever needed.

BAM! warp is a thing, ultra-elegant async mini-web framework that really takes advantage of what Rust has to offer, just look at the glory of this and how it’s implemented.

BAM! juniper is a thing, streamlined GraphQL development, async ready, with integration with warp and all things good.

BAM! movine is a thing, easy-peasy migrations that actually make sense.

I finally had almost everything I wanted, compile-time checked SQL, type-safe HTTP stuffs, GraphQL candies, easy migrations, and everything async!

Then, everything changed when the async-std nation attacked.

2020

It felt like the async ecosystem was quickly increasing pace of maturing now that async/await was on stable.

As such alternatives to tokio have been popping up, the major alternative to it is async-std, I have to say I really like their approach of just taking std APIs and trying to map them to be asynchronous as well as possible, and I feel like async-std is better than tokio in many ways.

Despite how nice async-std is I still hold fear and hatred for it, because of the added friction and fracture they created in the async ecosystem, I would have rather had them massage the tokio guy enough to convince him to change things around (which would have probably been a useless endeavor, but alas).

As things stand right now to write a reasonable backend you might end up with 1 + N runtimes running at the same time, this is because async-std ends up spawning its own runtime in the background and tokio wants its runtime to be running, too.

Library authors end up having to make a decision on which async runtime to target, and you end up having to make libraries targeting different runtimes work at the same time.

Now aside from the runtime issue that adds overhead, the async-std runtime cannot run some futures generated by tokio (namely sockets), this is not async-std’s fault but tokio’s as they use TLS contexts for performance reasons. tokio can still run futures coming from async-std tho.

In the meantime other people have been working on new runtimes, my favorite at the moment is smol.

The future holds potential for one of the best async experiences I can imagine having, especially if they do something like TCP-preserving closures, but I hope I won’t have to spend the next 2 years worrying if I’ll have to, once again, rewrite everything in Rust, but async, twice.

Time to get comfy

Here is how it looks dicking around with mutations on the GraphQL Playground (already provided by the service), docs and schemas and all the things!

Comfy GraphQL with Rust

Here’s the list of materials and tools needed to get this comfy:

Following are some simplified, cut up and commented excerpts from web services I’ve been working on.

Excerpts from main.

The main.rs is mostly a bunch of warp route setup and CLI management, nothing too fancy.

Here we can see all our comfy settings for our comfy web service, like database URLs and secrets, this stuff is taken from the .env or environment if no arguments are passed.

Not sure what I’m gonna do if I need more complicated configuration settings, but probably just TOML.

#[derive(Clap, Debug)]
#[clap(name = "brrrrr",
  rename_all = "kebab-case",
  rename_all_env = "screaming-snake"
)]
struct Args {
  #[clap(short, long)]
  debug: bool,

  #[clap(required = true, short = "D", long, env)]
  database_url: String,
  #[clap(required = true, short = "R", long, env)]
  redis_url: String,

  #[clap(required = true, long, env)]
  jwt_secret: String,
  #[clap(required = true, long, env)]
  argon_secret: String,
  #[clap(long, env)]
  argon_iterations: Option<u32>,
  #[clap(long, env)]
  argon_memory_size: Option<u32>,
  #[clap(short, long, env)]
  session_lifetime: Option<i64>,

  #[clap(default_value = "127.0.0.1:3535")]
  host: SocketAddr,
}

How cool is this tokio::main thing, and then returning an anyhow::Result, gone are the days where you had to set up runtimes manually and write your own exit helper.

#[tokio::main]
async fn main() -> anyhow::Result<()> {

Initialize stuff, I’d be using paw but dotenv doesn’t play well with it, so manual shit for now.

  tracing_subscriber::fmt::init();
  dotenv::dotenv()?;

Set up the Environment, here by environment I mean the set of common services the backend needs, for example password hashing, database access, email sending, external services, and whatever else might be needed.

I’m not gonna post a definition for it because it’s just boilerplate setting up and returning connection pools and clients and other garbage.

  let args = Args::parse();
  let env  = Environment::new(&args);

The Environment is usually sent along warp::Filters, so wrap it in one.

  let env = warp::any().map(move || env.clone());

This sets up CORS for us, and we never have to worry about it again, and warp::with is just lovely.

  let cors = warp::cors()
    .allow_methods(vec!["GET", "POST"])
    .allow_header("content-type")
    .allow_header("authorization")
    .allow_any_origin()
    .build();

This makes all the requests be logged using the log crate, which end up getting eaten up by tracing, which then prints them out.

  let log = warp::log("brrrrr::request");

This sets up the (so far) only non-GraphQL request, and it’s for authentication.

Just look at how cool this is, look at it! Environment passed along at every request, extract the body to JSON which gets directly deserialized (!) and the remote address for fingerprinting, because why not.

  let auth = warp::post()
    .and(warp::path("auth"))
    .and(env.clone())
    .and(warp::body::json())
    .and(warp::addr::remote())
    .and_then(|env, req, addr| async move {
      auth::filter(env, req, addr).await
        .map_err(problem::build)
    });

Some grouping for sanity, the way warp lets you compose things is just orgasmic to me.

  let graphql = {
     use juniper_warp::{*, subscriptions::*};
     use juniper_subscriptions::*;

Here I’m creating an auth filter that will resolve to an Option<(String, String)> and I can reuse it in any other filter.

Now if Rust had anonymous structs this would be even better.

     #[derive(Deserialize, Debug)]
     struct Query {
       csrf: Option<String>
     }

     let auth = warp::header::optional("authorization")
       .or(warp::cookie::optional("jwt"))
       .unify()
       .and(warp::query())
       .and_then(|jwt: Option<String>, query: Query| {
         if jwt.is_none() && query.csrf.is_none() {
           return future::ok(None);
         }

         if jwt.is_none() || query.csrf.is_none() {
           return future::err(problem::build(auth::AuthError::InvalidCredentials));
         }

         future::ok(Some((jwt.unwrap(), query.csrf.unwrap())))
       });

The glory of the compositional capabilities of warp is undescribable.

The Context is passed to every GraphQL resolver, and it just bundles the Environment with optionally an authenticated session.

     let context = warp::any()
       .and(env.clone())
       .and(auth)
       .and_then(|env, auth|
         graphql::Context::new(env, auth).map_err(problem::build))
         .boxed();

juniper already supports GraphQL subscriptions, so you don’t even need a separate WebSocket for fancier things.

     let coordinator = Arc::new(Coordinator::new(graphql::schema()));

     let query = warp::post()
       .and(warp::path("query"))
       .and(make_graphql_filter(graphql::schema(), context.clone()));

warp’s support for WebSockets is already quite neat, and juniper just takes advantage of it.

     let subscriptions = warp::path("subscriptions")
       .and(warp::ws())
       .and(context.clone())
       .and(warp::any().map(move || Arc::clone(&coordinator)))
       .map(|socket: warp::ws::Ws, context, coordinator|
         socket.on_upgrade(|socket|
           graphql_subscriptions(socket, coordinator, context)
             .map(|res|
               if let Err(err) = res {
                 tracing::error!("websocket error: {:?}", err);
               })
             .boxed()))
       .map(|reply|
         warp::reply::with_header(reply,
           "Sec-WebSocket-Protocol", "graphql-ws"));

juniper even wraps the GraphQL playground, so you get automagically generated schema description and documentation and a nice way to test queries out.

     let playground = warp::path("playground")
       .and(playground_filter("/graphql/query",
         Some("/graphql/subscriptions")));

     warp::path("graphql")
       .and(query.or(subscriptions).or(playground))
  };

Just start serving all this stuff, if the response is a rejection HttpApiProblem then it’s converted to the proper response, otherwise it’s forwarded through.

  warp::serve(auth.or(graphql)
    .recover(problem::unpack)
    .with(cors)
    .with(log))
    .run(&args.host).await;

  Ok(())
}

Excerpt from the problem module.

Here we turn anything that can turn into an anyhow::Error (which is any std::Error) into a warp::Rejection.

pub fn build<E: Into<anyhow::Error>>(err: E) -> Rejection {
  warp::reject::custom(pack(err.into()))
}

Here we can turn any internal errors into meaningful responses, or just let them through as internal server errors.

pub fn pack(err: anyhow::Error) -> Problem {
  let err = match err.downcast::<Problem>() {
    Ok(problem) =>
      return problem,

    Err(err) =>
      err,
  };

  if let Some(err) = err.downcast_ref::<auth::AuthError>() {
    match err {
      auth::AuthError::InvalidCredentials =>
        return Problem::new("Invalid credentials.")
          .set_status(StatusCode::BAD_REQUEST)
          .set_detail("The passed credentials were invalid."),

      auth::AuthError::ArgonError =>
        ()
    }
  }

  tracing::error!("internal error occurred: {:#}", err);
  Problem::with_title_and_type_from_status(StatusCode::INTERNAL_SERVER_ERROR)
}

Here we turn one of our rejections into the proper reply.

pub async fn unpack(rejection: Rejection) -> Result<impl Reply, Rejection> {
  if let Some(problem) = rejection.find::<Problem>() {
    let code = problem.status
      .unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR);

    let reply = warp::reply::json(problem);
    let reply = warp::reply::with_status(reply, code);
    let reply = warp::reply::with_header(
      reply,
      http::header::CONTENT_TYPE,
      PROBLEM_JSON_MEDIA_TYPE,
    );

    Ok(reply)
  }
  else {
    Err(rejection)
  }
}

Excerpt from the auth module.

Wrap the Session model into a newtype so all session actions must happen on an authenticated session and not on just any session loaded from PostgreSQL.

#[derive(Shrinkwrap, Clone, Debug)]
pub struct Session(model::Session);

This gets automatically deserialized from warp::body::json() when stuff is forwarded to filter.

#[derive(Serialize, Deserialize, Debug)]
pub struct Request {
  email: String,
  password: String,
  lifetime: Option<i64>,
}

This stuff is for biscuit to do its JWT thing, session stores the session key (or ID, or whatever you want to call it) and csrf is the CSRF token for the session.

#[derive(Serialize, Deserialize, Debug, Clone)]
struct Claims {
  session: String,
  csrf: String,
}

Comfy error definition using thiserror.

#[derive(Error, Debug)]
pub enum AuthError {
  #[error("invalid credentials")]
  InvalidCredentials,

  #[error("could not hash password")]
  ArgonError,
}

This gets forwarded directly from warp, it just converts an authentication request into a warp::Reply, the JWT token is set as a cookie, and the JWT and CSRF tokens are returned in the response.

From then on the CSRF token has to be passed as a query parameter, while the JWT token can be passed as a cookie (usually automagically) or can be passed as an Authorization header.

pub async fn filter(env: Environment, req: Request, address: Option<SocketAddr>)
  -> anyhow::Result<impl Reply>
{
  let (jwt, csrf) = request(env, req, address).await?;

  let reply = warp::reply::json(&json!({ "jwt": jwt, "csrf": csrf }));
  let reply = warp::reply::with_status(reply, http::StatusCode::OK);

  let reply = warp::reply::with_header(reply,
    http::header::CONTENT_TYPE,
    http_api_problem::PROBLEM_JSON_MEDIA_TYPE);

  let reply = warp::reply::with_header(reply,
    http::header::SET_COOKIE,
    format!("jwt={}", jwt));

  Ok(reply)
}

This actually handles the request for a session, checks the user exists and that the password matches, and if everything goes right creates a new session.

pub async fn request(env: Environment, req: Request, addr: Option<SocketAddr>)
  -> anyhow::Result<(String, String)>
{
  let account = query!(r#"
    SELECT id, password
      FROM accounts
      WHERE email = $1
    "#, &req.email)
      .fetch_optional(env.database()).await?
      .ok_or(AuthError::InvalidCredentials)?;

  let is_valid = env.argon().verifier()
    .with_hash(&account.password)
    .with_password(&req.password)
    .verify()
    .or(Err(AuthError::ArgonError))?;

  if !is_valid {
    return Err(AuthError::InvalidCredentials.into());
  }

  let identity = Identity {
    fingerprint: None,
    ip: addr.map(|addr| addr.ip()),
  };

  let claims = Claims {
    session: rand::thread_rng().sample_iter(&Alphanumeric).take(64).collect(),
    csrf: rand::thread_rng().sample_iter(&Alphanumeric).take(64).collect(),
  };

  let csrf = claims.csrf.clone();
  let expiry = Utc::now() + Duration::seconds(env.session_lifetime(req.lifetime));

  query_unchecked!(r#"
    INSERT INTO sessions (key, csrf, account, identity, expiry)
      VALUES ($1, $2, $3, $4, $5)
  "#, &claims.session, &claims.csrf, account.id, Json(identity), expiry)
    .execute(env.database()).await?;

  Ok((env.jwt().encode(claims, expiry)?, csrf))
}

This gets an existing session from a JWT token and CSRF token, getting a session without both is invalid, or if it’s expired, or has been explicitly invalidated. There is no other way to get a Session aside from passing through here.

pub async fn session(env: Environment, jwt: &str, csrf: &str) -> anyhow::Result<Session> {
  let claims: Claims = env.jwt().decode(jwt)?;

  if claims.csrf != csrf {
    Err(AuthError::InvalidCredentials)?
  }

  let session = query_as_unchecked!(Session, r#"
    SELECT *
      FROM sessions
      WHERE key = $1 AND csrf = $2 AND expiry > NOW() AND NOT invalidated
  "#, claims.session, &csrf)
    .fetch_optional(env.database()).await?;

  Ok(session.ok_or(AuthError::InvalidCredentials)?)
}

Excerpts from the session module.

This Session type is basically a glorified type-map backed by Redis using bincode as serialization format.

The data automagically expires too with the session expiration, breddy cool.

This essentially means API modules can just define the session type they need and use it transparently without having to worry about anything.

#[derive(Clone)]
pub struct Session {
  auth: auth::Session,
  env: Environment,

A redis::Client is not a connection pool, so it’s better to eagerly get a connection that can be cloned with the Session.

  redis: MultiplexedConnection,
}

impl Session {
  pub async fn new(env: Environment, auth: auth::Session) -> anyhow::Result<Self> {
    let redis = env.redis().await?;
    Ok(Self { env, auth, redis })
  }

  pub async fn account(&self) -> anyhow::Result<model::Account> {
    Ok(query_as_unchecked!(model::Account, r#"
      SELECT accounts.*
        FROM sessions
        INNER JOIN accounts
          ON sessions.account = accounts.id
        WHERE
          sessions.key = $1
    "#, self.auth.key)
      .fetch_one(self.env.database()).await?)
  }

  pub async fn set<T: Serialize>(&mut self, value: &T) -> anyhow::Result<()> {
    let expiry = self.auth.expiry.signed_duration_since(Utc::now());

    self.redis.set_ex(
      format!("session:{}:{}", self.auth.key, type_name::<T>()),
      bincode::serialize(value)?, expiry.num_seconds().try_into()?).await?;

    Ok(())
  }

  pub async fn get<T: DeserializeOwned>(&mut self) -> anyhow::Result<T> {
    let bytes: Vec<u8> = self.redis.get(
      format!("session:{}:{}", self.auth.key, type_name::<T>())).await?;

    Ok(bincode::deserialize(&bytes)?)
  }
}

Excerpts from the graphql modules.

Bunch of juniper boilerplate.

pub type Schema = juniper::RootNode<'static, Query, Mutation, Subscription>;

pub fn schema() -> Schema {
  Schema::new(Query, Mutation, Subscription)
}

#[derive(Clone)]
pub struct Context {
  session: Option<Session>,
  env: Environment,
}

impl juniper::Context for Context { }

Here be all the queries, i.e. read-only things, they can be namespaced but no need in this case.

pub struct Query;

#[juniper::graphql_object(Context = Context)]
impl Query {

Now this isn’t a query that’s actually going to be in any system, but it’s just here as an example.

  pub async fn accounts(ctx: &Context) -> FieldResult<Vec<model::Account>> {
    Ok(query_as_unchecked!(model::Account, "SELECT * FROM accounts")
      .fetch_all(ctx.database())
      .await?)
  }
}

Here be all the mutations, i.e. read-write things, it’s always better to namespace these or the generated documentation/schema is impossible to navigate, and honestly would make it much harder to use as well.

pub struct Mutation;

#[juniper::graphql_object(Context = Context)]
impl Mutation {
  fn account() -> AccountMutation {
    AccountMutation
  }
}

pub struct AccountMutation;

#[derive(juniper::GraphQLInputObject, Debug)]
pub struct AccountInput {
  email: Option<String>,
}

#[juniper::graphql_object(Context = Context)]
impl AccountMutation {
  async fn update(ctx: &Context, id: Uuid, input: AccountInput)
    -> FieldResult<model::Account>
  {
    let acc = ctx.session().ok_or(auth::InvalidCredentials)?.account().await?;

Here if you add a permission/role system you could check for this bullshite.

    if acc.id != id {
      Err(auth::InvalidCredentials)?
    }

    Ok(query_as_unchecked!(model::Account, r#"
      UPDATE accounts
        SET email = COALESCE($2, email)
        WHERE id = $1
        RETURNING *
    "#, id, input.email)
      .fetch_one(ctx.database()).await?)
  }
}

Here be the pub/sub thingy GraphQL provides, creating a stream is going to still be cumbersome until generators are a thing, but for now I can abuse channels and spawn a separate future to feed stuff to it.

pub struct Subscription;

type CallsStream = Pin<Box<dyn Stream<Item = FieldResult<i32>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl Subscription {
  pub async fn calls(ctx: &Context) -> CallsStream {
    let (tx, rx) = channel(16);
    Box::pin(rx)
  }
}

Excerpt from the model module.

Just a simple account type, can be expanded to contain roles and permissions and such.

#[derive(Clone, Serialize, Deserialize, GraphQLObject, Debug)]
pub struct Account {
  pub id: Uuid,
  pub email: String,

  #[graphql(skip)]
  #[serde(skip_serializing)]
  pub password: Redacted<String>,

  pub created_at: DateTime<Utc>,
  pub updated_at: Option<DateTime<Utc>>,
}

This is the structure used by the PostgreSQL database, so far the only issues I’ve had with SQLx revolve around user defined types, but they’re working on it, so should be super comfy very soon.

#[derive(Clone, FromRow, Debug)]
pub struct Session {
  pub key: String,
  pub csrf: String,
  pub account: Uuid,
  pub identity: Json<session::Identity>,
  pub expiry: DateTime<Utc>,
  pub invalidated: bool,

  pub created_at: DateTime<Utc>,
  pub updated_at: Option<DateTime<Utc>>,
}

pub mod session {
  use std::net::IpAddr;
  use serde::{Serialize, Deserialize};

This is stored as a JSONB field in the database, SQLx is smart enough to be able to use custom types just by wrapping the type you want to use with a Json<T> marker.

  #[derive(Clone, Serialize, Deserialize, Default, Debug)]
  pub struct Identity {
    pub fingerprint: Option<String>,
    pub ip: Option<IpAddr>,
  }
}

Now this is a thingy to prevent secretsy things from leaking out via serialization or printing through logs and such.

When printing it replaces any character with a █ making it look redacted by CIA agents.

#[derive(Shrinkwrap, Deserialize, sqlx::Type, Clone, Eq, PartialEq, Ord)]
#[derive(PartialOrd, Hash, Default)]
#[sqlx(transparent)]
pub struct Redacted<T>(T);

impl<T> Redacted<T> {
  pub fn new(value: T) -> Self {
    Self(value)
  }
}

impl<T> Serialize for Redacted<T> {
  fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
    ser.serialize_none()
  }
}

impl<T: fmt::Debug> fmt::Debug for Redacted<T> {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "{}", iter::repeat("█")
      .take(UnicodeWidthStr::width(format!("{:?}", self.0).as_str()))
      .collect::<String>())
  }
}

impl<T: fmt::Display> fmt::Display for Redacted<T> {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "{}", iter::repeat("█")
      .take(UnicodeWidthStr::width(format!("{}", self.0).as_str()))
      .collect::<String>())
  }
}

>me writing web services in Rust in 2020

Comfy Crab in Rusty Can