1use anemo::PeerId;
5use fastcrypto::encoding::{Encoding, Hex};
6use fastcrypto::error::{FastCryptoError, FastCryptoResult};
7use fastcrypto::groups::bls12381;
8use fastcrypto::serde_helpers::ToFromByteArray;
9use fastcrypto::traits::{KeyPair, ToFromBytes};
10use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
11use futures::StreamExt;
12use futures::stream::FuturesUnordered;
13use mysten_common::debug_fatal;
14use parking_lot::Mutex;
15use rand::SeedableRng;
16use rand::rngs::{OsRng, StdRng};
17use serde::{Deserialize, Serialize};
18use std::collections::{BTreeMap, HashMap};
19use std::sync::{Arc, Weak};
20use std::time::Instant;
21use sui_macros::fail_point_if;
22use sui_network::randomness;
23use sui_types::base_types::AuthorityName;
24use sui_types::committee::{Committee, EpochId, StakeUnit};
25use sui_types::crypto::{AuthorityKeyPair, RandomnessRound};
26use sui_types::error::{SuiErrorKind, SuiResult};
27use sui_types::messages_consensus::{
28 ConsensusTransaction, Round, TimestampMs, VersionedDkgConfirmation, VersionedDkgMessage,
29};
30use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
31use tokio::sync::OnceCell;
32use tokio::task::JoinHandle;
33use tracing::{debug, error, info, warn};
34use typed_store::Map;
35
36use crate::authority::authority_per_epoch_store::{
37 AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
38};
39use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
40use crate::consensus_adapter::SubmitToConsensus;
41
42type PkG = bls12381::G2Element;
43type EncG = bls12381::G2Element;
44
45pub const SINGLETON_KEY: u64 = 0;
46
47#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[allow(clippy::large_enum_variant)]
51pub enum VersionedProcessedMessage {
52 V0(), V1(dkg_v1::ProcessedMessage<PkG, EncG>),
54}
55
56impl VersionedProcessedMessage {
57 pub fn sender(&self) -> PartyId {
58 match self {
59 VersionedProcessedMessage::V0() => {
60 panic!("BUG: invalid VersionedProcessedMessage version V0")
61 }
62 VersionedProcessedMessage::V1(msg) => msg.message.sender,
63 }
64 }
65
66 pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
67 if let VersionedProcessedMessage::V1(msg) = self {
68 msg
69 } else {
70 panic!("BUG: expected message version is 1")
71 }
72 }
73
74 pub fn process(
75 party: Arc<dkg_v1::Party<PkG, EncG>>,
76 message: VersionedDkgMessage,
77 ) -> FastCryptoResult<VersionedProcessedMessage> {
78 let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
80 Ok(VersionedProcessedMessage::V1(processed))
81 }
82
83 pub fn merge(
84 party: Arc<dkg_v1::Party<PkG, EncG>>,
85 messages: Vec<Self>,
86 ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
87 let (conf, msgs) = party.merge(
89 &messages
90 .into_iter()
91 .map(|vm| vm.unwrap_v1())
92 .collect::<Vec<_>>(),
93 )?;
94 Ok((
95 VersionedDkgConfirmation::V1(conf),
96 VersionedUsedProcessedMessages::V1(msgs),
97 ))
98 }
99}
100
101#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
102pub enum VersionedUsedProcessedMessages {
103 V0(), V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
105}
106
107impl VersionedUsedProcessedMessages {
108 fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
109 &self,
110 party: Arc<dkg_v1::Party<PkG, EncG>>,
111 confirmations: Iter,
112 ) -> FastCryptoResult<Output<PkG, EncG>> {
113 let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
115 let VersionedUsedProcessedMessages::V1(msg) = self else {
116 panic!("BUG: invalid VersionedUsedProcessedMessages version")
117 };
118 party.complete(
119 msg,
120 &confirmations
121 .map(|vm| vm.unwrap_v1())
122 .cloned()
123 .collect::<Vec<_>>(),
124 rng,
125 )
126 }
127}
128
129pub struct RandomnessManager {
149 epoch_store: Weak<AuthorityPerEpochStore>,
150 epoch: EpochId,
151 consensus_adapter: Box<dyn SubmitToConsensus>,
152 network_handle: randomness::Handle,
153 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
154
155 dkg_start_time: OnceCell<Instant>,
157 party: Arc<dkg_v1::Party<PkG, EncG>>,
158 enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
159 processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
160 used_messages: OnceCell<VersionedUsedProcessedMessages>,
161 confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
162 dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
163
164 next_randomness_round: RandomnessRound,
166 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
167}
168
169impl RandomnessManager {
170 pub async fn try_new(
172 epoch_store_weak: Weak<AuthorityPerEpochStore>,
173 consensus_adapter: Box<dyn SubmitToConsensus>,
174 network_handle: randomness::Handle,
175 authority_key_pair: &AuthorityKeyPair,
176 ) -> Option<Self> {
177 let epoch_store = match epoch_store_weak.upgrade() {
178 Some(epoch_store) => epoch_store,
179 None => {
180 error!(
181 "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
182 );
183 return None;
184 }
185 };
186 let tables = match epoch_store.tables() {
187 Ok(tables) => tables,
188 Err(_) => {
189 error!(
190 "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
191 );
192 return None;
193 }
194 };
195 let protocol_config = epoch_store.protocol_config();
196
197 let name: AuthorityName = authority_key_pair.public().into();
198 let committee = epoch_store.committee();
199 let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
200 if tracing::enabled!(tracing::Level::DEBUG) {
201 for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
203 let pk_bytes = pk.as_element().to_byte_array();
204 debug!(
205 "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
206 );
207 }
208 }
209 let authority_ids: HashMap<_, _> =
210 info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
211 let authority_peer_ids = epoch_store
212 .epoch_start_config()
213 .epoch_start_state()
214 .get_authority_names_to_peer_ids();
215 let authority_info = authority_ids
216 .into_iter()
217 .map(|(name, id)| {
218 let peer_id = *authority_peer_ids
219 .get(&name)
220 .expect("authority name should be in peer_ids");
221 (name, (peer_id, id))
222 })
223 .collect();
224 let nodes = info
225 .iter()
226 .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
227 id: *id,
228 pk: pk.clone(),
229 weight: (*stake).try_into().expect("stake should fit in u16"),
230 })
231 .collect();
232 let (nodes, t) = match nodes::Nodes::new_reduced(
233 nodes,
234 committee
235 .validity_threshold()
236 .try_into()
237 .expect("validity threshold should fit in u16"),
238 protocol_config.random_beacon_reduction_allowed_delta(),
239 protocol_config
240 .random_beacon_reduction_lower_bound()
241 .try_into()
242 .expect("should fit u16"),
243 ) {
244 Ok((nodes, t)) => (nodes, t),
245 Err(err) => {
246 error!("random beacon: error while initializing Nodes: {err:?}");
247 return None;
248 }
249 };
250 let total_weight = nodes.total_weight();
251 let num_nodes = nodes.num_nodes();
252 let prefix_str = format!(
253 "dkg {} {}",
254 Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
255 committee.epoch()
256 );
257 let randomness_private_key = bls12381::Scalar::from_byte_array(
258 authority_key_pair
259 .copy()
260 .private()
261 .as_bytes()
262 .try_into()
263 .expect("key length should match"),
264 )
265 .expect("should work to convert BLS key to Scalar");
266 let party = match dkg_v1::Party::<PkG, EncG>::new(
267 fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
268 randomness_private_key,
269 ),
270 nodes,
271 t,
272 fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
273 &mut rand::thread_rng(),
274 ) {
275 Ok(party) => party,
276 Err(err) => {
277 error!("random beacon: error while initializing Party: {err:?}");
278 return None;
279 }
280 };
281 info!(
282 "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
283 );
284
285 let highest_completed_round = tables
287 .randomness_highest_completed_round
288 .get(&SINGLETON_KEY)
289 .expect("typed_store should not fail");
290 let mut rm = RandomnessManager {
291 epoch_store: epoch_store_weak,
292 epoch: committee.epoch(),
293 consensus_adapter,
294 network_handle: network_handle.clone(),
295 authority_info,
296 dkg_start_time: OnceCell::new(),
297 party: Arc::new(party),
298 enqueued_messages: BTreeMap::new(),
299 processed_messages: BTreeMap::new(),
300 used_messages: OnceCell::new(),
301 confirmations: BTreeMap::new(),
302 dkg_output: OnceCell::new(),
303 next_randomness_round: RandomnessRound(0),
304 highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
305 };
306 let dkg_output = tables
307 .dkg_output
308 .get(&SINGLETON_KEY)
309 .expect("typed_store should not fail");
310 if let Some(dkg_output) = dkg_output {
311 info!(
312 "random beacon: loaded existing DKG output for epoch {}",
313 committee.epoch()
314 );
315 epoch_store
316 .metrics
317 .epoch_random_beacon_dkg_num_shares
318 .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
319 rm.dkg_output
320 .set(Some(dkg_output.clone()))
321 .expect("setting new OnceCell should succeed");
322 network_handle.update_epoch(
323 committee.epoch(),
324 rm.authority_info.clone(),
325 dkg_output,
326 rm.party.t(),
327 highest_completed_round,
328 );
329 } else {
330 info!(
331 "random beacon: no existing DKG output found for epoch {}",
332 committee.epoch()
333 );
334
335 assert!(
337 epoch_store.protocol_config().dkg_version() > 0,
338 "BUG: DKG version 0 is deprecated"
339 );
340 rm.processed_messages.extend(
341 tables
342 .dkg_processed_messages_v2
343 .safe_iter()
344 .map(|result| result.expect("typed_store should not fail")),
345 );
346 if let Some(used_messages) = tables
347 .dkg_used_messages_v2
348 .get(&SINGLETON_KEY)
349 .expect("typed_store should not fail")
350 {
351 rm.used_messages
352 .set(used_messages.clone())
353 .expect("setting new OnceCell should succeed");
354 }
355 rm.confirmations.extend(
356 tables
357 .dkg_confirmations_v2
358 .safe_iter()
359 .map(|result| result.expect("typed_store should not fail")),
360 );
361 }
362
363 rm.next_randomness_round = tables
368 .randomness_next_round
369 .get(&SINGLETON_KEY)
370 .expect("typed_store should not fail")
371 .unwrap_or(RandomnessRound(0));
372 info!(
373 "random beacon: starting from next_randomness_round={}",
374 rm.next_randomness_round.0
375 );
376 let first_incomplete_round = highest_completed_round
377 .map(|r| r + 1)
378 .unwrap_or(RandomnessRound(0));
379 if first_incomplete_round < rm.next_randomness_round {
380 info!(
381 "random beacon: resuming generation for randomness rounds from {} to {}",
382 first_incomplete_round,
383 rm.next_randomness_round - 1,
384 );
385 for r in first_incomplete_round.0..rm.next_randomness_round.0 {
386 network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
387 }
388 }
389
390 Some(rm)
391 }
392
393 pub async fn start_dkg(&mut self) -> SuiResult {
395 if self.used_messages.initialized() || self.dkg_output.initialized() {
396 return Ok(());
398 }
399
400 let _ = self.dkg_start_time.set(Instant::now());
401
402 let epoch_store = self.epoch_store()?;
403 let dkg_version = epoch_store.protocol_config().dkg_version();
404 info!("random beacon: starting DKG, version {dkg_version}");
405
406 let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
407 Ok(msg) => msg,
408 Err(FastCryptoError::IgnoredMessage) => {
409 info!(
410 "random beacon: no DKG Message for party id={} (zero weight)",
411 self.party.id
412 );
413 return Ok(());
414 }
415 Err(e) => {
416 error!("random beacon: error while creating a DKG Message: {e:?}");
417 return Ok(());
418 }
419 };
420
421 info!("random beacon: created {msg:?} with dkg version {dkg_version}");
422 let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
423
424 #[allow(unused_mut)]
425 let mut fail_point_skip_sending = false;
426 fail_point_if!("rb-dkg", || {
427 fail_point_skip_sending = true;
429 });
430 if !fail_point_skip_sending {
431 self.consensus_adapter
432 .submit_to_consensus(&[transaction], &epoch_store)?;
433 }
434
435 epoch_store
436 .metrics
437 .epoch_random_beacon_dkg_message_time_ms
438 .set(
439 self.dkg_start_time
440 .get()
441 .unwrap() .elapsed()
443 .as_millis() as i64,
444 );
445 Ok(())
446 }
447
448 pub(crate) async fn advance_dkg(
451 &mut self,
452 consensus_output: &mut ConsensusCommitOutput,
453 round: Round,
454 ) -> SuiResult {
455 let epoch_store = self.epoch_store()?;
456
457 if !self.dkg_output.initialized() && !self.used_messages.initialized() {
459 let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
461 .into_values()
462 .collect();
463 while let Some(res) = handles.next().await {
464 if let Ok(Some(processed)) = res {
465 self.processed_messages
466 .insert(processed.sender(), processed.clone());
467 consensus_output.insert_dkg_processed_message(processed);
468 }
469 }
470
471 match VersionedProcessedMessage::merge(
473 self.party.clone(),
474 self.processed_messages
475 .values()
476 .cloned()
477 .collect::<Vec<_>>(),
478 ) {
479 Ok((conf, used_msgs)) => {
480 info!(
481 "random beacon: sending DKG Confirmation with {} complaints",
482 conf.num_of_complaints()
483 );
484 if self.used_messages.set(used_msgs.clone()).is_err() {
485 error!("BUG: used_messages should only ever be set once");
486 }
487 consensus_output.insert_dkg_used_messages(used_msgs);
488
489 let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
490 epoch_store.name,
491 &conf,
492 );
493
494 #[allow(unused_mut)]
495 let mut fail_point_skip_sending = false;
496 fail_point_if!("rb-dkg", || {
497 fail_point_skip_sending = true;
499 });
500 if !fail_point_skip_sending {
501 self.consensus_adapter
502 .submit_to_consensus(&[transaction], &epoch_store)?;
503 }
504
505 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
506 if let Some(elapsed) = elapsed {
507 epoch_store
508 .metrics
509 .epoch_random_beacon_dkg_confirmation_time_ms
510 .set(elapsed as i64);
511 }
512 }
513 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
515 }
516 }
517
518 if !self.dkg_output.initialized() && self.used_messages.initialized() {
520 match self
521 .used_messages
522 .get()
523 .expect("checked above that `used_messages` is initialized")
524 .complete_dkg(self.party.clone(), self.confirmations.values())
525 {
526 Ok(output) => {
527 let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
528 let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
529 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
530 info!(
531 "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
532 );
533 epoch_store
534 .metrics
535 .epoch_random_beacon_dkg_num_shares
536 .set(num_shares as i64);
537 epoch_store
538 .metrics
539 .epoch_random_beacon_dkg_epoch_start_completion_time_ms
540 .set(epoch_elapsed as i64);
541 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
542 if let Some(elapsed) = elapsed {
543 epoch_store
544 .metrics
545 .epoch_random_beacon_dkg_completion_time_ms
546 .set(elapsed as i64);
547 }
548 self.dkg_output
549 .set(Some(output.clone()))
550 .expect("checked above that `dkg_output` is uninitialized");
551 self.network_handle.update_epoch(
552 epoch_store.committee().epoch(),
553 self.authority_info.clone(),
554 output.clone(),
555 self.party.t(),
556 None,
557 );
558 consensus_output.set_dkg_output(output);
559 }
560 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
562 }
563 }
564
565 if !self.dkg_output.initialized()
567 && round
568 > epoch_store
569 .protocol_config()
570 .random_beacon_dkg_timeout_round()
571 .into()
572 {
573 error!(
574 "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
575 );
576 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
577 self.dkg_output
578 .set(None)
579 .expect("checked above that `dkg_output` is uninitialized");
580 }
581
582 Ok(())
583 }
584
585 pub fn add_message(
587 &mut self,
588 authority: &AuthorityName,
589 msg: VersionedDkgMessage,
590 ) -> SuiResult {
591 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
594 if !msg.is_valid_version(dkg_version) {
595 warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
596 return Ok(());
597 }
598
599 if self.used_messages.initialized() || self.dkg_output.initialized() {
600 return Ok(());
602 }
603 let Some((_, party_id)) = self.authority_info.get(authority) else {
604 debug_fatal!(
605 "random beacon: received DKG Message from unknown authority: {authority:?}"
606 );
607 return Ok(());
608 };
609 if *party_id != msg.sender() {
610 warn!(
611 "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
612 );
613 return Ok(());
614 }
615 if self.enqueued_messages.contains_key(&msg.sender())
616 || self.processed_messages.contains_key(&msg.sender())
617 {
618 info!("ignoring duplicate DKG Message from authority {authority:?}");
619 return Ok(());
620 }
621
622 let party = self.party.clone();
623 self.enqueued_messages.insert(
625 msg.sender(),
626 tokio::task::spawn_blocking(move || {
627 match VersionedProcessedMessage::process(party, msg) {
628 Ok(processed) => Some(processed),
629 Err(err) => {
630 debug!("random beacon: error while processing DKG Message: {err:?}");
631 None
632 }
633 }
634 }),
635 );
636 Ok(())
637 }
638
639 pub(crate) fn add_confirmation(
641 &mut self,
642 output: &mut ConsensusCommitOutput,
643 authority: &AuthorityName,
644 conf: VersionedDkgConfirmation,
645 ) -> SuiResult {
646 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
649 if !conf.is_valid_version(dkg_version) {
650 warn!(
651 "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
652 );
653 return Ok(());
654 }
655
656 if self.dkg_output.initialized() {
657 return Ok(());
659 }
660 let Some((_, party_id)) = self.authority_info.get(authority) else {
661 error!(
662 "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
663 );
664 return Ok(());
665 };
666 if *party_id != conf.sender() {
667 warn!(
668 "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
669 );
670 return Ok(());
671 }
672 self.confirmations.insert(conf.sender(), conf.clone());
673 output.insert_dkg_confirmation(conf);
674 Ok(())
675 }
676
677 pub(crate) fn reserve_next_randomness(
682 &mut self,
683 commit_timestamp: TimestampMs,
684 output: &mut ConsensusCommitOutput,
685 ) -> SuiResult<Option<RandomnessRound>> {
686 let epoch_store = self.epoch_store()?;
687
688 let last_round_timestamp = epoch_store
689 .get_randomness_last_round_timestamp()
690 .expect("read should not fail");
691
692 if let Some(last_round_timestamp) = last_round_timestamp
693 && commit_timestamp - last_round_timestamp
694 < epoch_store
695 .protocol_config()
696 .random_beacon_min_round_interval_ms()
697 {
698 return Ok(None);
699 }
700
701 let randomness_round = self.next_randomness_round;
702 self.next_randomness_round = self
703 .next_randomness_round
704 .checked_add(1)
705 .expect("RandomnessRound should not overflow");
706
707 output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
708
709 Ok(Some(randomness_round))
710 }
711
712 pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
714 self.network_handle
715 .send_partial_signatures(epoch, randomness_round);
716 }
717
718 pub fn dkg_status(&self) -> DkgStatus {
719 match self.dkg_output.get() {
720 Some(Some(_)) => DkgStatus::Successful,
721 Some(None) => DkgStatus::Failed,
722 None => DkgStatus::Pending,
723 }
724 }
725
726 pub fn reporter(&self) -> RandomnessReporter {
728 RandomnessReporter {
729 epoch_store: self.epoch_store.clone(),
730 epoch: self.epoch,
731 network_handle: self.network_handle.clone(),
732 highest_completed_round: self.highest_completed_round.clone(),
733 }
734 }
735
736 fn epoch_store(&self) -> SuiResult<Arc<AuthorityPerEpochStore>> {
737 self.epoch_store
738 .upgrade()
739 .ok_or(SuiErrorKind::EpochEnded(self.epoch).into())
740 }
741
742 fn randomness_dkg_info_from_committee(
743 committee: &Committee,
744 ) -> Vec<(
745 u16,
746 AuthorityName,
747 fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
748 StakeUnit,
749 )> {
750 committee
751 .members()
752 .map(|(name, stake)| {
753 let index: u16 = committee
754 .authority_index(name)
755 .expect("lookup of known committee member should succeed")
756 .try_into()
757 .expect("authority index should fit in u16");
758 let pk = bls12381::G2Element::from_byte_array(
759 committee
760 .public_key(name)
761 .expect("lookup of known committee member should succeed")
762 .as_bytes()
763 .try_into()
764 .expect("key length should match"),
765 )
766 .expect("should work to convert BLS key to G2Element");
767 (
768 index,
769 *name,
770 fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
771 *stake,
772 )
773 })
774 .collect()
775 }
776}
777
778#[derive(Clone)]
780pub struct RandomnessReporter {
781 epoch_store: Weak<AuthorityPerEpochStore>,
782 epoch: EpochId,
783 network_handle: randomness::Handle,
784 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
785}
786
787impl RandomnessReporter {
788 pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> SuiResult {
792 let epoch_store = self
793 .epoch_store
794 .upgrade()
795 .ok_or(SuiErrorKind::EpochEnded(self.epoch))?;
796 let mut highest_completed_round = self.highest_completed_round.lock();
797 if Some(round) > *highest_completed_round {
798 *highest_completed_round = Some(round);
799 epoch_store
800 .tables()?
801 .randomness_highest_completed_round
802 .insert(&SINGLETON_KEY, &round)?;
803 self.network_handle
804 .complete_round(epoch_store.committee().epoch(), round);
805 }
806 Ok(())
807 }
808}
809
810#[derive(Debug, Clone, Copy, PartialEq, Eq)]
811pub enum DkgStatus {
812 Pending,
813 Failed,
814 Successful,
815}
816
817#[cfg(test)]
818mod tests {
819 use crate::{
820 authority::{
821 authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStats},
822 test_authority_builder::TestAuthorityBuilder,
823 },
824 checkpoints::CheckpointStore,
825 consensus_adapter::{
826 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
827 MockConsensusClient,
828 },
829 epoch::randomness::*,
830 mock_consensus::with_block_status,
831 };
832 use consensus_core::BlockStatus;
833 use consensus_types::block::BlockRef;
834 use std::num::NonZeroUsize;
835 use sui_protocol_config::ProtocolConfig;
836 use sui_protocol_config::{Chain, ProtocolVersion};
837 use sui_types::messages_consensus::ConsensusTransactionKind;
838 use tokio::sync::mpsc;
839
840 #[tokio::test]
841 async fn test_dkg_v1() {
842 test_dkg(1).await;
843 }
844
845 async fn test_dkg(version: u64) {
846 telemetry_subscribers::init_for_testing();
847
848 let network_config =
849 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
850 .committee_size(NonZeroUsize::new(4).unwrap())
851 .with_reference_gas_price(500)
852 .build();
853
854 let mut protocol_config =
855 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
856 protocol_config.set_random_beacon_dkg_version_for_testing(version);
857
858 let mut epoch_stores = Vec::new();
859 let mut randomness_managers = Vec::new();
860 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
861
862 for validator in network_config.validator_configs.iter() {
863 let mut mock_consensus_client = MockConsensusClient::new();
865 let tx_consensus = tx_consensus.clone();
866 mock_consensus_client
867 .expect_submit()
868 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
869 tx_consensus.try_send(transactions.to_vec()).unwrap();
870 true
871 })
872 .returning(|_, _| {
873 Ok((
874 Vec::new(),
875 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
876 ))
877 });
878
879 let state = TestAuthorityBuilder::new()
880 .with_protocol_config(protocol_config.clone())
881 .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
882 .build()
883 .await;
884 let consensus_adapter = Arc::new(ConsensusAdapter::new(
885 Arc::new(mock_consensus_client),
886 CheckpointStore::new_for_tests(),
887 state.name,
888 Arc::new(ConnectionMonitorStatusForTests {}),
889 100_000,
890 100_000,
891 None,
892 None,
893 ConsensusAdapterMetrics::new_test(),
894 state.epoch_store_for_testing().protocol_config().clone(),
895 ));
896 let epoch_store = state.epoch_store_for_testing();
897 let randomness_manager = RandomnessManager::try_new(
898 Arc::downgrade(&epoch_store),
899 Box::new(consensus_adapter.clone()),
900 sui_network::randomness::Handle::new_stub(),
901 validator.protocol_key_pair(),
902 )
903 .await
904 .unwrap();
905
906 epoch_stores.push(epoch_store);
907 randomness_managers.push(randomness_manager);
908 }
909
910 let mut dkg_messages = Vec::new();
912 for randomness_manager in randomness_managers.iter_mut() {
913 randomness_manager.start_dkg().await.unwrap();
914
915 let mut dkg_message = rx_consensus.recv().await.unwrap();
916 assert!(dkg_message.len() == 1);
917 match dkg_message.remove(0).kind {
918 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
919 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
920 .expect("DKG message deserialization should not fail");
921 dkg_messages.push(msg);
922 }
923 _ => panic!("wrong type of message sent"),
924 }
925 }
926 for i in 0..randomness_managers.len() {
927 let mut output = ConsensusCommitOutput::new(0);
928 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
929 index: ExecutionIndices {
930 last_committed_round: 0,
931 ..Default::default()
932 },
933 ..Default::default()
934 });
935 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
936 randomness_managers[i]
937 .add_message(&epoch_stores[j].name, dkg_message)
938 .unwrap();
939 }
940 randomness_managers[i]
941 .advance_dkg(&mut output, 0)
942 .await
943 .unwrap();
944 let mut batch = epoch_stores[i].db_batch_for_test();
945 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
946 batch.write().unwrap();
947 }
948
949 let mut dkg_confirmations = Vec::new();
951 for _ in 0..randomness_managers.len() {
952 let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
953 assert!(dkg_confirmation.len() == 1);
954 match dkg_confirmation.remove(0).kind {
955 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
956 let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
957 .expect("DKG message deserialization should not fail");
958 dkg_confirmations.push(msg);
959 }
960 _ => panic!("wrong type of message sent"),
961 }
962 }
963 for i in 0..randomness_managers.len() {
964 let mut output = ConsensusCommitOutput::new(0);
965 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
966 index: ExecutionIndices {
967 last_committed_round: 1,
968 ..Default::default()
969 },
970 ..Default::default()
971 });
972 for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
973 randomness_managers[i]
974 .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
975 .unwrap();
976 }
977 randomness_managers[i]
978 .advance_dkg(&mut output, 0)
979 .await
980 .unwrap();
981 let mut batch = epoch_stores[i].db_batch_for_test();
982 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
983 batch.write().unwrap();
984 }
985
986 for randomness_manager in &randomness_managers {
988 assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
989 }
990 }
991
992 #[tokio::test]
993 async fn test_dkg_expiration_v1() {
994 test_dkg_expiration(1).await;
995 }
996
997 async fn test_dkg_expiration(version: u64) {
998 telemetry_subscribers::init_for_testing();
999
1000 let network_config =
1001 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1002 .committee_size(NonZeroUsize::new(4).unwrap())
1003 .with_reference_gas_price(500)
1004 .build();
1005
1006 let mut epoch_stores = Vec::new();
1007 let mut randomness_managers = Vec::new();
1008 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1009
1010 let mut protocol_config =
1011 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1012 protocol_config.set_random_beacon_dkg_version_for_testing(version);
1013
1014 for validator in network_config.validator_configs.iter() {
1015 let mut mock_consensus_client = MockConsensusClient::new();
1017 let tx_consensus = tx_consensus.clone();
1018 mock_consensus_client
1019 .expect_submit()
1020 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1021 tx_consensus.try_send(transactions.to_vec()).unwrap();
1022 true
1023 })
1024 .returning(|_, _| {
1025 Ok((
1026 Vec::new(),
1027 with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
1028 ))
1029 });
1030
1031 let state = TestAuthorityBuilder::new()
1032 .with_protocol_config(protocol_config.clone())
1033 .with_genesis_and_keypair(&network_config.genesis, validator.protocol_key_pair())
1034 .build()
1035 .await;
1036 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1037 Arc::new(mock_consensus_client),
1038 CheckpointStore::new_for_tests(),
1039 state.name,
1040 Arc::new(ConnectionMonitorStatusForTests {}),
1041 100_000,
1042 100_000,
1043 None,
1044 None,
1045 ConsensusAdapterMetrics::new_test(),
1046 state.epoch_store_for_testing().protocol_config().clone(),
1047 ));
1048 let epoch_store = state.epoch_store_for_testing();
1049 let randomness_manager = RandomnessManager::try_new(
1050 Arc::downgrade(&epoch_store),
1051 Box::new(consensus_adapter.clone()),
1052 sui_network::randomness::Handle::new_stub(),
1053 validator.protocol_key_pair(),
1054 )
1055 .await
1056 .unwrap();
1057
1058 epoch_stores.push(epoch_store);
1059 randomness_managers.push(randomness_manager);
1060 }
1061
1062 let mut dkg_messages = Vec::new();
1064 for randomness_manager in randomness_managers.iter_mut() {
1065 randomness_manager.start_dkg().await.unwrap();
1066
1067 let mut dkg_message = rx_consensus.recv().await.unwrap();
1068 assert!(dkg_message.len() == 1);
1069 match dkg_message.remove(0).kind {
1070 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1071 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1072 .expect("DKG message deserialization should not fail");
1073 dkg_messages.push(msg);
1074 }
1075 _ => panic!("wrong type of message sent"),
1076 }
1077 }
1078 for i in 0..randomness_managers.len() {
1079 let mut output = ConsensusCommitOutput::new(0);
1080 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1081 index: ExecutionIndices {
1082 last_committed_round: 0,
1083 ..Default::default()
1084 },
1085 ..Default::default()
1086 });
1087 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1088 randomness_managers[i]
1089 .add_message(&epoch_stores[j].name, dkg_message)
1090 .unwrap();
1091 }
1092 randomness_managers[i]
1093 .advance_dkg(&mut output, u64::MAX)
1094 .await
1095 .unwrap();
1096 let mut batch = epoch_stores[i].db_batch_for_test();
1097 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1098 batch.write().unwrap();
1099 }
1100
1101 for randomness_manager in &randomness_managers {
1103 assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1104 }
1105 }
1106}