1use std::collections::HashMap;
5use std::future::Future;
6use std::net::IpAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::time::Instant;
12
13use arc_swap::{ArcSwap, ArcSwapOption};
14use consensus_core::BlockStatus;
15use dashmap::DashMap;
16use dashmap::try_result::TryResult;
17use futures::FutureExt;
18use futures::future::{self, Either, select};
19use futures::stream::FuturesUnordered;
20use futures::{StreamExt, pin_mut};
21use itertools::Itertools;
22use mysten_common::debug_fatal;
23use mysten_metrics::{
24 GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
25};
26use mysten_network::anemo_connection_monitor::ConnectionStatus;
27use parking_lot::RwLockReadGuard;
28use prometheus::Histogram;
29use prometheus::HistogramVec;
30use prometheus::IntCounterVec;
31use prometheus::IntGauge;
32use prometheus::IntGaugeVec;
33use prometheus::Registry;
34use prometheus::{
35 register_histogram_vec_with_registry, register_histogram_with_registry,
36 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
37 register_int_gauge_with_registry,
38};
39use sui_protocol_config::ProtocolConfig;
40use sui_simulator::anemo::PeerId;
41use sui_types::base_types::AuthorityName;
42use sui_types::base_types::TransactionDigest;
43use sui_types::committee::Committee;
44use sui_types::error::{SuiErrorKind, SuiResult};
45use sui_types::fp_ensure;
46use sui_types::messages_consensus::ConsensusPosition;
47use sui_types::messages_consensus::ConsensusTransactionKind;
48use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
49use sui_types::transaction::TransactionDataAPI;
50use tokio::sync::{Semaphore, SemaphorePermit, oneshot};
51use tokio::task::JoinHandle;
52use tokio::time::Duration;
53use tokio::time::{self};
54use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
55
56use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
57use crate::checkpoints::CheckpointStore;
58use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
59use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
60use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
61use crate::metrics::LatencyObserver;
62
63#[cfg(test)]
64#[path = "unit_tests/consensus_tests.rs"]
65pub mod consensus_tests;
66
67const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
68 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
69];
70
71pub struct ConsensusAdapterMetrics {
72 pub sequencing_certificate_attempt: IntCounterVec,
74 pub sequencing_certificate_success: IntCounterVec,
75 pub sequencing_certificate_failures: IntCounterVec,
76 pub sequencing_certificate_status: IntCounterVec,
77 pub sequencing_certificate_inflight: IntGaugeVec,
78 pub sequencing_acknowledge_latency: HistogramVec,
79 pub sequencing_certificate_latency: HistogramVec,
80 pub sequencing_certificate_authority_position: Histogram,
81 pub sequencing_certificate_positions_moved: Histogram,
82 pub sequencing_certificate_preceding_disconnected: Histogram,
83 pub sequencing_certificate_processed: IntCounterVec,
84 pub sequencing_certificate_amplification_factor: Histogram,
85 pub sequencing_in_flight_semaphore_wait: IntGauge,
86 pub sequencing_in_flight_submissions: IntGauge,
87 pub sequencing_estimated_latency: IntGauge,
88 pub sequencing_resubmission_interval_ms: IntGauge,
89 pub sequencing_best_effort_timeout: IntCounterVec,
90}
91
92impl ConsensusAdapterMetrics {
93 pub fn new(registry: &Registry) -> Self {
94 Self {
95 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
96 "sequencing_certificate_attempt",
97 "Counts the number of certificates the validator attempts to sequence.",
98 &["tx_type"],
99 registry,
100 )
101 .unwrap(),
102 sequencing_certificate_success: register_int_counter_vec_with_registry!(
103 "sequencing_certificate_success",
104 "Counts the number of successfully sequenced certificates.",
105 &["tx_type"],
106 registry,
107 )
108 .unwrap(),
109 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
110 "sequencing_certificate_failures",
111 "Counts the number of sequenced certificates that failed other than by timeout.",
112 &["tx_type"],
113 registry,
114 )
115 .unwrap(),
116 sequencing_certificate_status: register_int_counter_vec_with_registry!(
117 "sequencing_certificate_status",
118 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
119 &["tx_type", "status"],
120 registry,
121 )
122 .unwrap(),
123 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
124 "sequencing_certificate_inflight",
125 "The inflight requests to sequence certificates.",
126 &["tx_type"],
127 registry,
128 )
129 .unwrap(),
130 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
131 "sequencing_acknowledge_latency",
132 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
133 &["retry", "tx_type"],
134 LATENCY_SEC_BUCKETS.to_vec(),
135 registry,
136 ).unwrap(),
137 sequencing_certificate_latency: register_histogram_vec_with_registry!(
138 "sequencing_certificate_latency",
139 "The latency for sequencing a certificate.",
140 &["position", "tx_type", "processed_method"],
141 LATENCY_SEC_BUCKETS.to_vec(),
142 registry,
143 ).unwrap(),
144 sequencing_certificate_authority_position: register_histogram_with_registry!(
145 "sequencing_certificate_authority_position",
146 "The position of the authority when submitted a certificate to consensus.",
147 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
148 registry,
149 ).unwrap(),
150 sequencing_certificate_positions_moved: register_histogram_with_registry!(
151 "sequencing_certificate_positions_moved",
152 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
153 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
154 registry,
155 ).unwrap(),
156 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
157 "sequencing_certificate_preceding_disconnected",
158 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
159 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
160 registry,
161 ).unwrap(),
162 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
163 "sequencing_certificate_processed",
164 "The number of certificates that have been processed either by consensus or checkpoint.",
165 &["source"],
166 registry
167 ).unwrap(),
168 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169 "sequencing_in_flight_semaphore_wait",
170 "How many requests are blocked on submit_permit.",
171 registry,
172 )
173 .unwrap(),
174 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175 "sequencing_in_flight_submissions",
176 "Number of transactions submitted to local consensus instance and not yet sequenced",
177 registry,
178 )
179 .unwrap(),
180 sequencing_estimated_latency: register_int_gauge_with_registry!(
181 "sequencing_estimated_latency",
182 "Consensus latency estimated by consensus adapter in milliseconds",
183 registry,
184 )
185 .unwrap(),
186 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187 "sequencing_resubmission_interval_ms",
188 "Resubmission interval used by consensus adapter in milliseconds",
189 registry,
190 )
191 .unwrap(),
192 sequencing_certificate_amplification_factor: register_histogram_with_registry!(
193 "sequencing_certificate_amplification_factor",
194 "The amplification factor used by consensus adapter to submit to consensus.",
195 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
196 registry,
197 ).unwrap(),
198 sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
199 "sequencing_best_effort_timeout",
200 "The number of times the best effort submission has timed out.",
201 &["tx_type"],
202 registry,
203 ).unwrap(),
204 }
205 }
206
207 pub fn new_test() -> Self {
208 Self::new(&Registry::default())
209 }
210}
211
212pub trait ConsensusOverloadChecker: Sync + Send + 'static {
214 fn check_consensus_overload(&self) -> SuiResult;
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221 fn submit_to_consensus(
222 &self,
223 transactions: &[ConsensusTransaction],
224 epoch_store: &Arc<AuthorityPerEpochStore>,
225 ) -> SuiResult;
226
227 fn submit_best_effort(
228 &self,
229 transaction: &ConsensusTransaction,
230 epoch_store: &Arc<AuthorityPerEpochStore>,
231 timeout: Duration,
232 ) -> SuiResult;
233}
234
235#[mockall::automock]
236#[async_trait::async_trait]
237pub trait ConsensusClient: Sync + Send + 'static {
238 async fn submit(
239 &self,
240 transactions: &[ConsensusTransaction],
241 epoch_store: &Arc<AuthorityPerEpochStore>,
242 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
243}
244
245pub struct ConsensusAdapter {
247 consensus_client: Arc<dyn ConsensusClient>,
249 checkpoint_store: Arc<CheckpointStore>,
251 authority: AuthorityName,
253 max_pending_transactions: usize,
255 num_inflight_transactions: AtomicU64,
257 max_submit_position: Option<usize>,
260 submit_delay_step_override: Option<Duration>,
263 connection_monitor_status: Arc<dyn CheckConnection>,
265 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
267 consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
269 metrics: ConsensusAdapterMetrics,
271 submit_semaphore: Arc<Semaphore>,
273 latency_observer: LatencyObserver,
274 protocol_config: ProtocolConfig,
275}
276
277pub trait CheckConnection: Send + Sync {
278 fn check_connection(
279 &self,
280 ourself: &AuthorityName,
281 authority: &AuthorityName,
282 ) -> Option<ConnectionStatus>;
283 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
284}
285
286pub struct ConnectionMonitorStatus {
287 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
289 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
291}
292
293pub struct ConnectionMonitorStatusForTests {}
294
295impl ConsensusAdapter {
296 pub fn new(
298 consensus_client: Arc<dyn ConsensusClient>,
299 checkpoint_store: Arc<CheckpointStore>,
300 authority: AuthorityName,
301 connection_monitor_status: Arc<dyn CheckConnection>,
302 max_pending_transactions: usize,
303 max_pending_local_submissions: usize,
304 max_submit_position: Option<usize>,
305 submit_delay_step_override: Option<Duration>,
306 metrics: ConsensusAdapterMetrics,
307 protocol_config: ProtocolConfig,
308 ) -> Self {
309 let num_inflight_transactions = Default::default();
310 let low_scoring_authorities =
311 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
312 Self {
313 consensus_client,
314 checkpoint_store,
315 authority,
316 max_pending_transactions,
317 max_submit_position,
318 submit_delay_step_override,
319 num_inflight_transactions,
320 connection_monitor_status,
321 low_scoring_authorities,
322 metrics,
323 submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
324 latency_observer: LatencyObserver::new(),
325 consensus_throughput_profiler: ArcSwapOption::empty(),
326 protocol_config,
327 }
328 }
329
330 pub fn swap_low_scoring_authorities(
331 &self,
332 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
333 ) {
334 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
335 }
336
337 pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
338 self.consensus_throughput_profiler.store(Some(profiler))
339 }
340
341 pub fn num_inflight_transactions(&self) -> u64 {
343 self.num_inflight_transactions.load(Ordering::Relaxed)
344 }
345
346 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
347 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
354
355 #[allow(clippy::collapsible_if)] if epoch_store.should_send_end_of_publish() {
357 if !recovered
358 .iter()
359 .any(ConsensusTransaction::is_end_of_publish)
360 {
361 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
368 }
369 }
370 debug!(
371 "Submitting {:?} recovered pending consensus transactions to consensus",
372 recovered.len()
373 );
374 for transaction in recovered {
375 if transaction.is_end_of_publish() {
376 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
377 }
378 self.submit_unchecked(&[transaction], epoch_store, None, None);
379 }
380 }
381
382 fn await_submit_delay(
383 &self,
384 epoch_store: &Arc<AuthorityPerEpochStore>,
385 transactions: &[ConsensusTransaction],
386 ) -> (impl Future<Output = ()>, usize, usize, usize, usize) {
387 if transactions.iter().any(|tx| tx.is_mfp_transaction()) {
388 return (tokio::time::sleep(Duration::ZERO), 0, 0, 0, 0);
391 }
392
393 let min_digest_and_gas_price = transactions
395 .iter()
396 .filter_map(|tx| match &tx.kind {
397 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
398 Some((certificate.digest(), certificate.gas_price()))
399 }
400 ConsensusTransactionKind::UserTransaction(transaction) => Some((
401 transaction.digest(),
402 transaction.data().transaction_data().gas_price(),
403 )),
404 ConsensusTransactionKind::UserTransactionV2(transaction) => Some((
405 transaction.tx().digest(),
406 transaction.tx().data().transaction_data().gas_price(),
407 )),
408 _ => None,
409 })
410 .min();
411 let mut amplification_factor = 0;
412
413 let (duration, position, positions_moved, preceding_disconnected) =
414 match min_digest_and_gas_price {
415 Some((digest, gas_price)) => {
416 let k = epoch_store
417 .protocol_config()
418 .sip_45_consensus_amplification_threshold_as_option()
419 .unwrap_or(u64::MAX);
420 let multiplier =
421 gas_price / std::cmp::max(epoch_store.reference_gas_price(), 1);
422 amplification_factor = if multiplier >= k { multiplier } else { 0 };
423 self.await_submit_delay_user_transaction(
424 epoch_store.committee(),
425 digest,
426 amplification_factor as usize,
427 )
428 }
429 _ => (Duration::ZERO, 0, 0, 0),
430 };
431 (
432 tokio::time::sleep(duration),
433 position,
434 positions_moved,
435 preceding_disconnected,
436 amplification_factor as usize,
437 )
438 }
439
440 fn await_submit_delay_user_transaction(
441 &self,
442 committee: &Committee,
443 tx_digest: &TransactionDigest,
444 amplification_factor: usize,
445 ) -> (Duration, usize, usize, usize) {
446 let (mut position, positions_moved, preceding_disconnected) =
447 self.submission_position(committee, tx_digest);
448 if amplification_factor > 0 {
449 position = (position + 1).saturating_sub(amplification_factor);
450 }
451
452 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
454 const MAX_LATENCY: Duration = Duration::from_secs(3);
455
456 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
457 self.metrics
458 .sequencing_estimated_latency
459 .set(latency.as_millis() as i64);
460
461 let latency = std::cmp::max(latency, MIN_LATENCY);
462 let latency = std::cmp::min(latency, MAX_LATENCY);
463 let latency = latency * 2;
464 let latency = self.override_by_throughput_profiler(position, latency);
465 let (delay_step, position) =
466 self.override_by_max_submit_position_settings(latency, position);
467
468 self.metrics
469 .sequencing_resubmission_interval_ms
470 .set(delay_step.as_millis() as i64);
471
472 (
473 delay_step * position as u32,
474 position,
475 positions_moved,
476 preceding_disconnected,
477 )
478 }
479
480 fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
484 const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
485 const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
486 const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;
487
488 let p = self.consensus_throughput_profiler.load();
489
490 if let Some(profiler) = p.as_ref() {
491 let (level, _) = profiler.throughput_level();
492
493 if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
496 return match level {
497 Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
498 Level::Medium => {
499 Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
500 }
501 Level::High => {
502 let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);
503
504 if latency >= 2 * l { latency } else { l }
506 }
507 };
508 }
509 }
510 latency
511 }
512
513 fn override_by_max_submit_position_settings(
517 &self,
518 latency: Duration,
519 mut position: usize,
520 ) -> (Duration, usize) {
521 if let Some(max_submit_position) = self.max_submit_position {
523 position = std::cmp::min(position, max_submit_position);
524 }
525
526 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
527 (delay_step, position)
528 }
529
530 fn submission_position(
538 &self,
539 committee: &Committee,
540 tx_digest: &TransactionDigest,
541 ) -> (usize, usize, usize) {
542 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
543
544 self.check_submission_wrt_connectivity_and_scores(positions)
545 }
546
547 fn check_submission_wrt_connectivity_and_scores(
568 &self,
569 positions: Vec<AuthorityName>,
570 ) -> (usize, usize, usize) {
571 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
572 if low_scoring_authorities.get(&self.authority).is_some() {
573 return (positions.len(), 0, 0);
574 }
575 let initial_position = get_position_in_list(self.authority, positions.clone());
576 let mut preceding_disconnected = 0;
577 let mut before_our_position = true;
578
579 let filtered_positions: Vec<_> = positions
580 .into_iter()
581 .filter(|authority| {
582 let keep = self.authority == *authority; if keep {
584 before_our_position = false;
585 }
586
587 let connected = self
589 .connection_monitor_status
590 .check_connection(&self.authority, authority)
591 .unwrap_or(ConnectionStatus::Disconnected)
592 == ConnectionStatus::Connected;
593 if !connected && before_our_position {
594 preceding_disconnected += 1; }
596
597 let high_scoring = low_scoring_authorities.get(authority).is_none();
599
600 keep || (connected && high_scoring)
601 })
602 .collect();
603
604 let position = get_position_in_list(self.authority, filtered_positions);
605
606 (
607 position,
608 initial_position - position,
609 preceding_disconnected,
610 )
611 }
612
613 pub fn submit(
621 self: &Arc<Self>,
622 transaction: ConsensusTransaction,
623 lock: Option<&RwLockReadGuard<ReconfigState>>,
624 epoch_store: &Arc<AuthorityPerEpochStore>,
625 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
626 submitter_client_addr: Option<IpAddr>,
627 ) -> SuiResult<JoinHandle<()>> {
628 self.submit_batch(
629 &[transaction],
630 lock,
631 epoch_store,
632 tx_consensus_position,
633 submitter_client_addr,
634 )
635 }
636
637 pub fn submit_batch(
640 self: &Arc<Self>,
641 transactions: &[ConsensusTransaction],
642 lock: Option<&RwLockReadGuard<ReconfigState>>,
643 epoch_store: &Arc<AuthorityPerEpochStore>,
644 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
645 submitter_client_addr: Option<IpAddr>,
646 ) -> SuiResult<JoinHandle<()>> {
647 if transactions.len() > 1 {
648 let first_kind = &transactions[0].kind;
652 let is_user_tx_batch = matches!(
653 first_kind,
654 ConsensusTransactionKind::UserTransaction(_)
655 | ConsensusTransactionKind::UserTransactionV2(_)
656 );
657 let is_cert_batch = matches!(
658 first_kind,
659 ConsensusTransactionKind::CertifiedTransaction(_)
660 );
661
662 for transaction in &transactions[1..] {
663 if is_user_tx_batch {
664 fp_ensure!(
665 matches!(
666 transaction.kind,
667 ConsensusTransactionKind::UserTransaction(_)
668 | ConsensusTransactionKind::UserTransactionV2(_)
669 ),
670 SuiErrorKind::InvalidTxKindInSoftBundle.into()
671 );
672 } else if is_cert_batch {
673 fp_ensure!(
674 matches!(
675 transaction.kind,
676 ConsensusTransactionKind::CertifiedTransaction(_)
677 ),
678 SuiErrorKind::InvalidTxKindInSoftBundle.into()
679 );
680 } else {
681 return Err(SuiErrorKind::InvalidTxKindInSoftBundle.into());
683 }
684 }
685 }
686
687 if !transactions.is_empty() {
688 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
689 }
690
691 Ok(self.submit_unchecked(
692 transactions,
693 epoch_store,
694 tx_consensus_position,
695 submitter_client_addr,
696 ))
697 }
698
699 fn check_limits(&self) -> bool {
702 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
704 > self.max_pending_transactions
705 {
706 return false;
707 }
708 self.submit_semaphore.available_permits() > 0
710 }
711
712 fn submit_unchecked(
713 self: &Arc<Self>,
714 transactions: &[ConsensusTransaction],
715 epoch_store: &Arc<AuthorityPerEpochStore>,
716 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
717 submitter_client_addr: Option<IpAddr>,
718 ) -> JoinHandle<()> {
719 let async_stage = self
721 .clone()
722 .submit_and_wait(
723 transactions.to_vec(),
724 epoch_store.clone(),
725 tx_consensus_position,
726 submitter_client_addr,
727 )
728 .in_current_span();
729 let join_handle = spawn_monitored_task!(async_stage);
732 join_handle
733 }
734
735 async fn submit_and_wait(
736 self: Arc<Self>,
737 transactions: Vec<ConsensusTransaction>,
738 epoch_store: Arc<AuthorityPerEpochStore>,
739 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
740 submitter_client_addr: Option<IpAddr>,
741 ) {
742 epoch_store
756 .within_alive_epoch(self.submit_and_wait_inner(
757 transactions,
758 &epoch_store,
759 tx_consensus_position,
760 submitter_client_addr,
761 ))
762 .await
763 .ok(); }
765
766 #[allow(clippy::option_map_unit_fn)]
767 #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
768 async fn submit_and_wait_inner(
769 self: Arc<Self>,
770 transactions: Vec<ConsensusTransaction>,
771 epoch_store: &Arc<AuthorityPerEpochStore>,
772 mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
773 submitter_client_addr: Option<IpAddr>,
774 ) {
775 if transactions.is_empty() {
776 debug!(
779 "Performing a ping check, pinging consensus to get a consensus position in next block"
780 );
781 let (consensus_positions, _status_waiter) = self
782 .submit_inner(&transactions, epoch_store, &[], "ping")
783 .await;
784
785 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
786 let _ = tx_consensus_positions.send(consensus_positions);
787 } else {
788 debug_fatal!("Ping check must have a consensus position channel");
789 }
790 return;
791 }
792
793 if epoch_store.protocol_config().mysticeti_fastpath() {
795 for transaction in &transactions {
796 if let Some(tx) = transaction.kind.as_user_transaction() {
797 let amplification_factor = (tx.data().transaction_data().gas_price()
798 / epoch_store.reference_gas_price().max(1))
799 .max(1);
800 epoch_store.submitted_transaction_cache.record_submitted_tx(
801 tx.digest(),
802 amplification_factor as u32,
803 submitter_client_addr,
804 );
805 }
806 }
807 }
808
809 let skip_processed_checks = tx_consensus_positions.is_some();
813
814 let is_soft_bundle = transactions.len() > 1;
819
820 let mut transaction_keys = Vec::new();
821 let mut tx_consensus_positions = tx_consensus_positions;
822
823 for transaction in &transactions {
824 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
825 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
826 epoch_store.record_epoch_pending_certs_process_time_metric();
827 }
828
829 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
830 transaction_keys.push(transaction_key);
831 }
832 let tx_type = if is_soft_bundle {
833 "soft_bundle"
834 } else {
835 classify(&transactions[0])
836 };
837 tracing::Span::current().record("tx_type", tx_type);
838 tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
839
840 let mut guard = InflightDropGuard::acquire(&self, tx_type);
841
842 let (await_submit, position, positions_moved, preceding_disconnected, amplification_factor) =
844 self.await_submit_delay(epoch_store, &transactions[..]);
845
846 let processed_via_consensus_or_checkpoint = if skip_processed_checks {
847 future::pending().boxed()
850 } else {
851 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store)
852 .boxed()
853 };
854 pin_mut!(processed_via_consensus_or_checkpoint);
855
856 let processed_waiter = tokio::select! {
857 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
859
860 _ = epoch_store.user_certs_closed_notify() => {
862 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
863 Some(processed_via_consensus_or_checkpoint)
864 }
865
866 _ = &mut processed_via_consensus_or_checkpoint => {
868 None
869 }
870 };
871
872 let _monitor = if matches!(
874 transactions[0].kind,
875 ConsensusTransactionKind::EndOfPublish(_)
876 | ConsensusTransactionKind::CapabilityNotification(_)
877 | ConsensusTransactionKind::CapabilityNotificationV2(_)
878 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
879 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
880 ) {
881 assert!(
882 !is_soft_bundle,
883 "System transactions should have been submitted individually"
884 );
885 let transaction_keys = transaction_keys.clone();
886 Some(CancelOnDrop(spawn_monitored_task!(async {
887 let mut i = 0u64;
888 loop {
889 i += 1;
890 const WARN_DELAY_S: u64 = 30;
891 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
892 let total_wait = i * WARN_DELAY_S;
893 warn!(
894 "Still waiting {} seconds for transactions {:?} to commit in consensus",
895 total_wait, transaction_keys
896 );
897 }
898 })))
899 } else {
900 None
901 };
902
903 if let Some(processed_waiter) = processed_waiter {
904 debug!("Submitting {:?} to consensus", transaction_keys);
905
906 guard.position = Some(position);
909 guard.positions_moved = Some(positions_moved);
910 guard.preceding_disconnected = Some(preceding_disconnected);
911 guard.amplification_factor = Some(amplification_factor);
912
913 let _permit: SemaphorePermit = self
914 .submit_semaphore
915 .acquire()
916 .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
917 .await
918 .expect("Consensus adapter does not close semaphore");
919 let _in_flight_submission_guard =
920 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
921
922 let submit_inner = async {
925 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
926
927 loop {
928 let (consensus_positions, status_waiter) = self
930 .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
931 .await;
932
933 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
934 tracing::Span::current().record(
935 "consensus_positions",
936 tracing::field::debug(&consensus_positions),
937 );
938 let _ = tx_consensus_positions.send(consensus_positions);
944 }
945
946 match status_waiter.await {
947 Ok(status @ BlockStatus::Sequenced(_)) => {
948 tracing::Span::current()
949 .record("status", tracing::field::debug(&status));
950 self.metrics
951 .sequencing_certificate_status
952 .with_label_values(&[tx_type, "sequenced"])
953 .inc();
954 trace!(
956 "Transaction {transaction_keys:?} has been sequenced by consensus."
957 );
958 break;
959 }
960 Ok(status @ BlockStatus::GarbageCollected(_)) => {
961 tracing::Span::current()
962 .record("status", tracing::field::debug(&status));
963 self.metrics
964 .sequencing_certificate_status
965 .with_label_values(&[tx_type, "garbage_collected"])
966 .inc();
967 debug!(
971 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
972 );
973 time::sleep(RETRY_DELAY_STEP).await;
974 continue;
975 }
976 Err(err) => {
977 warn!(
978 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
979 err
980 );
981 time::sleep(RETRY_DELAY_STEP).await;
982 continue;
983 }
984 }
985 }
986 };
987
988 guard.processed_method = if skip_processed_checks {
989 submit_inner.await;
991 ProcessedMethod::Consensus
992 } else {
993 match select(processed_waiter, submit_inner.boxed()).await {
994 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
995 Either::Right(((), processed_waiter)) => {
996 debug!("Submitted {transaction_keys:?} to consensus");
997 processed_waiter.await
998 }
999 }
1000 };
1001 }
1002 debug!("{transaction_keys:?} processed by consensus");
1003
1004 let consensus_keys: Vec<_> = transactions
1005 .iter()
1006 .filter_map(|t| {
1007 if t.is_mfp_transaction() {
1008 None
1012 } else {
1013 Some(t.key())
1014 }
1015 })
1016 .collect();
1017 epoch_store
1018 .remove_pending_consensus_transactions(&consensus_keys)
1019 .expect("Storage error when removing consensus transaction");
1020
1021 let is_user_tx = is_soft_bundle
1022 || matches!(
1023 transactions[0].kind,
1024 ConsensusTransactionKind::CertifiedTransaction(_)
1025 | ConsensusTransactionKind::UserTransaction(_)
1026 | ConsensusTransactionKind::UserTransactionV2(_)
1027 );
1028 if is_user_tx && epoch_store.should_send_end_of_publish() {
1029 if let Err(err) = self.submit(
1031 ConsensusTransaction::new_end_of_publish(self.authority),
1032 None,
1033 epoch_store,
1034 None,
1035 None,
1036 ) {
1037 warn!("Error when sending end of publish message: {:?}", err);
1038 } else {
1039 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1040 }
1041 }
1042 self.metrics
1043 .sequencing_certificate_success
1044 .with_label_values(&[tx_type])
1045 .inc();
1046 }
1047
1048 #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
1049 async fn submit_inner(
1050 self: &Arc<Self>,
1051 transactions: &[ConsensusTransaction],
1052 epoch_store: &Arc<AuthorityPerEpochStore>,
1053 transaction_keys: &[SequencedConsensusTransactionKey],
1054 tx_type: &str,
1055 ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
1056 let ack_start = Instant::now();
1057 let mut retries: u32 = 0;
1058 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
1059 Duration::from_millis(100),
1060 Duration::from_secs(10),
1061 );
1062
1063 let (consensus_positions, status_waiter) = loop {
1064 let span = debug_span!("client_submit");
1065 match self
1066 .consensus_client
1067 .submit(transactions, epoch_store)
1068 .instrument(span)
1069 .await
1070 {
1071 Err(err) => {
1072 if cfg!(msim) || retries > 3 {
1074 warn!(
1075 "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
1076 );
1077 }
1078 self.metrics
1079 .sequencing_certificate_failures
1080 .with_label_values(&[tx_type])
1081 .inc();
1082 retries += 1;
1083
1084 time::sleep(backoff.next().unwrap()).await;
1085 }
1086 Ok((consensus_positions, status_waiter)) => {
1087 break (consensus_positions, status_waiter);
1088 }
1089 }
1090 };
1091
1092 let bucket = match retries {
1096 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
1098 21..=50 => "between_20_and_50".to_string(),
1099 51..=100 => "between_50_and_100".to_string(),
1100 _ => "over_100".to_string(),
1101 };
1102
1103 self.metrics
1104 .sequencing_acknowledge_latency
1105 .with_label_values(&[&bucket, tx_type])
1106 .observe(ack_start.elapsed().as_secs_f64());
1107
1108 (consensus_positions, status_waiter)
1109 }
1110
1111 async fn await_consensus_or_checkpoint(
1114 self: &Arc<Self>,
1115 transaction_keys: Vec<SequencedConsensusTransactionKey>,
1116 epoch_store: &Arc<AuthorityPerEpochStore>,
1117 ) -> ProcessedMethod {
1118 let notifications = FuturesUnordered::new();
1119 for transaction_key in transaction_keys {
1120 let transaction_digests = match transaction_key {
1121 SequencedConsensusTransactionKey::External(
1122 ConsensusTransactionKey::Certificate(digest),
1123 ) => vec![digest],
1124 _ => vec![],
1125 };
1126
1127 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1128 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
1129 | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
1130 ) = transaction_key
1131 {
1132 Either::Left(
1135 self.checkpoint_store
1136 .notify_read_synced_checkpoint(checkpoint_sequence_number),
1137 )
1138 } else {
1139 Either::Right(future::pending())
1140 };
1141
1142 notifications.push(async move {
1146 tokio::select! {
1147 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1148 processed.expect("Storage error when waiting for consensus message processed");
1149 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1150 return ProcessedMethod::Consensus;
1151 },
1152 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1153 processed.expect("Storage error when waiting for transaction executed in checkpoint");
1154 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1155 }
1156 _ = checkpoint_synced_future => {
1157 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1158 }
1159 }
1160 ProcessedMethod::Checkpoint
1161 });
1162 }
1163
1164 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1165 for method in processed_methods {
1166 if method == ProcessedMethod::Checkpoint {
1167 return ProcessedMethod::Checkpoint;
1168 }
1169 }
1170 ProcessedMethod::Consensus
1171 }
1172}
1173
1174impl CheckConnection for ConnectionMonitorStatus {
1175 fn check_connection(
1176 &self,
1177 ourself: &AuthorityName,
1178 authority: &AuthorityName,
1179 ) -> Option<ConnectionStatus> {
1180 if ourself == authority {
1181 return Some(ConnectionStatus::Connected);
1182 }
1183
1184 let mapping = self.authority_names_to_peer_ids.load_full();
1185 let peer_id = match mapping.get(authority) {
1186 Some(p) => p,
1187 None => {
1188 warn!(
1189 "failed to find peer {:?} in connection monitor listener",
1190 authority
1191 );
1192 return None;
1193 }
1194 };
1195
1196 match self.connection_statuses.try_get(peer_id) {
1197 TryResult::Present(c) => Some(c.value().clone()),
1198 TryResult::Absent => None,
1199 TryResult::Locked => {
1200 Some(ConnectionStatus::Disconnected)
1202 }
1203 }
1204 }
1205 fn update_mapping_for_epoch(
1206 &self,
1207 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1208 ) {
1209 self.authority_names_to_peer_ids
1210 .swap(Arc::new(authority_names_to_peer_ids));
1211 }
1212}
1213
1214impl CheckConnection for ConnectionMonitorStatusForTests {
1215 fn check_connection(
1216 &self,
1217 _ourself: &AuthorityName,
1218 _authority: &AuthorityName,
1219 ) -> Option<ConnectionStatus> {
1220 Some(ConnectionStatus::Connected)
1221 }
1222 fn update_mapping_for_epoch(
1223 &self,
1224 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1225 ) {
1226 }
1227}
1228
1229pub fn get_position_in_list(
1230 search_authority: AuthorityName,
1231 positions: Vec<AuthorityName>,
1232) -> usize {
1233 positions
1234 .into_iter()
1235 .find_position(|authority| *authority == search_authority)
1236 .expect("Couldn't find ourselves in shuffled committee")
1237 .0
1238}
1239
1240impl ConsensusOverloadChecker for ConsensusAdapter {
1241 fn check_consensus_overload(&self) -> SuiResult {
1242 fp_ensure!(
1243 self.check_limits(),
1244 SuiErrorKind::TooManyTransactionsPendingConsensus.into()
1245 );
1246 Ok(())
1247 }
1248}
1249
1250pub struct NoopConsensusOverloadChecker {}
1251
1252impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
1253 fn check_consensus_overload(&self) -> SuiResult {
1254 Ok(())
1255 }
1256}
1257
1258impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1259 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1263 {
1264 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1265 if !reconfig_guard.should_accept_user_certs() {
1266 return;
1268 }
1269 epoch_store.close_user_certs(reconfig_guard);
1270 }
1271 if epoch_store.should_send_end_of_publish() {
1272 if let Err(err) = self.submit(
1273 ConsensusTransaction::new_end_of_publish(self.authority),
1274 None,
1275 epoch_store,
1276 None,
1277 None,
1278 ) {
1279 warn!("Error when sending end of publish message: {:?}", err);
1280 } else {
1281 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1282 }
1283 }
1284 }
1285}
1286
1287struct CancelOnDrop<T>(JoinHandle<T>);
1288
1289impl<T> Deref for CancelOnDrop<T> {
1290 type Target = JoinHandle<T>;
1291
1292 fn deref(&self) -> &Self::Target {
1293 &self.0
1294 }
1295}
1296
1297impl<T> Drop for CancelOnDrop<T> {
1298 fn drop(&mut self) {
1299 self.0.abort();
1300 }
1301}
1302
1303struct InflightDropGuard<'a> {
1305 adapter: &'a ConsensusAdapter,
1306 start: Instant,
1307 position: Option<usize>,
1308 positions_moved: Option<usize>,
1309 preceding_disconnected: Option<usize>,
1310 amplification_factor: Option<usize>,
1311 tx_type: &'static str,
1312 processed_method: ProcessedMethod,
1313}
1314
1315#[derive(PartialEq, Eq)]
1316enum ProcessedMethod {
1317 Consensus,
1318 Checkpoint,
1319}
1320
1321impl<'a> InflightDropGuard<'a> {
1322 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1323 adapter
1324 .num_inflight_transactions
1325 .fetch_add(1, Ordering::SeqCst);
1326 adapter
1327 .metrics
1328 .sequencing_certificate_inflight
1329 .with_label_values(&[tx_type])
1330 .inc();
1331 adapter
1332 .metrics
1333 .sequencing_certificate_attempt
1334 .with_label_values(&[tx_type])
1335 .inc();
1336 Self {
1337 adapter,
1338 start: Instant::now(),
1339 position: None,
1340 positions_moved: None,
1341 preceding_disconnected: None,
1342 amplification_factor: None,
1343 tx_type,
1344 processed_method: ProcessedMethod::Consensus,
1345 }
1346 }
1347}
1348
1349impl Drop for InflightDropGuard<'_> {
1350 fn drop(&mut self) {
1351 self.adapter
1352 .num_inflight_transactions
1353 .fetch_sub(1, Ordering::SeqCst);
1354 self.adapter
1355 .metrics
1356 .sequencing_certificate_inflight
1357 .with_label_values(&[self.tx_type])
1358 .dec();
1359
1360 let position = if let Some(position) = self.position {
1361 self.adapter
1362 .metrics
1363 .sequencing_certificate_authority_position
1364 .observe(position as f64);
1365 position.to_string()
1366 } else {
1367 "not_submitted".to_string()
1368 };
1369
1370 if let Some(positions_moved) = self.positions_moved {
1371 self.adapter
1372 .metrics
1373 .sequencing_certificate_positions_moved
1374 .observe(positions_moved as f64);
1375 };
1376
1377 if let Some(preceding_disconnected) = self.preceding_disconnected {
1378 self.adapter
1379 .metrics
1380 .sequencing_certificate_preceding_disconnected
1381 .observe(preceding_disconnected as f64);
1382 };
1383
1384 if let Some(amplification_factor) = self.amplification_factor {
1385 self.adapter
1386 .metrics
1387 .sequencing_certificate_amplification_factor
1388 .observe(amplification_factor as f64);
1389 };
1390
1391 let latency = self.start.elapsed();
1392 let processed_method = match self.processed_method {
1393 ProcessedMethod::Consensus => "processed_via_consensus",
1394 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1395 };
1396
1397 self.adapter
1398 .metrics
1399 .sequencing_certificate_latency
1400 .with_label_values(&[&position, self.tx_type, processed_method])
1401 .observe(latency.as_secs_f64());
1402
1403 if self.position == Some(0) {
1408 let sampled = matches!(
1411 self.tx_type,
1412 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1413 );
1414 if sampled && self.processed_method == ProcessedMethod::Consensus {
1416 self.adapter.latency_observer.report(latency);
1417 }
1418 }
1419 }
1420}
1421
1422impl SubmitToConsensus for Arc<ConsensusAdapter> {
1423 fn submit_to_consensus(
1424 &self,
1425 transactions: &[ConsensusTransaction],
1426 epoch_store: &Arc<AuthorityPerEpochStore>,
1427 ) -> SuiResult {
1428 self.submit_batch(transactions, None, epoch_store, None, None)
1429 .map(|_| ())
1430 }
1431
1432 fn submit_best_effort(
1433 &self,
1434 transaction: &ConsensusTransaction,
1435 epoch_store: &Arc<AuthorityPerEpochStore>,
1436 timeout: Duration,
1438 ) -> SuiResult {
1439 let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1440 Ok(permit) => permit,
1441 Err(_) => {
1442 return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1443 }
1444 };
1445
1446 let _in_flight_submission_guard =
1447 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1448
1449 let key = SequencedConsensusTransactionKey::External(transaction.key());
1450 let tx_type = classify(transaction);
1451
1452 let async_stage = {
1453 let transaction = transaction.clone();
1454 let epoch_store = epoch_store.clone();
1455 let this = self.clone();
1456
1457 async move {
1458 let _permit = permit; let result = tokio::time::timeout(
1461 timeout,
1462 this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1463 )
1464 .await;
1465
1466 if let Err(e) = result {
1467 warn!("Consensus submission timed out: {e:?}");
1468 this.metrics
1469 .sequencing_best_effort_timeout
1470 .with_label_values(&[tx_type])
1471 .inc();
1472 }
1473 }
1474 };
1475
1476 let epoch_store = epoch_store.clone();
1477 spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1478 Ok(())
1479 }
1480}
1481
1482pub fn position_submit_certificate(
1483 committee: &Committee,
1484 ourselves: &AuthorityName,
1485 tx_digest: &TransactionDigest,
1486) -> usize {
1487 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1488 get_position_in_list(*ourselves, validators)
1489}
1490
1491#[cfg(test)]
1492mod adapter_tests {
1493 use super::position_submit_certificate;
1494 use crate::checkpoints::CheckpointStore;
1495 use crate::consensus_adapter::{
1496 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1497 };
1498 use crate::mysticeti_adapter::LazyMysticetiClient;
1499 use fastcrypto::traits::KeyPair;
1500 use rand::Rng;
1501 use rand::{SeedableRng, rngs::StdRng};
1502 use std::sync::Arc;
1503 use std::time::Duration;
1504 use sui_types::{
1505 base_types::TransactionDigest,
1506 committee::Committee,
1507 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1508 };
1509
1510 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1511 let authorities = (0..size)
1512 .map(|_k| {
1513 (
1514 AuthorityPublicKeyBytes::from(
1515 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1516 ),
1517 rng.gen_range(0u64..10u64),
1518 )
1519 })
1520 .collect::<Vec<_>>();
1521 Committee::new_for_testing_with_normalized_voting_power(
1522 0,
1523 authorities.iter().cloned().collect(),
1524 )
1525 }
1526
1527 #[tokio::test]
1528 async fn test_await_submit_delay_user_transaction() {
1529 let mut rng = StdRng::from_seed([0; 32]);
1531 let committee = test_committee(&mut rng, 10);
1532
1533 let consensus_adapter = ConsensusAdapter::new(
1535 Arc::new(LazyMysticetiClient::new()),
1536 CheckpointStore::new_for_tests(),
1537 *committee.authority_by_index(0).unwrap(),
1538 Arc::new(ConnectionMonitorStatusForTests {}),
1539 100_000,
1540 100_000,
1541 Some(1),
1542 Some(Duration::from_secs(2)),
1543 ConsensusAdapterMetrics::new_test(),
1544 sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1545 );
1546
1547 let tx_digest = TransactionDigest::generate(&mut rng);
1549
1550 let (position, positions_moved, _) =
1552 consensus_adapter.submission_position(&committee, &tx_digest);
1553 assert_eq!(position, 7);
1554 assert!(!positions_moved > 0);
1555
1556 let (delay_step, position, positions_moved, _) =
1558 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1559
1560 assert_eq!(position, 1);
1561 assert_eq!(delay_step, Duration::from_secs(2));
1562 assert!(!positions_moved > 0);
1563
1564 let consensus_adapter = ConsensusAdapter::new(
1566 Arc::new(LazyMysticetiClient::new()),
1567 CheckpointStore::new_for_tests(),
1568 *committee.authority_by_index(0).unwrap(),
1569 Arc::new(ConnectionMonitorStatusForTests {}),
1570 100_000,
1571 100_000,
1572 None,
1573 None,
1574 ConsensusAdapterMetrics::new_test(),
1575 sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1576 );
1577
1578 let (delay_step, position, positions_moved, _) =
1579 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1580
1581 assert_eq!(position, 7);
1582
1583 assert_eq!(delay_step, Duration::from_secs(14));
1585 assert!(!positions_moved > 0);
1586
1587 let (delay_step, position, _, _) =
1589 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 7);
1590 assert_eq!(position, 1);
1591 assert_eq!(delay_step, Duration::from_secs(2));
1592
1593 let (delay_step, position, _, _) =
1595 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 8);
1596 assert_eq!(position, 0);
1597 assert_eq!(delay_step, Duration::ZERO);
1598 }
1599
1600 #[test]
1601 fn test_position_submit_certificate() {
1602 let mut rng = StdRng::from_seed([0; 32]);
1604 let committee = test_committee(&mut rng, 10);
1605
1606 const NUM_TEST_TRANSACTIONS: usize = 1000;
1608
1609 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1610 let tx_digest = TransactionDigest::generate(&mut rng);
1611
1612 let mut zero_found = false;
1613 for (name, _) in committee.members() {
1614 let f = position_submit_certificate(&committee, name, &tx_digest);
1615 assert!(f < committee.num_members());
1616 if f == 0 {
1617 assert!(!zero_found);
1619 zero_found = true;
1620 }
1621 }
1622 assert!(zero_found);
1623 }
1624 }
1625}