1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crypto::NetworkPublicKey;
use futures::{stream::FuturesUnordered, StreamExt};
use network::{P2pNetwork, ReliableNetwork};
use tokio::{sync::watch, task::JoinHandle};
use types::{metered_channel::Receiver, ReconfigureNotification, WorkerPrimaryMessage};
pub const MAX_PENDING_DIGESTS: usize = 10_000;
pub struct PrimaryConnector {
    primary_name: NetworkPublicKey,
    rx_reconfigure: watch::Receiver<ReconfigureNotification>,
    rx_digest: Receiver<WorkerPrimaryMessage>,
    primary_client: P2pNetwork,
}
impl PrimaryConnector {
    #[must_use]
    pub fn spawn(
        primary_name: NetworkPublicKey,
        rx_reconfigure: watch::Receiver<ReconfigureNotification>,
        rx_digest: Receiver<WorkerPrimaryMessage>,
        primary_client: P2pNetwork,
    ) -> JoinHandle<()> {
        tokio::spawn(async move {
            Self {
                primary_name,
                rx_reconfigure,
                rx_digest,
                primary_client,
            }
            .run()
            .await;
        })
    }
    async fn run(&mut self) {
        let mut futures = FuturesUnordered::new();
        loop {
            tokio::select! {
                Some(digest) = self.rx_digest.recv() => {
                    if futures.len() >= MAX_PENDING_DIGESTS {
                        tracing::warn!("Primary unreachable: dropping {digest:?}");
                        continue;
                    }
                    let handle = self.primary_client.send(self.primary_name.to_owned(), &digest).await;
                    futures.push(handle);
                },
                result = self.rx_reconfigure.changed() => {
                    result.expect("Committee channel dropped");
                    if self.rx_reconfigure.borrow().clone() == ReconfigureNotification::Shutdown {
                        return
                    }
                }
                Some(_result) = futures.next() => ()
            }
        }
    }
}