Batch

A background job library written in Rust.

Batch allows you to defer jobs to worker processes, by sending messages to a broker. It is a type-safe library that favors safety over performance in order to minimize risk and avoid mistakes. It leverages the futures & tokio crates to provide asynchronous operations to the user.

Batch doesn't tie you to a particular message broker implementation: a bunch of adapters are bundled in the library but you are free to write your own to accomodate your requirements. As of today batch provides adapters for the following message brokers:

Examples are available on GitHub or you can continue and read the Getting Started guide for the message broker of your choice.

Concepts

If you're not familiar with Rust or background job libraries, some aspects of batch can feel opaque. Hopefully this page will help clear this up and let you enjoy your projects to the fullest.

Job

A job is a unit of work that you want to defer to a Worker process. For example, imagine you're building a shiny new web application and you want to send the user an email when they sign up. You could send the email synchronously, but this often lead to a poor user experience: maybe the email could take a few seconds to send, making the application feel not very responsive, or maybe the email couldn't be sent because of a spurious network error, or maybe your email provider is down for maintenance. Instead, to guarantee an optimal user experience you will send the job to execute to a message broker which will then be sent to a Worker process that will execute it. If the execution fails, the job can be retried either until its retries limit is reached or until it succeeds.

More generally there are two kinds of work you'll want defer:

  • Work that has a dependency on an external service, external meaning "on which you have no control, especially in regard to downtime" (e.g: sending emails, etc).
  • Work that might take more than a few seconds to complete (e.g: compressing uploaded image files, re-encode video files, etc).

At-least once delivery & idempotency

One thing that you must keep in mind when you write a job is that it can be executed multimes times, even if it previously succeeded. Writing software is hard, but writing distributed software is even harder: it is possible that because of multiple message broker servers not synchronizing fast enough, or because a connection to a worker gets lost, the same job is given to two different workers. The best you can do to protect yourself is to make your jobs idempotent: meaning your job should always produce the same output regardless of how many times it is performed.

Serialization

In order to send your jobs to message brokers, Batch uses a technic known as serialization: it transforms your in-memory structures with unstable representations into a stable representations that can be used to create a structure that should be identical to the original. Because of this, there's some points you should keep in mind when using batch / declaring jobs:

  • Try to send the minimum amount of information. Serialization is a linear process, to serialize a structure you have to analyze all of its contents, so the fewer and smaller elements, the faster. For example, instead of sending your User struct to your job, send the user's ID and fetch it from the database in the job.
  • Do not send sensible information. There's no guarantee of privacy / secrecy on message brokers, so you really don't want to put API credentials or user passwords in your job payloads. Instead, you should try to use the inject feature of Batch for some of these cases.

Queue

A queue is the source of jobs for worker processes. It is represented as a never-ending stream of incoming deliveries stating which job should be executed and the environment they should be executed in. In order to consume from a queue, you have to explicitely declare it to your message broker. Instead of using external configuration files, Batch leverages Rust's powerful macro system to ensure your code complies with your expectations (e.g: you shouldn't be able to set a priority on your job if your message broker doesn't support it, with Batch this becomes a compile-time error).

Worker

A worker is the name given to the process that will subscribe to queues and execute the associated code. It is a long running process that should not crash.

Jobs

Declaring a job

The simplest way to declare a job is to create a function, and then annotate it with Batch's job procedural macro:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

#[job(name = "batch-example.say-hello")]
fn say_hello(name: String) {
    println!("Hello {}!", name);
}
#}

Return value of a job

As of today, the job macro only supports two types of return values:

  • either the function return a unit value (the () type);
  • either the function return a value that implements the IntoFuture trait (amongst others, it is implemented for both Future and Result)

The macro not being capable of determining whether the function complies with the above requirements means that if you made a mistake, the compiler will error and the message might not be the clearest, so keep that in mind if you encounter a compile error on one of your job.

Injecting external values

More often than not, your job will need values that can't or shouldn't be transfered in their message payload. For example, you might need a connection handle to your database, or a mail API client instance, or even the contents of a configuration file. To solve this problem, Batch allow you to mark arguments as "injected". This is done by using the inject parameter of the job attribute:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;
#
# struct UserRepository;
# struct Mailer;

