« Back to Index

Rust: Tokio Spawn and Retry

View original Gist on GitHub

Tags: #rust #async #retry

main.rs

use std::any::type_name;
use tokio_retry::strategy::{jitter, FixedInterval};
use tokio_retry::Retry;

#[derive(Debug)]
struct ImageId {
    id: Option<String>,
}

async fn action() -> Result<ImageId, ()> {
    println!("action(): doing stuff async");
    Ok(ImageId {
        id: Some(String::from("some id")),
    })
}

async fn something() -> Result<u64, ()> {
    Ok(999)
}

async fn something_with_an_arg(n: u8) -> Result<u8, ()> {
    Ok(n)
}

async fn something_with_an_arg_that_errors() -> Result<u8, ()> {
    Err(())
}

fn type_of<T>(_: &T) {
    println!("type_of(): {}", type_name::<T>())
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    let handle = tokio::spawn(async move { action().await });

    type_of(&handle);

    // Calling await returns a Result containing whatever was returned from the `async` closure
    // executed from inside the `spawn` method. In this case another Result which was returned by
    // the `action()` async function. That nested Result contains the ImageId struct.
    let task = handle.await;
    println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))
    println!("task unwrapped: {:?}", task.unwrap().unwrap().id.unwrap()); // "some id"

    let retry_strategy = FixedInterval::from_millis(1000)
        .map(jitter) // add jitter to delays
        .take(10); // limit to 10 retries

    let result = Retry::spawn(retry_strategy.clone(), action).await?;
    println!("retry result.id from action(): {:#?}", result.id);

    // Demonstrating how to pass arguments to an asynchronous function by way of defining an async
    // block first and calling the async function from within it (as the Retry::spawn method
    // doesn't allow us to pass arguments to the specified 'action').
    //
    // NOTE: You can't use the ? operator within an async block as a closure can't bubble up the
    // error type. Although, that said I didn't try specifying a specific return type using -> so
    // maybe it would work. For the purposes of this example it doesn't matter.
    let result = Retry::spawn(retry_strategy.clone(), || async {
        println!("doing stuff async in a closure");
        something().await
    })
    .await?;
    println!("retry result from closure: {}", result);

    // We have to clone this to avoid 'move' semantic issues that we otherwise would encounter if
    // we just tried to call retry_strategy.clone() as the argument inside the tokio::spawn's async
    // block. This happens because we move retry_strategy into the first async block, and then when
    // we do another spawn later we'd then try and move it when it has already been moved. I didn't
    // realise that it would be moved because I was calling .clone() and so expected the cloned
    // instance to be moved, but it kinda makes sense that the variable itself is moved.
    //
    // Maybe we could avoid this with a reference, but I'm trying to keep the example as close to a
    // real project I'm trying to include Retry::spawn into.
    let clone_strategy = retry_strategy.clone();

    // Demonstrating that the async block can return any type of data, and also that we're able to
    // do a retry operation within a tokio spawned task.
    let handle = tokio::spawn(async move {
        (
            "some random key",
            Retry::spawn(clone_strategy, action).await,
        )
    });
    let task = handle.await;
    println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))

    // Demonstrating the same as above, but additionally the use of a closure to allow us to pass
    // arguments to the Retry::spawn 'action'.
    //
    // NOTE: If the argument value 123 passed to something_with_an_arg was a complex type (e.g. a
    // type that doesn't implement Copy), then this would cause an error related to the closure
    // implementing FnOnce and not the required FnMut (that Retry::spawn expects). See here
    // https://stackoverflow.com/a/30232500/14849316 explanation of FnOnce, FnMut and Fn. But in
    // essence we might be moving a type from the closure's environment into the
    // something_with_an_arg and that would mean it's FnOnce. The solution would be to change the
    // signature for something_with_an_arg to accept a reference so we pass a reference as the
    // argument type and thus the closure becomes FnMut because it doesn't move any variables.
    let clone_strategy = retry_strategy.clone();
    let handle = tokio::spawn(async move {
        (
            "some random key",
            Retry::spawn(clone_strategy, || async {
                something_with_an_arg(123).await
            })
            .await,
        )
    });
    let task = handle.await;
    println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))

    // NOTE: retry_strategy has now 'moved' into (i.e. been consumed by) the async block so it
    // can't be used again after this point in the code (see above `clone_strategy` variables).
    let handle = tokio::spawn(async move {
        (
            "some random key",
            Retry::spawn(retry_strategy.clone(), || async {
                println!("trying a function that errors");
                something_with_an_arg_that_errors().await
            })
            .await,
        )
    });
    let task = handle.await;
    println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))

    Ok(())
}