1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use fastcrypto::groups::bls12381;
11use fastcrypto_tbls::{tbls::ThresholdBls, types::ThresholdBls12381MinSig};
12use mysten_common::debug_fatal_no_invariant;
13use mysten_common::fatal;
14use mysten_metrics::spawn_monitored_task;
15use serde::{Deserialize, Serialize};
16use sui_macros::fail_point_async;
17use sui_types::committee::EpochId;
18use sui_types::crypto::RandomnessRound;
19use sui_types::crypto::RandomnessSignature;
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::executable_transaction::VerifiedExecutableTransaction;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::transaction::{TransactionKey, VerifiedTransaction};
24use tokio::sync::{broadcast, mpsc, watch};
25use tokio::task::JoinHandle;
26use tracing::{debug, info, instrument, warn};
27
28use crate::authority::AuthorityState;
29use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
30
31#[derive(Debug, Serialize, Deserialize, Clone)]
33pub struct RandomnessSignatureMessage {
34 pub epoch: EpochId,
35 pub round: RandomnessRound,
36 pub signature_bytes: Vec<u8>,
37}
38
39const SIGNATURES_BROADCAST_CAPACITY: usize = 1000;
40const CONSENSUS_SIGNATURES_CHANNEL_SIZE: usize = 1000;
41const EXECUTED_ROUNDS_CACHE_CAPACITY: usize = 1000;
42
43pub struct RandomnessRoundReceiverHandle {
44 consensus_signatures_tx: mysten_metrics::monitored_mpsc::Sender<bytes::Bytes>,
45 vss_pk_tx: watch::Sender<Option<bls12381::G2Element>>,
46 signatures_broadcast: broadcast::Sender<bytes::Bytes>,
47 #[cfg(test)]
48 executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
49 _task_handle: JoinHandle<()>,
50}
51
52impl RandomnessRoundReceiverHandle {
53 pub fn set_public_key(&self, vss_pk: bls12381::G2Element) {
56 self.vss_pk_tx.send(Some(vss_pk)).ok();
57 }
58
59 pub fn clear_public_key(&self) {
63 self.vss_pk_tx.send(None).ok();
64 }
65}
66
67impl RandomnessRoundReceiverHandle {
68 pub fn new_for_testing() -> Arc<Self> {
69 let (consensus_signatures_tx, _) =
70 mysten_metrics::monitored_mpsc::channel("test_auxiliary", 1);
71 let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
72 let (signatures_broadcast, _) = broadcast::channel(1);
73 Arc::new(Self {
74 consensus_signatures_tx,
75 vss_pk_tx,
76 signatures_broadcast,
77 #[cfg(test)]
78 executed_consensus_rounds: Arc::new(Mutex::new(BTreeSet::new())),
79 _task_handle: tokio::spawn(async move {
80 let _vss_pk_rx = vss_pk_rx;
81 futures::future::pending::<()>().await;
82 }),
83 })
84 }
85
86 #[cfg(test)]
87 pub(crate) fn public_key_for_testing(&self) -> Option<bls12381::G2Element> {
88 let rx = self.vss_pk_tx.subscribe();
89 *rx.borrow()
90 }
91
92 #[cfg(test)]
93 pub(crate) fn mark_round_executed(&self, epoch: EpochId, round: RandomnessRound) {
94 self.executed_consensus_rounds.lock().insert((epoch, round));
95 }
96}
97
98impl consensus_core::RandomnessSignatureHandler for RandomnessRoundReceiverHandle {
99 fn handle_randomness_signature(&self, data: bytes::Bytes) {
100 if let Err(e) = self.consensus_signatures_tx.try_send(data) {
101 warn!(
102 "RandomnessRoundReceiverHandle: failed to forward randomness round signature: {e}"
103 );
104 }
105 }
106
107 fn subscribe_randomness_signatures(&self) -> broadcast::Receiver<bytes::Bytes> {
108 self.signatures_broadcast.subscribe()
109 }
110}
111
112pub struct RandomnessRoundReceiver {
113 authority_state: Arc<AuthorityState>,
114 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
115 consensus_signatures_rx: mysten_metrics::monitored_mpsc::Receiver<bytes::Bytes>,
116 vss_pk_rx: watch::Receiver<Option<bls12381::G2Element>>,
117 signatures_broadcast: broadcast::Sender<bytes::Bytes>,
121 executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
125}
126
127impl RandomnessRoundReceiver {
128 pub fn spawn(
130 authority_state: Arc<AuthorityState>,
131 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
132 ) -> Arc<RandomnessRoundReceiverHandle> {
133 let (signatures_broadcast, _) =
134 broadcast::channel::<bytes::Bytes>(SIGNATURES_BROADCAST_CAPACITY);
135
136 let (consensus_signatures_tx, consensus_signatures_rx) =
137 mysten_metrics::monitored_mpsc::channel(
138 "consensus_randomness_round_signatures",
139 CONSENSUS_SIGNATURES_CHANNEL_SIZE,
140 );
141 let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
142 let executed_consensus_rounds = Arc::new(Mutex::new(BTreeSet::new()));
143
144 let rrr = RandomnessRoundReceiver {
145 authority_state,
146 randomness_rx,
147 consensus_signatures_rx,
148 vss_pk_rx,
149 signatures_broadcast: signatures_broadcast.clone(),
150 executed_consensus_rounds: executed_consensus_rounds.clone(),
151 };
152 let task_handle = spawn_monitored_task!(rrr.run());
153
154 Arc::new(RandomnessRoundReceiverHandle {
155 consensus_signatures_tx,
156 vss_pk_tx,
157 signatures_broadcast,
158 #[cfg(test)]
159 executed_consensus_rounds,
160 _task_handle: task_handle,
161 })
162 }
163
164 async fn run(mut self) {
165 info!("RandomnessRoundReceiver event loop started");
166
167 loop {
168 let vss_pk = *self.vss_pk_rx.borrow_and_update();
169 tokio::select! {
170 maybe_recv = self.randomness_rx.recv() => {
171 if let Some((epoch, round, bytes)) = maybe_recv {
172 self.handle_new_randomness(epoch, round, bytes).await;
173 } else {
174 break;
175 }
176 },
177 Some(data) = self.consensus_signatures_rx.recv(), if vss_pk.is_some() => {
178 self.handle_consensus_randomness_signature(vss_pk.unwrap(), &data).await;
179 },
180 _ = self.vss_pk_rx.changed() => {},
182 }
183 }
184
185 info!("RandomnessRoundReceiver event loop ended");
186 }
187
188 async fn handle_consensus_randomness_signature(
191 &mut self,
192 vss_pk: bls12381::G2Element,
193 data: &bytes::Bytes,
194 ) {
195 let msg: RandomnessSignatureMessage = match bcs::from_bytes(data) {
196 Ok(msg) => msg,
197 Err(e) => {
198 warn!("RandomnessRoundReceiver: failed to deserialize round signature: {e}");
199 return;
200 }
201 };
202
203 let sig: RandomnessSignature = match bcs::from_bytes(&msg.signature_bytes) {
204 Ok(sig) => sig,
205 Err(e) => {
206 warn!(
207 "RandomnessRoundReceiver: failed to deserialize signature \
208 for epoch {} round {}: {e}",
209 msg.epoch, msg.round
210 );
211 return;
212 }
213 };
214
215 if let Err(e) =
216 ThresholdBls12381MinSig::verify(&vss_pk, &msg.round.signature_message(), &sig)
217 {
218 warn!(
219 "RandomnessRoundReceiver: invalid auxiliary signature \
220 for epoch {} round {}: {e}",
221 msg.epoch, msg.round
222 );
223 return;
224 }
225
226 if self
227 .executed_consensus_rounds
228 .lock()
229 .contains(&(msg.epoch, msg.round))
230 {
231 info!(
232 "RandomnessRoundReceiver: dropping already-executed auxiliary signature for epoch {} round {}",
233 msg.epoch, msg.round
234 );
235 return;
236 }
237
238 debug!(
239 "RandomnessRoundReceiver: verified auxiliary signature for epoch {} round {}",
240 msg.epoch, msg.round
241 );
242
243 self.handle_new_randomness(msg.epoch, msg.round, msg.signature_bytes)
244 .await;
245 }
246
247 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
248 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
249 fail_point_async!("randomness-delay");
250
251 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
252 if epoch_store.epoch() != epoch {
253 warn!(
254 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
255 epoch_store.epoch()
256 );
257 return;
258 }
259 let msg = RandomnessSignatureMessage {
262 epoch,
263 round,
264 signature_bytes: bytes.clone(),
265 };
266 match bcs::to_bytes(&msg) {
267 Ok(encoded) => {
268 let _ = self.signatures_broadcast.send(bytes::Bytes::from(encoded));
269 }
270 Err(err) => {
271 warn!("serialisation of RandomnessSignatureMessage failed: {err}");
272 }
273 }
274
275 let key = TransactionKey::RandomnessRound(epoch, round);
276 let transaction = VerifiedTransaction::new_randomness_state_update(
277 epoch,
278 round,
279 bytes,
280 epoch_store
281 .epoch_start_config()
282 .randomness_obj_initial_shared_version()
283 .expect("randomness state obj must exist"),
284 );
285 debug!(
286 "created randomness state update transaction with digest: {:?}",
287 transaction.digest()
288 );
289 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
290 let digest = *transaction.digest();
291
292 self.authority_state
297 .get_cache_commit()
298 .persist_transaction(&transaction);
299
300 if epoch_store.insert_tx_key(key, digest).is_err() {
302 warn!("epoch ended while handling new randomness");
303 }
304
305 let authority_state = self.authority_state.clone();
306 let executed_consensus_rounds = self.executed_consensus_rounds.clone();
307 spawn_monitored_task!(async move {
308 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
315 let result = tokio::time::timeout(
316 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
317 authority_state
318 .get_transaction_cache_reader()
319 .notify_read_executed_effects(
320 "RandomnessRoundReceiver::notify_read_executed_effects_first",
321 &[digest],
322 ),
323 )
324 .await;
325 let mut effects = match result {
326 Ok(result) => result,
327 Err(_) => {
328 debug_fatal_no_invariant!(
330 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
331 );
332 authority_state
334 .get_transaction_cache_reader()
335 .notify_read_executed_effects(
336 "RandomnessRoundReceiver::notify_read_executed_effects_second",
337 &[digest],
338 )
339 .await
340 }
341 };
342
343 let effects = effects.pop().expect("should return effects");
344 if *effects.status() != ExecutionStatus::Success {
345 fatal!(
346 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
347 );
348 }
349 debug!(
350 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
351 );
352
353 let cache = &mut *executed_consensus_rounds.lock();
354 cache.insert((epoch, round));
355 while cache.len() > EXECUTED_ROUNDS_CACHE_CAPACITY {
356 cache.pop_first();
357 }
358 });
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use crate::authority::test_authority_builder::TestAuthorityBuilder;
366 use consensus_core::RandomnessSignatureHandler;
367 use fastcrypto::groups::{GroupElement, HashToGroupElement, bls12381};
368
369 fn generate_keypair() -> (bls12381::Scalar, bls12381::G2Element) {
370 let sk = bls12381::Scalar::generator();
371 let pk = bls12381::G2Element::generator() * sk;
372 (sk, pk)
373 }
374
375 fn sign(sk: &bls12381::Scalar, msg: &[u8]) -> RandomnessSignature {
376 bls12381::G1Element::hash_to_group_element(msg) * sk
377 }
378
379 fn build_message(
380 epoch: EpochId,
381 round: RandomnessRound,
382 sig: &RandomnessSignature,
383 ) -> bytes::Bytes {
384 let msg = RandomnessSignatureMessage {
385 epoch,
386 round,
387 signature_bytes: bcs::to_bytes(sig).unwrap(),
388 };
389 bytes::Bytes::from(bcs::to_bytes(&msg).unwrap())
390 }
391
392 #[tokio::test]
393 async fn test_invalid_signature_not_broadcast() {
394 let (_sk, pk) = generate_keypair();
395 let state = TestAuthorityBuilder::new().build().await;
396 let epoch = state.epoch_store_for_testing().epoch();
397
398 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
399 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
400 let mut sig_rx = handle.subscribe_randomness_signatures();
401
402 handle.set_public_key(pk);
403 tokio::task::yield_now().await;
404
405 let wrong_sk = bls12381::Scalar::generator() + bls12381::Scalar::generator();
406 let round = RandomnessRound(1);
407 let bad_sig = sign(&wrong_sk, &round.signature_message());
408 handle.handle_randomness_signature(build_message(epoch, round, &bad_sig));
409
410 let result =
411 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
412 assert!(result.is_err(), "should have timed out (no broadcast)");
413 }
414
415 #[tokio::test]
416 async fn test_malformed_data_not_broadcast() {
417 let (_sk, pk) = generate_keypair();
418 let state = TestAuthorityBuilder::new().build().await;
419
420 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
421 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
422 let mut sig_rx = handle.subscribe_randomness_signatures();
423
424 handle.set_public_key(pk);
425 tokio::task::yield_now().await;
426
427 handle.handle_randomness_signature(bytes::Bytes::from(vec![0u8; 32]));
428
429 let result =
430 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
431 assert!(result.is_err(), "should have timed out (no broadcast)");
432 }
433
434 #[tokio::test]
435 async fn test_signatures_buffer_until_dkg_completes() {
436 let (sk, pk) = generate_keypair();
437 let state = TestAuthorityBuilder::new().build().await;
438 let epoch = state.epoch_store_for_testing().epoch();
439
440 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
441 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
442 let mut sig_rx = handle.subscribe_randomness_signatures();
443
444 let round = RandomnessRound(1);
446 let sig = sign(&sk, &round.signature_message());
447 handle.handle_randomness_signature(build_message(epoch, round, &sig));
448
449 let result =
450 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
451 assert!(result.is_err(), "should have timed out (DKG not complete)");
452
453 handle.set_public_key(pk);
455
456 let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
457 .await
458 .expect("timed out")
459 .expect("channel closed");
460
461 let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
462 assert_eq!(decoded.epoch, epoch);
463 assert_eq!(decoded.round, round);
464 }
465
466 #[tokio::test]
467 async fn test_executed_consensus_signature_not_rebroadcast() {
468 let (sk, pk) = generate_keypair();
469 let state = TestAuthorityBuilder::new().build().await;
470 let epoch = state.epoch_store_for_testing().epoch();
471
472 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
473 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
474 let mut sig_rx = handle.subscribe_randomness_signatures();
475
476 handle.set_public_key(pk);
477 tokio::task::yield_now().await;
478
479 let round = RandomnessRound(1);
480 let sig = sign(&sk, &round.signature_message());
481 let msg = build_message(epoch, round, &sig);
482
483 handle.mark_round_executed(epoch, round);
485
486 handle.handle_randomness_signature(msg);
487
488 let result =
489 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
490 assert!(
491 result.is_err(),
492 "already-executed round should not be rebroadcast"
493 );
494 }
495
496 #[tokio::test]
497 async fn test_clear_public_key_pauses_processing() {
498 let (sk, pk) = generate_keypair();
499 let state = TestAuthorityBuilder::new().build().await;
500 let epoch = state.epoch_store_for_testing().epoch();
501
502 let (_randomness_tx, randomness_rx) = mpsc::channel(1);
503 let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
504 let mut sig_rx = handle.subscribe_randomness_signatures();
505
506 handle.set_public_key(pk);
508 tokio::task::yield_now().await;
509
510 let round1 = RandomnessRound(1);
511 let sig1 = sign(&sk, &round1.signature_message());
512 handle.handle_randomness_signature(build_message(epoch, round1, &sig1));
513
514 tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
515 .await
516 .expect("timed out")
517 .expect("channel closed");
518
519 handle.clear_public_key();
521 for _ in 0..5 {
523 tokio::task::yield_now().await;
524 }
525
526 let round2 = RandomnessRound(2);
527 let sig2 = sign(&sk, &round2.signature_message());
528 handle.handle_randomness_signature(build_message(epoch, round2, &sig2));
529
530 let result =
531 tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
532 assert!(result.is_err(), "should have timed out (key cleared)");
533
534 handle.set_public_key(pk);
536
537 let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
538 .await
539 .expect("timed out")
540 .expect("channel closed");
541
542 let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
543 assert_eq!(decoded.round, round2);
544 }
545}