1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::CancelOnDropHandler;
use anyhow::Result;
use async_trait::async_trait;
use crypto::NetworkPublicKey;
use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{Batch, BatchDigest};
pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
type Response: Clone + Send + Sync;
fn unreliable_send(
&mut self,
peer: NetworkPublicKey,
message: &Request,
) -> Result<JoinHandle<Result<anemo::Response<Self::Response>>>>;
fn unreliable_broadcast(
&mut self,
peers: Vec<NetworkPublicKey>,
message: &Request,
) -> Vec<Result<JoinHandle<Result<anemo::Response<Self::Response>>>>> {
let mut handlers = Vec::new();
for peer in peers {
let handle = { self.unreliable_send(peer, message) };
handlers.push(handle);
}
handlers
}
}
pub trait Lucky {
fn rng(&mut self) -> &mut SmallRng;
}
pub trait LuckyNetwork<Request> {
type Response: Clone + Send + Sync;
fn lucky_broadcast(
&mut self,
peers: Vec<NetworkPublicKey>,
message: &Request,
num_nodes: usize,
) -> Vec<Result<JoinHandle<anyhow::Result<anemo::Response<Self::Response>>>>>;
}
impl<T, M> LuckyNetwork<M> for T
where
M: Clone + Send + Sync,
T: UnreliableNetwork<M> + Send,
T: Lucky,
{
type Response = T::Response;
fn lucky_broadcast(
&mut self,
mut peers: Vec<NetworkPublicKey>,
message: &M,
nodes: usize,
) -> Vec<Result<JoinHandle<Result<anemo::Response<Self::Response>>>>> {
peers.shuffle(self.rng());
peers.truncate(nodes);
self.unreliable_broadcast(peers, message)
}
}
#[async_trait]
pub trait ReliableNetwork<Request: Clone + Send + Sync> {
type Response: Clone + Send + Sync;
async fn send(
&mut self,
peer: NetworkPublicKey,
message: &Request,
) -> CancelOnDropHandler<anyhow::Result<anemo::Response<Self::Response>>>;
async fn broadcast(
&mut self,
peers: Vec<NetworkPublicKey>,
message: &Request,
) -> Vec<CancelOnDropHandler<anyhow::Result<anemo::Response<Self::Response>>>> {
let mut handlers = Vec::new();
for peer in peers {
let handle = self.send(peer, message).await;
handlers.push(handle);
}
handlers
}
}
#[async_trait]
pub trait PrimaryToWorkerRpc {
async fn request_batch(
&self,
peer: &NetworkPublicKey,
batch: BatchDigest,
) -> Result<Option<Batch>>;
}