Fijar

Cuando esperas un futuro, todas las variables locales (que normalmente se almacenarían en un marco de pila) se almacenan en el futuro del bloque asíncrono. Si tu futuro tiene punteros en datos de la pila, podrían invalidarse. Es una acción insegura.

Por lo tanto, debes asegurarte de que las direcciones a las que apunta el futuro no cambien. Por ese motivo debemos fijar (pin) los futuros. Si se usa el mismo futuro varias veces en un select!, se suelen producir problemas en los valores fijados.

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// A work item. In this case, just sleep for the given time and respond
// with a message on the `respond_on` channel.
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// A worker which listens for work on a queue and performs it.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Pretend to work.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: report number of iterations every 100ms
        }
    }
}

// A requester which requests work and waits for it to complete.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work {
            input,
            respond_on: tx,
        })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}
  • Puede que reconozcas esto como un ejemplo del patrón actor. Los actores suelen llamar a select! en un bucle.

  • Esta sección es un resumen de algunas de las lecciones anteriores, así que tómate tu tiempo .

    • Si añade un _ = sleep(Duration::from_millis(100)) => { println!(..) } a select!, nunca se ejecutará. ¿Por qué?

    • En su lugar, añade un timeout_fut que contenga ese futuro fuera de loop:

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • Continuará sin funcionar. Sigue los errores del compilador y añade &mut a timeout_fut en select! para ir despejando el problema. A continuación, usa Box::pin:

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • Se puede compilar, pero una vez que vence el tiempo de espera, aparece Poll::Ready en cada iteración (un futuro fusionado podría resultar útil). Actualiza para restablecer timeout_fut cada vez que expire.

  • Box se asigna en el montículo. En algunos casos, std::pin::pin! (solo si se ha estabilizado recientemente, con código antiguo que suele utilizar tokio::pin!) también es una opción, pero difícil de utilizar en un futuro que se reasigna.

  • Otra alternativa es no utilizar pin, sino generar otra tarea que se enviará a un canal de oneshot cada 100 ms.