Recreating concurrent futures combinators in smol

John Nunley · October 22, 2023

futures comes with many additional combinators that smol doesn’t have out of the box. We can rebuild them, better.

Whew, it’s almost been a month since my last blogpost here. This was because I was spending time doing research and testing, and not because I lost the PGP key that allows me to upload to this site. No sir, how could anyone be that irresponsible?

…or maybe I was just using the PGP key as an excuse not to write? It’s not like I’m being paid to psychoanalyze myself in front of you people.

It doesn’t matter, we’re back! Let’s talk about smol.

The Problem with futures

futures was originally released in 2016 to provide an implementation of asynchronous programming for Rust. In the time since, it’s accumulated a lot of baggage. Many of its combinators have been superseded by the async/await syntax, meaning that a large amount of its API surface isn’t relevant anymore.

For instance, take the Map combinator. It takes the value of some Future and maps the return value through some closure.

use futures::prelude::*;

let fut = async { 1 };
let mapped_fut = fut.map(|x| x * 2);

In the pre-2018-edition days, these combinators were the only way to manipulate the value of a Future. They were completely necessary for using these asynchronous values back in the day. Nowadays, we can just wrap the original futures using async/await and treat it more like a normal expression.

let fut = async { 1 };
let mapped_fut = async { fut.await * 2 };

Therefore, in this brave new post-async world, many of these combinators became unnecessary. futures-lite, one of the core components of smol, was created to address this new reality.

futures-lite explicitly ignores all combinators that can be implemented using async/await or features that have already been moved into the standard library. This leaves behind a small, clean subset of the API that builds fast.

A semi-frequently asked question I see goes along the lines of: “I was porting my application over to smol, but I noticed that it doesn’t have for_each_concurrent or buffered. Is this API excluded purposely?”

This is a reasonable question. The short answer is “yes”, and the medium answer is “these concurrent functions have small but frustrating problems that futures-lite avoids by not implementing them”. This article is the longer answer.

Concurrency Conundrum

I would argue that the concurrent Stream stream combinators mentioned above are a code smell. Well-formed production-ready code should not use for_each_concurrent or buffered. If I knew how compilers worked, I would suggest a Clippy lint that would flag these functions as a warning.

The for_each_concurrent function is called like this:

let my_stream = /* ... */;
my_stream.for_each_concurrent(
    None, // Limiter parameter, not important for now.
    |x| async move { do_something(x).await } // Closure to run for each element.
).await;

To massively oversimplify, for_each_concurrent does this:

use futures::prelude::*;
use futures::stream::FuturesUnordered;

let my_stream = /* ... */;

// Create a list to store all of the futures.
let mut futures_list = FuturesUnordered::new();

// Get all of the values from our stream.
while let Some(x) = my_stream.next().await {
    // Push the future to the list.
    futures_list.push(async move { x + 5 });
}

// Wait for all of the futures to complete. FuturesUnordered polls each future
// in order and returns their results.
futures_list.for_each(|()| {}).await;

FuturesUnordered is sort of an unholy combination of an executor and a Stream. It collects a bunch of Futures into a list, then maintains a queue of which Futures are ready to be polled. Once a Future returns that it is Ready, it returns that Future’s value.

This means that, every time you call for_each_concurrent, it creates an entire new executor, runs the Stream to completion on it, then discards that executor entirely. buffered is implemented in a similar way.

It’s bad for a couple of reasons. Most async runtimes already provide an executor. tokio provides one out of the box, and smol encourages you to create and optimize your own. By using for_each_concurrent or buffered, you are essentially ignoring your previous executor in order to spawn a short-lived temporary executor.

In addition to the resources that are wasted on the new executor, it’s often less efficient than async runtime executors. tokio and smol support options to let you offload tasks on other threads or handle contention more efficiently. In contrast, FuturesUnordered is a relatively naive executor that is completely unaware of its surrounding runtime.

Not to mention, FuturesUnordered comes with a few footguns that make it impractical for common use cases.

Make your own, better combinator

In smol, you can emulate these use cases somewhat easily. First, you need to create an Executor and execute your features in its context.

