sui_core/
randomness_round_receiver.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use fastcrypto::groups::bls12381;
11use fastcrypto_tbls::{tbls::ThresholdBls, types::ThresholdBls12381MinSig};
12use mysten_common::debug_fatal_no_invariant;
13use mysten_common::fatal;
14use mysten_metrics::spawn_monitored_task;
15use serde::{Deserialize, Serialize};
16use sui_macros::fail_point_async;
17use sui_types::committee::EpochId;
18use sui_types::crypto::RandomnessRound;
19use sui_types::crypto::RandomnessSignature;
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::executable_transaction::VerifiedExecutableTransaction;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::transaction::{TransactionKey, VerifiedTransaction};
24use tokio::sync::{broadcast, mpsc, watch};
25use tokio::task::JoinHandle;
26use tracing::{debug, info, instrument, warn};
27
28use crate::authority::AuthorityState;
29use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
30
31/// A randomness signature for a specific epoch and round, broadcast to observer peers.
32#[derive(Debug, Serialize, Deserialize, Clone)]
33pub struct RandomnessSignatureMessage {
34    pub epoch: EpochId,
35    pub round: RandomnessRound,
36    pub signature_bytes: Vec<u8>,
37}
38
39const SIGNATURES_BROADCAST_CAPACITY: usize = 1000;
40const CONSENSUS_SIGNATURES_CHANNEL_SIZE: usize = 1000;
41const EXECUTED_ROUNDS_CACHE_CAPACITY: usize = 1000;
42
43pub struct RandomnessRoundReceiverHandle {
44    consensus_signatures_tx: mysten_metrics::monitored_mpsc::Sender<bytes::Bytes>,
45    vss_pk_tx: watch::Sender<Option<bls12381::G2Element>>,
46    signatures_broadcast: broadcast::Sender<bytes::Bytes>,
47    #[cfg(test)]
48    executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
49    _task_handle: JoinHandle<()>,
50}
51
52impl RandomnessRoundReceiverHandle {
53    /// Sets the VSS public key for signature verification. Called by
54    /// `RandomnessManager::advance_dkg` when DKG completes.
55    pub fn set_public_key(&self, vss_pk: bls12381::G2Element) {
56        self.vss_pk_tx.send(Some(vss_pk)).ok();
57    }
58
59    /// Clears the VSS public key. Called at epoch start before the new DKG
60    /// completes. While the key is `None`, incoming randomness round signatues from consensus
61    /// accumulate in the bounded channel giving the opportunity to buffer them until DKG completes.
62    pub fn clear_public_key(&self) {
63        self.vss_pk_tx.send(None).ok();
64    }
65}
66
67impl RandomnessRoundReceiverHandle {
68    pub fn new_for_testing() -> Arc<Self> {
69        let (consensus_signatures_tx, _) =
70            mysten_metrics::monitored_mpsc::channel("test_auxiliary", 1);
71        let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
72        let (signatures_broadcast, _) = broadcast::channel(1);
73        Arc::new(Self {
74            consensus_signatures_tx,
75            vss_pk_tx,
76            signatures_broadcast,
77            #[cfg(test)]
78            executed_consensus_rounds: Arc::new(Mutex::new(BTreeSet::new())),
79            _task_handle: tokio::spawn(async move {
80                let _vss_pk_rx = vss_pk_rx;
81                futures::future::pending::<()>().await;
82            }),
83        })
84    }
85
86    #[cfg(test)]
87    pub(crate) fn public_key_for_testing(&self) -> Option<bls12381::G2Element> {
88        let rx = self.vss_pk_tx.subscribe();
89        *rx.borrow()
90    }
91
92    #[cfg(test)]
93    pub(crate) fn mark_round_executed(&self, epoch: EpochId, round: RandomnessRound) {
94        self.executed_consensus_rounds.lock().insert((epoch, round));
95    }
96}
97
98impl consensus_core::RandomnessSignatureHandler for RandomnessRoundReceiverHandle {
99    fn handle_randomness_signature(&self, data: bytes::Bytes) {
100        if let Err(e) = self.consensus_signatures_tx.try_send(data) {
101            warn!(
102                "RandomnessRoundReceiverHandle: failed to forward randomness round signature: {e}"
103            );
104        }
105    }
106
107    fn subscribe_randomness_signatures(&self) -> broadcast::Receiver<bytes::Bytes> {
108        self.signatures_broadcast.subscribe()
109    }
110}
111
112pub struct RandomnessRoundReceiver {
113    authority_state: Arc<AuthorityState>,
114    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
115    consensus_signatures_rx: mysten_metrics::monitored_mpsc::Receiver<bytes::Bytes>,
116    vss_pk_rx: watch::Receiver<Option<bls12381::G2Element>>,
117    /// Best-effort broadcast of verified signatures. Primarily used for propagating the
118    /// signatures via consensus to non-committee peers (observers) syncing their state via consensus
119    /// from a read-only capacity.
120    signatures_broadcast: broadcast::Sender<bytes::Bytes>,
121    /// Tracks rounds whose randomness transaction executed successfully via the consensus relay
122    /// path. Prevents rebroadcast loops in observer-to-observer topologies (although in practice is not anticipated having such)
123    /// while still allowing retries for rounds that failed execution.
124    executed_consensus_rounds: Arc<Mutex<BTreeSet<(EpochId, RandomnessRound)>>>,
125}
126
127impl RandomnessRoundReceiver {
128    /// Spawns the receiver loop and returns the shared handle.
129    pub fn spawn(
130        authority_state: Arc<AuthorityState>,
131        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
132    ) -> Arc<RandomnessRoundReceiverHandle> {
133        let (signatures_broadcast, _) =
134            broadcast::channel::<bytes::Bytes>(SIGNATURES_BROADCAST_CAPACITY);
135
136        let (consensus_signatures_tx, consensus_signatures_rx) =
137            mysten_metrics::monitored_mpsc::channel(
138                "consensus_randomness_round_signatures",
139                CONSENSUS_SIGNATURES_CHANNEL_SIZE,
140            );
141        let (vss_pk_tx, vss_pk_rx) = watch::channel(None);
142        let executed_consensus_rounds = Arc::new(Mutex::new(BTreeSet::new()));
143
144        let rrr = RandomnessRoundReceiver {
145            authority_state,
146            randomness_rx,
147            consensus_signatures_rx,
148            vss_pk_rx,
149            signatures_broadcast: signatures_broadcast.clone(),
150            executed_consensus_rounds: executed_consensus_rounds.clone(),
151        };
152        let task_handle = spawn_monitored_task!(rrr.run());
153
154        Arc::new(RandomnessRoundReceiverHandle {
155            consensus_signatures_tx,
156            vss_pk_tx,
157            signatures_broadcast,
158            #[cfg(test)]
159            executed_consensus_rounds,
160            _task_handle: task_handle,
161        })
162    }
163
164    async fn run(mut self) {
165        info!("RandomnessRoundReceiver event loop started");
166
167        loop {
168            let vss_pk = *self.vss_pk_rx.borrow_and_update();
169            tokio::select! {
170                maybe_recv = self.randomness_rx.recv() => {
171                    if let Some((epoch, round, bytes)) = maybe_recv {
172                        self.handle_new_randomness(epoch, round, bytes).await;
173                    } else {
174                        break;
175                    }
176                },
177                Some(data) = self.consensus_signatures_rx.recv(), if vss_pk.is_some() => {
178                    self.handle_consensus_randomness_signature(vss_pk.unwrap(), &data).await;
179                },
180                // Wake up when the key changes so we re-evaluate the select guard.
181                _ = self.vss_pk_rx.changed() => {},
182            }
183        }
184
185        info!("RandomnessRoundReceiver event loop ended");
186    }
187
188    /// Signatures that are propagated via consensus state sync. This is meant to be used from nodes that are not part of the committee
189    /// and are using consensus sync additionally to sync state.
190    async fn handle_consensus_randomness_signature(
191        &mut self,
192        vss_pk: bls12381::G2Element,
193        data: &bytes::Bytes,
194    ) {
195        let msg: RandomnessSignatureMessage = match bcs::from_bytes(data) {
196            Ok(msg) => msg,
197            Err(e) => {
198                warn!("RandomnessRoundReceiver: failed to deserialize round signature: {e}");
199                return;
200            }
201        };
202
203        let sig: RandomnessSignature = match bcs::from_bytes(&msg.signature_bytes) {
204            Ok(sig) => sig,
205            Err(e) => {
206                warn!(
207                    "RandomnessRoundReceiver: failed to deserialize signature \
208                     for epoch {} round {}: {e}",
209                    msg.epoch, msg.round
210                );
211                return;
212            }
213        };
214
215        if let Err(e) =
216            ThresholdBls12381MinSig::verify(&vss_pk, &msg.round.signature_message(), &sig)
217        {
218            warn!(
219                "RandomnessRoundReceiver: invalid auxiliary signature \
220                 for epoch {} round {}: {e}",
221                msg.epoch, msg.round
222            );
223            return;
224        }
225
226        if self
227            .executed_consensus_rounds
228            .lock()
229            .contains(&(msg.epoch, msg.round))
230        {
231            info!(
232                "RandomnessRoundReceiver: dropping already-executed auxiliary signature for epoch {} round {}",
233                msg.epoch, msg.round
234            );
235            return;
236        }
237
238        debug!(
239            "RandomnessRoundReceiver: verified auxiliary signature for epoch {} round {}",
240            msg.epoch, msg.round
241        );
242
243        self.handle_new_randomness(msg.epoch, msg.round, msg.signature_bytes)
244            .await;
245    }
246
247    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
248    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
249        fail_point_async!("randomness-delay");
250
251        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
252        if epoch_store.epoch() != epoch {
253            warn!(
254                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
255                epoch_store.epoch()
256            );
257            return;
258        }
259        // Broadcast signature to connected observer peers so they can create the same
260        // transaction. This enables observer-to-observer relay chains.
261        let msg = RandomnessSignatureMessage {
262            epoch,
263            round,
264            signature_bytes: bytes.clone(),
265        };
266        match bcs::to_bytes(&msg) {
267            Ok(encoded) => {
268                let _ = self.signatures_broadcast.send(bytes::Bytes::from(encoded));
269            }
270            Err(err) => {
271                warn!("serialisation of RandomnessSignatureMessage failed: {err}");
272            }
273        }
274
275        let key = TransactionKey::RandomnessRound(epoch, round);
276        let transaction = VerifiedTransaction::new_randomness_state_update(
277            epoch,
278            round,
279            bytes,
280            epoch_store
281                .epoch_start_config()
282                .randomness_obj_initial_shared_version()
283                .expect("randomness state obj must exist"),
284        );
285        debug!(
286            "created randomness state update transaction with digest: {:?}",
287            transaction.digest()
288        );
289        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
290        let digest = *transaction.digest();
291
292        // Randomness state updates contain the full bls signature for the random round,
293        // which cannot necessarily be reconstructed again later. Therefore we must immediately
294        // persist this transaction. If we crash before its outputs are committed, this
295        // ensures we will be able to re-execute it.
296        self.authority_state
297            .get_cache_commit()
298            .persist_transaction(&transaction);
299
300        // Notify the scheduler that the transaction key now has a known digest
301        if epoch_store.insert_tx_key(key, digest).is_err() {
302            warn!("epoch ended while handling new randomness");
303        }
304
305        let authority_state = self.authority_state.clone();
306        let executed_consensus_rounds = self.executed_consensus_rounds.clone();
307        spawn_monitored_task!(async move {
308            // Wait for transaction execution in a separate task, to avoid deadlock in case of
309            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
310            // output of the RandomnessStateUpdate from the previous round.)
311            //
312            // We set a very long timeout so that in case this gets stuck for some reason, the
313            // validator will eventually crash rather than continuing in a zombie mode.
314            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
315            let result = tokio::time::timeout(
316                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
317                authority_state
318                    .get_transaction_cache_reader()
319                    .notify_read_executed_effects(
320                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
321                        &[digest],
322                    ),
323            )
324            .await;
325            let mut effects = match result {
326                Ok(result) => result,
327                Err(_) => {
328                    // Crash on randomness update execution timeout in debug builds.
329                    debug_fatal_no_invariant!(
330                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
331                    );
332                    // Continue waiting as long as necessary in non-debug builds.
333                    authority_state
334                        .get_transaction_cache_reader()
335                        .notify_read_executed_effects(
336                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
337                            &[digest],
338                        )
339                        .await
340                }
341            };
342
343            let effects = effects.pop().expect("should return effects");
344            if *effects.status() != ExecutionStatus::Success {
345                fatal!(
346                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
347                );
348            }
349            debug!(
350                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
351            );
352
353            let cache = &mut *executed_consensus_rounds.lock();
354            cache.insert((epoch, round));
355            while cache.len() > EXECUTED_ROUNDS_CACHE_CAPACITY {
356                cache.pop_first();
357            }
358        });
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use crate::authority::test_authority_builder::TestAuthorityBuilder;
366    use consensus_core::RandomnessSignatureHandler;
367    use fastcrypto::groups::{GroupElement, HashToGroupElement, bls12381};
368
369    fn generate_keypair() -> (bls12381::Scalar, bls12381::G2Element) {
370        let sk = bls12381::Scalar::generator();
371        let pk = bls12381::G2Element::generator() * sk;
372        (sk, pk)
373    }
374
375    fn sign(sk: &bls12381::Scalar, msg: &[u8]) -> RandomnessSignature {
376        bls12381::G1Element::hash_to_group_element(msg) * sk
377    }
378
379    fn build_message(
380        epoch: EpochId,
381        round: RandomnessRound,
382        sig: &RandomnessSignature,
383    ) -> bytes::Bytes {
384        let msg = RandomnessSignatureMessage {
385            epoch,
386            round,
387            signature_bytes: bcs::to_bytes(sig).unwrap(),
388        };
389        bytes::Bytes::from(bcs::to_bytes(&msg).unwrap())
390    }
391
392    #[tokio::test]
393    async fn test_invalid_signature_not_broadcast() {
394        let (_sk, pk) = generate_keypair();
395        let state = TestAuthorityBuilder::new().build().await;
396        let epoch = state.epoch_store_for_testing().epoch();
397
398        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
399        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
400        let mut sig_rx = handle.subscribe_randomness_signatures();
401
402        handle.set_public_key(pk);
403        tokio::task::yield_now().await;
404
405        let wrong_sk = bls12381::Scalar::generator() + bls12381::Scalar::generator();
406        let round = RandomnessRound(1);
407        let bad_sig = sign(&wrong_sk, &round.signature_message());
408        handle.handle_randomness_signature(build_message(epoch, round, &bad_sig));
409
410        let result =
411            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
412        assert!(result.is_err(), "should have timed out (no broadcast)");
413    }
414
415    #[tokio::test]
416    async fn test_malformed_data_not_broadcast() {
417        let (_sk, pk) = generate_keypair();
418        let state = TestAuthorityBuilder::new().build().await;
419
420        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
421        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
422        let mut sig_rx = handle.subscribe_randomness_signatures();
423
424        handle.set_public_key(pk);
425        tokio::task::yield_now().await;
426
427        handle.handle_randomness_signature(bytes::Bytes::from(vec![0u8; 32]));
428
429        let result =
430            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
431        assert!(result.is_err(), "should have timed out (no broadcast)");
432    }
433
434    #[tokio::test]
435    async fn test_signatures_buffer_until_dkg_completes() {
436        let (sk, pk) = generate_keypair();
437        let state = TestAuthorityBuilder::new().build().await;
438        let epoch = state.epoch_store_for_testing().epoch();
439
440        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
441        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
442        let mut sig_rx = handle.subscribe_randomness_signatures();
443
444        // Send before DKG completes — should buffer.
445        let round = RandomnessRound(1);
446        let sig = sign(&sk, &round.signature_message());
447        handle.handle_randomness_signature(build_message(epoch, round, &sig));
448
449        let result =
450            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
451        assert!(result.is_err(), "should have timed out (DKG not complete)");
452
453        // Now complete DKG — buffered message should be processed.
454        handle.set_public_key(pk);
455
456        let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
457            .await
458            .expect("timed out")
459            .expect("channel closed");
460
461        let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
462        assert_eq!(decoded.epoch, epoch);
463        assert_eq!(decoded.round, round);
464    }
465
466    #[tokio::test]
467    async fn test_executed_consensus_signature_not_rebroadcast() {
468        let (sk, pk) = generate_keypair();
469        let state = TestAuthorityBuilder::new().build().await;
470        let epoch = state.epoch_store_for_testing().epoch();
471
472        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
473        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
474        let mut sig_rx = handle.subscribe_randomness_signatures();
475
476        handle.set_public_key(pk);
477        tokio::task::yield_now().await;
478
479        let round = RandomnessRound(1);
480        let sig = sign(&sk, &round.signature_message());
481        let msg = build_message(epoch, round, &sig);
482
483        // Simulate that round 1 has already been executed successfully.
484        handle.mark_round_executed(epoch, round);
485
486        handle.handle_randomness_signature(msg);
487
488        let result =
489            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
490        assert!(
491            result.is_err(),
492            "already-executed round should not be rebroadcast"
493        );
494    }
495
496    #[tokio::test]
497    async fn test_clear_public_key_pauses_processing() {
498        let (sk, pk) = generate_keypair();
499        let state = TestAuthorityBuilder::new().build().await;
500        let epoch = state.epoch_store_for_testing().epoch();
501
502        let (_randomness_tx, randomness_rx) = mpsc::channel(1);
503        let handle = RandomnessRoundReceiver::spawn(state, randomness_rx);
504        let mut sig_rx = handle.subscribe_randomness_signatures();
505
506        // Set key, verify a signature flows through.
507        handle.set_public_key(pk);
508        tokio::task::yield_now().await;
509
510        let round1 = RandomnessRound(1);
511        let sig1 = sign(&sk, &round1.signature_message());
512        handle.handle_randomness_signature(build_message(epoch, round1, &sig1));
513
514        tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
515            .await
516            .expect("timed out")
517            .expect("channel closed");
518
519        // Clear key — should pause.
520        handle.clear_public_key();
521        // Yield a few times so the receiver loop picks up the cleared key.
522        for _ in 0..5 {
523            tokio::task::yield_now().await;
524        }
525
526        let round2 = RandomnessRound(2);
527        let sig2 = sign(&sk, &round2.signature_message());
528        handle.handle_randomness_signature(build_message(epoch, round2, &sig2));
529
530        let result =
531            tokio::time::timeout(std::time::Duration::from_millis(200), sig_rx.recv()).await;
532        assert!(result.is_err(), "should have timed out (key cleared)");
533
534        // Re-set key — buffered message should be processed.
535        handle.set_public_key(pk);
536
537        let received = tokio::time::timeout(std::time::Duration::from_secs(5), sig_rx.recv())
538            .await
539            .expect("timed out")
540            .expect("channel closed");
541
542        let decoded: RandomnessSignatureMessage = bcs::from_bytes(&received).unwrap();
543        assert_eq!(decoded.round, round2);
544    }
545}