1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crypto::NetworkPublicKey;
use futures::{stream::FuturesUnordered, StreamExt};
use network::{P2pNetwork, ReliableNetwork};
use tokio::{sync::watch, task::JoinHandle};
use types::{metered_channel::Receiver, ReconfigureNotification, WorkerPrimaryMessage};
pub const MAX_PENDING_DIGESTS: usize = 10_000;
pub struct PrimaryConnector {
primary_name: NetworkPublicKey,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_digest: Receiver<WorkerPrimaryMessage>,
primary_client: P2pNetwork,
}
impl PrimaryConnector {
#[must_use]
pub fn spawn(
primary_name: NetworkPublicKey,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_digest: Receiver<WorkerPrimaryMessage>,
primary_client: P2pNetwork,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
primary_name,
rx_reconfigure,
rx_digest,
primary_client,
}
.run()
.await;
})
}
async fn run(&mut self) {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
Some(digest) = self.rx_digest.recv() => {
if futures.len() >= MAX_PENDING_DIGESTS {
tracing::warn!("Primary unreachable: dropping {digest:?}");
continue;
}
let handle = self.primary_client.send(self.primary_name.to_owned(), &digest).await;
futures.push(handle);
},
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
if self.rx_reconfigure.borrow().clone() == ReconfigureNotification::Shutdown {
return
}
}
Some(_result) = futures.next() => ()
}
}
}
}