1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::CancelOnDropHandler;
use anyhow::Result;
use async_trait::async_trait;
use crypto::NetworkPublicKey;
use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{Batch, BatchDigest};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
    type Response: Clone + Send + Sync;

    fn unreliable_send(
        &mut self,
        peer: NetworkPublicKey,
        message: &Request,
    ) -> Result<JoinHandle<Result<anemo::Response<Self::Response>>>>;

    /// Broadcasts a message to all `peers` passed as an argument.
    /// The attempts to send individual messages are best effort and will not be retried.
    fn unreliable_broadcast(
        &mut self,
        peers: Vec<NetworkPublicKey>,
        message: &Request,
    ) -> Vec<Result<JoinHandle<Result<anemo::Response<Self::Response>>>>> {
        let mut handlers = Vec::new();
        for peer in peers {
            let handle = { self.unreliable_send(peer, message) };
            handlers.push(handle);
        }
        handlers
    }
}

pub trait Lucky {
    fn rng(&mut self) -> &mut SmallRng;
}

pub trait LuckyNetwork<Request> {
    type Response: Clone + Send + Sync;
    /// Pick a few addresses at random (specified by `nodes`) and try (best-effort) to send the
    /// message only to them. This is useful to pick nodes with whom to sync.
    fn lucky_broadcast(
        &mut self,
        peers: Vec<NetworkPublicKey>,
        message: &Request,
        num_nodes: usize,
    ) -> Vec<Result<JoinHandle<anyhow::Result<anemo::Response<Self::Response>>>>>;
}

impl<T, M> LuckyNetwork<M> for T
where
    M: Clone + Send + Sync,
    T: UnreliableNetwork<M> + Send,
    T: Lucky,
{
    type Response = T::Response;
    fn lucky_broadcast(
        &mut self,
        mut peers: Vec<NetworkPublicKey>,
        message: &M,
        nodes: usize,
    ) -> Vec<Result<JoinHandle<Result<anemo::Response<Self::Response>>>>> {
        peers.shuffle(self.rng());
        peers.truncate(nodes);
        self.unreliable_broadcast(peers, message)
    }
}

#[async_trait]
pub trait ReliableNetwork<Request: Clone + Send + Sync> {
    type Response: Clone + Send + Sync;

    async fn send(
        &mut self,
        peer: NetworkPublicKey,
        message: &Request,
    ) -> CancelOnDropHandler<anyhow::Result<anemo::Response<Self::Response>>>;

    async fn broadcast(
        &mut self,
        peers: Vec<NetworkPublicKey>,
        message: &Request,
    ) -> Vec<CancelOnDropHandler<anyhow::Result<anemo::Response<Self::Response>>>> {
        let mut handlers = Vec::new();
        for peer in peers {
            let handle = self.send(peer, message).await;
            handlers.push(handle);
        }
        handlers
    }
}

#[async_trait]
pub trait PrimaryToWorkerRpc {
    async fn request_batch(
        &self,
        peer: &NetworkPublicKey,
        batch: BatchDigest,
    ) -> Result<Option<Batch>>;
}