#[job(name = "batch-example.send-hello", inject = [ repository, mailer ])]
fn send_hello(user_id: i32, repository: UserRepository, mailer: Mailer) {
    # drop(user_id);
    # drop(repository);
    # drop(mailer);
    // ...
}
#}

In the example above, Batch will only put the user_id parameter in the underlying structure, and will fetch the values for the repository and the mailer values when performing the job. These values are registered on the executor of the job which more often than not will be Batch's own Worker.

Configuring the job

Changing the number of retries

By default, a job will be tried 25 times before being declared failed and dropped into a dead-letter queue. This gives you plenty of time to fix your job's implementation. If you wish to change this number, you can do so by using the retries parameter of the job procedural macro:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

#[job(name = "batch-example.send-hello", retries = 10)]
fn send_hello(to: String) {
    println!("Hello {}", to);
}

#[job(name = "batch-example.send-goodbye", retries = 50)]
fn send_goodbye(to: String) {
    println!("Goodbye {}", to);
}
#}

Changing the timeout for the job's execution

By default, a job is given 30 minutes to complete. If you wish to change this number, you can do so by using the timeout parameter of the job procedural macro with the number of seconds the job should be allowed to run:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

// This job will have 1 minute to complete.
#[job(name = "batch-example.send-hello", timeout = "1minute")]
fn send_hello(to: String) {
    println!("Hello {}", to);
}

// This job will have 3 hours and 30 minutes to complete.
#[job(name = "batch-example.send-goodbye", timeout = "3hours 30mins")]
fn send_goodbye(to: String) {
    println!("Goodbye {}", to);
}
#}

The timeout parser supports the given suffixes:

  • nsec, ns -- microseconds
  • usec, us -- microseconds
  • msec, ms -- milliseconds
  • seconds, second, sec, s
  • minutes, minute, min, m
  • hours, hour, hr, h
  • days, day, d
  • weeks, week, w
  • months, month, M -- defined as 30.44 days
  • years, year, y -- defined as 365.25 days

Changing the job priority

By default, a job is assigned the Normal priority. If you wish to change this, you can use the priority parameter of the job procedural macro with one of trivial, low, normal, high, critical:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

// This job will be assigned the `low` priority.
#[job(name = "batch-example.send-hello", priority = low)]
fn send_hello(to: String) {
    println!("Hello {}", to);
}

// This job will be assigned the `critical` priority.
#[job(name = "batch-example.send-goodbye", priority = critical)]
fn send_goodbye(to: String) {
    println!("Goodbye {}", to);
}
#}

Under the hood

Annotating a function with the job procedural macro will do two things:

  • Create a new structure deriving Serde's Serialize & Deserialize traits, that stores all of the non-provided (see below) function arguments. This implies that all of the arguments must also implement Serde's Serialize & Deserialize traits. More importantly, this new structure also implements Batch's Job trait.
  • Change the annotated function to return an instance of the newly defined structure (see previous bullet point), allowing for easy scheduling of the job.

Get or set the underlying struct of a job

When invoked, job procedural macro will generate a new structure declaration containing the arguments for the job, and implementing the Job trait. The name of this structure is the same as the function is comes from. This is made possible by the fact that in Rust, structures and functions don't share the same namespace:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

#[job(name = "batch-example.say-hello")]
fn say_hello(name: String) {
    // ...
}

// Would generate code roughly equivalent to:

# mod generated_do_not_copy_paste_this_please {
struct say_hello {
    name: String
}
# }
#}

Hopefully, this simple naming scheme should not conflict with already existing code. If you ever happen to be in this situation, Batch allows you to set the name of the structure that will be generated, by using the wrapper parameter:


# #![allow(unused_variables)]
#fn main() {
extern crate batch;

use batch::job;

#[job(name = "batch-example.say-hello", wrapper = MySuperAwesomeJob)]
fn say_hello(name: String) {
    // ...
}

// Would generate code roughly equivalent to:

