What is the best approach to encapsulate blocking I/O in future-rs?

Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

Yes, this is the recommended approach for asynchronous execution. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!

Futures crate

The ThreadPool type was created for this1.

In this case, you spawn work to run in the pool. The pool itself performs the work to check to see if the work is completed yet and returns a type that fulfills the Future trait.

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}

You can see that the total time is only 3 seconds:

% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 — There’s some discussion that the current implementation may not be the best for blocking operations, but it suffices for now.

Tokio

Here, we use task::spawn_blocking

use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

See also CPU-bound tasks and blocking code in the Tokio documentation.

Additional points

Note that this is not an efficient way of sleeping, it’s just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio::time::sleep. See Why does Future::select choose the future with a longer sleep period first? for more details

neither solution is optimal and don’t get the full advantage of the green-threading model

That’s correct – because you don’t have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.

second don’t pass through the executor provided by reactor framework

I’m not sure what you mean here. There’s an executor implicitly created by block_on or tokio::main. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user’s executor polls it.

Leave a Comment