1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7};
8
9use super::{
10 Handle, RandomnessEventLoop, RandomnessMessage, RandomnessServer, auth::AllowedPeersUpdatable,
11 metrics::Metrics, server::Server,
12};
13use anemo::codegen::InboundRequestLayer;
14use anemo_tower::{auth::RequireAuthorizationLayer, inflight_limit};
15use sui_config::p2p::RandomnessConfig;
16use sui_types::{base_types::AuthorityName, committee::EpochId, crypto::RandomnessRound};
17use tokio::sync::mpsc;
18
19pub struct Builder {
21 name: AuthorityName,
22 config: Option<RandomnessConfig>,
23 metrics: Option<Metrics>,
24 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
25}
26
27impl Builder {
28 pub fn new(
29 name: AuthorityName,
30 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
31 ) -> Self {
32 Self {
33 name,
34 config: None,
35 metrics: None,
36 randomness_tx,
37 }
38 }
39
40 pub fn config(mut self, config: RandomnessConfig) -> Self {
41 self.config = Some(config);
42 self
43 }
44
45 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
46 self.metrics = Some(Metrics::enabled(registry));
47 self
48 }
49
50 pub fn build(self) -> (UnstartedRandomness, anemo::Router) {
51 let Builder {
52 name,
53 config,
54 metrics,
55 randomness_tx,
56 } = self;
57 let config = config.unwrap_or_default();
58 let metrics = metrics.unwrap_or_else(Metrics::disabled);
59 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
60 let mailbox_sender = sender.downgrade();
61 let handle = Handle {
62 sender: sender.clone(),
63 };
64 let server = Server {
65 sender: sender.downgrade(),
66 };
67 let randomness_server = RandomnessServer::new(server).add_layer_for_send_signatures(
68 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
69 config.send_partial_signatures_inflight_limit(),
70 inflight_limit::WaitMode::ReturnError,
71 )),
72 );
73
74 let allowed_peers = AllowedPeersUpdatable::new(Arc::new(HashSet::new()));
75 let router = anemo::Router::new()
76 .route_layer(RequireAuthorizationLayer::new(allowed_peers.clone()))
77 .add_rpc_service(randomness_server);
78
79 (
80 UnstartedRandomness {
81 name,
82 config,
83 handle,
84 mailbox,
85 mailbox_sender,
86 allowed_peers,
87 metrics,
88 randomness_tx,
89 },
90 router,
91 )
92 }
93}
94
95pub struct UnstartedRandomness {
97 pub(super) name: AuthorityName,
98 pub(super) config: RandomnessConfig,
99 pub(super) handle: Handle,
100 pub(super) mailbox: mpsc::Receiver<RandomnessMessage>,
101 pub(super) mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
102 pub(super) allowed_peers: AllowedPeersUpdatable,
103 pub(super) metrics: Metrics,
104 pub(super) randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
105}
106
107impl UnstartedRandomness {
108 pub(super) fn build(self, network: anemo::Network) -> (RandomnessEventLoop, Handle) {
109 let Self {
110 name,
111 config,
112 handle,
113 mailbox,
114 mailbox_sender,
115 allowed_peers,
116 metrics,
117 randomness_tx,
118 } = self;
119 (
120 RandomnessEventLoop {
121 name,
122 config,
123 mailbox,
124 mailbox_sender,
125 network,
126 allowed_peers,
127 allowed_peers_set: HashSet::new(),
128 metrics,
129 randomness_tx,
130
131 epoch: 0,
132 authority_info: Arc::new(HashMap::new()),
133 peer_share_ids: None,
134 blocked_share_id_count: 0,
135 dkg_output: None,
136 aggregation_threshold: 0,
137 highest_requested_round: BTreeMap::new(),
138 send_tasks: BTreeMap::new(),
139 round_request_time: BTreeMap::new(),
140 future_epoch_partial_sigs: BTreeMap::new(),
141 received_partial_sigs: BTreeMap::new(),
142 completed_sigs: BTreeMap::new(),
143 highest_completed_round: BTreeMap::new(),
144 },
145 handle,
146 )
147 }
148
149 pub fn start(self, network: anemo::Network) -> Handle {
150 let (event_loop, handle) = self.build(network);
151 tokio::spawn(event_loop.start());
152
153 handle
154 }
155}