Fijar
Cuando esperas un futuro, todas las variables locales (que normalmente se almacenarían en un marco de pila) se almacenan en el futuro del bloque asíncrono. Si tu futuro tiene punteros en datos de la pila, podrían invalidarse. Es una acción insegura.
Por lo tanto, debes asegurarte de que las direcciones a las que apunta el futuro no cambien. Por ese motivo debemos fijar (pin
) los futuros. Si se usa el mismo futuro varias veces en un select!
, se suelen producir problemas en los valores fijados.
use tokio::sync::{mpsc, oneshot}; use tokio::task::spawn; use tokio::time::{sleep, Duration}; // A work item. In this case, just sleep for the given time and respond // with a message on the `respond_on` channel. #[derive(Debug)] struct Work { input: u32, respond_on: oneshot::Sender<u32>, } // A worker which listens for work on a queue and performs it. async fn worker(mut work_queue: mpsc::Receiver<Work>) { let mut iterations = 0; loop { tokio::select! { Some(work) = work_queue.recv() => { sleep(Duration::from_millis(10)).await; // Pretend to work. work.respond_on .send(work.input * 1000) .expect("failed to send response"); iterations += 1; } // TODO: report number of iterations every 100ms } } } // A requester which requests work and waits for it to complete. async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 { let (tx, rx) = oneshot::channel(); work_queue .send(Work { input, respond_on: tx, }) .await .expect("failed to send on work queue"); rx.await.expect("failed waiting for response") } #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(10); spawn(worker(rx)); for i in 0..100 { let resp = do_work(&tx, i).await; println!("work result for iteration {i}: {resp}"); } }
-
Puede que reconozcas esto como un ejemplo del patrón actor. Los actores suelen llamar a
select!
en un bucle. -
Esta sección es un resumen de algunas de las lecciones anteriores, así que tómate tu tiempo .
-
Si añade un
_ = sleep(Duration::from_millis(100)) => { println!(..) }
aselect!
, nunca se ejecutará. ¿Por qué? -
En su lugar, añade un
timeout_fut
que contenga ese futuro fuera deloop
:#![allow(unused)] fn main() { let mut timeout_fut = sleep(Duration::from_millis(100)); loop { select! { .., _ = timeout_fut => { println!(..); }, } } }
-
Continuará sin funcionar. Sigue los errores del compilador y añade
&mut
atimeout_fut
enselect!
para ir despejando el problema. A continuación, usaBox::pin
:#![allow(unused)] fn main() { let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); loop { select! { .., _ = &mut timeout_fut => { println!(..); }, } } }
-
Se puede compilar, pero una vez que vence el tiempo de espera, aparece
Poll::Ready
en cada iteración (un futuro fusionado podría resultar útil). Actualiza para restablecertimeout_fut
cada vez que expire.
-
-
Box se asigna en el montículo. En algunos casos,
std::pin::pin!
(solo si se ha estabilizado recientemente, con código antiguo que suele utilizartokio::pin!
) también es una opción, pero difícil de utilizar en un futuro que se reasigna. -
Otra alternativa es no utilizar
pin
, sino generar otra tarea que se enviará a un canal deoneshot
cada 100 ms.