sui_core/epoch/
randomness.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::PeerId;
5use fastcrypto::encoding::{Encoding, Hex};
6use fastcrypto::error::{FastCryptoError, FastCryptoResult};
7use fastcrypto::groups::bls12381;
8use fastcrypto::serde_helpers::ToFromByteArray;
9use fastcrypto::traits::{KeyPair, ToFromBytes};
10use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
11use futures::StreamExt;
12use futures::stream::FuturesUnordered;
13use mysten_common::debug_fatal;
14use parking_lot::Mutex;
15use rand::SeedableRng;
16use rand::rngs::{OsRng, StdRng};
17use serde::{Deserialize, Serialize};
18use std::collections::{BTreeMap, HashMap};
19use std::sync::{Arc, Weak};
20use std::time::Instant;
21use sui_macros::fail_point_if;
22use sui_network::randomness;
23use sui_types::base_types::AuthorityName;
24use sui_types::committee::{Committee, EpochId, StakeUnit};
25use sui_types::crypto::{AuthorityKeyPair, RandomnessRound};
26use sui_types::error::{SuiErrorKind, SuiResult};
27use sui_types::messages_consensus::{
28    ConsensusTransaction, Round, TimestampMs, VersionedDkgConfirmation, VersionedDkgMessage,
29};
30use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
31use tokio::sync::OnceCell;
32use tokio::task::JoinHandle;
33use tracing::{debug, error, info, warn};
34use typed_store::Map;
35
36use crate::authority::authority_per_epoch_store::{
37    AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
38};
39use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
40use crate::consensus_adapter::SubmitToConsensus;
41
42type PkG = bls12381::G2Element;
43type EncG = bls12381::G2Element;
44
45pub const SINGLETON_KEY: u64 = 0;
46
47// Wrappers for DKG messages (to simplify upgrades).
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[allow(clippy::large_enum_variant)]
51pub enum VersionedProcessedMessage {
52    V0(), // deprecated
53    V1(dkg_v1::ProcessedMessage<PkG, EncG>),
54}
55
56impl VersionedProcessedMessage {
57    pub fn sender(&self) -> PartyId {
58        match self {
59            VersionedProcessedMessage::V0() => {
60                panic!("BUG: invalid VersionedProcessedMessage version V0")
61            }
62            VersionedProcessedMessage::V1(msg) => msg.message.sender,
63        }
64    }
65
66    pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
67        if let VersionedProcessedMessage::V1(msg) = self {
68            msg
69        } else {
70            panic!("BUG: expected message version is 1")
71        }
72    }
73
74    pub fn process(
75        party: Arc<dkg_v1::Party<PkG, EncG>>,
76        message: VersionedDkgMessage,
77    ) -> FastCryptoResult<VersionedProcessedMessage> {
78        // All inputs are verified in add_message, so we can assume they are of the correct version.
79        let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
80        Ok(VersionedProcessedMessage::V1(processed))
81    }
82
83    pub fn merge(
84        party: Arc<dkg_v1::Party<PkG, EncG>>,
85        messages: Vec<Self>,
86    ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
87        // All inputs were created by this validator, so we can assume they are of the correct version.
88        let (conf, msgs) = party.merge(
89            &messages
90                .into_iter()
91                .map(|vm| vm.unwrap_v1())
92                .collect::<Vec<_>>(),
93        )?;
94        Ok((
95            VersionedDkgConfirmation::V1(conf),
96            VersionedUsedProcessedMessages::V1(msgs),
97        ))
98    }
99}
100
101#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
102pub enum VersionedUsedProcessedMessages {
103    V0(), // deprecated
104    V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
105}
106
107impl VersionedUsedProcessedMessages {
108    fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
109        &self,
110        party: Arc<dkg_v1::Party<PkG, EncG>>,
111        confirmations: Iter,
112    ) -> FastCryptoResult<Output<PkG, EncG>> {
113        // All inputs are verified in add_confirmation, so we can assume they are of the correct version.
114        let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
115        let VersionedUsedProcessedMessages::V1(msg) = self else {
116            panic!("BUG: invalid VersionedUsedProcessedMessages version")
117        };
118        party.complete(
119            msg,
120            &confirmations
121                .map(|vm| vm.unwrap_v1())
122                .cloned()
123                .collect::<Vec<_>>(),
124            rng,
125        )
126    }
127}
128
129// State machine for randomness DKG and generation.
130//
131// DKG protocol:
132// 1. This validator sends out a `Message` to all other validators.
133// 2. Once sufficient valid `Message`s are received from other validators via consensus and
134//    processed, this validator sends out a `Confirmation` to all other validators.
135// 3. Once sufficient `Confirmation`s are received from other validators via consensus and
136//    processed, they are combined to form a public VSS key and local private key shares.
137// 4. Randomness generation begins.
138//
139// Randomness generation:
140// 1. For each new round, AuthorityPerEpochStore eventually calls `generate_randomness`.
141// 2. This kicks off a process in RandomnessEventLoop to send partial signatures for the new
142//    round to all other validators.
143// 3. Once enough partial signautres for the round are collected, a RandomnessStateUpdate
144//    transaction is generated and injected into the ExecutionScheduler.
145// 4. Once the RandomnessStateUpdate transaction is seen in a certified checkpoint,
146//    `notify_randomness_in_checkpoint` is called to complete the round and stop sending
147//    partial signatures for it.
148pub struct RandomnessManager {
149    epoch_store: Weak<AuthorityPerEpochStore>,
150    epoch: EpochId,
151    consensus_adapter: Box<dyn SubmitToConsensus>,
152    network_handle: randomness::Handle,
153    authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
154
155    // State for DKG.
156    dkg_start_time: OnceCell<Instant>,
157    party: Arc<dkg_v1::Party<PkG, EncG>>,
158    enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
159    processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
160    used_messages: OnceCell<VersionedUsedProcessedMessages>,
161    confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
162    dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
163
164    // State for randomness generation.
165    next_randomness_round: RandomnessRound,
166    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
167}
168
169impl RandomnessManager {
170    // Returns None in case of invalid input or other failure to initialize DKG.
171    pub async fn try_new(
172        epoch_store_weak: Weak<AuthorityPerEpochStore>,
173        consensus_adapter: Box<dyn SubmitToConsensus>,
174        network_handle: randomness::Handle,
175        authority_key_pair: &AuthorityKeyPair,
176    ) -> Option<Self> {
177        let epoch_store = match epoch_store_weak.upgrade() {
178            Some(epoch_store) => epoch_store,
179            None => {
180                error!(
181                    "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
182                );
183                return None;
184            }
185        };
186        let tables = match epoch_store.tables() {
187            Ok(tables) => tables,
188            Err(_) => {
189                error!(
190                    "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
191                );
192                return None;
193            }
194        };
195        let protocol_config = epoch_store.protocol_config();
196
197        let name: AuthorityName = authority_key_pair.public().into();
198        let committee = epoch_store.committee();
199        let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
200        if tracing::enabled!(tracing::Level::DEBUG) {
201            // Log first few entries in DKG info for debugging.
202            for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
203                let pk_bytes = pk.as_element().to_byte_array();
204                debug!(
205                    "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
206                );
207            }
208        }
209        let authority_ids: HashMap<_, _> =
210            info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
211        let authority_peer_ids = epoch_store
212            .epoch_start_config()
213            .epoch_start_state()
214            .get_authority_names_to_peer_ids();
215        let authority_info = authority_ids
216            .into_iter()
217            .map(|(name, id)| {
218                let peer_id = *authority_peer_ids
219                    .get(&name)
220                    .expect("authority name should be in peer_ids");
221                (name, (peer_id, id))
222            })
223            .collect();
224        let nodes = info
225            .iter()
226            .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
227                id: *id,
228                pk: pk.clone(),
229                weight: (*stake).try_into().expect("stake should fit in u16"),
230            })
231            .collect();
232        let (nodes, t) = match nodes::Nodes::new_reduced(
233            nodes,
234            committee
235                .validity_threshold()
236                .try_into()
237                .expect("validity threshold should fit in u16"),
238            protocol_config.random_beacon_reduction_allowed_delta(),
239            protocol_config
240                .random_beacon_reduction_lower_bound()
241                .try_into()
242                .expect("should fit u16"),
243        ) {
244            Ok((nodes, t)) => (nodes, t),
245            Err(err) => {
246                error!("random beacon: error while initializing Nodes: {err:?}");
247                return None;
248            }
249        };
250        let total_weight = nodes.total_weight();
251        let num_nodes = nodes.num_nodes();
252        let prefix_str = format!(
253            "dkg {} {}",
254            Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
255            committee.epoch()
256        );
257        let randomness_private_key = bls12381::Scalar::from_byte_array(
258            authority_key_pair
259                .copy()
260                .private()
261                .as_bytes()
262                .try_into()
263                .expect("key length should match"),
264        )
265        .expect("should work to convert BLS key to Scalar");
266        let party = match dkg_v1::Party::<PkG, EncG>::new(
267            fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
268                randomness_private_key,
269            ),
270            nodes,
271            t,
272            fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
273            &mut rand::thread_rng(),
274        ) {
275            Ok(party) => party,
276            Err(err) => {
277                error!("random beacon: error while initializing Party: {err:?}");
278                return None;
279            }
280        };
281        info!(
282            "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
283        );
284
285        // Load existing data from store.
286        let highest_completed_round = tables
287            .randomness_highest_completed_round
288            .get(&SINGLETON_KEY)
289            .expect("typed_store should not fail");
290        let mut rm = RandomnessManager {
291            epoch_store: epoch_store_weak,
292            epoch: committee.epoch(),
293            consensus_adapter,
294            network_handle: network_handle.clone(),
295            authority_info,
296            dkg_start_time: OnceCell::new(),
297            party: Arc::new(party),
298            enqueued_messages: BTreeMap::new(),
299            processed_messages: BTreeMap::new(),
300            used_messages: OnceCell::new(),
301            confirmations: BTreeMap::new(),
302            dkg_output: OnceCell::new(),
303            next_randomness_round: RandomnessRound(0),
304            highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
305        };
306        let dkg_output = tables
307            .dkg_output
308            .get(&SINGLETON_KEY)
309            .expect("typed_store should not fail");
310        if let Some(dkg_output) = dkg_output {
311            info!(
312                "random beacon: loaded existing DKG output for epoch {}",
313                committee.epoch()
314            );
315            epoch_store
316                .metrics
317                .epoch_random_beacon_dkg_num_shares
318                .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
319            rm.dkg_output
320                .set(Some(dkg_output.clone()))
321                .expect("setting new OnceCell should succeed");
322            network_handle.update_epoch(
323                committee.epoch(),
324                rm.authority_info.clone(),
325                dkg_output,
326                rm.party.t(),
327                highest_completed_round,
328            );
329        } else {
330            info!(
331                "random beacon: no existing DKG output found for epoch {}",
332                committee.epoch()
333            );
334
335            // Load intermediate data.
336            assert!(
337                epoch_store.protocol_config().dkg_version() > 0,
338                "BUG: DKG version 0 is deprecated"
339            );
340            rm.processed_messages.extend(
341                tables
342                    .dkg_processed_messages_v2
343                    .safe_iter()
344                    .map(|result| result.expect("typed_store should not fail")),
345            );
346            if let Some(used_messages) = tables
347                .dkg_used_messages_v2
348                .get(&SINGLETON_KEY)
349                .expect("typed_store should not fail")
350            {
351                rm.used_messages
352                    .set(used_messages.clone())
353                    .expect("setting new OnceCell should succeed");
354            }
355            rm.confirmations.extend(
356                tables
357                    .dkg_confirmations_v2
358                    .safe_iter()
359                    .map(|result| result.expect("typed_store should not fail")),
360            );
361        }
362
363        // Resume randomness generation from where we left off.
364        // This must be loaded regardless of whether DKG has finished yet, since the
365        // RandomnessEventLoop and commit-handling logic in AuthorityPerEpochStore both depend on
366        // this state.
367        rm.next_randomness_round = tables
368            .randomness_next_round
369            .get(&SINGLETON_KEY)
370            .expect("typed_store should not fail")
371            .unwrap_or(RandomnessRound(0));
372        info!(
373            "random beacon: starting from next_randomness_round={}",
374            rm.next_randomness_round.0
375        );
376        let first_incomplete_round = highest_completed_round
377            .map(|r| r + 1)
378            .unwrap_or(RandomnessRound(0));
379        if first_incomplete_round < rm.next_randomness_round {
380            info!(
381                "random beacon: resuming generation for randomness rounds from {} to {}",
382                first_incomplete_round,
383                rm.next_randomness_round - 1,
384            );
385            for r in first_incomplete_round.0..rm.next_randomness_round.0 {
386                network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
387            }
388        }
389
390        Some(rm)
391    }
392
393    /// Sends the initial dkg::Message to begin the randomness DKG protocol.
394    pub async fn start_dkg(&mut self) -> SuiResult {
395        if self.used_messages.initialized() || self.dkg_output.initialized() {
396            // DKG already started (or completed or failed).
397            return Ok(());
398        }
399
400        let _ = self.dkg_start_time.set(Instant::now());
401
402        let epoch_store = self.epoch_store()?;
403        let dkg_version = epoch_store.protocol_config().dkg_version();
404        info!("random beacon: starting DKG, version {dkg_version}");
405
406        let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
407            Ok(msg) => msg,
408            Err(FastCryptoError::IgnoredMessage) => {
409                info!(
410                    "random beacon: no DKG Message for party id={} (zero weight)",
411                    self.party.id
412                );
413                return Ok(());
414            }
415            Err(e) => {
416                error!("random beacon: error while creating a DKG Message: {e:?}");
417                return Ok(());
418            }
419        };
420
421        info!("random beacon: created {msg:?} with dkg version {dkg_version}");
422        let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
423
424        #[allow(unused_mut)]
425        let mut fail_point_skip_sending = false;
426        fail_point_if!("rb-dkg", || {
427            // maybe skip sending in simtests
428            fail_point_skip_sending = true;
429        });
430        if !fail_point_skip_sending {
431            self.consensus_adapter
432                .submit_to_consensus(&[transaction], &epoch_store)?;
433        }
434
435        epoch_store
436            .metrics
437            .epoch_random_beacon_dkg_message_time_ms
438            .set(
439                self.dkg_start_time
440                    .get()
441                    .unwrap() // already set above
442                    .elapsed()
443                    .as_millis() as i64,
444            );
445        Ok(())
446    }
447
448    /// Processes all received messages and advances the randomness DKG state machine when possible,
449    /// sending out a dkg::Confirmation and generating final output.
450    pub(crate) async fn advance_dkg(
451        &mut self,
452        consensus_output: &mut ConsensusCommitOutput,
453        round: Round,
454    ) -> SuiResult {
455        let epoch_store = self.epoch_store()?;
456
457        // Once we have enough Messages, send a Confirmation.
458        if !self.dkg_output.initialized() && !self.used_messages.initialized() {
459            // Process all enqueued messages.
460            let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
461                .into_values()
462                .collect();
463            while let Some(res) = handles.next().await {
464                if let Ok(Some(processed)) = res {
465                    self.processed_messages
466                        .insert(processed.sender(), processed.clone());
467                    consensus_output.insert_dkg_processed_message(processed);
468                }
469            }
470
471            // Attempt to generate the Confirmation.
472            match VersionedProcessedMessage::merge(
473                self.party.clone(),
474                self.processed_messages
475                    .values()
476                    .cloned()
477                    .collect::<Vec<_>>(),
478            ) {
479                Ok((conf, used_msgs)) => {
480                    info!(
481                        "random beacon: sending DKG Confirmation with {} complaints",
482                        conf.num_of_complaints()
483                    );
484                    if self.used_messages.set(used_msgs.clone()).is_err() {
485                        error!("BUG: used_messages should only ever be set once");
486                    }
487                    consensus_output.insert_dkg_used_messages(used_msgs);
488
489                    let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
490                        epoch_store.name,
491                        &conf,
492                    );
493
494                    #[allow(unused_mut)]
495                    let mut fail_point_skip_sending = false;
496                    fail_point_if!("rb-dkg", || {
497                        // maybe skip sending in simtests
498                        fail_point_skip_sending = true;
499                    });
500                    if !fail_point_skip_sending {
501                        self.consensus_adapter
502                            .submit_to_consensus(&[transaction], &epoch_store)?;
503                    }
504
505                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
506                    if let Some(elapsed) = elapsed {
507                        epoch_store
508                            .metrics
509                            .epoch_random_beacon_dkg_confirmation_time_ms
510                            .set(elapsed as i64);
511                    }
512                }
513                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
514                Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
515            }
516        }
517
518        // Once we have enough Confirmations, process them and update shares.
519        if !self.dkg_output.initialized() && self.used_messages.initialized() {
520            match self
521                .used_messages
522                .get()
523                .expect("checked above that `used_messages` is initialized")
524                .complete_dkg(self.party.clone(), self.confirmations.values())
525            {
526                Ok(output) => {
527                    let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
528                    let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
529                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
530                    info!(
531                        "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
532                    );
533                    epoch_store
534                        .metrics
535                        .epoch_random_beacon_dkg_num_shares
536                        .set(num_shares as i64);
537                    epoch_store
538                        .metrics
539                        .epoch_random_beacon_dkg_epoch_start_completion_time_ms
540                        .set(epoch_elapsed as i64);
541                    epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
542                    if let Some(elapsed) = elapsed {
543                        epoch_store
544                            .metrics
545                            .epoch_random_beacon_dkg_completion_time_ms
546                            .set(elapsed as i64);
547                    }
548                    self.dkg_output
549                        .set(Some(output.clone()))
550                        .expect("checked above that `dkg_output` is uninitialized");
551                    self.network_handle.update_epoch(
552                        epoch_store.committee().epoch(),
553                        self.authority_info.clone(),
554                        output.clone(),
555                        self.party.t(),
556                        None,
557                    );
558                    consensus_output.set_dkg_output(output);
559                }
560                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
561                Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
562            }
563        }
564
565        // If we ran out of time, mark DKG as failed.
566        if !self.dkg_output.initialized()
567            && round
568                > epoch_store
569                    .protocol_config()
570                    .random_beacon_dkg_timeout_round()
571                    .into()
572        {
573            error!(
574                "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
575            );
576            epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
577            self.dkg_output
578                .set(None)
579                .expect("checked above that `dkg_output` is uninitialized");
580        }
581
582        Ok(())
583    }
584
585    /// Adds a received VersionedDkgMessage to the randomness DKG state machine.
586    pub fn add_message(
587        &mut self,
588        authority: &AuthorityName,
589        msg: VersionedDkgMessage,
590    ) -> SuiResult {
591        // message was received from other validators, so we need to ensure it uses a supported
592        // version before we call other functions that assume the version is correct
593        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
594        if !msg.is_valid_version(dkg_version) {
595            warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
596            return Ok(());
597        }
598
599        if self.used_messages.initialized() || self.dkg_output.initialized() {
600            // We've already sent a `Confirmation`, so we can't add any more messages.
601            return Ok(());
602        }
603        let Some((_, party_id)) = self.authority_info.get(authority) else {
604            debug_fatal!(
605                "random beacon: received DKG Message from unknown authority: {authority:?}"
606            );
607            return Ok(());
608        };
609        if *party_id != msg.sender() {
610            warn!(
611                "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
612            );
613            return Ok(());
614        }
615        if self.enqueued_messages.contains_key(&msg.sender())
616            || self.processed_messages.contains_key(&msg.sender())
617        {
618            info!("ignoring duplicate DKG Message from authority {authority:?}");
619            return Ok(());
620        }
621
622        let party = self.party.clone();
623        // TODO: Could save some CPU by not processing messages if we already have enough to merge.
624        self.enqueued_messages.insert(
625            msg.sender(),
626            tokio::task::spawn_blocking(move || {
627                match VersionedProcessedMessage::process(party, msg) {
628                    Ok(processed) => Some(processed),
629                    Err(err) => {
630                        debug!("random beacon: error while processing DKG Message: {err:?}");
631                        None
632                    }
633                }
634            }),
635        );
636        Ok(())
637    }
638
639    /// Adds a received dkg::Confirmation to the randomness DKG state machine.
640    pub(crate) fn add_confirmation(
641        &mut self,
642        output: &mut ConsensusCommitOutput,
643        authority: &AuthorityName,
644        conf: VersionedDkgConfirmation,
645    ) -> SuiResult {
646        // confirmation was received from other validators, so we need to ensure it uses a supported
647        // version before we call other functions that assume the version is correct
648        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
649        if !conf.is_valid_version(dkg_version) {
650            warn!(
651                "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
652            );
653            return Ok(());
654        }
655
656        if self.dkg_output.initialized() {
657            // Once we have completed DKG, no more `Confirmation`s are needed.
658            return Ok(());
659        }
660        let Some((_, party_id)) = self.authority_info.get(authority) else {
661            error!(
662                "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
663            );
664            return Ok(());
665        };
666        if *party_id != conf.sender() {
667            warn!(
668                "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
669            );
670            return Ok(());
671        }
672        self.confirmations.insert(conf.sender(), conf.clone());
673        output.insert_dkg_confirmation(conf);
674        Ok(())
675    }
676
677    /// Reserves the next available round number for randomness generation if enough time has
678    /// elapsed, or returns None if not yet ready (based on ProtocolConfig setting). Once the given
679    /// batch is written, `generate_randomness` must be called to start the process. On restart,
680    /// any reserved rounds for which the batch was written will automatically be resumed.
681    pub(crate) fn reserve_next_randomness(
682        &mut self,
683        commit_timestamp: TimestampMs,
684        output: &mut ConsensusCommitOutput,
685    ) -> SuiResult<Option<RandomnessRound>> {
686        let epoch_store = self.epoch_store()?;
687
688        let last_round_timestamp = epoch_store
689            .get_randomness_last_round_timestamp()
690            .expect("read should not fail");
691
692        if let Some(last_round_timestamp) = last_round_timestamp
693            && commit_timestamp - last_round_timestamp
694                < epoch_store
695                    .protocol_config()
696                    .random_beacon_min_round_interval_ms()
697        {
698            return Ok(None);
699        }
700
701        let randomness_round = self.next_randomness_round;
702        self.next_randomness_round = self
703            .next_randomness_round
704            .checked_add(1)
705            .expect("RandomnessRound should not overflow");
706
707        output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
708
709        Ok(Some(randomness_round))
710    }
711
712    /// Starts the process of generating the given RandomnessRound.
713    pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
714        self.network_handle
715            .send_partial_signatures(epoch, randomness_round);
716    }
717
718    pub fn dkg_status(&self) -> DkgStatus {
719        match self.dkg_output.get() {
720            Some(Some(_)) => DkgStatus::Successful,
721            Some(None) => DkgStatus::Failed,
722            None => DkgStatus::Pending,
723        }
724    }
725
726    /// Generates a new RandomnessReporter for reporting observed rounds to this RandomnessManager.
727    pub fn reporter(&self) -> RandomnessReporter {
728        RandomnessReporter {
729            epoch_store: self.epoch_store.clone(),
730            epoch: self.epoch,
731            network_handle: self.network_handle.clone(),
732            highest_completed_round: self.highest_completed_round.clone(),
733        }
734    }
735
736    fn epoch_store(&self) -> SuiResult<Arc<AuthorityPerEpochStore>> {
737        self.epoch_store
738            .upgrade()
739            .ok_or(SuiErrorKind::EpochEnded(self.epoch).into())
740    }
741
742    fn randomness_dkg_info_from_committee(
743        committee: &Committee,
744    ) -> Vec<(
745        u16,
746        AuthorityName,
747        fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
748        StakeUnit,
749    )> {
750        committee
751            .members()
752            .map(|(name, stake)| {
753                let index: u16 = committee
754                    .authority_index(name)
755                    .expect("lookup of known committee member should succeed")
756                    .try_into()
757                    .expect("authority index should fit in u16");
758                let pk = bls12381::G2Element::from_byte_array(
759                    committee
760                        .public_key(name)
761                        .expect("lookup of known committee member should succeed")
762                        .as_bytes()
763                        .try_into()
764                        .expect("key length should match"),
765                )
766                .expect("should work to convert BLS key to G2Element");
767                (
768                    index,
769                    *name,
770                    fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
771                    *stake,
772                )
773            })
774            .collect()
775    }
776}
777
778// Used by other components to notify the randomness system of observed randomness.
779#[derive(Clone)]
780pub struct RandomnessReporter {
781    epoch_store: Weak<AuthorityPerEpochStore>,
782    epoch: EpochId,
783    network_handle: randomness::Handle,
784    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
785}
786
787impl RandomnessReporter {
788    /// Notifies the associated randomness manager that randomness for the given round has been
789    /// durably committed in a checkpoint. This completes the process of generating randomness for
790    /// the round.
791    pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> SuiResult {
792        let epoch_store = self
793            .epoch_store
794            .upgrade()
795            .ok_or(SuiErrorKind::EpochEnded(self.epoch))?;
796        let mut highest_completed_round = self.highest_completed_round.lock();
797        if Some(round) > *highest_completed_round {
798            *highest_completed_round = Some(round);
799            epoch_store
800                .tables()?
801                .randomness_highest_completed_round
802                .insert(&SINGLETON_KEY, &round)?;
803            self.network_handle
804                .complete_round(epoch_store.committee().epoch(), round);
805        }
806        Ok(())
807    }
808}
809
810#[derive(Debug, Clone, Copy, PartialEq, Eq)]
811pub enum DkgStatus {
812    Pending,
813    Failed,
814    Successful,
815}
816
817#[cfg(test)]
818mod tests {
819    use crate::{
820        authority::{
821            authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStats},
822            test_authority_builder::TestAuthorityBuilder,
823        },
824        checkpoints::CheckpointStore,
825        consensus_adapter::{
826            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
827            MockConsensusClient,
828        },
829        epoch::randomness::*,
830        mock_consensus::with_block_status,
831    };
832    use consensus_core::BlockStatus;
833    use consensus_types::block::BlockRef;
834    use std::num::NonZeroUsize;
835    use sui_protocol_config::ProtocolConfig;
836    use sui_protocol_config::{Chain, ProtocolVersion};
837    use sui_types::messages_consensus::ConsensusTransactionKind;
838    use tokio::sync::mpsc;
839
840    #[tokio::test]
841    async fn test_dkg_v1() {
842        test_dkg(1).await;
843    }
844
845    async fn test_dkg(version: u64) {
846        telemetry_subscribers::init_for_testing();
847
848        let network_config =
849            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
850                .committee_size(NonZeroUsize::new(4).unwrap())
851                .with_reference_gas_price(500)
852                .build();
853
854        let mut protocol_config =
855            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
856        protocol_config.set_random_beacon_dkg_version_for_testing(version);
857
858        let mut epoch_stores = Vec::new();
859        let mut randomness_managers = Vec::new();
860        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
861
862        for validator in network_config.validator_configs.iter() {
863            // Send consensus messages to channel.
864            let mut mock_consensus_client = MockConsensusClient::new();
865            let tx_consensus = tx_consensus.clone();
866            mock_consensus_client
867                .expect_submit()
868                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
869                    tx_consensus.try_send(transactions.to_vec()).unwrap();
870                    true
871                })
872                .returning(|_, _| {
873                    Ok((
874                        Vec::new(),
875                        with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
876                    ))
877                });
878
879            let state = TestAuthorityBuilder::new()
880                .with_protocol_config(protocol_config.clone())
881                .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
882                .build()
883                .await;
884            let consensus_adapter = Arc::new(ConsensusAdapter::new(
885                Arc::new(mock_consensus_client),
886                CheckpointStore::new_for_tests(),
887                state.name,
888                Arc::new(ConnectionMonitorStatusForTests {}),
889                100_000,
890                100_000,
891                None,
892                None,
893                ConsensusAdapterMetrics::new_test(),
894                state.epoch_store_for_testing().protocol_config().clone(),
895            ));
896            let epoch_store = state.epoch_store_for_testing();
897            let randomness_manager = RandomnessManager::try_new(
898                Arc::downgrade(&epoch_store),
899                Box::new(consensus_adapter.clone()),
900                sui_network::randomness::Handle::new_stub(),
901                validator.protocol_key_pair(),
902            )
903            .await
904            .unwrap();
905
906            epoch_stores.push(epoch_store);
907            randomness_managers.push(randomness_manager);
908        }
909
910        // Generate and distribute Messages.
911        let mut dkg_messages = Vec::new();
912        for randomness_manager in randomness_managers.iter_mut() {
913            randomness_manager.start_dkg().await.unwrap();
914
915            let mut dkg_message = rx_consensus.recv().await.unwrap();
916            assert!(dkg_message.len() == 1);
917            match dkg_message.remove(0).kind {
918                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
919                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
920                        .expect("DKG message deserialization should not fail");
921                    dkg_messages.push(msg);
922                }
923                _ => panic!("wrong type of message sent"),
924            }
925        }
926        for i in 0..randomness_managers.len() {
927            let mut output = ConsensusCommitOutput::new(0);
928            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
929                index: ExecutionIndices {
930                    last_committed_round: 0,
931                    ..Default::default()
932                },
933                ..Default::default()
934            });
935            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
936                randomness_managers[i]
937                    .add_message(&epoch_stores[j].name, dkg_message)
938                    .unwrap();
939            }
940            randomness_managers[i]
941                .advance_dkg(&mut output, 0)
942                .await
943                .unwrap();
944            let mut batch = epoch_stores[i].db_batch_for_test();
945            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
946            batch.write().unwrap();
947        }
948
949        // Generate and distribute Confirmations.
950        let mut dkg_confirmations = Vec::new();
951        for _ in 0..randomness_managers.len() {
952            let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
953            assert!(dkg_confirmation.len() == 1);
954            match dkg_confirmation.remove(0).kind {
955                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
956                    let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
957                        .expect("DKG message deserialization should not fail");
958                    dkg_confirmations.push(msg);
959                }
960                _ => panic!("wrong type of message sent"),
961            }
962        }
963        for i in 0..randomness_managers.len() {
964            let mut output = ConsensusCommitOutput::new(0);
965            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
966                index: ExecutionIndices {
967                    last_committed_round: 1,
968                    ..Default::default()
969                },
970                ..Default::default()
971            });
972            for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
973                randomness_managers[i]
974                    .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
975                    .unwrap();
976            }
977            randomness_managers[i]
978                .advance_dkg(&mut output, 0)
979                .await
980                .unwrap();
981            let mut batch = epoch_stores[i].db_batch_for_test();
982            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
983            batch.write().unwrap();
984        }
985
986        // Verify DKG completed.
987        for randomness_manager in &randomness_managers {
988            assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
989        }
990    }
991
992    #[tokio::test]
993    async fn test_dkg_expiration_v1() {
994        test_dkg_expiration(1).await;
995    }
996
997    async fn test_dkg_expiration(version: u64) {
998        telemetry_subscribers::init_for_testing();
999
1000        let network_config =
1001            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1002                .committee_size(NonZeroUsize::new(4).unwrap())
1003                .with_reference_gas_price(500)
1004                .build();
1005
1006        let mut epoch_stores = Vec::new();
1007        let mut randomness_managers = Vec::new();
1008        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1009
1010        let mut protocol_config =
1011            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1012        protocol_config.set_random_beacon_dkg_version_for_testing(version);
1013
1014        for validator in network_config.validator_configs.iter() {
1015            // Send consensus messages to channel.
1016            let mut mock_consensus_client = MockConsensusClient::new();
1017            let tx_consensus = tx_consensus.clone();
1018            mock_consensus_client
1019                .expect_submit()
1020                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1021                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1022                    true
1023                })
1024                .returning(|_, _| {
1025                    Ok((
1026                        Vec::new(),
1027                        with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
1028                    ))
1029                });
1030
1031            let state = TestAuthorityBuilder::new()
1032                .with_protocol_config(protocol_config.clone())
1033                .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
1034                .build()
1035                .await;
1036            let consensus_adapter = Arc::new(ConsensusAdapter::new(
1037                Arc::new(mock_consensus_client),
1038                CheckpointStore::new_for_tests(),
1039                state.name,
1040                Arc::new(ConnectionMonitorStatusForTests {}),
1041                100_000,
1042                100_000,
1043                None,
1044                None,
1045                ConsensusAdapterMetrics::new_test(),
1046                state.epoch_store_for_testing().protocol_config().clone(),
1047            ));
1048            let epoch_store = state.epoch_store_for_testing();
1049            let randomness_manager = RandomnessManager::try_new(
1050                Arc::downgrade(&epoch_store),
1051                Box::new(consensus_adapter.clone()),
1052                sui_network::randomness::Handle::new_stub(),
1053                validator.protocol_key_pair(),
1054            )
1055            .await
1056            .unwrap();
1057
1058            epoch_stores.push(epoch_store);
1059            randomness_managers.push(randomness_manager);
1060        }
1061
1062        // Generate and distribute Messages.
1063        let mut dkg_messages = Vec::new();
1064        for randomness_manager in randomness_managers.iter_mut() {
1065            randomness_manager.start_dkg().await.unwrap();
1066
1067            let mut dkg_message = rx_consensus.recv().await.unwrap();
1068            assert!(dkg_message.len() == 1);
1069            match dkg_message.remove(0).kind {
1070                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1071                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1072                        .expect("DKG message deserialization should not fail");
1073                    dkg_messages.push(msg);
1074                }
1075                _ => panic!("wrong type of message sent"),
1076            }
1077        }
1078        for i in 0..randomness_managers.len() {
1079            let mut output = ConsensusCommitOutput::new(0);
1080            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1081                index: ExecutionIndices {
1082                    last_committed_round: 0,
1083                    ..Default::default()
1084                },
1085                ..Default::default()
1086            });
1087            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1088                randomness_managers[i]
1089                    .add_message(&epoch_stores[j].name, dkg_message)
1090                    .unwrap();
1091            }
1092            randomness_managers[i]
1093                .advance_dkg(&mut output, u64::MAX)
1094                .await
1095                .unwrap();
1096            let mut batch = epoch_stores[i].db_batch_for_test();
1097            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1098            batch.write().unwrap();
1099        }
1100
1101        // Verify DKG failed.
1102        for randomness_manager in &randomness_managers {
1103            assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1104        }
1105    }
1106}