1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use fastcrypto::groups::bls12381;
11use fastcrypto_tbls::{tbls::ThresholdBls, types::ThresholdBls12381MinSig};
12use mysten_common::debug_fatal_no_invariant;
13use mysten_common::fatal;
14use mysten_metrics::spawn_monitored_task;
15use serde::{Deserialize, Serialize};
16use sui_macros::fail_point_async;
17use sui_types::committee::EpochId;
18use sui_types::crypto::RandomnessRound;
19use sui_types::crypto::RandomnessSignature;
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::executable_transaction::VerifiedExecutableTransaction;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::transaction::{TransactionKey, VerifiedTransaction};
24use tokio::sync::{broadcast, mpsc, watch};
25use tokio::task::JoinHandle;
26use tracing::{debug, info, instrument, warn};
27
28use crate::authority::AuthorityState;
29
30#[derive(Debug, Serialize, Deserialize, Clone)]
32pub struct RandomnessSignatureMessage {
33 pub epoch: EpochId,
34 pub round: RandomnessRound,
35 pub signature_bytes: Vec<u8>,
36}
37
38const SIGNATURES_BROADCAST_CAPACITY: usize = 1000;
39const CONSENSUS_SIGNATURES_CHANNEL_SIZE: usize = 1000;
40const EXECUTED_ROUNDS_CACHE_CAPACITY: usize = 1000;
41
42pub struct RandomnessRoundReceiverHandle {
43 consensus_signatures_tx: mysten_metrics::monitored_mpsc::Sender<bytes::Bytes>,
44 vss_pk_tx: watch::Sender<Option<bls12381::G2Element>>,
45 signatures_broadcast: broadcast::Sender<bytes::Bytes>,
46 #[cfg(test)]
47 executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
48 _task_handle: JoinHandle<()>,
49}
50
51impl RandomnessRoundReceiverHandle {
52 pub fn set_public_key(&self, vss_pk: bls12381::G2Element) {
55 self.vss_pk_tx.send(Some(vss_pk)).ok();
56 }
57
58 pub fn clear_public_key(&self) {
62 self.vss_pk_tx.send(None).ok();
63 }
64}
65
66impl RandomnessRoundReceiverHandle {
67 pub fn new_for_testing() -> Arc<Self> {
68 let (consensus_signatures_tx, _) =
69 mysten_metrics::monitored_mpsc::channel("test_auxiliary", 1);
70 let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
71 let (signatures_broadcast, _) = broadcast::channel(1);
72 Arc::new(Self {
73 consensus_signatures_tx,
74 vss_pk_tx,
75 signatures_broadcast,
76 #[cfg(test)]
77 executed_consensus_rounds: Arc::new(Mutex::new(BTreeSet::new())),
78 _task_handle: tokio::spawn(async move {
79 let _vss_pk_rx = vss_pk_rx;
80 futures::future::pending::<()>().await;
81 }),
82 })
83 }
84
85 #[cfg(test)]
86 pub(crate) fn public_key_for_testing(&self) -> Option<bls12381::G2Element> {
87 let rx = self.vss_pk_tx.subscribe();
88 *rx.borrow()
89 }
90
91 #[cfg(test)]
92 pub(crate) fn mark_round_executed(&self, epoch: EpochId, round: RandomnessRound) {
93 self.executed_consensus_rounds.lock().insert((epoch, round));
94 }
95}
96
97impl consensus_core::RandomnessSignatureHandler for RandomnessRoundReceiverHandle {
98 fn handle_randomness_signature(&self, data: bytes::Bytes) {
99 if let Err(e) = self.consensus_signatures_tx.try_send(data) {
100 warn!(
101 "RandomnessRoundReceiverHandle: failed to forward randomness round signature: {e}"
102 );
103 }
104 }
105
106 fn subscribe_randomness_signatures(&self) -> broadcast::Receiver<bytes::Bytes> {
107 self.signatures_broadcast.subscribe()
108 }
109}
110
111pub struct RandomnessRoundReceiver {
112 authority_state: Arc<AuthorityState>,
113 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
114 consensus_signatures_rx: mysten_metrics::monitored_mpsc::Receiver<bytes::Bytes>,
115 vss_pk_rx: watch::Receiver<Option<bls12381::G2Element>>,
116 signatures_broadcast: broadcast::Sender<bytes::Bytes>,
120 executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
124}
125
126impl RandomnessRoundReceiver {
127 pub fn spawn(
129 authority_state: Arc<AuthorityState>,
130 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
131 ) -> Arc<RandomnessRoundReceiverHandle> {
132 let (signatures_broadcast, _) =
133 broadcast::channel::<bytes::Bytes>(SIGNATURES_BROADCAST_CAPACITY);
134
135 let (consensus_signatures_tx, consensus_signatures_rx) =
136 mysten_metrics::monitored_mpsc::channel(
137 "consensus_randomness_round_signatures",
138 CONSENSUS_SIGNATURES_CHANNEL_SIZE,
139 );
140 let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
141 let executed_consensus_rounds = Arc::new(Mutex::new(BTreeSet::new()));
142
143 let rrr = RandomnessRoundReceiver {
144 authority_state,
145 randomness_rx,
146 consensus_signatures_rx,
147 vss_pk_rx,
148 signatures_broadcast: signatures_broadcast.clone(),
149 executed_consensus_rounds: executed_consensus_rounds.clone(),
150 };
151 let task_handle = spawn_monitored_task!(rrr.run());
152
153 Arc::new(RandomnessRoundReceiverHandle {
154 consensus_signatures_tx,
155 vss_pk_tx,
156 signatures_broadcast,
157 #[cfg(test)]
158 executed_consensus_rounds,
159 _task_handle: task_handle,
160 })
161 }
162
163 async fn run(mut self) {
164 info!("RandomnessRoundReceiver event loop started");
165
166 loop {
167 let vss_pk = *self.vss_pk_rx.borrow_and_update();
168 tokio::select! {
169 maybe_recv = self.randomness_rx.recv() => {
170 if let Some((epoch, round, bytes)) = maybe_recv {
171 self.handle_new_randomness(epoch, round, bytes).await;
172 } else {
173 break;
174 }
175 },
176 Some(data) = self.consensus_signatures_rx.recv(), if vss_pk.is_some() => {
177 self.handle_consensus_randomness_signature(vss_pk.unwrap(), &data).await;
178 },
179 _ = self.vss_pk_rx.changed() => {},
181 }
182 }
183
184 info!("RandomnessRoundReceiver event loop ended");
185 }
186
187 async fn handle_consensus_randomness_signature(
190 &mut self,
191 vss_pk: bls12381::G2Element,
192 data: &bytes::Bytes,
193 ) {
194 let msg: RandomnessSignatureMessage = match bcs::from_bytes(data) {
195 Ok(msg) => msg,
196 Err(e) => {
197 warn!("RandomnessRoundReceiver: failed to deserialize round signature: {e}");
198 return;
199 }
200 };
201
202 let sig: RandomnessSignature = match bcs::from_bytes(&msg.signature_bytes) {
203 Ok(sig) => sig,
204 Err(e) => {
205 warn!(
206 "RandomnessRoundReceiver: failed to deserialize signature \
207 for epoch {} round {}: {e}",
208 msg.epoch, msg.round
209 );
210 return;
211 }
212 };
213
214 if let Err(e) =
215 ThresholdBls12381MinSig::verify(&vss_pk, &msg.round.signature_message(), &sig)
216 {
217 warn!(
218 "RandomnessRoundReceiver: invalid auxiliary signature \
219 for epoch {} round {}: {e}",
220 msg.epoch, msg.round
221 );
222 return;
223 }
224
225 if self
226 .executed_consensus_rounds
227 .lock()
228 .contains(&(msg.epoch, msg.round))
229 {
230 info!(
231 "RandomnessRoundReceiver: dropping already-executed auxiliary signature for epoch {} round {}",
232 msg.epoch, msg.round
233 );
234 return;
235 }
236
237 debug!(
238 "RandomnessRoundReceiver: verified auxiliary signature for epoch {} round {}",
239 msg.epoch, msg.round
240 );
241
242 self.handle_new_randomness(msg.epoch, msg.round, msg.signature_bytes)
243 .await;
244 }
245
246 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
247 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
248 fail_point_async!("randomness-delay");
249
250 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
251 if epoch_store.epoch() != epoch {
252 warn!(
253 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
254 epoch_store.epoch()
255 );
256 return;
257 }
258 let msg = RandomnessSignatureMessage {
261 epoch,
262 round,
263 signature_bytes: bytes.clone(),
264 };
265 match bcs::to_bytes(&msg) {
266 Ok(encoded) => {
267 let _ = self.signatures_broadcast.send(bytes::Bytes::from(encoded));
268 }
269 Err(err) => {
270 warn!("serialisation of RandomnessSignatureMessage failed: {err}");
271 }
272 }
273
274 let key = TransactionKey::RandomnessRound(epoch, round);
275 let transaction = VerifiedTransaction::new_randomness_state_update(
276 epoch,
277 round,
278 bytes,
279 epoch_store
280 .epoch_start_config()
281 .randomness_obj_initial_shared_version()
282 .expect("randomness state obj must exist"),
283 );
284 debug!(
285 "created randomness state update transaction with digest: {:?}",
286 transaction.digest()
287 );
288 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
289 let digest = *transaction.digest();
290
291 self.authority_state
296 .get_cache_commit()
297 .persist_transaction(&transaction);
298
299 if epoch_store.insert_tx_key(key, digest).is_err() {
301 warn!("epoch ended while handling new randomness");
302 }
303
304 let authority_state = self.authority_state.clone();
305 let executed_consensus_rounds = self.executed_consensus_rounds.clone();
306 spawn_monitored_task!(async move {
307 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
314 let result = tokio::time::timeout(
315 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
316 authority_state
317 .get_transaction_cache_reader()
318 .notify_read_executed_effects(
319 "RandomnessRoundReceiver::notify_read_executed_effects_first",
320 &[digest],
321 ),
322 )
323 .await;
324 let mut effects = match result {
325 Ok(result) => result,
326 Err(_) => {
327 debug_fatal_no_invariant!(
329 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
330 );
331 authority_state
333 .get_transaction_cache_reader()
334 .notify_read_executed_effects(
335 "RandomnessRoundReceiver::notify_read_executed_effects_second",
336 &[digest],
337 )
338 .await
339 }
340 };
341
342 let effects = effects.pop().expect("should return effects");
343 if *effects.status() != ExecutionStatus::Success {
344 fatal!(
345 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
346 );
347 }
348 debug!(
349 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
350 );
351
352 let cache = &mut *executed_consensus_rounds.lock();
353 cache.insert((epoch, round));
354 while cache.len() > EXECUTED_ROUNDS_CACHE_CAPACITY {
355 cache.pop_first();
356 }
357 });
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::authority::test_authority_builder::TestAuthorityBuilder;
365 use consensus_core::RandomnessSignatureHandler;
366 use fastcrypto::groups::{GroupElement, HashToGroupElement, bls12381};
367
368 fn generate_keypair() -> (bls12381::Scalar, bls12381::G2Element) {
369 let sk = bls12381::Scalar::generator();
370 let pk = bls12381::G2Element::generator() * sk;
371 (sk, pk)
372 }
373
374 fn sign(sk: &bls12381::Scalar, msg: &[u8]) -> RandomnessSignature {
375 bls12381::G1Element::hash_to_group_element(msg) * sk
376 }
377
378 fn build_message(
379 epoch: EpochId,
380 round: RandomnessRound,
381 sig: &RandomnessSignature,
382 ) -> bytes::Bytes {
383 let msg = RandomnessSignatureMessage {
384 epoch,
385 round,
386 signature_bytes: bcs::to_bytes(sig).unwrap(),
387 };
388 bytes::Bytes::from(bcs::to_bytes(&msg).unwrap())
389 }
390
391 #[tokio::test]
392 async fn test_invalid_signature_not_broadcast() {
393 let (_sk, pk) = generate_keypair();
394 let state = TestAuthorityBuilder::new().build().await;
395 let epoch = state.epoch_store_for_testing().epoch();
396
397 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
398 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
399 let mut sig_rx = handle.subscribe_randomness_signatures();
400
401 handle.set_public_key(pk);
402 tokio::task::yield_now().await;
403
404 let wrong_sk = bls12381::Scalar::generator() + bls12381::Scalar::generator();
405 let round = RandomnessRound(1);
406 let bad_sig = sign(&wrong_sk, &round.signature_message());
407 handle.handle_randomness_signature(build_message(epoch, round, &bad_sig));
408
409 let result =
410 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
411 assert!(result.is_err(), "should have timed out (no broadcast)");
412 }
413
414 #[tokio::test]
415 async fn test_malformed_data_not_broadcast() {
416 let (_sk, pk) = generate_keypair();
417 let state = TestAuthorityBuilder::new().build().await;
418
419 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
420 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
421 let mut sig_rx = handle.subscribe_randomness_signatures();
422
423 handle.set_public_key(pk);
424 tokio::task::yield_now().await;
425
426 handle.handle_randomness_signature(bytes::Bytes::from(vec![0u8; 32]));
427
428 let result =
429 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
430 assert!(result.is_err(), "should have timed out (no broadcast)");
431 }
432
433 #[tokio::test]
434 async fn test_signatures_buffer_until_dkg_completes() {
435 let (sk, pk) = generate_keypair();
436 let state = TestAuthorityBuilder::new().build().await;
437 let epoch = state.epoch_store_for_testing().epoch();
438
439 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
440 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
441 let mut sig_rx = handle.subscribe_randomness_signatures();
442
443 let round = RandomnessRound(1);
445 let sig = sign(&sk, &round.signature_message());
446 handle.handle_randomness_signature(build_message(epoch, round, &sig));
447
448 let result =
449 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
450 assert!(result.is_err(), "should have timed out (DKG not complete)");
451
452 handle.set_public_key(pk);
454
455 let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
456 .await
457 .expect("timed out")
458 .expect("channel closed");
459
460 let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
461 assert_eq!(decoded.epoch, epoch);
462 assert_eq!(decoded.round, round);
463 }
464
465 #[tokio::test]
466 async fn test_executed_consensus_signature_not_rebroadcast() {
467 let (sk, pk) = generate_keypair();
468 let state = TestAuthorityBuilder::new().build().await;
469 let epoch = state.epoch_store_for_testing().epoch();
470
471 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
472 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
473 let mut sig_rx = handle.subscribe_randomness_signatures();
474
475 handle.set_public_key(pk);
476 tokio::task::yield_now().await;
477
478 let round = RandomnessRound(1);
479 let sig = sign(&sk, &round.signature_message());
480 let msg = build_message(epoch, round, &sig);
481
482 handle.mark_round_executed(epoch, round);
484
485 handle.handle_randomness_signature(msg);
486
487 let result =
488 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
489 assert!(
490 result.is_err(),
491 "already-executed round should not be rebroadcast"
492 );
493 }
494
495 #[tokio::test]
496 async fn test_clear_public_key_pauses_processing() {
497 let (sk, pk) = generate_keypair();
498 let state = TestAuthorityBuilder::new().build().await;
499 let epoch = state.epoch_store_for_testing().epoch();
500
501 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
502 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
503 let mut sig_rx = handle.subscribe_randomness_signatures();
504
505 handle.set_public_key(pk);
507 tokio::task::yield_now().await;
508
509 let round1 = RandomnessRound(1);
510 let sig1 = sign(&sk, &round1.signature_message());
511 handle.handle_randomness_signature(build_message(epoch, round1, &sig1));
512
513 tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
514 .await
515 .expect("timed out")
516 .expect("channel closed");
517
518 handle.clear_public_key();
520 for _ in 0..5 {
522 tokio::task::yield_now().await;
523 }
524
525 let round2 = RandomnessRound(2);
526 let sig2 = sign(&sk, &round2.signature_message());
527 handle.handle_randomness_signature(build_message(epoch, round2, &sig2));
528
529 let result =
530 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
531 assert!(result.is_err(), "should have timed out (key cleared)");
532
533 handle.set_public_key(pk);
535
536 let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
537 .await
538 .expect("timed out")
539 .expect("channel closed");
540
541 let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
542 assert_eq!(decoded.round, round2);
543 }
544}