mysten_network/
anemo_ext.rs1use anemo::Network;
5use anemo::PeerId;
6use anemo::Request;
7use anemo::Response;
8use anemo::codegen::BoxError;
9use anemo::codegen::BoxFuture;
10use anemo::codegen::Service;
11use anemo::types::PeerEvent;
12use bytes::Bytes;
13use futures::FutureExt;
14use futures::future::OptionFuture;
15use std::time::Instant;
16
17pub trait NetworkExt {
18 fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
19}
20
21impl NetworkExt for Network {
22 fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer {
23 WaitingPeer::new(self.clone(), peer_id)
24 }
25}
26
27#[derive(Clone)]
28pub struct WaitingPeer {
29 peer_id: PeerId,
30 network: Network,
31}
32
33impl WaitingPeer {
34 pub fn new(network: Network, peer_id: PeerId) -> Self {
35 Self { peer_id, network }
36 }
37
38 async fn do_rpc(self, mut request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
39 use tokio::sync::broadcast::error::RecvError;
40
41 let start = Instant::now();
42 let (mut subscriber, _) = self.network.subscribe()?;
43
44 if let Some(mut peer) = self.network.peer(self.peer_id) {
46 return peer.rpc(request).await.map_err(Into::into);
47 }
48
49 let timeout = request.timeout();
51 let sleep: OptionFuture<_> = timeout.map(tokio::time::sleep).into();
52 tokio::pin!(sleep);
53 loop {
54 if self.network.known_peers().get(&self.peer_id).is_none() {
55 return Err(format!("peer {} is not a known peer", self.peer_id).into());
56 }
57
58 tokio::select! {
59 recv = subscriber.recv() => match recv {
60 Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
61 if let Some(mut peer) = self.network.peer(self.peer_id) {
63 if let Some(duration) = timeout {
64 request.set_timeout(duration.saturating_sub(Instant::now().duration_since(start)));
67 }
68 return peer.rpc(request).await.map_err(Into::into);
69 }
70 }
71 Err(RecvError::Closed) => return Err("network is closed".into()),
72 Err(RecvError::Lagged(_)) => {
73 subscriber = subscriber.resubscribe();
74
75 if let Some(mut peer) = self.network.peer(self.peer_id) {
77 return peer.rpc(request).await.map_err(Into::into);
78 }
79 }
80 _ => {}
82 },
83 Some(_) = &mut sleep => {
84 return Err(format!("timed out waiting for peer {}", self.peer_id).into());
85 },
86 }
87 }
88 }
89}
90
91impl Service<Request<Bytes>> for WaitingPeer {
92 type Response = Response<Bytes>;
93 type Error = BoxError;
94 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
95
96 #[inline]
97 fn poll_ready(
98 &mut self,
99 _: &mut std::task::Context<'_>,
100 ) -> std::task::Poll<Result<(), Self::Error>> {
101 std::task::Poll::Ready(Ok(()))
102 }
103
104 #[inline]
105 fn call(&mut self, request: Request<Bytes>) -> Self::Future {
106 let peer = self.clone();
107 peer.do_rpc(request).boxed()
108 }
109}