1use anemo::PeerId;
5use fastcrypto::encoding::{Encoding, Hex};
6use fastcrypto::error::{FastCryptoError, FastCryptoResult};
7use fastcrypto::groups::bls12381;
8use fastcrypto::serde_helpers::ToFromByteArray;
9use fastcrypto::traits::{KeyPair, ToFromBytes};
10use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
11use futures::StreamExt;
12use futures::stream::FuturesUnordered;
13use mysten_common::debug_fatal;
14use parking_lot::Mutex;
15use rand::SeedableRng;
16use rand::rngs::{OsRng, StdRng};
17use serde::{Deserialize, Serialize};
18use std::collections::{BTreeMap, HashMap};
19use std::sync::{Arc, Weak};
20use std::time::Instant;
21use sui_macros::fail_point_if;
22use sui_network::randomness;
23use sui_types::base_types::AuthorityName;
24use sui_types::committee::{Committee, EpochId, StakeUnit};
25use sui_types::crypto::{AuthorityKeyPair, RandomnessRound};
26use sui_types::error::{SuiErrorKind, SuiResult};
27use sui_types::messages_consensus::{
28 ConsensusTransaction, Round, TimestampMs, VersionedDkgConfirmation, VersionedDkgMessage,
29};
30use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
31use tokio::sync::OnceCell;
32use tokio::task::JoinHandle;
33use tracing::{debug, error, info, warn};
34use typed_store::Map;
35
36use crate::authority::authority_per_epoch_store::{
37 AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
38};
39use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
40use crate::consensus_adapter::SubmitToConsensus;
41use crate::randomness_round_receiver::RandomnessRoundReceiverHandle;
42
43type PkG = bls12381::G2Element;
44type EncG = bls12381::G2Element;
45
46pub const SINGLETON_KEY: u64 = 0;
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
51#[allow(clippy::large_enum_variant)]
52pub enum VersionedProcessedMessage {
53 V0(), V1(dkg_v1::ProcessedMessage<PkG, EncG>),
55}
56
57impl VersionedProcessedMessage {
58 pub fn sender(&self) -> PartyId {
59 match self {
60 VersionedProcessedMessage::V0() => {
61 panic!("BUG: invalid VersionedProcessedMessage version V0")
62 }
63 VersionedProcessedMessage::V1(msg) => msg.message.sender,
64 }
65 }
66
67 pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
68 if let VersionedProcessedMessage::V1(msg) = self {
69 msg
70 } else {
71 panic!("BUG: expected message version is 1")
72 }
73 }
74
75 pub fn as_v1(&self) -> Option<&dkg_v1::ProcessedMessage<PkG, EncG>> {
76 if let VersionedProcessedMessage::V1(msg) = self {
77 Some(msg)
78 } else {
79 None
80 }
81 }
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub enum VersionedUsedProcessedMessages {
86 V0(), V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
88}
89
90impl VersionedUsedProcessedMessages {
91 pub fn as_v1(&self) -> Option<&dkg_v1::UsedProcessedMessages<PkG, EncG>> {
92 if let VersionedUsedProcessedMessages::V1(msg) = self {
93 Some(msg)
94 } else {
95 None
96 }
97 }
98}
99
100enum DkgRole {
102 Party(dkg_v1::Party<PkG, EncG>),
103 Observer(dkg_v1::Observer<PkG, EncG>),
104}
105
106impl DkgRole {
107 fn try_new(
116 authority_key_pair: Option<&AuthorityKeyPair>,
117 nodes: nodes::Nodes<EncG>,
118 t: u16,
119 random_oracle: fastcrypto_tbls::random_oracle::RandomOracle,
120 ) -> Option<Self> {
121 let total_weight = nodes.total_weight();
122 let num_nodes = nodes.num_nodes();
123
124 if let Some(authority_key_pair) = authority_key_pair {
125 let randomness_private_key = bls12381::Scalar::from_byte_array(
126 authority_key_pair
127 .copy()
128 .private()
129 .as_bytes()
130 .try_into()
131 .expect("key length should match"),
132 )
133 .expect("should work to convert BLS key to Scalar");
134 let party = match dkg_v1::Party::<PkG, EncG>::new(
135 fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
136 randomness_private_key,
137 ),
138 nodes,
139 t,
140 random_oracle,
141 &mut rand::thread_rng(),
142 ) {
143 Ok(party) => party,
144 Err(err) => {
145 debug_fatal!("random beacon: error while initializing Party: {err:?}");
146 return None;
147 }
148 };
149 let name: AuthorityName = authority_key_pair.public().into();
150 info!(
151 "random beacon: Party initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}",
152 );
153 Some(DkgRole::Party(party))
154 } else {
155 let observer = match dkg_v1::Observer::<PkG, EncG>::new(nodes, t, random_oracle) {
156 Ok(observer) => observer,
157 Err(err) => {
158 debug_fatal!("random beacon: error while initializing Observer: {err:?}");
159 return None;
160 }
161 };
162 info!(
163 "random beacon: Observer initialized with total_weight={total_weight}, t={t}, num_nodes={num_nodes}",
164 );
165 Some(DkgRole::Observer(observer))
166 }
167 }
168
169 fn is_party(&self) -> bool {
170 matches!(self, DkgRole::Party(_))
171 }
172
173 fn is_observer(&self) -> bool {
174 matches!(self, DkgRole::Observer(_))
175 }
176
177 fn process_message(
179 &self,
180 message: VersionedDkgMessage,
181 ) -> FastCryptoResult<VersionedProcessedMessage> {
182 match self {
183 DkgRole::Party(party) => {
184 let processed =
185 party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
186 Ok(VersionedProcessedMessage::V1(processed))
187 }
188 DkgRole::Observer(observer) => {
189 let raw_msg = message.unwrap_v1();
190 observer.process_message(raw_msg.clone())?;
191 Ok(VersionedProcessedMessage::V1(dkg_v1::ProcessedMessage {
192 message: raw_msg,
193 shares: vec![],
194 complaint: None,
195 }))
196 }
197 }
198 }
199
200 fn merge_messages(
204 &self,
205 messages: Vec<VersionedProcessedMessage>,
206 ) -> FastCryptoResult<(
207 Option<VersionedDkgConfirmation>,
208 VersionedUsedProcessedMessages,
209 )> {
210 match self {
211 DkgRole::Party(party) => {
212 let (conf, msgs) = party.merge(
213 &messages
214 .into_iter()
215 .map(|vm| vm.unwrap_v1())
216 .collect::<Vec<_>>(),
217 )?;
218 Ok((
219 Some(VersionedDkgConfirmation::V1(conf)),
220 VersionedUsedProcessedMessages::V1(msgs),
221 ))
222 }
223 DkgRole::Observer(observer) => {
224 let raw_messages: Vec<_> = messages
225 .into_iter()
226 .map(|pm| pm.unwrap_v1().message)
227 .collect();
228 let used = observer.merge(raw_messages)?;
229 Ok((
230 None,
231 VersionedUsedProcessedMessages::V1(dkg_v1::UsedProcessedMessages(
232 used.into_iter()
233 .map(|m| dkg_v1::ProcessedMessage {
234 message: m,
235 shares: vec![],
236 complaint: None,
237 })
238 .collect(),
239 )),
240 ))
241 }
242 }
243 }
244
245 fn complete_dkg<'a>(
249 &self,
250 used_messages: &VersionedUsedProcessedMessages,
251 confirmations: impl Iterator<Item = &'a VersionedDkgConfirmation>,
252 ) -> FastCryptoResult<Output<PkG, EncG>> {
253 match self {
254 DkgRole::Party(party) => {
255 let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
256 let msg = used_messages
257 .as_v1()
258 .expect("expected V1 used processed messages");
259 party.complete(
260 msg,
261 &confirmations
262 .map(|vm| vm.as_v1().expect("expected V1 confirmation"))
263 .cloned()
264 .collect::<Vec<_>>(),
265 rng,
266 )
267 }
268 DkgRole::Observer(observer) => {
269 let raw_messages: Vec<_> = used_messages
270 .as_v1()
271 .expect("expected V1 used processed messages")
272 .0
273 .iter()
274 .map(|pm| pm.message.clone())
275 .collect();
276 let confirmations: Vec<_> = confirmations
277 .map(|c| c.as_v1().expect("expected V1 confirmation").clone())
278 .collect();
279 observer.complete(&raw_messages, &confirmations)
280 }
281 }
282 }
283}
284
285pub struct RandomnessManager {
305 epoch_store: Weak<AuthorityPerEpochStore>,
306 epoch: EpochId,
307 consensus_adapter: Box<dyn SubmitToConsensus>,
308 network_handle: randomness::Handle,
309 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
310
311 dkg_start_time: OnceCell<Instant>,
313 role: Arc<DkgRole>,
314 enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
315 processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
316 used_messages: OnceCell<VersionedUsedProcessedMessages>,
317 confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
318 dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
319
320 next_randomness_round: RandomnessRound,
322 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
323
324 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
325}
326
327impl RandomnessManager {
328 pub async fn try_new(
330 epoch_store_weak: Weak<AuthorityPerEpochStore>,
331 consensus_adapter: Box<dyn SubmitToConsensus>,
332 network_handle: randomness::Handle,
333 authority_key_pair: Option<&AuthorityKeyPair>,
334 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
335 ) -> Option<Self> {
336 let epoch_store = match epoch_store_weak.upgrade() {
337 Some(epoch_store) => epoch_store,
338 None => {
339 error!(
340 "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
341 );
342 return None;
343 }
344 };
345 let tables = match epoch_store.tables() {
346 Ok(tables) => tables,
347 Err(_) => {
348 error!(
349 "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
350 );
351 return None;
352 }
353 };
354 let protocol_config = epoch_store.protocol_config();
355 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
356 epoch_store
357 .metrics
358 .epoch_random_beacon_dkg_num_shares
359 .set(0);
360
361 let committee = epoch_store.committee();
362 let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
363 if tracing::enabled!(tracing::Level::DEBUG) {
364 for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
366 let pk_bytes = pk.as_element().to_byte_array();
367 debug!(
368 "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
369 );
370 }
371 }
372 let authority_ids: HashMap<_, _> =
373 info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
374 let authority_peer_ids = epoch_store
375 .epoch_start_config()
376 .epoch_start_state()
377 .get_authority_names_to_peer_ids();
378 let authority_info = authority_ids
379 .into_iter()
380 .map(|(name, id)| {
381 let peer_id = *authority_peer_ids
382 .get(&name)
383 .expect("authority name should be in peer_ids");
384 (name, (peer_id, id))
385 })
386 .collect();
387 let nodes = info
388 .iter()
389 .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
390 id: *id,
391 pk: pk.clone(),
392 weight: (*stake).try_into().expect("stake should fit in u16"),
393 })
394 .collect();
395 let (nodes, t) = match nodes::Nodes::new_reduced(
396 nodes,
397 committee
398 .validity_threshold()
399 .try_into()
400 .expect("validity threshold should fit in u16"),
401 protocol_config.random_beacon_reduction_allowed_delta(),
402 protocol_config
403 .random_beacon_reduction_lower_bound()
404 .try_into()
405 .expect("should fit u16"),
406 ) {
407 Ok((nodes, t)) => (nodes, t),
408 Err(err) => {
409 error!("random beacon: error while initializing Nodes: {err:?}");
410 return None;
411 }
412 };
413 let random_oracle = fastcrypto_tbls::random_oracle::RandomOracle::new(&format!(
414 "dkg {} {}",
415 Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
416 committee.epoch()
417 ));
418
419 let role = Arc::new(DkgRole::try_new(
420 authority_key_pair,
421 nodes,
422 t,
423 random_oracle,
424 )?);
425
426 let highest_completed_round = tables
428 .randomness_highest_completed_round
429 .get(&SINGLETON_KEY)
430 .expect("typed_store should not fail");
431 let mut rm = RandomnessManager {
432 epoch_store: epoch_store_weak,
433 epoch: committee.epoch(),
434 consensus_adapter,
435 network_handle: network_handle.clone(),
436 authority_info,
437 dkg_start_time: OnceCell::new(),
438 role,
439 enqueued_messages: BTreeMap::new(),
440 processed_messages: BTreeMap::new(),
441 used_messages: OnceCell::new(),
442 confirmations: BTreeMap::new(),
443 dkg_output: OnceCell::new(),
444 next_randomness_round: RandomnessRound(0),
445 highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
446 randomness_receiver_handle,
447 };
448 let dkg_output = match tables
449 .dkg_output_v2
450 .get(&SINGLETON_KEY)
451 .expect("typed_store should not fail")
452 {
453 Some(dkg_output) => Some(dkg_output),
454 None => tables
455 .dkg_output
456 .get(&SINGLETON_KEY)
457 .expect("typed_store should not fail")
458 .map(Some),
459 };
460 match dkg_output {
461 Some(Some(dkg_output)) => {
462 info!(
463 "random beacon: loaded existing DKG output for epoch {}",
464 committee.epoch()
465 );
466 epoch_store
467 .metrics
468 .epoch_random_beacon_dkg_num_shares
469 .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
470 rm.dkg_output
471 .set(Some(dkg_output.clone()))
472 .expect("setting new OnceCell should succeed");
473 rm.randomness_receiver_handle
476 .set_public_key(*dkg_output.vss_pk.c0());
477
478 if let DkgRole::Party(party) = rm.role.as_ref() {
479 network_handle.update_epoch(
480 committee.epoch(),
481 rm.authority_info.clone(),
482 dkg_output,
483 party.t(),
484 highest_completed_round,
485 );
486 }
487 }
488 Some(None) => {
489 error!(
493 "random beacon: loaded failed DKG for epoch {}. Randomness disabled for this epoch. All randomness-using transactions will fail.",
494 committee.epoch()
495 );
496 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
497 rm.dkg_output
498 .set(None)
499 .expect("setting new OnceCell should succeed");
500 }
501 None => {
502 info!(
503 "random beacon: no existing DKG output found for epoch {}",
504 committee.epoch()
505 );
506
507 assert!(
509 epoch_store.protocol_config().dkg_version() > 0,
510 "BUG: DKG version 0 is deprecated"
511 );
512 rm.processed_messages.extend(
513 tables
514 .dkg_processed_messages_v2
515 .safe_iter()
516 .map(|result| result.expect("typed_store should not fail")),
517 );
518 if let Some(used_messages) = tables
519 .dkg_used_messages_v2
520 .get(&SINGLETON_KEY)
521 .expect("typed_store should not fail")
522 {
523 rm.used_messages
524 .set(used_messages.clone())
525 .expect("setting new OnceCell should succeed");
526 }
527 rm.confirmations.extend(
528 tables
529 .dkg_confirmations_v2
530 .safe_iter()
531 .map(|result| result.expect("typed_store should not fail")),
532 );
533 }
534 }
535
536 rm.next_randomness_round = tables
541 .randomness_next_round
542 .get(&SINGLETON_KEY)
543 .expect("typed_store should not fail")
544 .unwrap_or(RandomnessRound(0));
545 info!(
546 "random beacon: starting from next_randomness_round={}",
547 rm.next_randomness_round.0
548 );
549
550 if rm.role.is_party() {
552 let first_incomplete_round = highest_completed_round
553 .map(|r| r + 1)
554 .unwrap_or(RandomnessRound(0));
555 if first_incomplete_round < rm.next_randomness_round {
556 info!(
557 "random beacon: resuming generation for randomness rounds from {} to {}",
558 first_incomplete_round,
559 rm.next_randomness_round - 1,
560 );
561 for r in first_incomplete_round.0..rm.next_randomness_round.0 {
562 network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
563 }
564 }
565 }
566
567 Some(rm)
568 }
569
570 pub async fn start_dkg(&mut self) -> SuiResult {
573 let party = match self.role.as_ref() {
574 DkgRole::Observer(_) => {
575 info!("random beacon: observer started observing DKG");
576 return Ok(());
577 }
578 DkgRole::Party(party) => party,
579 };
580
581 if self.used_messages.initialized() || self.dkg_output.initialized() {
582 return Ok(());
584 }
585
586 let _ = self.dkg_start_time.set(Instant::now());
587
588 let epoch_store = self.epoch_store()?;
589 let dkg_version = epoch_store.protocol_config().dkg_version();
590 info!("random beacon: starting DKG, version {dkg_version}");
591
592 let msg = match VersionedDkgMessage::create(dkg_version, party) {
593 Ok(msg) => msg,
594 Err(FastCryptoError::IgnoredMessage) => {
595 info!(
596 "random beacon: no DKG Message for party id={} (zero weight)",
597 party.id
598 );
599 return Ok(());
600 }
601 Err(e) => {
602 error!("random beacon: error while creating a DKG Message: {e:?}");
603 return Ok(());
604 }
605 };
606
607 info!("random beacon: created {msg:?} with dkg version {dkg_version}");
608 let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
609
610 #[allow(unused_mut)]
611 let mut fail_point_skip_sending = false;
612 fail_point_if!("rb-dkg", || {
613 fail_point_skip_sending = true;
615 });
616 if !fail_point_skip_sending {
617 self.consensus_adapter
618 .submit_to_consensus(&[transaction], &epoch_store)?;
619 }
620
621 epoch_store
622 .metrics
623 .epoch_random_beacon_dkg_message_time_ms
624 .set(
625 self.dkg_start_time
626 .get()
627 .unwrap() .elapsed()
629 .as_millis() as i64,
630 );
631 Ok(())
632 }
633
634 pub(crate) async fn advance_dkg(
637 &mut self,
638 consensus_output: &mut ConsensusCommitOutput,
639 round: Round,
640 ) -> SuiResult {
641 let epoch_store = self.epoch_store()?;
642
643 self.try_merge_messages(consensus_output, &epoch_store)
644 .await?;
645 self.try_complete_dkg(consensus_output, round, &epoch_store)?;
646
647 if !self.dkg_output.initialized()
649 && round
650 > epoch_store
651 .protocol_config()
652 .random_beacon_dkg_timeout_round()
653 .into()
654 {
655 error!(
656 "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
657 );
658 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
659 self.dkg_output
660 .set(None)
661 .expect("checked above that `dkg_output` is uninitialized");
662 consensus_output.set_dkg_output(None);
663 }
664
665 Ok(())
666 }
667
668 async fn try_merge_messages(
671 &mut self,
672 consensus_output: &mut ConsensusCommitOutput,
673 epoch_store: &Arc<AuthorityPerEpochStore>,
674 ) -> SuiResult {
675 if self.dkg_output.initialized() || self.used_messages.initialized() {
676 return Ok(());
677 }
678
679 let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
681 .into_values()
682 .collect();
683 while let Some(res) = handles.next().await {
684 if let Ok(Some(processed)) = res {
685 self.processed_messages
686 .insert(processed.sender(), processed.clone());
687 consensus_output.insert_dkg_processed_message(processed);
688 }
689 }
690
691 let messages: Vec<_> = self.processed_messages.values().cloned().collect();
692
693 match self.role.merge_messages(messages) {
694 Ok((conf, used_msgs)) => {
695 if let Some(conf) = &conf {
696 info!(
697 "random beacon: sending DKG Confirmation with {} complaints",
698 conf.num_of_complaints()
699 );
700 } else {
701 info!(
702 "random beacon: observer merged {} DKG messages",
703 used_msgs
704 .as_v1()
705 .expect("expected V1 used processed messages")
706 .0
707 .len()
708 );
709 }
710 if self.used_messages.set(used_msgs.clone()).is_err() {
711 error!("BUG: used_messages should only ever be set once");
712 }
713 consensus_output.insert_dkg_used_messages(used_msgs);
714
715 if let Some(conf) = conf {
716 let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
717 epoch_store.name,
718 &conf,
719 );
720
721 #[allow(unused_mut)]
722 let mut fail_point_skip_sending = false;
723 fail_point_if!("rb-dkg", || {
724 fail_point_skip_sending = true;
726 });
727 if !fail_point_skip_sending {
728 self.consensus_adapter
729 .submit_to_consensus(&[transaction], epoch_store)?;
730 }
731
732 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
733 if let Some(elapsed) = elapsed {
734 epoch_store
735 .metrics
736 .epoch_random_beacon_dkg_confirmation_time_ms
737 .set(elapsed as i64);
738 }
739 }
740 }
741 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
743 }
744
745 Ok(())
746 }
747
748 fn try_complete_dkg(
752 &mut self,
753 consensus_output: &mut ConsensusCommitOutput,
754 round: Round,
755 epoch_store: &Arc<AuthorityPerEpochStore>,
756 ) -> SuiResult {
757 if !self.dkg_output.initialized() && self.used_messages.initialized() {
758 let used_messages = self
759 .used_messages
760 .get()
761 .expect("checked above that `used_messages` is initialized");
762 let complete_result = self
763 .role
764 .complete_dkg(used_messages, self.confirmations.values());
765
766 let epoch = epoch_store.committee().epoch();
767 let num_confirmations = self.confirmations.len();
768 let num_messages = self.processed_messages.len();
769
770 match complete_result {
771 Ok(output) => {
772 self.dkg_output
774 .set(Some(output.clone()))
775 .expect("checked above that `dkg_output` is uninitialized");
776 consensus_output.set_dkg_output(Some(output.clone()));
777
778 self.randomness_receiver_handle
779 .set_public_key(*output.vss_pk.c0());
780
781 let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
782 epoch_store
783 .metrics
784 .epoch_random_beacon_dkg_epoch_start_completion_time_ms
785 .set(epoch_elapsed as i64);
786 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
787
788 match self.role.as_ref() {
789 DkgRole::Party(party) => {
790 let num_shares =
791 output.shares.as_ref().map_or(0, |shares| shares.len());
792 let elapsed =
793 self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
794 info!(
795 "random beacon: DKG complete for Party epoch={epoch} commit_round={round} \
796 num_messages={num_messages} num_confirmations={num_confirmations} \
797 num_shares={num_shares} epoch_elapsed_ms={epoch_elapsed} dkg_elapsed_ms={elapsed:?}"
798 );
799 epoch_store
800 .metrics
801 .epoch_random_beacon_dkg_num_shares
802 .set(num_shares as i64);
803
804 if let Some(elapsed) = elapsed {
805 epoch_store
806 .metrics
807 .epoch_random_beacon_dkg_completion_time_ms
808 .set(elapsed as i64);
809 }
810
811 self.network_handle.update_epoch(
812 epoch_store.committee().epoch(),
813 self.authority_info.clone(),
814 output,
815 party.t(),
816 None,
817 );
818 }
819 DkgRole::Observer(_) => {
820 info!(
821 "random beacon: DKG complete for Observer epoch={epoch} commit_round={round} \
822 num_messages={num_messages} num_confirmations={num_confirmations} \
823 epoch_elapsed_ms={epoch_elapsed}"
824 );
825 }
826 }
827 }
828 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
830 }
831 }
832
833 Ok(())
834 }
835
836 pub fn add_message(
838 &mut self,
839 authority: &AuthorityName,
840 msg: VersionedDkgMessage,
841 ) -> SuiResult {
842 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
845 if !msg.is_valid_version(dkg_version) {
846 warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
847 return Ok(());
848 }
849
850 if self.used_messages.initialized() || self.dkg_output.initialized() {
851 return Ok(());
853 }
854 let Some((_, party_id)) = self.authority_info.get(authority) else {
855 debug_fatal!(
856 "random beacon: received DKG Message from unknown authority: {authority:?}"
857 );
858 return Ok(());
859 };
860 if *party_id != msg.sender() {
861 warn!(
862 "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
863 );
864 return Ok(());
865 }
866 if self.enqueued_messages.contains_key(&msg.sender())
867 || self.processed_messages.contains_key(&msg.sender())
868 {
869 info!("ignoring duplicate DKG Message from authority {authority:?}");
870 return Ok(());
871 }
872
873 let sender = msg.sender();
874 let role = self.role.clone();
875 let handle = tokio::task::spawn_blocking(move || match role.process_message(msg) {
877 Ok(processed) => Some(processed),
878 Err(err) => {
879 debug!("random beacon: error while processing DKG Message: {err:?}");
880 None
881 }
882 });
883 self.enqueued_messages.insert(sender, handle);
884 Ok(())
885 }
886
887 pub(crate) fn add_confirmation(
889 &mut self,
890 output: &mut ConsensusCommitOutput,
891 authority: &AuthorityName,
892 conf: VersionedDkgConfirmation,
893 ) -> SuiResult {
894 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
897 if !conf.is_valid_version(dkg_version) {
898 warn!(
899 "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
900 );
901 return Ok(());
902 }
903
904 if self.dkg_output.initialized() {
905 return Ok(());
907 }
908 let Some((_, party_id)) = self.authority_info.get(authority) else {
909 error!(
910 "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
911 );
912 return Ok(());
913 };
914 if *party_id != conf.sender() {
915 warn!(
916 "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
917 );
918 return Ok(());
919 }
920 self.confirmations.insert(conf.sender(), conf.clone());
921 output.insert_dkg_confirmation(conf);
922 Ok(())
923 }
924
925 pub(crate) fn reserve_next_randomness(
930 &mut self,
931 commit_timestamp: TimestampMs,
932 output: &mut ConsensusCommitOutput,
933 ) -> SuiResult<Option<RandomnessRound>> {
934 let epoch_store = self.epoch_store()?;
935
936 let last_round_timestamp = epoch_store
937 .get_randomness_last_round_timestamp()
938 .expect("read should not fail");
939
940 if let Some(last_round_timestamp) = last_round_timestamp
941 && commit_timestamp - last_round_timestamp
942 < epoch_store
943 .protocol_config()
944 .random_beacon_min_round_interval_ms()
945 {
946 return Ok(None);
947 }
948
949 let randomness_round = self.next_randomness_round;
950 self.next_randomness_round = self
951 .next_randomness_round
952 .checked_add(1)
953 .expect("RandomnessRound should not overflow");
954
955 output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
956
957 Ok(Some(randomness_round))
958 }
959
960 pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
962 if self.role.is_party() {
963 self.network_handle
964 .send_partial_signatures(epoch, randomness_round);
965 }
966 }
967
968 pub fn dkg_status(&self) -> DkgStatus {
969 match self.dkg_output.get() {
970 Some(Some(_)) => DkgStatus::Successful,
971 Some(None) => DkgStatus::Failed,
972 None => DkgStatus::Pending,
973 }
974 }
975
976 pub fn reporter(&self) -> Option<RandomnessReporter> {
979 if self.role.is_observer() {
980 return None;
981 }
982 Some(RandomnessReporter {
983 epoch_store: self.epoch_store.clone(),
984 epoch: self.epoch,
985 network_handle: self.network_handle.clone(),
986 highest_completed_round: self.highest_completed_round.clone(),
987 })
988 }
989
990 #[cfg(test)]
991 fn dkg_output(&self) -> Option<&dkg_v1::Output<PkG, EncG>> {
992 self.dkg_output.get().and_then(|o| o.as_ref())
993 }
994
995 fn epoch_store(&self) -> SuiResult<Arc<AuthorityPerEpochStore>> {
996 self.epoch_store
997 .upgrade()
998 .ok_or(SuiErrorKind::EpochEnded(self.epoch).into())
999 }
1000
1001 fn randomness_dkg_info_from_committee(
1002 committee: &Committee,
1003 ) -> Vec<(
1004 u16,
1005 AuthorityName,
1006 fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
1007 StakeUnit,
1008 )> {
1009 committee
1010 .members()
1011 .map(|(name, stake)| {
1012 let index: u16 = committee
1013 .authority_index(name)
1014 .expect("lookup of known committee member should succeed")
1015 .try_into()
1016 .expect("authority index should fit in u16");
1017 let pk = bls12381::G2Element::from_byte_array(
1018 committee
1019 .public_key(name)
1020 .expect("lookup of known committee member should succeed")
1021 .as_bytes()
1022 .try_into()
1023 .expect("key length should match"),
1024 )
1025 .expect("should work to convert BLS key to G2Element");
1026 (
1027 index,
1028 *name,
1029 fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
1030 *stake,
1031 )
1032 })
1033 .collect()
1034 }
1035}
1036
1037#[derive(Clone)]
1039pub struct RandomnessReporter {
1040 epoch_store: Weak<AuthorityPerEpochStore>,
1041 epoch: EpochId,
1042 network_handle: randomness::Handle,
1043 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
1044}
1045
1046impl RandomnessReporter {
1047 pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> SuiResult {
1051 let epoch_store = self
1052 .epoch_store
1053 .upgrade()
1054 .ok_or(SuiErrorKind::EpochEnded(self.epoch))?;
1055 let mut highest_completed_round = self.highest_completed_round.lock();
1056 if Some(round) > *highest_completed_round {
1057 *highest_completed_round = Some(round);
1058 epoch_store
1059 .tables()?
1060 .randomness_highest_completed_round
1061 .insert(&SINGLETON_KEY, &round)?;
1062 self.network_handle
1063 .complete_round(epoch_store.committee().epoch(), round);
1064 }
1065 Ok(())
1066 }
1067}
1068
1069#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1070pub enum DkgStatus {
1071 Pending,
1072 Failed,
1073 Successful,
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use crate::{
1079 authority::{
1080 authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStatsV2},
1081 test_authority_builder::TestAuthorityBuilder,
1082 },
1083 checkpoints::CheckpointStore,
1084 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, MockConsensusClient},
1085 epoch::randomness::*,
1086 mock_consensus::with_block_status,
1087 randomness_round_receiver::RandomnessRoundReceiverHandle,
1088 };
1089 use consensus_core::BlockStatus;
1090 use consensus_types::block::BlockRef;
1091 use fastcrypto::groups::bls12381;
1092 use fastcrypto::serde_helpers::ToFromByteArray;
1093 use fastcrypto_tbls::{mocked_dkg, nodes};
1094 use std::num::NonZeroUsize;
1095 use sui_protocol_config::ProtocolConfig;
1096 use sui_protocol_config::{Chain, ProtocolVersion};
1097 use sui_types::{base_types::AuthorityName, messages_consensus::ConsensusTransactionKind};
1098 use tokio::sync::mpsc;
1099 use typed_store::Map;
1100
1101 use arc_swap::Guard;
1102
1103 struct DkgTestSetup {
1106 network_config: sui_swarm_config::network_config::NetworkConfig,
1107 epoch_stores: Vec<Guard<Arc<AuthorityPerEpochStore>>>,
1108 randomness_managers: Vec<RandomnessManager>,
1109 tx_consensus: mpsc::Sender<Vec<ConsensusTransaction>>,
1110 rx_consensus: mpsc::Receiver<Vec<ConsensusTransaction>>,
1111 num_validators: usize,
1112 }
1113
1114 impl DkgTestSetup {
1115 async fn new(include_observer: bool) -> Self {
1116 let network_config =
1117 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1118 .committee_size(NonZeroUsize::new(4).unwrap())
1119 .with_reference_gas_price(500)
1120 .build();
1121
1122 let mut protocol_config =
1123 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1124 protocol_config.set_random_beacon_dkg_version_for_testing(1);
1125
1126 let num_validators = network_config.validator_configs.len();
1127 let mut epoch_stores = Vec::new();
1128 let mut randomness_managers = Vec::new();
1129 let (tx_consensus, rx_consensus) = mpsc::channel(100);
1130
1131 for validator in network_config.validator_configs.iter() {
1132 let mut mock_consensus_client = MockConsensusClient::new();
1133 let tx_consensus = tx_consensus.clone();
1134 mock_consensus_client
1135 .expect_submit()
1136 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1137 tx_consensus.try_send(transactions.to_vec()).unwrap();
1138 true
1139 })
1140 .returning(|_, _| {
1141 Ok((
1142 Vec::new(),
1143 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
1144 ))
1145 });
1146
1147 let state = TestAuthorityBuilder::new()
1148 .with_protocol_config(protocol_config.clone())
1149 .with_genesis_and_keypair(
1150 &network_config.genesis,
1151 validator.protocol_key_pair(),
1152 )
1153 .build()
1154 .await;
1155 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1156 Arc::new(mock_consensus_client),
1157 CheckpointStore::new_for_tests(),
1158 state.name,
1159 100_000,
1160 100_000,
1161 ConsensusAdapterMetrics::new_test(),
1162 Arc::new(tokio::sync::Notify::new()),
1163 ));
1164 let epoch_store = state.epoch_store_for_testing();
1165 let randomness_manager = RandomnessManager::try_new(
1166 Arc::downgrade(&epoch_store),
1167 Box::new(consensus_adapter.clone()),
1168 sui_network::randomness::Handle::new_stub(),
1169 Some(validator.protocol_key_pair()),
1170 RandomnessRoundReceiverHandle::new_for_testing(),
1171 )
1172 .await
1173 .unwrap();
1174
1175 epoch_stores.push(epoch_store);
1176 randomness_managers.push(randomness_manager);
1177 }
1178
1179 if include_observer {
1180 let observer_epoch_store = epoch_stores[0].clone();
1181 let mut mock_observer_consensus = MockConsensusClient::new();
1182 mock_observer_consensus
1183 .expect_submit()
1184 .returning(|_, _| panic!("observer should not submit to consensus"));
1185 let observer_adapter = Arc::new(ConsensusAdapter::new(
1186 Arc::new(mock_observer_consensus),
1187 CheckpointStore::new_for_tests(),
1188 observer_epoch_store.name,
1189 100_000,
1190 100_000,
1191 ConsensusAdapterMetrics::new_test(),
1192 Arc::new(tokio::sync::Notify::new()),
1193 ));
1194 let observer_manager = RandomnessManager::try_new(
1195 Arc::downgrade(&observer_epoch_store),
1196 Box::new(observer_adapter),
1197 sui_network::randomness::Handle::new_stub(),
1198 None,
1199 RandomnessRoundReceiverHandle::new_for_testing(),
1200 )
1201 .await
1202 .unwrap();
1203
1204 epoch_stores.push(observer_epoch_store.into());
1205 randomness_managers.push(observer_manager);
1206 }
1207
1208 Self {
1209 network_config,
1210 epoch_stores,
1211 randomness_managers,
1212 tx_consensus,
1213 rx_consensus,
1214 num_validators,
1215 }
1216 }
1217
1218 fn consensus_adapter(&self, authority: AuthorityName) -> Arc<ConsensusAdapter> {
1219 let mut mock_consensus_client = MockConsensusClient::new();
1220 let tx_consensus = self.tx_consensus.clone();
1221 mock_consensus_client
1222 .expect_submit()
1223 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1224 tx_consensus.try_send(transactions.to_vec()).unwrap();
1225 true
1226 })
1227 .returning(|_, _| {
1228 Ok((
1229 Vec::new(),
1230 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
1231 ))
1232 });
1233
1234 Arc::new(ConsensusAdapter::new(
1235 Arc::new(mock_consensus_client),
1236 CheckpointStore::new_for_tests(),
1237 authority,
1238 100_000,
1239 100_000,
1240 ConsensusAdapterMetrics::new_test(),
1241 Arc::new(tokio::sync::Notify::new()),
1242 ))
1243 }
1244
1245 async fn recover_validator_randomness_managers(&mut self) -> usize {
1246 let mut recovered_randomness_managers = Vec::new();
1247 for (i, validator) in self
1248 .network_config
1249 .validator_configs
1250 .iter()
1251 .enumerate()
1252 .take(self.num_validators)
1253 {
1254 let consensus_adapter = self.consensus_adapter(self.epoch_stores[i].name);
1255 recovered_randomness_managers.push(
1256 RandomnessManager::try_new(
1257 Arc::downgrade(&self.epoch_stores[i]),
1258 Box::new(consensus_adapter),
1259 sui_network::randomness::Handle::new_stub(),
1260 Some(validator.protocol_key_pair()),
1261 RandomnessRoundReceiverHandle::new_for_testing(),
1262 )
1263 .await
1264 .unwrap(),
1265 );
1266 }
1267 let recovered_count = recovered_randomness_managers.len();
1268 self.randomness_managers = recovered_randomness_managers;
1269 recovered_count
1270 }
1271
1272 async fn start_dkg_and_collect_messages(&mut self) -> Vec<VersionedDkgMessage> {
1274 let mut dkg_messages = Vec::new();
1275 for randomness_manager in self.randomness_managers.iter_mut() {
1276 randomness_manager.start_dkg().await.unwrap();
1277 }
1278 for _ in 0..self.num_validators {
1279 let mut dkg_message = self.rx_consensus.recv().await.unwrap();
1280 assert!(dkg_message.len() == 1);
1281 match dkg_message.remove(0).kind {
1282 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1283 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1284 .expect("DKG message deserialization should not fail");
1285 dkg_messages.push(msg);
1286 }
1287 _ => panic!("wrong type of message sent"),
1288 }
1289 }
1290 dkg_messages
1291 }
1292
1293 async fn distribute_messages_and_advance(
1295 &mut self,
1296 dkg_messages: &[VersionedDkgMessage],
1297 advance_round: Round,
1298 ) {
1299 let indexed_messages: Vec<_> = dkg_messages.iter().cloned().enumerate().collect();
1300 self.distribute_indexed_messages_and_advance(&indexed_messages, advance_round)
1301 .await;
1302 }
1303
1304 async fn distribute_indexed_messages_and_advance(
1305 &mut self,
1306 dkg_messages: &[(usize, VersionedDkgMessage)],
1307 advance_round: Round,
1308 ) {
1309 for i in 0..self.randomness_managers.len() {
1310 let mut output = ConsensusCommitOutput::new(0);
1311 output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1312 index: ExecutionIndices {
1313 last_committed_round: 0,
1314 ..Default::default()
1315 },
1316 ..Default::default()
1317 });
1318 for (j, dkg_message) in dkg_messages.iter().cloned() {
1319 self.randomness_managers[i]
1320 .add_message(&self.epoch_stores[j].name, dkg_message)
1321 .unwrap();
1322 }
1323 self.randomness_managers[i]
1324 .advance_dkg(&mut output, advance_round)
1325 .await
1326 .unwrap();
1327 let mut batch = self.epoch_stores[i].db_batch_for_test();
1328 output
1329 .write_to_batch(&self.epoch_stores[i], &mut batch)
1330 .unwrap();
1331 batch.write().unwrap();
1332 }
1333 }
1334
1335 async fn collect_and_distribute_confirmations(&mut self) {
1337 let mut dkg_confirmations = Vec::new();
1338 for _ in 0..self.num_validators {
1339 let mut dkg_confirmation = self.rx_consensus.recv().await.unwrap();
1340 assert!(dkg_confirmation.len() == 1);
1341 match dkg_confirmation.remove(0).kind {
1342 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
1343 let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
1344 .expect("DKG confirmation deserialization should not fail");
1345 dkg_confirmations.push(msg);
1346 }
1347 _ => panic!("wrong type of message sent"),
1348 }
1349 }
1350 for i in 0..self.randomness_managers.len() {
1351 let mut output = ConsensusCommitOutput::new(0);
1352 output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1353 index: ExecutionIndices {
1354 last_committed_round: 1,
1355 ..Default::default()
1356 },
1357 ..Default::default()
1358 });
1359 for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
1360 self.randomness_managers[i]
1361 .add_confirmation(&mut output, &self.epoch_stores[j].name, dkg_confirmation)
1362 .unwrap();
1363 }
1364 self.randomness_managers[i]
1365 .advance_dkg(&mut output, 0)
1366 .await
1367 .unwrap();
1368 let mut batch = self.epoch_stores[i].db_batch_for_test();
1369 output
1370 .write_to_batch(&self.epoch_stores[i], &mut batch)
1371 .unwrap();
1372 batch.write().unwrap();
1373 }
1374 }
1375 }
1376
1377 #[tokio::test]
1378 async fn test_dkg() {
1379 telemetry_subscribers::init_for_testing();
1380
1381 let mut setup = DkgTestSetup::new(false).await;
1382 let dkg_messages = setup.start_dkg_and_collect_messages().await;
1383 setup
1384 .distribute_messages_and_advance(&dkg_messages, 0)
1385 .await;
1386 setup.collect_and_distribute_confirmations().await;
1387
1388 for rm in &setup.randomness_managers {
1389 assert_eq!(DkgStatus::Successful, rm.dkg_status());
1390 }
1391 }
1392
1393 #[tokio::test]
1394 async fn test_dkg_expiration() {
1395 telemetry_subscribers::init_for_testing();
1396
1397 let mut setup = DkgTestSetup::new(false).await;
1398 let dkg_messages = setup.start_dkg_and_collect_messages().await;
1399 setup
1401 .distribute_messages_and_advance(&dkg_messages, u64::MAX)
1402 .await;
1403
1404 for rm in &setup.randomness_managers {
1405 assert_eq!(DkgStatus::Failed, rm.dkg_status());
1406 }
1407
1408 let recovered_count = setup.recover_validator_randomness_managers().await;
1409 assert_eq!(setup.num_validators, recovered_count);
1410
1411 for rm in &setup.randomness_managers {
1415 assert_eq!(DkgStatus::Failed, rm.dkg_status());
1416 }
1417 }
1418
1419 #[tokio::test]
1420 async fn test_dkg_recovers_processed_messages_after_restart() {
1421 telemetry_subscribers::init_for_testing();
1422
1423 let mut setup = DkgTestSetup::new(false).await;
1424 let dkg_messages = setup.start_dkg_and_collect_messages().await;
1425 let processed_before_restart = 1;
1426 assert!(processed_before_restart > 0);
1427 assert!(processed_before_restart < setup.num_validators);
1428 let pre_restart_messages: Vec<_> = dkg_messages
1429 .iter()
1430 .cloned()
1431 .enumerate()
1432 .take(processed_before_restart)
1433 .collect();
1434 let post_restart_messages: Vec<_> = dkg_messages
1435 .iter()
1436 .cloned()
1437 .enumerate()
1438 .skip(processed_before_restart)
1439 .collect();
1440
1441 setup
1442 .distribute_indexed_messages_and_advance(&pre_restart_messages, 0)
1443 .await;
1444
1445 for rm in &setup.randomness_managers {
1446 assert_eq!(DkgStatus::Pending, rm.dkg_status());
1447 assert_eq!(processed_before_restart, rm.processed_messages.len());
1448 assert!(!rm.used_messages.initialized());
1449 }
1450
1451 let recovered_count = setup.recover_validator_randomness_managers().await;
1452 assert_eq!(setup.num_validators, recovered_count);
1453 for rm in &setup.randomness_managers {
1454 assert_eq!(DkgStatus::Pending, rm.dkg_status());
1455 assert_eq!(processed_before_restart, rm.processed_messages.len());
1456 assert!(!rm.used_messages.initialized());
1457 }
1458
1459 setup
1460 .distribute_indexed_messages_and_advance(&post_restart_messages, 0)
1461 .await;
1462
1463 for rm in &setup.randomness_managers {
1464 assert_eq!(DkgStatus::Pending, rm.dkg_status());
1465 assert_eq!(setup.num_validators, rm.processed_messages.len());
1466 assert!(rm.used_messages.initialized());
1467 }
1468
1469 setup.collect_and_distribute_confirmations().await;
1470
1471 for rm in &setup.randomness_managers {
1472 assert_eq!(DkgStatus::Successful, rm.dkg_status());
1473 }
1474 }
1475
1476 #[tokio::test]
1479 async fn test_dkg_observer() {
1480 telemetry_subscribers::init_for_testing();
1481
1482 let mut setup = DkgTestSetup::new(true).await;
1483 let observer_idx = setup.randomness_managers.len() - 1;
1484
1485 let dkg_messages = setup.start_dkg_and_collect_messages().await;
1486 setup
1487 .distribute_messages_and_advance(&dkg_messages, 0)
1488 .await;
1489
1490 for rm in &setup.randomness_managers {
1491 assert_eq!(DkgStatus::Pending, rm.dkg_status());
1492 }
1493
1494 setup.collect_and_distribute_confirmations().await;
1495
1496 for rm in &setup.randomness_managers {
1497 assert_eq!(DkgStatus::Successful, rm.dkg_status());
1498 }
1499
1500 let observer_output = setup.randomness_managers[observer_idx]
1502 .dkg_output()
1503 .expect("observer should have DKG output");
1504 let validator_output = setup.randomness_managers[0]
1505 .dkg_output()
1506 .expect("validator should have DKG output");
1507 assert_eq!(observer_output.vss_pk, validator_output.vss_pk);
1508 assert!(observer_output.shares.is_none());
1509
1510 for rm in &setup.randomness_managers[..setup.num_validators] {
1511 let output = rm.dkg_output().expect("validator should have DKG output");
1512 assert!(output.shares.is_some());
1513 }
1514 }
1515
1516 fn build_dkg_nodes(
1518 network_config: &sui_swarm_config::network_config::NetworkConfig,
1519 ) -> (nodes::Nodes<EncG>, u16) {
1520 let dkg_nodes: Vec<_> = network_config
1521 .validator_configs
1522 .iter()
1523 .enumerate()
1524 .map(|(i, v)| {
1525 let pk = bls12381::G2Element::from_byte_array(
1526 v.protocol_key_pair()
1527 .public()
1528 .as_bytes()
1529 .try_into()
1530 .expect("key length should match"),
1531 )
1532 .expect("should work to convert BLS key to G2Element");
1533 nodes::Node::<EncG> {
1534 id: i as u16,
1535 pk: fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
1536 weight: 1,
1537 }
1538 })
1539 .collect();
1540 let num_nodes = dkg_nodes.len();
1541 let t = num_nodes.div_ceil(3) as u16;
1543 nodes::Nodes::new(dkg_nodes).map(|n| (n, t)).unwrap()
1544 }
1545
1546 #[test]
1547 fn test_dkg_role_try_new_party() {
1548 let network_config =
1549 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1550 .committee_size(NonZeroUsize::new(4).unwrap())
1551 .with_reference_gas_price(500)
1552 .build();
1553
1554 let (nodes, t) = build_dkg_nodes(&network_config);
1555 let random_oracle =
1556 fastcrypto_tbls::random_oracle::RandomOracle::new("test_dkg_role_party");
1557
1558 let role = DkgRole::try_new(
1559 Some(network_config.validator_configs[0].protocol_key_pair()),
1560 nodes,
1561 t,
1562 random_oracle,
1563 );
1564 assert!(role.is_some());
1565 assert!(role.unwrap().is_party());
1566 }
1567
1568 #[tokio::test]
1569 async fn test_randomness_manager_crash_recovery_v1() {
1570 telemetry_subscribers::init_for_testing();
1571
1572 let network_config =
1573 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1574 .committee_size(NonZeroUsize::new(4).unwrap())
1575 .with_reference_gas_price(500)
1576 .build();
1577
1578 let mut protocol_config =
1579 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1580 protocol_config.set_random_beacon_dkg_version_for_testing(1);
1581
1582 let validator = &network_config.validator_configs[0];
1583 let state = TestAuthorityBuilder::new()
1584 .with_protocol_config(protocol_config.clone())
1585 .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
1586 .build()
1587 .await;
1588 let epoch_store = state.epoch_store_for_testing();
1589
1590 let dkg_nodes = nodes::Nodes::new(
1591 RandomnessManager::randomness_dkg_info_from_committee(epoch_store.committee())
1592 .into_iter()
1593 .map(|(id, _, pk, stake)| nodes::Node::<bls12381::G2Element> {
1594 id,
1595 pk,
1596 weight: stake.try_into().unwrap(),
1597 })
1598 .collect(),
1599 )
1600 .unwrap();
1601 let threshold = epoch_store
1602 .committee()
1603 .validity_threshold()
1604 .try_into()
1605 .unwrap();
1606 let party_id = epoch_store
1607 .committee()
1608 .authority_index(&epoch_store.name)
1609 .unwrap()
1610 .try_into()
1611 .unwrap();
1612 let expected_dkg_output = mocked_dkg::generate_mocked_output::<
1613 bls12381::G2Element,
1614 bls12381::G2Element,
1615 >(dkg_nodes, threshold, 0, party_id);
1616 let expected_dkg_output_bytes =
1617 bcs::to_bytes(&expected_dkg_output).expect("DKG output serialization should not fail");
1618 let expected_public_key = *expected_dkg_output.vss_pk.c0();
1619
1620 let tables = epoch_store.tables().unwrap();
1621 tables
1622 .dkg_output
1623 .insert(&SINGLETON_KEY, &expected_dkg_output)
1624 .unwrap();
1625 tables
1626 .randomness_next_round
1627 .insert(&SINGLETON_KEY, &RandomnessRound(3))
1628 .unwrap();
1629 tables
1630 .randomness_highest_completed_round
1631 .insert(&SINGLETON_KEY, &RandomnessRound(1))
1632 .unwrap();
1633
1634 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1635 Arc::new(MockConsensusClient::new()),
1636 CheckpointStore::new_for_tests(),
1637 epoch_store.name,
1638 100_000,
1639 100_000,
1640 ConsensusAdapterMetrics::new_test(),
1641 Arc::new(tokio::sync::Notify::new()),
1642 ));
1643 let recovered_receiver_handle = RandomnessRoundReceiverHandle::new_for_testing();
1644 assert!(recovered_receiver_handle.public_key_for_testing().is_none());
1645
1646 let recovered_randomness_manager = RandomnessManager::try_new(
1647 Arc::downgrade(&epoch_store),
1648 Box::new(consensus_adapter.clone()),
1649 sui_network::randomness::Handle::new_stub(),
1650 Some(validator.protocol_key_pair()),
1651 recovered_receiver_handle.clone(),
1652 )
1653 .await
1654 .unwrap();
1655
1656 assert_eq!(
1657 DkgStatus::Successful,
1658 recovered_randomness_manager.dkg_status()
1659 );
1660 let recovered_dkg_output = recovered_randomness_manager
1661 .dkg_output
1662 .get()
1663 .expect("recovered DKG output should be initialized")
1664 .as_ref()
1665 .expect("recovered DKG should be successful");
1666 assert_eq!(
1667 expected_dkg_output_bytes,
1668 bcs::to_bytes(recovered_dkg_output).expect("DKG output serialization should not fail")
1669 );
1670 assert_eq!(
1671 RandomnessRound(3),
1672 recovered_randomness_manager.next_randomness_round
1673 );
1674 assert_eq!(
1675 Some(RandomnessRound(1)),
1676 *recovered_randomness_manager.highest_completed_round.lock()
1677 );
1678 let recovered_public_key = recovered_receiver_handle
1679 .public_key_for_testing()
1680 .expect("public key should be restored on recovery");
1681 assert_eq!(
1682 expected_public_key.to_byte_array(),
1683 recovered_public_key.to_byte_array()
1684 );
1685 }
1686
1687 #[test]
1688 fn test_dkg_role_try_new_observer() {
1689 let network_config =
1690 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1691 .committee_size(NonZeroUsize::new(4).unwrap())
1692 .with_reference_gas_price(500)
1693 .build();
1694
1695 let (nodes, t) = build_dkg_nodes(&network_config);
1696 let random_oracle =
1697 fastcrypto_tbls::random_oracle::RandomOracle::new("test_dkg_role_observer");
1698
1699 let role = DkgRole::try_new(None, nodes, t, random_oracle);
1700 assert!(role.is_some());
1701 assert!(role.unwrap().is_observer());
1702 }
1703}