mysten_network/
anemo_ext.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo::Request;
7use anemo::Response;
8use anemo::codegen::BoxError;
9use anemo::codegen::BoxFuture;
10use anemo::codegen::Service;
11use anemo::types::PeerEvent;
12use bytes::Bytes;
13use futures::FutureExt;
14use futures::future::OptionFuture;
15use std::time::Instant;
16
17pub trait NetworkExt {
18    fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
19}
20
21impl NetworkExt for Network {
22    fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer {
23        WaitingPeer::new(self.clone(), peer_id)
24    }
25}
26
27#[derive(Clone)]
28pub struct WaitingPeer {
29    peer_id: PeerId,
30    network: Network,
31}
32
33impl WaitingPeer {
34    pub fn new(network: Network, peer_id: PeerId) -> Self {
35        Self { peer_id, network }
36    }
37
38    async fn do_rpc(self, mut request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
39        use tokio::sync::broadcast::error::RecvError;
40
41        let start = Instant::now();
42        let (mut subscriber, _) = self.network.subscribe()?;
43
44        // If we're connected with the peer immediately make the request
45        if let Some(mut peer) = self.network.peer(self.peer_id) {
46            return peer.rpc(request).await.map_err(Into::into);
47        }
48
49        // If we're not connected we'll need to check to see if the Peer is a KnownPeer
50        let timeout = request.timeout();
51        let sleep: OptionFuture<_> = timeout.map(tokio::time::sleep).into();
52        tokio::pin!(sleep);
53        loop {
54            if self.network.known_peers().get(&self.peer_id).is_none() {
55                return Err(format!("peer {} is not a known peer", self.peer_id).into());
56            }
57
58            tokio::select! {
59                recv = subscriber.recv() => match recv {
60                    Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
61                        // We're now connected with the peer, lets try to make a network request
62                        if let Some(mut peer) = self.network.peer(self.peer_id) {
63                            if let Some(duration) = timeout {
64                                // Reduce timeout to account for time already spent waiting
65                                // for the peer.
66                                request.set_timeout(duration.saturating_sub(Instant::now().duration_since(start)));
67                            }
68                            return peer.rpc(request).await.map_err(Into::into);
69                        }
70                    }
71                    Err(RecvError::Closed) => return Err("network is closed".into()),
72                    Err(RecvError::Lagged(_)) => {
73                        subscriber = subscriber.resubscribe();
74
75                        // We lagged behind so we may have missed the connection event
76                        if let Some(mut peer) = self.network.peer(self.peer_id) {
77                            return peer.rpc(request).await.map_err(Into::into);
78                        }
79                    }
80                    // Just do another iteration
81                    _ => {}
82                },
83                Some(_) = &mut sleep => {
84                    return Err(format!("timed out waiting for peer {}", self.peer_id).into());
85                },
86            }
87        }
88    }
89}
90
91impl Service<Request<Bytes>> for WaitingPeer {
92    type Response = Response<Bytes>;
93    type Error = BoxError;
94    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
95
96    #[inline]
97    fn poll_ready(
98        &mut self,
99        _: &mut std::task::Context<'_>,
100    ) -> std::task::Poll<Result<(), Self::Error>> {
101        std::task::Poll::Ready(Ok(()))
102    }
103
104    #[inline]
105    fn call(&mut self, request: Request<Bytes>) -> Self::Future {
106        let peer = self.clone();
107        peer.do_rpc(request).boxed()
108    }
109}