let ex = smol::Executor::new();

ex.run(async {
    // The code written below will take place in this context.
}).await;

You can emulate for_each_concurrent by turning every future in the stream into a task, then awaiting all of those tasks. Here’s how it looks if you don’t have a task limit:

use smol::prelude::*;

let my_stream = smol::stream::iter(vec![1u32, 2, 3]);

// Spawn the set of futures on an executor.
let handles: Vec<smol::Task<()>> = my_stream
    .map(|item| {
        // Spawn the future on the executor.
        ex.spawn(do_something(item))
    }).collect().await;

// Wait for all of the handles to complete.
for handle in handles {
    handle.await;
}

Here, we spawn every future involved onto the executor. We then take all of the task handles and collect them. Since we are running inside of the executor, all of these tasks will be run in parallel. Finally, we just .await on each handle to wait for all of the tasks to complete.

The best part is that the allocation, the Vec<smol::Task<()>>, isn’t even necessary. It could be one-time allocation that is just extended to hold the tasks.

Generally, it doesn’t matter how many tasks are spawned onto the global executor. In contrast to the mini-executor that for_each_concurrent spawns, the global executor is designed to handle large numbers of tasks. However, if you still want to impose a resource limit, you can use a Semaphore.

use smol::prelude::*;
use std::sync::Arc;

let my_stream = smol::stream::iter(vec![1u32, 2, 3]);
let my_limit = 5;

// Semaphore for limiting the number of tasks.
let semaphore = Arc::new(smol::lock::Semaphore::new(my_limit));

// Spawn the set of futures on an executor.
let handles: Vec<smol::Task<()>> = my_stream
    // We use using `then` now, since we need to `.await` for the 
    // semaphore to have a permit available.
    .then(|item| {
        // Borrow the semaphore and executor.
        let (ex, semaphore) = (&ex, &semaphore);
        async move {
            // Wait for a semaphore permit.
            let permit = semaphore.acquire_arc().await;

            // Spawn the future on the executor.
            ex.spawn(async move {
                // Run our future.
                do_something(item).await;

                // Drop the permit to let another task run.
                drop(permit);
            })
        }
    })
    .collect()
    .await;

// Wait for the remaining handles to complete.
for handle in handles {
    handle.await;
}

This works by having each task borrow a Semaphore permit. The semaphore is sort of like a Mutex that can be locked by multiple parties at once, up to a certain limit. Once it runs out of permits, this code doesn’t spawn a task until one of the tasks completes. The permit is moved into the task and is dropped once the computation completes.

The then-stream above is also practically the equivalent of buffered. It yields tasks that can then be awaited to get their results.

// snip: semaphore and stream setup

// This time, we do something else that maps the value to another.
async fn do_something_else(x: u32) -> u32 { x + 1 }

// Get a `Stream` of tasks that can be `await`ed to get their value.
let buffered_stream = my_stream
    .then(|item| {
        // Borrow the semaphore and executor.
        let (ex, semaphore) = (&ex, &semaphore);
        async move {
            // Wait for a semaphore permit.
            let permit = semaphore.acquire_arc().await;

            // Spawn the future on the executor.
            ex.spawn(async move {
                // Run our future.
                // NEW: This now returns a value.
                let result = do_something_else(item).await;

                // Drop the permit to let another task run.
                drop(permit);

                // NEW: Return the result of the inner future.
                result
            })
        }
    });

// NEW: Because the stream uses an unpinned async closure,
// we have to pin it.
smol::pin!(buffered_stream);

// NEW: We can now wait on this stream for its values.
while let Some(task) = buffered_stream.next().await {
    println!("Value: {}", task.await);
}

This is all practically more efficient than buffered while giving you much greater control over how it runs.

Parting Shots

Unfortunately there’s not much documentation for the fact that for_each_concurrent and buffered spawn their own separate executors. Raising awareness of proper async code is powerful, in my opinion, as it unlocks a whole new world of computations for intermediately experienced Rustaceans. I hope this makes it clearer what should be happening in well-formed code.

Twitter, Facebook