# mod generated_do_not_copy_paste_this_please {
struct MySuperAwesomeJob {
    name: String
}
# }
#}

RabbitMQ

RabbitMQ is a popular message broker that implements the AMQP/0.9.1 protocol often used in enterprise settings thanks to its replication & clustering capabilities.

Batch provides an official adapter for RabbitMQ by enabling the rabbitmq feature in your Cargo.toml:

[dependencies]
batch = { version = "0.2", features = ["rabbitmq"] }

Batch only supports versions of RabbitMQ that are officially supported by Pivotal, which means RabbitMQ 3.7+. If you encounter an problem using batch with an older version of RabbitMQ we won't be able to fix it. You are free to submit a bug fix but we reserve ourselves the right to refuse it if we think it will induce too much work to maintain it.

Getting started

The first thing you'll want to do once you've installed batch is connect to a message broker. Batch provides a few adapters for popular choices amongst message brokers, but you can also write your own adapter if you want to. In this guide we'll use the RabbitMQ adapter (don't forget to enable the rabbitmq feature when installing batch).

Let's begin by connection to a broker:

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use tokio::prelude::*;

fn main() {
    let f = batch_rabbitmq::Connection::open("amqp://guest:guest@localhost:5672/%2f")
        .map(|conn| {
#           drop(conn);
            println!("We're connected to RabbitMQ!");
        })
        .map_err(|e| eprintln!("An error occured while connecting to RabbitMQ: {}", e));

# if false {
    tokio::run(f);
# }
}

Now that we've acquired a connection to our RabbitMQ server, we'll write our first [job]. There are two ways of defining a job with batch: the high-level one consists of writing a function and annotate it with the job attribute, the low-level one consists of declaring a structure and implementing batch::Job manually. For now we'll use the high-level way:

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use batch::job;
use tokio::prelude::*;

#[job(name = "batch-example.say-hello")]
fn say_hello(name: String) {
    println!("Hello {}!", name);
}

fn main() {
    let f = batch_rabbitmq::Connection::open("amqp://guest:guest@localhost:5672/%2f")
        .map(|conn| {
#           drop(conn);
            println!("We're connected to RabbitMQ!");
        })
        .map_err(|e| eprintln!("An error occured while connecting to RabbitMQ: {}", e));

# if false {
    tokio::run(f);
# }
}

Note: the job procedural macro will generate a structure that derives Serde's Serialize & Deserialize. That means that the arguments of your function must implement these traits.

Note: The string given to the job procedural macro as a parameter of the name of the job. You should strive for unique job names, ideally structured by domain (e.g: prefix all jobs related to media files compression by "media-compress.").

Now that we have our job, we want to send it to our RabbitMQ server. To do that we need to declare a queue, using the queues! macro:

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use batch::job;
use batch_rabbitmq::queues;
use tokio::prelude::*;

queues! {
    Example {
        name = "batch-example.exchange",
        bindings = [
            say_hello,
        ],
    }
}

#[job(name = "batch-example.say-hello")]
fn say_hello(name: String) {
    println!("Hello {}!", name);
}

fn main() {
    let f = batch_rabbitmq::Connection::build("amqp://guest:guest@localhost:5672/%2f")
        .declare(Example)
        .connect()
        .and_then(|mut client| {
            let job = say_hello("Ferris".to_string());
            Example(job).dispatch(&mut client)
        })
        .map_err(|e| eprintln!("An error occured while connecting to RabbitMQ: {}", e));

# if false {
    tokio::run(f);
# }
}

Note how we're declaring the exchange by giving its name as a parameter and not by using the infamous turbofish syntax.

Now that our job has been published to our broker, we'll need to fetch it and assign a function to this job. To do this, we'll create a new program, a worker.

Exchanges

In RabbitMQ parlance you don't publish messages to a queue but to an exchange. Batch abstracts away this behavior behind a unified Queue trait. When you declare your queue, an exchange with the same name is implicitly declared. But sometimes you want to have control about what example declared and how. To do that, you can use the exchange property of the queues! macro:

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use batch_rabbitmq::queues;
use tokio::prelude::*;

