sui_core/
randomness_round_receiver.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use fastcrypto::groups::bls12381;
11use fastcrypto_tbls::{tbls::ThresholdBls, types::ThresholdBls12381MinSig};
12use mysten_common::debug_fatal_no_invariant;
13use mysten_common::fatal;
14use mysten_metrics::spawn_monitored_task;
15use serde::{Deserialize, Serialize};
16use sui_macros::fail_point_async;
17use sui_types::committee::EpochId;
18use sui_types::crypto::RandomnessRound;
19use sui_types::crypto::RandomnessSignature;
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::executable_transaction::VerifiedExecutableTransaction;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::transaction::{TransactionKey, VerifiedTransaction};
24use tokio::sync::{broadcast, mpsc, watch};
25use tokio::task::JoinHandle;
26use tracing::{debug, info, instrument, warn};
27
28use crate::authority::AuthorityState;
29
30/// A randomness signature for a specific epoch and round, broadcast to observer peers.
31#[derive(Debug, Serialize, Deserialize, Clone)]
32pub struct RandomnessSignatureMessage {
33    pub epoch: EpochId,
34    pub round: RandomnessRound,
35    pub signature_bytes: Vec<u8>,
36}
37
38const SIGNATURES_BROADCAST_CAPACITY: usize = 1000;
39const CONSENSUS_SIGNATURES_CHANNEL_SIZE: usize = 1000;
40const EXECUTED_ROUNDS_CACHE_CAPACITY: usize = 1000;
41
42pub struct RandomnessRoundReceiverHandle {
43    consensus_signatures_tx: mysten_metrics::monitored_mpsc::Sender<bytes::Bytes>,
44    vss_pk_tx: watch::Sender<Option<bls12381::G2Element>>,
45    signatures_broadcast: broadcast::Sender<bytes::Bytes>,
46    #[cfg(test)]
47    executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
48    _task_handle: JoinHandle<()>,
49}
50
51impl RandomnessRoundReceiverHandle {
52    /// Sets the VSS public key for signature verification. Called by
53    /// `RandomnessManager::advance_dkg` when DKG completes.
54    pub fn set_public_key(&self, vss_pk: bls12381::G2Element) {
55        self.vss_pk_tx.send(Some(vss_pk)).ok();
56    }
57
58    /// Clears the VSS public key. Called at epoch start before the new DKG
59    /// completes. While the key is `None`, incoming randomness round signatues from consensus
60    /// accumulate in the bounded channel giving the opportunity to buffer them until DKG completes.
61    pub fn clear_public_key(&self) {
62        self.vss_pk_tx.send(None).ok();
63    }
64}
65
66impl RandomnessRoundReceiverHandle {
67    pub fn new_for_testing() -> Arc<Self> {
68        let (consensus_signatures_tx, _) =
69            mysten_metrics::monitored_mpsc::channel("test_auxiliary", 1);
70        let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
71        let (signatures_broadcast, _) = broadcast::channel(1);
72        Arc::new(Self {
73            consensus_signatures_tx,
74            vss_pk_tx,
75            signatures_broadcast,
76            #[cfg(test)]
77            executed_consensus_rounds: Arc::new(Mutex::new(BTreeSet::new())),
78            _task_handle: tokio::spawn(async move {
79                let _vss_pk_rx = vss_pk_rx;
80                futures::future::pending::<()>().await;
81            }),
82        })
83    }
84
85    #[cfg(test)]
86    pub(crate) fn public_key_for_testing(&self) -> Option<bls12381::G2Element> {
87        let rx = self.vss_pk_tx.subscribe();
88        *rx.borrow()
89    }
90
91    #[cfg(test)]
92    pub(crate) fn mark_round_executed(&self, epoch: EpochId, round: RandomnessRound) {
93        self.executed_consensus_rounds.lock().insert((epoch, round));
94    }
95}
96
97impl consensus_core::RandomnessSignatureHandler for RandomnessRoundReceiverHandle {
98    fn handle_randomness_signature(&self, data: bytes::Bytes) {
99        if let Err(e) = self.consensus_signatures_tx.try_send(data) {
100            warn!(
101                "RandomnessRoundReceiverHandle: failed to forward randomness round signature: {e}"
102            );
103        }
104    }
105
106    fn subscribe_randomness_signatures(&self) -> broadcast::Receiver<bytes::Bytes> {
107        self.signatures_broadcast.subscribe()
108    }
109}
110
111pub struct RandomnessRoundReceiver {
112    authority_state: Arc<AuthorityState>,
113    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
114    consensus_signatures_rx: mysten_metrics::monitored_mpsc::Receiver<bytes::Bytes>,
115    vss_pk_rx: watch::Receiver<Option<bls12381::G2Element>>,
116    /// Best-effort broadcast of verified signatures. Primarily used for propagating the
117    /// signatures via consensus to non-committee peers (observers) syncing their state via consensus
118    /// from a read-only capacity.
119    signatures_broadcast: broadcast::Sender<bytes::Bytes>,
120    /// Tracks rounds whose randomness transaction executed successfully via the consensus relay
121    /// path. Prevents rebroadcast loops in observer-to-observer topologies (although in practice is not anticipated having such)
122    /// while still allowing retries for rounds that failed execution.
123    executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
124}
125
126impl RandomnessRoundReceiver {
127    /// Spawns the receiver loop and returns the shared handle.
128    pub fn spawn(
129        authority_state: Arc<AuthorityState>,
130        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
131    ) -> Arc<RandomnessRoundReceiverHandle> {
132        let (signatures_broadcast, _) =
133            broadcast::channel::<bytes::Bytes>(SIGNATURES_BROADCAST_CAPACITY);
134
135        let (consensus_signatures_tx, consensus_signatures_rx) =
136            mysten_metrics::monitored_mpsc::channel(
137                "consensus_randomness_round_signatures",
138                CONSENSUS_SIGNATURES_CHANNEL_SIZE,
139            );
140        let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
141        let executed_consensus_rounds = Arc::new(Mutex::new(BTreeSet::new()));
142
143        let rrr = RandomnessRoundReceiver {
144            authority_state,
145            randomness_rx,
146            consensus_signatures_rx,
147            vss_pk_rx,
148            signatures_broadcast: signatures_broadcast.clone(),
149            executed_consensus_rounds: executed_consensus_rounds.clone(),
150        };
151        let task_handle = spawn_monitored_task!(rrr.run());
152
153        Arc::new(RandomnessRoundReceiverHandle {
154            consensus_signatures_tx,
155            vss_pk_tx,
156            signatures_broadcast,
157            #[cfg(test)]
158            executed_consensus_rounds,
159            _task_handle: task_handle,
160        })
161    }
162
163    async fn run(mut self) {
164        info!("RandomnessRoundReceiver event loop started");
165
166        loop {
167            let vss_pk = *self.vss_pk_rx.borrow_and_update();
168            tokio::select! {
169                maybe_recv = self.randomness_rx.recv() => {
170                    if let Some((epoch, round, bytes)) = maybe_recv {
171                        self.handle_new_randomness(epoch, round, bytes).await;
172                    } else {
173                        break;
174                    }
175                },
176                Some(data) = self.consensus_signatures_rx.recv(), if vss_pk.is_some() => {
177                    self.handle_consensus_randomness_signature(vss_pk.unwrap(), &data).await;
178                },
179                // Wake up when the key changes so we re-evaluate the select guard.
180                _ = self.vss_pk_rx.changed() => {},
181            }
182        }
183
184        info!("RandomnessRoundReceiver event loop ended");
185    }
186
187    /// Signatures that are propagated via consensus state sync. This is meant to be used from nodes that are not part of the committee
188    /// and are using consensus sync additionally to sync state.
189    async fn handle_consensus_randomness_signature(
190        &mut self,
191        vss_pk: bls12381::G2Element,
192        data: &bytes::Bytes,
193    ) {
194        let msg: RandomnessSignatureMessage = match bcs::from_bytes(data) {
195            Ok(msg) => msg,
196            Err(e) => {
197                warn!("RandomnessRoundReceiver: failed to deserialize round signature: {e}");
198                return;
199            }
200        };
201
202        let sig: RandomnessSignature = match bcs::from_bytes(&msg.signature_bytes) {
203            Ok(sig) => sig,
204            Err(e) => {
205                warn!(
206                    "RandomnessRoundReceiver: failed to deserialize signature \
207                     for epoch {} round {}: {e}",
208                    msg.epoch, msg.round
209                );
210                return;
211            }
212        };
213
214        if let Err(e) =
215            ThresholdBls12381MinSig::verify(&vss_pk, &msg.round.signature_message(), &sig)
216        {
217            warn!(
218                "RandomnessRoundReceiver: invalid auxiliary signature \
219                 for epoch {} round {}: {e}",
220                msg.epoch, msg.round
221            );
222            return;
223        }
224
225        if self
226            .executed_consensus_rounds
227            .lock()
228            .contains(&(msg.epoch, msg.round))
229        {
230            info!(
231                "RandomnessRoundReceiver: dropping already-executed auxiliary signature for epoch {} round {}",
232                msg.epoch, msg.round
233            );
234            return;
235        }
236
237        debug!(
238            "RandomnessRoundReceiver: verified auxiliary signature for epoch {} round {}",
239            msg.epoch, msg.round
240        );
241
242        self.handle_new_randomness(msg.epoch, msg.round, msg.signature_bytes)
243            .await;
244    }
245
246    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
247    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
248        fail_point_async!("randomness-delay");
249
250        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
251        if epoch_store.epoch() != epoch {
252            warn!(
253                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
254                epoch_store.epoch()
255            );
256            return;
257        }
258        // Broadcast signature to connected observer peers so they can create the same
259        // transaction. This enables observer-to-observer relay chains.
260        let msg = RandomnessSignatureMessage {
261            epoch,
262            round,
263            signature_bytes: bytes.clone(),
264        };
265        match bcs::to_bytes(&msg) {
266            Ok(encoded) => {
267                let _ = self.signatures_broadcast.send(bytes::Bytes::from(encoded));
268            }
269            Err(err) => {
270                warn!("serialisation of RandomnessSignatureMessage failed: {err}");
271            }
272        }
273
274        let key = TransactionKey::RandomnessRound(epoch, round);
275        let transaction = VerifiedTransaction::new_randomness_state_update(
276            epoch,
277            round,
278            bytes,
279            epoch_store
280                .epoch_start_config()
281                .randomness_obj_initial_shared_version()
282                .expect("randomness state obj must exist"),
283        );
284        debug!(
285            "created randomness state update transaction with digest: {:?}",
286            transaction.digest()
287        );
288        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
289        let digest = *transaction.digest();
290
291        // Randomness state updates contain the full bls signature for the random round,
292        // which cannot necessarily be reconstructed again later. Therefore we must immediately
293        // persist this transaction. If we crash before its outputs are committed, this
294        // ensures we will be able to re-execute it.
295        self.authority_state
296            .get_cache_commit()
297            .persist_transaction(&transaction);
298
299        // Notify the scheduler that the transaction key now has a known digest
300        if epoch_store.insert_tx_key(key, digest).is_err() {
301            warn!("epoch ended while handling new randomness");
302        }
303
304        let authority_state = self.authority_state.clone();
305        let executed_consensus_rounds = self.executed_consensus_rounds.clone();
306        spawn_monitored_task!(async move {
307            // Wait for transaction execution in a separate task, to avoid deadlock in case of
308            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
309            // output of the RandomnessStateUpdate from the previous round.)
310            //
311            // We set a very long timeout so that in case this gets stuck for some reason, the
312            // validator will eventually crash rather than continuing in a zombie mode.
313            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
314            let result = tokio::time::timeout(
315                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
316                authority_state
317                    .get_transaction_cache_reader()
318                    .notify_read_executed_effects(
319                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
320                        &[digest],
321                    ),
322            )
323            .await;
324            let mut effects = match result {
325                Ok(result) => result,
326                Err(_) => {
327                    // Crash on randomness update execution timeout in debug builds.
328                    debug_fatal_no_invariant!(
329                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
330                    );
331                    // Continue waiting as long as necessary in non-debug builds.
332                    authority_state
333                        .get_transaction_cache_reader()
334                        .notify_read_executed_effects(
335                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
336                            &[digest],
337                        )
338                        .await
339                }
340            };
341
342            let effects = effects.pop().expect("should return effects");
343            if *effects.status() != ExecutionStatus::Success {
344                fatal!(
345                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
346                );
347            }
348            debug!(
349                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
350            );
351
352            let cache = &mut *executed_consensus_rounds.lock();
353            cache.insert((epoch, round));
354            while cache.len() > EXECUTED_ROUNDS_CACHE_CAPACITY {
355                cache.pop_first();
356            }
357        });
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use crate::authority::test_authority_builder::TestAuthorityBuilder;
365    use consensus_core::RandomnessSignatureHandler;
366    use fastcrypto::groups::{GroupElement, HashToGroupElement, bls12381};
367
368    fn generate_keypair() -> (bls12381::Scalar, bls12381::G2Element) {
369        let sk = bls12381::Scalar::generator();
370        let pk = bls12381::G2Element::generator() * sk;
371        (sk, pk)
372    }
373
374    fn sign(sk: &bls12381::Scalar, msg: &[u8]) -> RandomnessSignature {
375        bls12381::G1Element::hash_to_group_element(msg) * sk
376    }
377
378    fn build_message(
379        epoch: EpochId,
380        round: RandomnessRound,
381        sig: &RandomnessSignature,
382    ) -> bytes::Bytes {
383        let msg = RandomnessSignatureMessage {
384            epoch,
385            round,
386            signature_bytes: bcs::to_bytes(sig).unwrap(),
387        };
388        bytes::Bytes::from(bcs::to_bytes(&msg).unwrap())
389    }
390
391    #[tokio::test]
392    async fn test_invalid_signature_not_broadcast() {
393        let (_sk, pk) = generate_keypair();
394        let state = TestAuthorityBuilder::new().build().await;
395        let epoch = state.epoch_store_for_testing().epoch();
396
397        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
398        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
399        let mut sig_rx = handle.subscribe_randomness_signatures();
400
401        handle.set_public_key(pk);
402        tokio::task::yield_now().await;
403
404        let wrong_sk = bls12381::Scalar::generator() + bls12381::Scalar::generator();
405        let round = RandomnessRound(1);
406        let bad_sig = sign(&wrong_sk, &round.signature_message());
407        handle.handle_randomness_signature(build_message(epoch, round, &bad_sig));
408
409        let result =
410            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
411        assert!(result.is_err(), "should have timed out (no broadcast)");
412    }
413
414    #[tokio::test]
415    async fn test_malformed_data_not_broadcast() {
416        let (_sk, pk) = generate_keypair();
417        let state = TestAuthorityBuilder::new().build().await;
418
419        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
420        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
421        let mut sig_rx = handle.subscribe_randomness_signatures();
422
423        handle.set_public_key(pk);
424        tokio::task::yield_now().await;
425
426        handle.handle_randomness_signature(bytes::Bytes::from(vec![0u8; 32]));
427
428        let result =
429            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
430        assert!(result.is_err(), "should have timed out (no broadcast)");
431    }
432
433    #[tokio::test]
434    async fn test_signatures_buffer_until_dkg_completes() {
435        let (sk, pk) = generate_keypair();
436        let state = TestAuthorityBuilder::new().build().await;
437        let epoch = state.epoch_store_for_testing().epoch();
438
439        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
440        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
441        let mut sig_rx = handle.subscribe_randomness_signatures();
442
443        // Send before DKG completes — should buffer.
444        let round = RandomnessRound(1);
445        let sig = sign(&sk, &round.signature_message());
446        handle.handle_randomness_signature(build_message(epoch, round, &sig));
447
448        let result =
449            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
450        assert!(result.is_err(), "should have timed out (DKG not complete)");
451
452        // Now complete DKG — buffered message should be processed.
453        handle.set_public_key(pk);
454
455        let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
456            .await
457            .expect("timed out")
458            .expect("channel closed");
459
460        let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
461        assert_eq!(decoded.epoch, epoch);
462        assert_eq!(decoded.round, round);
463    }
464
465    #[tokio::test]
466    async fn test_executed_consensus_signature_not_rebroadcast() {
467        let (sk, pk) = generate_keypair();
468        let state = TestAuthorityBuilder::new().build().await;
469        let epoch = state.epoch_store_for_testing().epoch();
470
471        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
472        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
473        let mut sig_rx = handle.subscribe_randomness_signatures();
474
475        handle.set_public_key(pk);
476        tokio::task::yield_now().await;
477
478        let round = RandomnessRound(1);
479        let sig = sign(&sk, &round.signature_message());
480        let msg = build_message(epoch, round, &sig);
481
482        // Simulate that round 1 has already been executed successfully.
483        handle.mark_round_executed(epoch, round);
484
485        handle.handle_randomness_signature(msg);
486
487        let result =
488            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
489        assert!(
490            result.is_err(),
491            "already-executed round should not be rebroadcast"
492        );
493    }
494
495    #[tokio::test]
496    async fn test_clear_public_key_pauses_processing() {
497        let (sk, pk) = generate_keypair();
498        let state = TestAuthorityBuilder::new().build().await;
499        let epoch = state.epoch_store_for_testing().epoch();
500
501        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
502        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
503        let mut sig_rx = handle.subscribe_randomness_signatures();
504
505        // Set key, verify a signature flows through.
506        handle.set_public_key(pk);
507        tokio::task::yield_now().await;
508
509        let round1 = RandomnessRound(1);
510        let sig1 = sign(&sk, &round1.signature_message());
511        handle.handle_randomness_signature(build_message(epoch, round1, &sig1));
512
513        tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
514            .await
515            .expect("timed out")
516            .expect("channel closed");
517
518        // Clear key — should pause.
519        handle.clear_public_key();
520        // Yield a few times so the receiver loop picks up the cleared key.
521        for _ in 0..5 {
522            tokio::task::yield_now().await;
523        }
524
525        let round2 = RandomnessRound(2);
526        let sig2 = sign(&sk, &round2.signature_message());
527        handle.handle_randomness_signature(build_message(epoch, round2, &sig2));
528
529        let result =
530            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
531        assert!(result.is_err(), "should have timed out (key cleared)");
532
533        // Re-set key — buffered message should be processed.
534        handle.set_public_key(pk);
535
536        let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
537            .await
538            .expect("timed out")
539            .expect("channel closed");
540
541        let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
542        assert_eq!(decoded.round, round2);
543    }
544}