sui_core/epoch/
randomness.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::PeerId;
5use fastcrypto::encoding::{Encoding, Hex};
6use fastcrypto::error::{FastCryptoError, FastCryptoResult};
7use fastcrypto::groups::bls12381;
8use fastcrypto::serde_helpers::ToFromByteArray;
9use fastcrypto::traits::{KeyPair, ToFromBytes};
10use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
11use futures::StreamExt;
12use futures::stream::FuturesUnordered;
13use mysten_common::debug_fatal;
14use parking_lot::Mutex;
15use rand::SeedableRng;
16use rand::rngs::{OsRng, StdRng};
17use serde::{Deserialize, Serialize};
18use std::collections::{BTreeMap, HashMap};
19use std::sync::{Arc, Weak};
20use std::time::Instant;
21use sui_macros::fail_point_if;
22use sui_network::randomness;
23use sui_types::base_types::AuthorityName;
24use sui_types::committee::{Committee, EpochId, StakeUnit};
25use sui_types::crypto::{AuthorityKeyPair, RandomnessRound};
26use sui_types::error::{SuiErrorKind, SuiResult};
27use sui_types::messages_consensus::{
28    ConsensusTransaction, Round, TimestampMs, VersionedDkgConfirmation, VersionedDkgMessage,
29};
30use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
31use tokio::sync::OnceCell;
32use tokio::task::JoinHandle;
33use tracing::{debug, error, info, warn};
34use typed_store::Map;
35
36use crate::authority::authority_per_epoch_store::{
37    AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
38};
39use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
40use crate::consensus_adapter::SubmitToConsensus;
41use crate::randomness_round_receiver::RandomnessRoundReceiverHandle;
42
43type PkG = bls12381::G2Element;
44type EncG = bls12381::G2Element;
45
46pub const SINGLETON_KEY: u64 = 0;
47
48// Wrappers for DKG messages (to simplify upgrades).
49
50#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
51#[allow(clippy::large_enum_variant)]
52pub enum VersionedProcessedMessage {
53    V0(), // deprecated
54    V1(dkg_v1::ProcessedMessage<PkG, EncG>),
55}
56
57impl VersionedProcessedMessage {
58    pub fn sender(&self) -> PartyId {
59        match self {
60            VersionedProcessedMessage::V0() => {
61                panic!("BUG: invalid VersionedProcessedMessage version V0")
62            }
63            VersionedProcessedMessage::V1(msg) => msg.message.sender,
64        }
65    }
66
67    pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
68        if let VersionedProcessedMessage::V1(msg) = self {
69            msg
70        } else {
71            panic!("BUG: expected message version is 1")
72        }
73    }
74
75    pub fn as_v1(&self) -> Option<&dkg_v1::ProcessedMessage<PkG, EncG>> {
76        if let VersionedProcessedMessage::V1(msg) = self {
77            Some(msg)
78        } else {
79            None
80        }
81    }
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub enum VersionedUsedProcessedMessages {
86    V0(), // deprecated
87    V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
88}
89
90impl VersionedUsedProcessedMessages {
91    pub fn as_v1(&self) -> Option<&dkg_v1::UsedProcessedMessages<PkG, EncG>> {
92        if let VersionedUsedProcessedMessages::V1(msg) = self {
93            Some(msg)
94        } else {
95            None
96        }
97    }
98}
99
100/// Distinguishes between an active DKG participant (validator) and a read-only observer (fullnode).
101enum DkgRole {
102    Party(dkg_v1::Party<PkG, EncG>),
103    Observer(dkg_v1::Observer<PkG, EncG>),
104}
105
106impl DkgRole {
107    /// Creates a new DkgRole. When `authority_key_pair` is Some, creates an active Party
108    /// (validator). When None, creates a read-only Observer (fullnode).
109    ///
110    /// * `authority_key_pair` - The validator's BLS key pair used to derive the DKG private key.
111    ///   When None, an Observer is created that can follow DKG but not produce shares.
112    /// * `nodes` - The reduced set of DKG participants with their public keys and weights.
113    /// * `t` - The threshold number of shares required to reconstruct randomness.
114    /// * `random_oracle` - Epoch-specific oracle used to derive deterministic challenges during DKG.
115    fn try_new(
116        authority_key_pair: Option<&AuthorityKeyPair>,
117        nodes: nodes::Nodes<EncG>,
118        t: u16,
119        random_oracle: fastcrypto_tbls::random_oracle::RandomOracle,
120    ) -> Option<Self> {
121        let total_weight = nodes.total_weight();
122        let num_nodes = nodes.num_nodes();
123
124        if let Some(authority_key_pair) = authority_key_pair {
125            let randomness_private_key = bls12381::Scalar::from_byte_array(
126                authority_key_pair
127                    .copy()
128                    .private()
129                    .as_bytes()
130                    .try_into()
131                    .expect("key length should match"),
132            )
133            .expect("should work to convert BLS key to Scalar");
134            let party = match dkg_v1::Party::<PkG, EncG>::new(
135                fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
136                    randomness_private_key,
137                ),
138                nodes,
139                t,
140                random_oracle,
141                &mut rand::thread_rng(),
142            ) {
143                Ok(party) => party,
144                Err(err) => {
145                    debug_fatal!("random beacon: error while initializing Party: {err:?}");
146                    return None;
147                }
148            };
149            let name: AuthorityName = authority_key_pair.public().into();
150            info!(
151                "random beacon: Party initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}",
152            );
153            Some(DkgRole::Party(party))
154        } else {
155            let observer = match dkg_v1::Observer::<PkG, EncG>::new(nodes, t, random_oracle) {
156                Ok(observer) => observer,
157                Err(err) => {
158                    debug_fatal!("random beacon: error while initializing Observer: {err:?}");
159                    return None;
160                }
161            };
162            info!(
163                "random beacon: Observer initialized with total_weight={total_weight}, t={t}, num_nodes={num_nodes}",
164            );
165            Some(DkgRole::Observer(observer))
166        }
167    }
168
169    fn is_party(&self) -> bool {
170        matches!(self, DkgRole::Party(_))
171    }
172
173    fn is_observer(&self) -> bool {
174        matches!(self, DkgRole::Observer(_))
175    }
176
177    /// Processes a received DKG message according to the role.
178    fn process_message(
179        &self,
180        message: VersionedDkgMessage,
181    ) -> FastCryptoResult<VersionedProcessedMessage> {
182        match self {
183            DkgRole::Party(party) => {
184                let processed =
185                    party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
186                Ok(VersionedProcessedMessage::V1(processed))
187            }
188            DkgRole::Observer(observer) => {
189                let raw_msg = message.unwrap_v1();
190                observer.process_message(raw_msg.clone())?;
191                Ok(VersionedProcessedMessage::V1(dkg_v1::ProcessedMessage {
192                    message: raw_msg,
193                    shares: vec![],
194                    complaint: None,
195                }))
196            }
197        }
198    }
199
200    /// Merges processed DKG messages. For Party, produces a confirmation and used messages.
201    /// For Observer, produces only used messages, confirmation is None, as observer nodes do
202    /// not have any voting rights.
203    fn merge_messages(
204        &self,
205        messages: Vec<VersionedProcessedMessage>,
206    ) -> FastCryptoResult<(
207        Option<VersionedDkgConfirmation>,
208        VersionedUsedProcessedMessages,
209    )> {
210        match self {
211            DkgRole::Party(party) => {
212                let (conf, msgs) = party.merge(
213                    &messages
214                        .into_iter()
215                        .map(|vm| vm.unwrap_v1())
216                        .collect::<Vec<_>>(),
217                )?;
218                Ok((
219                    Some(VersionedDkgConfirmation::V1(conf)),
220                    VersionedUsedProcessedMessages::V1(msgs),
221                ))
222            }
223            DkgRole::Observer(observer) => {
224                let raw_messages: Vec<_> = messages
225                    .into_iter()
226                    .map(|pm| pm.unwrap_v1().message)
227                    .collect();
228                let used = observer.merge(raw_messages)?;
229                Ok((
230                    None,
231                    VersionedUsedProcessedMessages::V1(dkg_v1::UsedProcessedMessages(
232                        used.into_iter()
233                            .map(|m| dkg_v1::ProcessedMessage {
234                                message: m,
235                                shares: vec![],
236                                complaint: None,
237                            })
238                            .collect(),
239                    )),
240                ))
241            }
242        }
243    }
244
245    /// Completes DKG from used messages and confirmations. The output contains the shared public key which can be used
246    /// from there after to validate the randomness round signatures. For the observer case the output will contain the public
247    /// key but no shares, as again the node does not participate in the voting process.
248    fn complete_dkg<'a>(
249        &self,
250        used_messages: &VersionedUsedProcessedMessages,
251        confirmations: impl Iterator<Item = &'a VersionedDkgConfirmation>,
252    ) -> FastCryptoResult<Output<PkG, EncG>> {
253        match self {
254            DkgRole::Party(party) => {
255                let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
256                let msg = used_messages
257                    .as_v1()
258                    .expect("expected V1 used processed messages");
259                party.complete(
260                    msg,
261                    &confirmations
262                        .map(|vm| vm.as_v1().expect("expected V1 confirmation"))
263                        .cloned()
264                        .collect::<Vec<_>>(),
265                    rng,
266                )
267            }
268            DkgRole::Observer(observer) => {
269                let raw_messages: Vec<_> = used_messages
270                    .as_v1()
271                    .expect("expected V1 used processed messages")
272                    .0
273                    .iter()
274                    .map(|pm| pm.message.clone())
275                    .collect();
276                let confirmations: Vec<_> = confirmations
277                    .map(|c| c.as_v1().expect("expected V1 confirmation").clone())
278                    .collect();
279                observer.complete(&raw_messages, &confirmations)
280            }
281        }
282    }
283}
284
285// State machine for randomness DKG and generation.
286//
287// DKG protocol:
288// 1. This validator sends out a `Message` to all other validators.
289// 2. Once sufficient valid `Message`s are received from other validators via consensus and
290//    processed, this validator sends out a `Confirmation` to all other validators.
291// 3. Once sufficient `Confirmation`s are received from other validators via consensus and
292//    processed, they are combined to form a public VSS key and local private key shares.
293// 4. Randomness generation begins.
294//
295// Randomness generation:
296// 1. For each new round, AuthorityPerEpochStore eventually calls `generate_randomness`.
297// 2. This kicks off a process in RandomnessEventLoop to send partial signatures for the new
298//    round to all other validators.
299// 3. Once enough partial signautres for the round are collected, a RandomnessStateUpdate
300//    transaction is generated and injected into the ExecutionScheduler.
301// 4. Once the RandomnessStateUpdate transaction is seen in a certified checkpoint,
302//    `notify_randomness_in_checkpoint` is called to complete the round and stop sending
303//    partial signatures for it.
304pub struct RandomnessManager {
305    epoch_store: Weak<AuthorityPerEpochStore>,
306    epoch: EpochId,
307    consensus_adapter: Box<dyn SubmitToConsensus>,
308    network_handle: randomness::Handle,
309    authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
310
311    // State for DKG.
312    dkg_start_time: OnceCell<Instant>,
313    role: Arc<DkgRole>,
314    enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
315    processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
316    used_messages: OnceCell<VersionedUsedProcessedMessages>,
317    confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
318    dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
319
320    // State for randomness generation.
321    next_randomness_round: RandomnessRound,
322    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
323
324    randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
325}
326
327impl RandomnessManager {
328    // Returns None in case of invalid input or other failure to initialize DKG.
329    pub async fn try_new(
330        epoch_store_weak: Weak<AuthorityPerEpochStore>,
331        consensus_adapter: Box<dyn SubmitToConsensus>,
332        network_handle: randomness::Handle,
333        authority_key_pair: Option<&AuthorityKeyPair>,
334        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
335    ) -> Option<Self> {
336        let epoch_store = match epoch_store_weak.upgrade() {
337            Some(epoch_store) => epoch_store,
338            None => {
339                error!(
340                    "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
341                );
342                return None;
343            }
344        };
345        let tables = match epoch_store.tables() {
346            Ok(tables) => tables,
347            Err(_) => {
348                error!(
349                    "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
350                );
351                return None;
352            }
353        };
354        let protocol_config = epoch_store.protocol_config();
355        epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
356        epoch_store
357            .metrics
358            .epoch_random_beacon_dkg_num_shares
359            .set(0);
360
361        let committee = epoch_store.committee();
362        let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
363        if tracing::enabled!(tracing::Level::DEBUG) {
364            // Log first few entries in DKG info for debugging.
365            for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
366                let pk_bytes = pk.as_element().to_byte_array();
367                debug!(
368                    "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
369                );
370            }
371        }
372        let authority_ids: HashMap<_, _> =
373            info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
374        let authority_peer_ids = epoch_store
375            .epoch_start_config()
376            .epoch_start_state()
377            .get_authority_names_to_peer_ids();
378        let authority_info = authority_ids
379            .into_iter()
380            .map(|(name, id)| {
381                let peer_id = *authority_peer_ids
382                    .get(&name)
383                    .expect("authority name should be in peer_ids");
384                (name, (peer_id, id))
385            })
386            .collect();
387        let nodes = info
388            .iter()
389            .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
390                id: *id,
391                pk: pk.clone(),
392                weight: (*stake).try_into().expect("stake should fit in u16"),
393            })
394            .collect();
395        let (nodes, t) = match nodes::Nodes::new_reduced(
396            nodes,
397            committee
398                .validity_threshold()
399                .try_into()
400                .expect("validity threshold should fit in u16"),
401            protocol_config.random_beacon_reduction_allowed_delta(),
402            protocol_config
403                .random_beacon_reduction_lower_bound()
404                .try_into()
405                .expect("should fit u16"),
406        ) {
407            Ok((nodes, t)) => (nodes, t),
408            Err(err) => {
409                error!("random beacon: error while initializing Nodes: {err:?}");
410                return None;
411            }
412        };
413        let random_oracle = fastcrypto_tbls::random_oracle::RandomOracle::new(&format!(
414            "dkg {} {}",
415            Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
416            committee.epoch()
417        ));
418
419        let role = Arc::new(DkgRole::try_new(
420            authority_key_pair,
421            nodes,
422            t,
423            random_oracle,
424        )?);
425
426        // Load existing data from store.
427        let highest_completed_round = tables
428            .randomness_highest_completed_round
429            .get(&SINGLETON_KEY)
430            .expect("typed_store should not fail");
431        let mut rm = RandomnessManager {
432            epoch_store: epoch_store_weak,
433            epoch: committee.epoch(),
434            consensus_adapter,
435            network_handle: network_handle.clone(),
436            authority_info,
437            dkg_start_time: OnceCell::new(),
438            role,
439            enqueued_messages: BTreeMap::new(),
440            processed_messages: BTreeMap::new(),
441            used_messages: OnceCell::new(),
442            confirmations: BTreeMap::new(),
443            dkg_output: OnceCell::new(),
444            next_randomness_round: RandomnessRound(0),
445            highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
446            randomness_receiver_handle,
447        };
448        let dkg_output = match tables
449            .dkg_output_v2
450            .get(&SINGLETON_KEY)
451            .expect("typed_store should not fail")
452        {
453            Some(dkg_output) => Some(dkg_output),
454            None => tables
455                .dkg_output
456                .get(&SINGLETON_KEY)
457                .expect("typed_store should not fail")
458                .map(Some),
459        };
460        match dkg_output {
461            Some(Some(dkg_output)) => {
462                info!(
463                    "random beacon: loaded existing DKG output for epoch {}",
464                    committee.epoch()
465                );
466                epoch_store
467                    .metrics
468                    .epoch_random_beacon_dkg_num_shares
469                    .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
470                rm.dkg_output
471                    .set(Some(dkg_output.clone()))
472                    .expect("setting new OnceCell should succeed");
473                // Update the randomness round receiver with the public key, so it can now
474                // verify randomness round signatures received out of consensus.
475                rm.randomness_receiver_handle
476                    .set_public_key(*dkg_output.vss_pk.c0());
477
478                if let DkgRole::Party(party) = rm.role.as_ref() {
479                    network_handle.update_epoch(
480                        committee.epoch(),
481                        rm.authority_info.clone(),
482                        dkg_output,
483                        party.t(),
484                        highest_completed_round,
485                    );
486                }
487            }
488            Some(None) => {
489                // DKG previously completed as a failure (recorded only in `dkg_output_v2`).
490                // Restore that terminal state so DKG isn't re-run and randomness stays disabled
491                // for the epoch.
492                error!(
493                    "random beacon: loaded failed DKG for epoch {}. Randomness disabled for this epoch. All randomness-using transactions will fail.",
494                    committee.epoch()
495                );
496                epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
497                rm.dkg_output
498                    .set(None)
499                    .expect("setting new OnceCell should succeed");
500            }
501            None => {
502                info!(
503                    "random beacon: no existing DKG output found for epoch {}",
504                    committee.epoch()
505                );
506
507                // Load intermediate data.
508                assert!(
509                    epoch_store.protocol_config().dkg_version() > 0,
510                    "BUG: DKG version 0 is deprecated"
511                );
512                rm.processed_messages.extend(
513                    tables
514                        .dkg_processed_messages_v2
515                        .safe_iter()
516                        .map(|result| result.expect("typed_store should not fail")),
517                );
518                if let Some(used_messages) = tables
519                    .dkg_used_messages_v2
520                    .get(&SINGLETON_KEY)
521                    .expect("typed_store should not fail")
522                {
523                    rm.used_messages
524                        .set(used_messages.clone())
525                        .expect("setting new OnceCell should succeed");
526                }
527                rm.confirmations.extend(
528                    tables
529                        .dkg_confirmations_v2
530                        .safe_iter()
531                        .map(|result| result.expect("typed_store should not fail")),
532                );
533            }
534        }
535
536        // Resume randomness generation from where we left off.
537        // This must be loaded regardless of whether DKG has finished yet, since the
538        // RandomnessEventLoop and commit-handling logic in AuthorityPerEpochStore both depend on
539        // this state.
540        rm.next_randomness_round = tables
541            .randomness_next_round
542            .get(&SINGLETON_KEY)
543            .expect("typed_store should not fail")
544            .unwrap_or(RandomnessRound(0));
545        info!(
546            "random beacon: starting from next_randomness_round={}",
547            rm.next_randomness_round.0
548        );
549
550        // Re-send partial signatures for incomplete rounds (validators only).
551        if rm.role.is_party() {
552            let first_incomplete_round = highest_completed_round
553                .map(|r| r + 1)
554                .unwrap_or(RandomnessRound(0));
555            if first_incomplete_round < rm.next_randomness_round {
556                info!(
557                    "random beacon: resuming generation for randomness rounds from {} to {}",
558                    first_incomplete_round,
559                    rm.next_randomness_round - 1,
560                );
561                for r in first_incomplete_round.0..rm.next_randomness_round.0 {
562                    network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
563                }
564            }
565        }
566
567        Some(rm)
568    }
569
570    /// Sends the initial dkg::Message to begin the randomness DKG protocol.
571    /// For observers, this is a no-op (observers don't send messages).
572    pub async fn start_dkg(&mut self) -> SuiResult {
573        let party = match self.role.as_ref() {
574            DkgRole::Observer(_) => {
575                info!("random beacon: observer started observing DKG");
576                return Ok(());
577            }
578            DkgRole::Party(party) => party,
579        };
580
581        if self.used_messages.initialized() || self.dkg_output.initialized() {
582            // DKG already started (or completed or failed).
583            return Ok(());
584        }
585
586        let _ = self.dkg_start_time.set(Instant::now());
587
588        let epoch_store = self.epoch_store()?;
589        let dkg_version = epoch_store.protocol_config().dkg_version();
590        info!("random beacon: starting DKG, version {dkg_version}");
591
592        let msg = match VersionedDkgMessage::create(dkg_version, party) {
593            Ok(msg) => msg,
594            Err(FastCryptoError::IgnoredMessage) => {
595                info!(
596                    "random beacon: no DKG Message for party id={} (zero weight)",
597                    party.id
598                );
599                return Ok(());
600            }
601            Err(e) => {
602                error!("random beacon: error while creating a DKG Message: {e:?}");
603                return Ok(());
604            }
605        };
606
607        info!("random beacon: created {msg:?} with dkg version {dkg_version}");
608        let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
609
610        #[allow(unused_mut)]
611        let mut fail_point_skip_sending = false;
612        fail_point_if!("rb-dkg", || {
613            // maybe skip sending in simtests
614            fail_point_skip_sending = true;
615        });
616        if !fail_point_skip_sending {
617            self.consensus_adapter
618                .submit_to_consensus(&[transaction], &epoch_store)?;
619        }
620
621        epoch_store
622            .metrics
623            .epoch_random_beacon_dkg_message_time_ms
624            .set(
625                self.dkg_start_time
626                    .get()
627                    .unwrap() // already set above
628                    .elapsed()
629                    .as_millis() as i64,
630            );
631        Ok(())
632    }
633
634    /// Processes all received messages and advances the randomness DKG state machine when possible,
635    /// sending out a dkg::Confirmation and generating final output.
636    pub(crate) async fn advance_dkg(
637        &mut self,
638        consensus_output: &mut ConsensusCommitOutput,
639        round: Round,
640    ) -> SuiResult {
641        let epoch_store = self.epoch_store()?;
642
643        self.try_merge_messages(consensus_output, &epoch_store)
644            .await?;
645        self.try_complete_dkg(consensus_output, round, &epoch_store)?;
646
647        // If we ran out of time, mark DKG as failed.
648        if !self.dkg_output.initialized()
649            && round
650                > epoch_store
651                    .protocol_config()
652                    .random_beacon_dkg_timeout_round()
653                    .into()
654        {
655            error!(
656                "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
657            );
658            epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
659            self.dkg_output
660                .set(None)
661                .expect("checked above that `dkg_output` is uninitialized");
662            consensus_output.set_dkg_output(None);
663        }
664
665        Ok(())
666    }
667
668    /// Drains enqueued messages and attempts to merge them. For validators, a successful merge
669    /// produces and broadcasts a DKG Confirmation. For observers, it just records the used messages.
670    async fn try_merge_messages(
671        &mut self,
672        consensus_output: &mut ConsensusCommitOutput,
673        epoch_store: &Arc<AuthorityPerEpochStore>,
674    ) -> SuiResult {
675        if self.dkg_output.initialized() || self.used_messages.initialized() {
676            return Ok(());
677        }
678
679        // Process all enqueued messages.
680        let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
681            .into_values()
682            .collect();
683        while let Some(res) = handles.next().await {
684            if let Ok(Some(processed)) = res {
685                self.processed_messages
686                    .insert(processed.sender(), processed.clone());
687                consensus_output.insert_dkg_processed_message(processed);
688            }
689        }
690
691        let messages: Vec<_> = self.processed_messages.values().cloned().collect();
692
693        match self.role.merge_messages(messages) {
694            Ok((conf, used_msgs)) => {
695                if let Some(conf) = &conf {
696                    info!(
697                        "random beacon: sending DKG Confirmation with {} complaints",
698                        conf.num_of_complaints()
699                    );
700                } else {
701                    info!(
702                        "random beacon: observer merged {} DKG messages",
703                        used_msgs
704                            .as_v1()
705                            .expect("expected V1 used processed messages")
706                            .0
707                            .len()
708                    );
709                }
710                if self.used_messages.set(used_msgs.clone()).is_err() {
711                    error!("BUG: used_messages should only ever be set once");
712                }
713                consensus_output.insert_dkg_used_messages(used_msgs);
714
715                if let Some(conf) = conf {
716                    let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
717                        epoch_store.name,
718                        &conf,
719                    );
720
721                    #[allow(unused_mut)]
722                    let mut fail_point_skip_sending = false;
723                    fail_point_if!("rb-dkg", || {
724                        // maybe skip sending in simtests
725                        fail_point_skip_sending = true;
726                    });
727                    if !fail_point_skip_sending {
728                        self.consensus_adapter
729                            .submit_to_consensus(&[transaction], epoch_store)?;
730                    }
731
732                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
733                    if let Some(elapsed) = elapsed {
734                        epoch_store
735                            .metrics
736                            .epoch_random_beacon_dkg_confirmation_time_ms
737                            .set(elapsed as i64);
738                    }
739                }
740            }
741            Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
742            Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
743        }
744
745        Ok(())
746    }
747
748    /// Attempts to complete DKG once enough Confirmations have been collected. For validators,
749    /// this produces the shared public key and private key shares. For observers, only the
750    /// shared public key is derived.
751    fn try_complete_dkg(
752        &mut self,
753        consensus_output: &mut ConsensusCommitOutput,
754        round: Round,
755        epoch_store: &Arc<AuthorityPerEpochStore>,
756    ) -> SuiResult {
757        if !self.dkg_output.initialized() && self.used_messages.initialized() {
758            let used_messages = self
759                .used_messages
760                .get()
761                .expect("checked above that `used_messages` is initialized");
762            let complete_result = self
763                .role
764                .complete_dkg(used_messages, self.confirmations.values());
765
766            let epoch = epoch_store.committee().epoch();
767            let num_confirmations = self.confirmations.len();
768            let num_messages = self.processed_messages.len();
769
770            match complete_result {
771                Ok(output) => {
772                    // Set the output now both internally and to consensus output
773                    self.dkg_output
774                        .set(Some(output.clone()))
775                        .expect("checked above that `dkg_output` is uninitialized");
776                    consensus_output.set_dkg_output(Some(output.clone()));
777
778                    self.randomness_receiver_handle
779                        .set_public_key(*output.vss_pk.c0());
780
781                    let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
782                    epoch_store
783                        .metrics
784                        .epoch_random_beacon_dkg_epoch_start_completion_time_ms
785                        .set(epoch_elapsed as i64);
786                    epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
787
788                    match self.role.as_ref() {
789                        DkgRole::Party(party) => {
790                            let num_shares =
791                                output.shares.as_ref().map_or(0, |shares| shares.len());
792                            let elapsed =
793                                self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
794                            info!(
795                                "random beacon: DKG complete for Party epoch={epoch} commit_round={round} \
796                                 num_messages={num_messages} num_confirmations={num_confirmations} \
797                                 num_shares={num_shares} epoch_elapsed_ms={epoch_elapsed} dkg_elapsed_ms={elapsed:?}"
798                            );
799                            epoch_store
800                                .metrics
801                                .epoch_random_beacon_dkg_num_shares
802                                .set(num_shares as i64);
803
804                            if let Some(elapsed) = elapsed {
805                                epoch_store
806                                    .metrics
807                                    .epoch_random_beacon_dkg_completion_time_ms
808                                    .set(elapsed as i64);
809                            }
810
811                            self.network_handle.update_epoch(
812                                epoch_store.committee().epoch(),
813                                self.authority_info.clone(),
814                                output,
815                                party.t(),
816                                None,
817                            );
818                        }
819                        DkgRole::Observer(_) => {
820                            info!(
821                                "random beacon: DKG complete for Observer epoch={epoch} commit_round={round} \
822                                 num_messages={num_messages} num_confirmations={num_confirmations} \
823                                 epoch_elapsed_ms={epoch_elapsed}"
824                            );
825                        }
826                    }
827                }
828                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
829                Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
830            }
831        }
832
833        Ok(())
834    }
835
836    /// Adds a received VersionedDkgMessage to the randomness DKG state machine.
837    pub fn add_message(
838        &mut self,
839        authority: &AuthorityName,
840        msg: VersionedDkgMessage,
841    ) -> SuiResult {
842        // message was received from other validators, so we need to ensure it uses a supported
843        // version before we call other functions that assume the version is correct
844        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
845        if !msg.is_valid_version(dkg_version) {
846            warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
847            return Ok(());
848        }
849
850        if self.used_messages.initialized() || self.dkg_output.initialized() {
851            // We've already sent a `Confirmation`, so we can't add any more messages.
852            return Ok(());
853        }
854        let Some((_, party_id)) = self.authority_info.get(authority) else {
855            debug_fatal!(
856                "random beacon: received DKG Message from unknown authority: {authority:?}"
857            );
858            return Ok(());
859        };
860        if *party_id != msg.sender() {
861            warn!(
862                "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
863            );
864            return Ok(());
865        }
866        if self.enqueued_messages.contains_key(&msg.sender())
867            || self.processed_messages.contains_key(&msg.sender())
868        {
869            info!("ignoring duplicate DKG Message from authority {authority:?}");
870            return Ok(());
871        }
872
873        let sender = msg.sender();
874        let role = self.role.clone();
875        // TODO: Could save some CPU by not processing messages if we already have enough to merge.
876        let handle = tokio::task::spawn_blocking(move || match role.process_message(msg) {
877            Ok(processed) => Some(processed),
878            Err(err) => {
879                debug!("random beacon: error while processing DKG Message: {err:?}");
880                None
881            }
882        });
883        self.enqueued_messages.insert(sender, handle);
884        Ok(())
885    }
886
887    /// Adds a received dkg::Confirmation to the randomness DKG state machine.
888    pub(crate) fn add_confirmation(
889        &mut self,
890        output: &mut ConsensusCommitOutput,
891        authority: &AuthorityName,
892        conf: VersionedDkgConfirmation,
893    ) -> SuiResult {
894        // confirmation was received from other validators, so we need to ensure it uses a supported
895        // version before we call other functions that assume the version is correct
896        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
897        if !conf.is_valid_version(dkg_version) {
898            warn!(
899                "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
900            );
901            return Ok(());
902        }
903
904        if self.dkg_output.initialized() {
905            // Once we have completed DKG, no more `Confirmation`s are needed.
906            return Ok(());
907        }
908        let Some((_, party_id)) = self.authority_info.get(authority) else {
909            error!(
910                "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
911            );
912            return Ok(());
913        };
914        if *party_id != conf.sender() {
915            warn!(
916                "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
917            );
918            return Ok(());
919        }
920        self.confirmations.insert(conf.sender(), conf.clone());
921        output.insert_dkg_confirmation(conf);
922        Ok(())
923    }
924
925    /// Reserves the next available round number for randomness generation if enough time has
926    /// elapsed, or returns None if not yet ready (based on ProtocolConfig setting). Once the given
927    /// batch is written, `generate_randomness` must be called to start the process. On restart,
928    /// any reserved rounds for which the batch was written will automatically be resumed.
929    pub(crate) fn reserve_next_randomness(
930        &mut self,
931        commit_timestamp: TimestampMs,
932        output: &mut ConsensusCommitOutput,
933    ) -> SuiResult<Option<RandomnessRound>> {
934        let epoch_store = self.epoch_store()?;
935
936        let last_round_timestamp = epoch_store
937            .get_randomness_last_round_timestamp()
938            .expect("read should not fail");
939
940        if let Some(last_round_timestamp) = last_round_timestamp
941            && commit_timestamp - last_round_timestamp
942                < epoch_store
943                    .protocol_config()
944                    .random_beacon_min_round_interval_ms()
945        {
946            return Ok(None);
947        }
948
949        let randomness_round = self.next_randomness_round;
950        self.next_randomness_round = self
951            .next_randomness_round
952            .checked_add(1)
953            .expect("RandomnessRound should not overflow");
954
955        output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
956
957        Ok(Some(randomness_round))
958    }
959
960    /// Starts the process of generating the given RandomnessRound (validators only).
961    pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
962        if self.role.is_party() {
963            self.network_handle
964                .send_partial_signatures(epoch, randomness_round);
965        }
966    }
967
968    pub fn dkg_status(&self) -> DkgStatus {
969        match self.dkg_output.get() {
970            Some(Some(_)) => DkgStatus::Successful,
971            Some(None) => DkgStatus::Failed,
972            None => DkgStatus::Pending,
973        }
974    }
975
976    /// Generates a new RandomnessReporter for reporting observed rounds to this RandomnessManager.
977    /// Returns None for observers (they don't generate partial signatures).
978    pub fn reporter(&self) -> Option<RandomnessReporter> {
979        if self.role.is_observer() {
980            return None;
981        }
982        Some(RandomnessReporter {
983            epoch_store: self.epoch_store.clone(),
984            epoch: self.epoch,
985            network_handle: self.network_handle.clone(),
986            highest_completed_round: self.highest_completed_round.clone(),
987        })
988    }
989
990    #[cfg(test)]
991    fn dkg_output(&self) -> Option<&dkg_v1::Output<PkG, EncG>> {
992        self.dkg_output.get().and_then(|o| o.as_ref())
993    }
994
995    fn epoch_store(&self) -> SuiResult<Arc<AuthorityPerEpochStore>> {
996        self.epoch_store
997            .upgrade()
998            .ok_or(SuiErrorKind::EpochEnded(self.epoch).into())
999    }
1000
1001    fn randomness_dkg_info_from_committee(
1002        committee: &Committee,
1003    ) -> Vec<(
1004        u16,
1005        AuthorityName,
1006        fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
1007        StakeUnit,
1008    )> {
1009        committee
1010            .members()
1011            .map(|(name, stake)| {
1012                let index: u16 = committee
1013                    .authority_index(name)
1014                    .expect("lookup of known committee member should succeed")
1015                    .try_into()
1016                    .expect("authority index should fit in u16");
1017                let pk = bls12381::G2Element::from_byte_array(
1018                    committee
1019                        .public_key(name)
1020                        .expect("lookup of known committee member should succeed")
1021                        .as_bytes()
1022                        .try_into()
1023                        .expect("key length should match"),
1024                )
1025                .expect("should work to convert BLS key to G2Element");
1026                (
1027                    index,
1028                    *name,
1029                    fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
1030                    *stake,
1031                )
1032            })
1033            .collect()
1034    }
1035}
1036
1037// Used by other components to notify the randomness system of observed randomness.
1038#[derive(Clone)]
1039pub struct RandomnessReporter {
1040    epoch_store: Weak<AuthorityPerEpochStore>,
1041    epoch: EpochId,
1042    network_handle: randomness::Handle,
1043    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
1044}
1045
1046impl RandomnessReporter {
1047    /// Notifies the associated randomness manager that randomness for the given round has been
1048    /// durably committed in a checkpoint. This completes the process of generating randomness for
1049    /// the round.
1050    pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> SuiResult {
1051        let epoch_store = self
1052            .epoch_store
1053            .upgrade()
1054            .ok_or(SuiErrorKind::EpochEnded(self.epoch))?;
1055        let mut highest_completed_round = self.highest_completed_round.lock();
1056        if Some(round) > *highest_completed_round {
1057            *highest_completed_round = Some(round);
1058            epoch_store
1059                .tables()?
1060                .randomness_highest_completed_round
1061                .insert(&SINGLETON_KEY, &round)?;
1062            self.network_handle
1063                .complete_round(epoch_store.committee().epoch(), round);
1064        }
1065        Ok(())
1066    }
1067}
1068
1069#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1070pub enum DkgStatus {
1071    Pending,
1072    Failed,
1073    Successful,
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use crate::{
1079        authority::{
1080            authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStatsV2},
1081            test_authority_builder::TestAuthorityBuilder,
1082        },
1083        checkpoints::CheckpointStore,
1084        consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, MockConsensusClient},
1085        epoch::randomness::*,
1086        mock_consensus::with_block_status,
1087        randomness_round_receiver::RandomnessRoundReceiverHandle,
1088    };
1089    use consensus_core::BlockStatus;
1090    use consensus_types::block::BlockRef;
1091    use fastcrypto::groups::bls12381;
1092    use fastcrypto::serde_helpers::ToFromByteArray;
1093    use fastcrypto_tbls::{mocked_dkg, nodes};
1094    use std::num::NonZeroUsize;
1095    use sui_protocol_config::ProtocolConfig;
1096    use sui_protocol_config::{Chain, ProtocolVersion};
1097    use sui_types::{base_types::AuthorityName, messages_consensus::ConsensusTransactionKind};
1098    use tokio::sync::mpsc;
1099    use typed_store::Map;
1100
1101    use arc_swap::Guard;
1102
1103    /// Test harness that sets up validators (and optionally an observer) with mock consensus,
1104    /// ready for DKG message exchange.
1105    struct DkgTestSetup {
1106        network_config: sui_swarm_config::network_config::NetworkConfig,
1107        epoch_stores: Vec<Guard<Arc<AuthorityPerEpochStore>>>,
1108        randomness_managers: Vec<RandomnessManager>,
1109        tx_consensus: mpsc::Sender<Vec<ConsensusTransaction>>,
1110        rx_consensus: mpsc::Receiver<Vec<ConsensusTransaction>>,
1111        num_validators: usize,
1112    }
1113
1114    impl DkgTestSetup {
1115        async fn new(include_observer: bool) -> Self {
1116            let network_config =
1117                sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1118                    .committee_size(NonZeroUsize::new(4).unwrap())
1119                    .with_reference_gas_price(500)
1120                    .build();
1121
1122            let mut protocol_config =
1123                ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1124            protocol_config.set_random_beacon_dkg_version_for_testing(1);
1125
1126            let num_validators = network_config.validator_configs.len();
1127            let mut epoch_stores = Vec::new();
1128            let mut randomness_managers = Vec::new();
1129            let (tx_consensus, rx_consensus) = mpsc::channel(100);
1130
1131            for validator in network_config.validator_configs.iter() {
1132                let mut mock_consensus_client = MockConsensusClient::new();
1133                let tx_consensus = tx_consensus.clone();
1134                mock_consensus_client
1135                    .expect_submit()
1136                    .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1137                        tx_consensus.try_send(transactions.to_vec()).unwrap();
1138                        true
1139                    })
1140                    .returning(|_, _| {
1141                        Ok((
1142                            Vec::new(),
1143                            with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
1144                        ))
1145                    });
1146
1147                let state = TestAuthorityBuilder::new()
1148                    .with_protocol_config(protocol_config.clone())
1149                    .with_genesis_and_keypair(
1150                        &network_config.genesis,
1151                        validator.protocol_key_pair(),
1152                    )
1153                    .build()
1154                    .await;
1155                let consensus_adapter = Arc::new(ConsensusAdapter::new(
1156                    Arc::new(mock_consensus_client),
1157                    CheckpointStore::new_for_tests(),
1158                    state.name,
1159                    100_000,
1160                    100_000,
1161                    ConsensusAdapterMetrics::new_test(),
1162                    Arc::new(tokio::sync::Notify::new()),
1163                ));
1164                let epoch_store = state.epoch_store_for_testing();
1165                let randomness_manager = RandomnessManager::try_new(
1166                    Arc::downgrade(&epoch_store),
1167                    Box::new(consensus_adapter.clone()),
1168                    sui_network::randomness::Handle::new_stub(),
1169                    Some(validator.protocol_key_pair()),
1170                    RandomnessRoundReceiverHandle::new_for_testing(),
1171                )
1172                .await
1173                .unwrap();
1174
1175                epoch_stores.push(epoch_store);
1176                randomness_managers.push(randomness_manager);
1177            }
1178
1179            if include_observer {
1180                let observer_epoch_store = epoch_stores[0].clone();
1181                let mut mock_observer_consensus = MockConsensusClient::new();
1182                mock_observer_consensus
1183                    .expect_submit()
1184                    .returning(|_, _| panic!("observer should not submit to consensus"));
1185                let observer_adapter = Arc::new(ConsensusAdapter::new(
1186                    Arc::new(mock_observer_consensus),
1187                    CheckpointStore::new_for_tests(),
1188                    observer_epoch_store.name,
1189                    100_000,
1190                    100_000,
1191                    ConsensusAdapterMetrics::new_test(),
1192                    Arc::new(tokio::sync::Notify::new()),
1193                ));
1194                let observer_manager = RandomnessManager::try_new(
1195                    Arc::downgrade(&observer_epoch_store),
1196                    Box::new(observer_adapter),
1197                    sui_network::randomness::Handle::new_stub(),
1198                    None,
1199                    RandomnessRoundReceiverHandle::new_for_testing(),
1200                )
1201                .await
1202                .unwrap();
1203
1204                epoch_stores.push(observer_epoch_store.into());
1205                randomness_managers.push(observer_manager);
1206            }
1207
1208            Self {
1209                network_config,
1210                epoch_stores,
1211                randomness_managers,
1212                tx_consensus,
1213                rx_consensus,
1214                num_validators,
1215            }
1216        }
1217
1218        fn consensus_adapter(&self, authority: AuthorityName) -> Arc<ConsensusAdapter> {
1219            let mut mock_consensus_client = MockConsensusClient::new();
1220            let tx_consensus = self.tx_consensus.clone();
1221            mock_consensus_client
1222                .expect_submit()
1223                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1224                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1225                    true
1226                })
1227                .returning(|_, _| {
1228                    Ok((
1229                        Vec::new(),
1230                        with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
1231                    ))
1232                });
1233
1234            Arc::new(ConsensusAdapter::new(
1235                Arc::new(mock_consensus_client),
1236                CheckpointStore::new_for_tests(),
1237                authority,
1238                100_000,
1239                100_000,
1240                ConsensusAdapterMetrics::new_test(),
1241                Arc::new(tokio::sync::Notify::new()),
1242            ))
1243        }
1244
1245        async fn recover_validator_randomness_managers(&mut self) -> usize {
1246            let mut recovered_randomness_managers = Vec::new();
1247            for (i, validator) in self
1248                .network_config
1249                .validator_configs
1250                .iter()
1251                .enumerate()
1252                .take(self.num_validators)
1253            {
1254                let consensus_adapter = self.consensus_adapter(self.epoch_stores[i].name);
1255                recovered_randomness_managers.push(
1256                    RandomnessManager::try_new(
1257                        Arc::downgrade(&self.epoch_stores[i]),
1258                        Box::new(consensus_adapter),
1259                        sui_network::randomness::Handle::new_stub(),
1260                        Some(validator.protocol_key_pair()),
1261                        RandomnessRoundReceiverHandle::new_for_testing(),
1262                    )
1263                    .await
1264                    .unwrap(),
1265                );
1266            }
1267            let recovered_count = recovered_randomness_managers.len();
1268            self.randomness_managers = recovered_randomness_managers;
1269            recovered_count
1270        }
1271
1272        /// Runs start_dkg on all managers and collects the DKG messages from validators.
1273        async fn start_dkg_and_collect_messages(&mut self) -> Vec<VersionedDkgMessage> {
1274            let mut dkg_messages = Vec::new();
1275            for randomness_manager in self.randomness_managers.iter_mut() {
1276                randomness_manager.start_dkg().await.unwrap();
1277            }
1278            for _ in 0..self.num_validators {
1279                let mut dkg_message = self.rx_consensus.recv().await.unwrap();
1280                assert!(dkg_message.len() == 1);
1281                match dkg_message.remove(0).kind {
1282                    ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1283                        let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1284                            .expect("DKG message deserialization should not fail");
1285                        dkg_messages.push(msg);
1286                    }
1287                    _ => panic!("wrong type of message sent"),
1288                }
1289            }
1290            dkg_messages
1291        }
1292
1293        /// Distributes DKG messages to all managers and advances DKG at the given round.
1294        async fn distribute_messages_and_advance(
1295            &mut self,
1296            dkg_messages: &[VersionedDkgMessage],
1297            advance_round: Round,
1298        ) {
1299            let indexed_messages: Vec<_> = dkg_messages.iter().cloned().enumerate().collect();
1300            self.distribute_indexed_messages_and_advance(&indexed_messages, advance_round)
1301                .await;
1302        }
1303
1304        async fn distribute_indexed_messages_and_advance(
1305            &mut self,
1306            dkg_messages: &[(usize, VersionedDkgMessage)],
1307            advance_round: Round,
1308        ) {
1309            for i in 0..self.randomness_managers.len() {
1310                let mut output = ConsensusCommitOutput::new(0);
1311                output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1312                    index: ExecutionIndices {
1313                        last_committed_round: 0,
1314                        ..Default::default()
1315                    },
1316                    ..Default::default()
1317                });
1318                for (j, dkg_message) in dkg_messages.iter().cloned() {
1319                    self.randomness_managers[i]
1320                        .add_message(&self.epoch_stores[j].name, dkg_message)
1321                        .unwrap();
1322                }
1323                self.randomness_managers[i]
1324                    .advance_dkg(&mut output, advance_round)
1325                    .await
1326                    .unwrap();
1327                let mut batch = self.epoch_stores[i].db_batch_for_test();
1328                output
1329                    .write_to_batch(&self.epoch_stores[i], &mut batch)
1330                    .unwrap();
1331                batch.write().unwrap();
1332            }
1333        }
1334
1335        /// Collects DKG confirmations from validators and distributes them to all managers.
1336        async fn collect_and_distribute_confirmations(&mut self) {
1337            let mut dkg_confirmations = Vec::new();
1338            for _ in 0..self.num_validators {
1339                let mut dkg_confirmation = self.rx_consensus.recv().await.unwrap();
1340                assert!(dkg_confirmation.len() == 1);
1341                match dkg_confirmation.remove(0).kind {
1342                    ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
1343                        let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
1344                            .expect("DKG confirmation deserialization should not fail");
1345                        dkg_confirmations.push(msg);
1346                    }
1347                    _ => panic!("wrong type of message sent"),
1348                }
1349            }
1350            for i in 0..self.randomness_managers.len() {
1351                let mut output = ConsensusCommitOutput::new(0);
1352                output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1353                    index: ExecutionIndices {
1354                        last_committed_round: 1,
1355                        ..Default::default()
1356                    },
1357                    ..Default::default()
1358                });
1359                for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
1360                    self.randomness_managers[i]
1361                        .add_confirmation(&mut output, &self.epoch_stores[j].name, dkg_confirmation)
1362                        .unwrap();
1363                }
1364                self.randomness_managers[i]
1365                    .advance_dkg(&mut output, 0)
1366                    .await
1367                    .unwrap();
1368                let mut batch = self.epoch_stores[i].db_batch_for_test();
1369                output
1370                    .write_to_batch(&self.epoch_stores[i], &mut batch)
1371                    .unwrap();
1372                batch.write().unwrap();
1373            }
1374        }
1375    }
1376
1377    #[tokio::test]
1378    async fn test_dkg() {
1379        telemetry_subscribers::init_for_testing();
1380
1381        let mut setup = DkgTestSetup::new(false).await;
1382        let dkg_messages = setup.start_dkg_and_collect_messages().await;
1383        setup
1384            .distribute_messages_and_advance(&dkg_messages, 0)
1385            .await;
1386        setup.collect_and_distribute_confirmations().await;
1387
1388        for rm in &setup.randomness_managers {
1389            assert_eq!(DkgStatus::Successful, rm.dkg_status());
1390        }
1391    }
1392
1393    #[tokio::test]
1394    async fn test_dkg_expiration() {
1395        telemetry_subscribers::init_for_testing();
1396
1397        let mut setup = DkgTestSetup::new(false).await;
1398        let dkg_messages = setup.start_dkg_and_collect_messages().await;
1399        // Pass u64::MAX as round to trigger DKG timeout.
1400        setup
1401            .distribute_messages_and_advance(&dkg_messages, u64::MAX)
1402            .await;
1403
1404        for rm in &setup.randomness_managers {
1405            assert_eq!(DkgStatus::Failed, rm.dkg_status());
1406        }
1407
1408        let recovered_count = setup.recover_validator_randomness_managers().await;
1409        assert_eq!(setup.num_validators, recovered_count);
1410
1411        // Verify the failure is durably loaded on restart: RandomnessManagers reconstructed
1412        // from the same epoch stores must report DKG as failed (not pending), exercising the
1413        // `dkg_output_v2` failure-load path in `try_new`.
1414        for rm in &setup.randomness_managers {
1415            assert_eq!(DkgStatus::Failed, rm.dkg_status());
1416        }
1417    }
1418
1419    #[tokio::test]
1420    async fn test_dkg_recovers_processed_messages_after_restart() {
1421        telemetry_subscribers::init_for_testing();
1422
1423        let mut setup = DkgTestSetup::new(false).await;
1424        let dkg_messages = setup.start_dkg_and_collect_messages().await;
1425        let processed_before_restart = 1;
1426        assert!(processed_before_restart > 0);
1427        assert!(processed_before_restart < setup.num_validators);
1428        let pre_restart_messages: Vec<_> = dkg_messages
1429            .iter()
1430            .cloned()
1431            .enumerate()
1432            .take(processed_before_restart)
1433            .collect();
1434        let post_restart_messages: Vec<_> = dkg_messages
1435            .iter()
1436            .cloned()
1437            .enumerate()
1438            .skip(processed_before_restart)
1439            .collect();
1440
1441        setup
1442            .distribute_indexed_messages_and_advance(&pre_restart_messages, 0)
1443            .await;
1444
1445        for rm in &setup.randomness_managers {
1446            assert_eq!(DkgStatus::Pending, rm.dkg_status());
1447            assert_eq!(processed_before_restart, rm.processed_messages.len());
1448            assert!(!rm.used_messages.initialized());
1449        }
1450
1451        let recovered_count = setup.recover_validator_randomness_managers().await;
1452        assert_eq!(setup.num_validators, recovered_count);
1453        for rm in &setup.randomness_managers {
1454            assert_eq!(DkgStatus::Pending, rm.dkg_status());
1455            assert_eq!(processed_before_restart, rm.processed_messages.len());
1456            assert!(!rm.used_messages.initialized());
1457        }
1458
1459        setup
1460            .distribute_indexed_messages_and_advance(&post_restart_messages, 0)
1461            .await;
1462
1463        for rm in &setup.randomness_managers {
1464            assert_eq!(DkgStatus::Pending, rm.dkg_status());
1465            assert_eq!(setup.num_validators, rm.processed_messages.len());
1466            assert!(rm.used_messages.initialized());
1467        }
1468
1469        setup.collect_and_distribute_confirmations().await;
1470
1471        for rm in &setup.randomness_managers {
1472            assert_eq!(DkgStatus::Successful, rm.dkg_status());
1473        }
1474    }
1475
1476    /// Verifies that an Observer completes DKG alongside validators and derives the same
1477    /// shared public key (vss_pk), but without receiving any private key shares.
1478    #[tokio::test]
1479    async fn test_dkg_observer() {
1480        telemetry_subscribers::init_for_testing();
1481
1482        let mut setup = DkgTestSetup::new(true).await;
1483        let observer_idx = setup.randomness_managers.len() - 1;
1484
1485        let dkg_messages = setup.start_dkg_and_collect_messages().await;
1486        setup
1487            .distribute_messages_and_advance(&dkg_messages, 0)
1488            .await;
1489
1490        for rm in &setup.randomness_managers {
1491            assert_eq!(DkgStatus::Pending, rm.dkg_status());
1492        }
1493
1494        setup.collect_and_distribute_confirmations().await;
1495
1496        for rm in &setup.randomness_managers {
1497            assert_eq!(DkgStatus::Successful, rm.dkg_status());
1498        }
1499
1500        // Verify the observer derived the same vss_pk as validators, but without shares.
1501        let observer_output = setup.randomness_managers[observer_idx]
1502            .dkg_output()
1503            .expect("observer should have DKG output");
1504        let validator_output = setup.randomness_managers[0]
1505            .dkg_output()
1506            .expect("validator should have DKG output");
1507        assert_eq!(observer_output.vss_pk, validator_output.vss_pk);
1508        assert!(observer_output.shares.is_none());
1509
1510        for rm in &setup.randomness_managers[..setup.num_validators] {
1511            let output = rm.dkg_output().expect("validator should have DKG output");
1512            assert!(output.shares.is_some());
1513        }
1514    }
1515
1516    /// Builds a minimal set of DKG Nodes from a network config's validator key pairs.
1517    fn build_dkg_nodes(
1518        network_config: &sui_swarm_config::network_config::NetworkConfig,
1519    ) -> (nodes::Nodes<EncG>, u16) {
1520        let dkg_nodes: Vec<_> = network_config
1521            .validator_configs
1522            .iter()
1523            .enumerate()
1524            .map(|(i, v)| {
1525                let pk = bls12381::G2Element::from_byte_array(
1526                    v.protocol_key_pair()
1527                        .public()
1528                        .as_bytes()
1529                        .try_into()
1530                        .expect("key length should match"),
1531                )
1532                .expect("should work to convert BLS key to G2Element");
1533                nodes::Node::<EncG> {
1534                    id: i as u16,
1535                    pk: fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
1536                    weight: 1,
1537                }
1538            })
1539            .collect();
1540        let num_nodes = dkg_nodes.len();
1541        // threshold = ceil(num_nodes / 3) works for a minimal committee
1542        let t = num_nodes.div_ceil(3) as u16;
1543        nodes::Nodes::new(dkg_nodes).map(|n| (n, t)).unwrap()
1544    }
1545
1546    #[test]
1547    fn test_dkg_role_try_new_party() {
1548        let network_config =
1549            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1550                .committee_size(NonZeroUsize::new(4).unwrap())
1551                .with_reference_gas_price(500)
1552                .build();
1553
1554        let (nodes, t) = build_dkg_nodes(&network_config);
1555        let random_oracle =
1556            fastcrypto_tbls::random_oracle::RandomOracle::new("test_dkg_role_party");
1557
1558        let role = DkgRole::try_new(
1559            Some(network_config.validator_configs[0].protocol_key_pair()),
1560            nodes,
1561            t,
1562            random_oracle,
1563        );
1564        assert!(role.is_some());
1565        assert!(role.unwrap().is_party());
1566    }
1567
1568    #[tokio::test]
1569    async fn test_randomness_manager_crash_recovery_v1() {
1570        telemetry_subscribers::init_for_testing();
1571
1572        let network_config =
1573            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1574                .committee_size(NonZeroUsize::new(4).unwrap())
1575                .with_reference_gas_price(500)
1576                .build();
1577
1578        let mut protocol_config =
1579            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1580        protocol_config.set_random_beacon_dkg_version_for_testing(1);
1581
1582        let validator = &network_config.validator_configs[0];
1583        let state = TestAuthorityBuilder::new()
1584            .with_protocol_config(protocol_config.clone())
1585            .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
1586            .build()
1587            .await;
1588        let epoch_store = state.epoch_store_for_testing();
1589
1590        let dkg_nodes = nodes::Nodes::new(
1591            RandomnessManager::randomness_dkg_info_from_committee(epoch_store.committee())
1592                .into_iter()
1593                .map(|(id, _, pk, stake)| nodes::Node::<bls12381::G2Element> {
1594                    id,
1595                    pk,
1596                    weight: stake.try_into().unwrap(),
1597                })
1598                .collect(),
1599        )
1600        .unwrap();
1601        let threshold = epoch_store
1602            .committee()
1603            .validity_threshold()
1604            .try_into()
1605            .unwrap();
1606        let party_id = epoch_store
1607            .committee()
1608            .authority_index(&epoch_store.name)
1609            .unwrap()
1610            .try_into()
1611            .unwrap();
1612        let expected_dkg_output = mocked_dkg::generate_mocked_output::<
1613            bls12381::G2Element,
1614            bls12381::G2Element,
1615        >(dkg_nodes, threshold, 0, party_id);
1616        let expected_dkg_output_bytes =
1617            bcs::to_bytes(&expected_dkg_output).expect("DKG output serialization should not fail");
1618        let expected_public_key = *expected_dkg_output.vss_pk.c0();
1619
1620        let tables = epoch_store.tables().unwrap();
1621        tables
1622            .dkg_output
1623            .insert(&SINGLETON_KEY, &expected_dkg_output)
1624            .unwrap();
1625        tables
1626            .randomness_next_round
1627            .insert(&SINGLETON_KEY, &RandomnessRound(3))
1628            .unwrap();
1629        tables
1630            .randomness_highest_completed_round
1631            .insert(&SINGLETON_KEY, &RandomnessRound(1))
1632            .unwrap();
1633
1634        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1635            Arc::new(MockConsensusClient::new()),
1636            CheckpointStore::new_for_tests(),
1637            epoch_store.name,
1638            100_000,
1639            100_000,
1640            ConsensusAdapterMetrics::new_test(),
1641            Arc::new(tokio::sync::Notify::new()),
1642        ));
1643        let recovered_receiver_handle = RandomnessRoundReceiverHandle::new_for_testing();
1644        assert!(recovered_receiver_handle.public_key_for_testing().is_none());
1645
1646        let recovered_randomness_manager = RandomnessManager::try_new(
1647            Arc::downgrade(&epoch_store),
1648            Box::new(consensus_adapter.clone()),
1649            sui_network::randomness::Handle::new_stub(),
1650            Some(validator.protocol_key_pair()),
1651            recovered_receiver_handle.clone(),
1652        )
1653        .await
1654        .unwrap();
1655
1656        assert_eq!(
1657            DkgStatus::Successful,
1658            recovered_randomness_manager.dkg_status()
1659        );
1660        let recovered_dkg_output = recovered_randomness_manager
1661            .dkg_output
1662            .get()
1663            .expect("recovered DKG output should be initialized")
1664            .as_ref()
1665            .expect("recovered DKG should be successful");
1666        assert_eq!(
1667            expected_dkg_output_bytes,
1668            bcs::to_bytes(recovered_dkg_output).expect("DKG output serialization should not fail")
1669        );
1670        assert_eq!(
1671            RandomnessRound(3),
1672            recovered_randomness_manager.next_randomness_round
1673        );
1674        assert_eq!(
1675            Some(RandomnessRound(1)),
1676            *recovered_randomness_manager.highest_completed_round.lock()
1677        );
1678        let recovered_public_key = recovered_receiver_handle
1679            .public_key_for_testing()
1680            .expect("public key should be restored on recovery");
1681        assert_eq!(
1682            expected_public_key.to_byte_array(),
1683            recovered_public_key.to_byte_array()
1684        );
1685    }
1686
1687    #[test]
1688    fn test_dkg_role_try_new_observer() {
1689        let network_config =
1690            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1691                .committee_size(NonZeroUsize::new(4).unwrap())
1692                .with_reference_gas_price(500)
1693                .build();
1694
1695        let (nodes, t) = build_dkg_nodes(&network_config);
1696        let random_oracle =
1697            fastcrypto_tbls::random_oracle::RandomOracle::new("test_dkg_role_observer");
1698
1699        let role = DkgRole::try_new(None, nodes, t, random_oracle);
1700        assert!(role.is_some());
1701        assert!(role.unwrap().is_observer());
1702    }
1703}