queues! {
    Example {
        name = "batch.example",
        exchange = "batch.example-exchange",
    }
}

fn main() {
    let fut = batch_rabbitmq::Connection::build("amqp://guest:guest@localhost:5672/%2f")
        .declare(Example)
        .connect()
        .and_then(|client| {
#           drop(client);
            /* The `batch.example-exchange` exchange is now declared. */
            Ok(())
        })
        .map_err(|e| eprintln!("An error occured while declaring the queue: {}", e));

# if false {
    tokio::run(fut);
# }
}

Queues

Worker

As explained in the Concepts chapter, a worker consumes & executes jobs delivered by a message broker. Batch comes with a worker implementation with semantics heavily inspired by the Resque project. Like Resque, it assumes chaos: eventually your jobs will crash, or get stuck computing a value, or will be unable to contact an external service, in any way the Worker process shouldn't be affected by the execution of the jobs it is responsible for and be as resilient as possible against failure.

Maintaining such guarantees means that this Worker implementation isn't the most performant one, it is however one of the safest if you're not sure that your jobs are infallible. For example, because this implementation supports job timeouts it has to execute the job in a new process which is considered an expensive operation compared to spawning a thread. Due to this behavior (spawning a new process for each job execution), we will refer to this implementation as a "Forking Worker".

Adding a worker to your project

The easiest way to integrate the forking worker is to create a new binary (e.g: src/bin/worker.rs). This makes sure that your main command-line interface will not conflict with the worker, and vice-versa. The Worker struct is built using a Client instance, this makes it possible to use the same Worker implementation with any message broker adapter. In this example, we will be using the RabbitMQ adapter:

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use batch::job;
use batch_rabbitmq::queues;
use batch::Worker;
use tokio::prelude::*;

queues! {
    Example {
        name = "example",
        bindings = [
            say_hello
        ]
    }
}

#[job(name = "batch-example.say-hello")]
fn say_hello(name: String) {
    println!("Hello {}!", name);
}

fn main() {
    // First, we configure the connection to our message broker
    let f = batch_rabbitmq::Connection::build("amqp://guest:guest@localhost:5672/%2f")
        // We declare our queue & exchange against RabbitMQ
        .declare(Example)
        // We establish the connection
        .connect()
        // Then, we create our worker instance & register the queue we will consume from
        .map(|client| Worker::new(client).queue(Example))
        // And finally, we consume incoming jobs
        .and_then(|worker| worker.work())
        .map_err(|e| eprintln!("An error occured: {}", e));

# if false {
    tokio::run(f);
# }
}

Worker-provided values

Some of your jobs will undoubtly have to depend on values that can't be serialized (e.g: a connection to a database or credentials for your third party services). On one hand, you can't really easily serialize them, on the other hand you don't want to re-instantiate every time you need them. Batch gives you a solution to this problem: you provide a callback returning an instance of a resource to your worker, and your worker will use them to fill out values marked as "injected" on your jobs.

extern crate batch;
extern crate batch_rabbitmq;
extern crate tokio;

use batch::job;
use batch_rabbitmq::queues;
use tokio::prelude::*;
#
# mod diesel {
#   pub struct PgConn;
# }

queues! {
    Maintenance {
        name = "maintenance",
        bindings = [
            count_active_users
        ]
    }
}

#[job(name = "batch-example.count-active-users", inject = [ db ])]
fn count_active_users(db: diesel::PgConn) {
    # drop(db);
    // ...
}

fn init_database_conn() -> diesel::PgConn {
    // ...
#     diesel::PgConn
}

fn main() {
    let f = batch_rabbitmq::Connection::build("amqp://guest:guest@localhost:5672/%2f")
        .declare(Maintenance)
        .connect()
        .map(|conn|
            batch::Worker::new(conn)
                .provide(init_database_conn)
                .queue(Maintenance)
        )
        .and_then(|worker| worker.work())
        .map_err(|e| eprintln!("An error occured while executing the worker: {}", e));

# if false {
    tokio::run(f);
# }
}