1use std::collections::HashMap;
5use std::future::Future;
6use std::net::IpAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::time::Instant;
12
13use arc_swap::{ArcSwap, ArcSwapOption};
14use consensus_core::{BlockStatus, ConnectionStatus};
15use dashmap::DashMap;
16use dashmap::try_result::TryResult;
17use futures::FutureExt;
18use futures::future::{self, Either, select};
19use futures::stream::FuturesUnordered;
20use futures::{StreamExt, pin_mut};
21use itertools::Itertools;
22use mysten_common::debug_fatal;
23use mysten_metrics::{
24 GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
25};
26use parking_lot::RwLockReadGuard;
27use prometheus::Histogram;
28use prometheus::HistogramVec;
29use prometheus::IntCounterVec;
30use prometheus::IntGauge;
31use prometheus::IntGaugeVec;
32use prometheus::Registry;
33use prometheus::{
34 register_histogram_vec_with_registry, register_histogram_with_registry,
35 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36 register_int_gauge_with_registry,
37};
38use sui_protocol_config::ProtocolConfig;
39use sui_simulator::anemo::PeerId;
40use sui_types::base_types::AuthorityName;
41use sui_types::base_types::TransactionDigest;
42use sui_types::committee::Committee;
43use sui_types::error::{SuiErrorKind, SuiResult};
44use sui_types::fp_ensure;
45use sui_types::messages_consensus::ConsensusPosition;
46use sui_types::messages_consensus::ConsensusTransactionKind;
47use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
48use sui_types::transaction::TransactionDataAPI;
49use tokio::sync::{Semaphore, SemaphorePermit, oneshot};
50use tokio::task::JoinHandle;
51use tokio::time::Duration;
52use tokio::time::{self};
53use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
54
55use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
56use crate::checkpoints::CheckpointStore;
57use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
58use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
59use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
60use crate::metrics::LatencyObserver;
61
62#[cfg(test)]
63#[path = "unit_tests/consensus_tests.rs"]
64pub mod consensus_tests;
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71 pub sequencing_certificate_attempt: IntCounterVec,
73 pub sequencing_certificate_success: IntCounterVec,
74 pub sequencing_certificate_failures: IntCounterVec,
75 pub sequencing_certificate_status: IntCounterVec,
76 pub sequencing_certificate_inflight: IntGaugeVec,
77 pub sequencing_acknowledge_latency: HistogramVec,
78 pub sequencing_certificate_latency: HistogramVec,
79 pub sequencing_certificate_authority_position: Histogram,
80 pub sequencing_certificate_positions_moved: Histogram,
81 pub sequencing_certificate_preceding_disconnected: Histogram,
82 pub sequencing_certificate_processed: IntCounterVec,
83 pub sequencing_certificate_amplification_factor: Histogram,
84 pub sequencing_in_flight_semaphore_wait: IntGauge,
85 pub sequencing_in_flight_submissions: IntGauge,
86 pub sequencing_estimated_latency: IntGauge,
87 pub sequencing_resubmission_interval_ms: IntGauge,
88 pub sequencing_best_effort_timeout: IntCounterVec,
89}
90
91impl ConsensusAdapterMetrics {
92 pub fn new(registry: &Registry) -> Self {
93 Self {
94 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
95 "sequencing_certificate_attempt",
96 "Counts the number of certificates the validator attempts to sequence.",
97 &["tx_type"],
98 registry,
99 )
100 .unwrap(),
101 sequencing_certificate_success: register_int_counter_vec_with_registry!(
102 "sequencing_certificate_success",
103 "Counts the number of successfully sequenced certificates.",
104 &["tx_type"],
105 registry,
106 )
107 .unwrap(),
108 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
109 "sequencing_certificate_failures",
110 "Counts the number of sequenced certificates that failed other than by timeout.",
111 &["tx_type"],
112 registry,
113 )
114 .unwrap(),
115 sequencing_certificate_status: register_int_counter_vec_with_registry!(
116 "sequencing_certificate_status",
117 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
118 &["tx_type", "status"],
119 registry,
120 )
121 .unwrap(),
122 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
123 "sequencing_certificate_inflight",
124 "The inflight requests to sequence certificates.",
125 &["tx_type"],
126 registry,
127 )
128 .unwrap(),
129 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
130 "sequencing_acknowledge_latency",
131 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
132 &["retry", "tx_type"],
133 LATENCY_SEC_BUCKETS.to_vec(),
134 registry,
135 ).unwrap(),
136 sequencing_certificate_latency: register_histogram_vec_with_registry!(
137 "sequencing_certificate_latency",
138 "The latency for sequencing a certificate.",
139 &["position", "tx_type", "processed_method"],
140 LATENCY_SEC_BUCKETS.to_vec(),
141 registry,
142 ).unwrap(),
143 sequencing_certificate_authority_position: register_histogram_with_registry!(
144 "sequencing_certificate_authority_position",
145 "The position of the authority when submitted a certificate to consensus.",
146 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
147 registry,
148 ).unwrap(),
149 sequencing_certificate_positions_moved: register_histogram_with_registry!(
150 "sequencing_certificate_positions_moved",
151 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
152 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
153 registry,
154 ).unwrap(),
155 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
156 "sequencing_certificate_preceding_disconnected",
157 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
158 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
159 registry,
160 ).unwrap(),
161 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162 "sequencing_certificate_processed",
163 "The number of certificates that have been processed either by consensus or checkpoint.",
164 &["source"],
165 registry
166 ).unwrap(),
167 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
168 "sequencing_in_flight_semaphore_wait",
169 "How many requests are blocked on submit_permit.",
170 registry,
171 )
172 .unwrap(),
173 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
174 "sequencing_in_flight_submissions",
175 "Number of transactions submitted to local consensus instance and not yet sequenced",
176 registry,
177 )
178 .unwrap(),
179 sequencing_estimated_latency: register_int_gauge_with_registry!(
180 "sequencing_estimated_latency",
181 "Consensus latency estimated by consensus adapter in milliseconds",
182 registry,
183 )
184 .unwrap(),
185 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
186 "sequencing_resubmission_interval_ms",
187 "Resubmission interval used by consensus adapter in milliseconds",
188 registry,
189 )
190 .unwrap(),
191 sequencing_certificate_amplification_factor: register_histogram_with_registry!(
192 "sequencing_certificate_amplification_factor",
193 "The amplification factor used by consensus adapter to submit to consensus.",
194 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
195 registry,
196 ).unwrap(),
197 sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
198 "sequencing_best_effort_timeout",
199 "The number of times the best effort submission has timed out.",
200 &["tx_type"],
201 registry,
202 ).unwrap(),
203 }
204 }
205
206 pub fn new_test() -> Self {
207 Self::new(&Registry::default())
208 }
209}
210
211pub trait ConsensusOverloadChecker: Sync + Send + 'static {
213 fn check_consensus_overload(&self) -> SuiResult;
214}
215
216pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
217
218#[mockall::automock]
219pub trait SubmitToConsensus: Sync + Send + 'static {
220 fn submit_to_consensus(
221 &self,
222 transactions: &[ConsensusTransaction],
223 epoch_store: &Arc<AuthorityPerEpochStore>,
224 ) -> SuiResult;
225
226 fn submit_best_effort(
227 &self,
228 transaction: &ConsensusTransaction,
229 epoch_store: &Arc<AuthorityPerEpochStore>,
230 timeout: Duration,
231 ) -> SuiResult;
232}
233
234#[mockall::automock]
235#[async_trait::async_trait]
236pub trait ConsensusClient: Sync + Send + 'static {
237 async fn submit(
238 &self,
239 transactions: &[ConsensusTransaction],
240 epoch_store: &Arc<AuthorityPerEpochStore>,
241 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
242}
243
244pub struct ConsensusAdapter {
246 consensus_client: Arc<dyn ConsensusClient>,
248 checkpoint_store: Arc<CheckpointStore>,
250 authority: AuthorityName,
252 max_pending_transactions: usize,
254 num_inflight_transactions: AtomicU64,
256 max_submit_position: Option<usize>,
259 submit_delay_step_override: Option<Duration>,
262 connection_monitor_status: Arc<dyn CheckConnection>,
264 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
266 consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
268 metrics: ConsensusAdapterMetrics,
270 submit_semaphore: Arc<Semaphore>,
272 latency_observer: LatencyObserver,
273 protocol_config: ProtocolConfig,
274}
275
276pub trait CheckConnection: Send + Sync {
277 fn check_connection(
278 &self,
279 ourself: &AuthorityName,
280 authority: &AuthorityName,
281 ) -> Option<ConnectionStatus>;
282 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
283}
284
285pub struct ConnectionMonitorStatus {
286 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
288 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
290}
291
292pub struct ConnectionMonitorStatusForTests {}
293
294impl ConsensusAdapter {
295 pub fn new(
297 consensus_client: Arc<dyn ConsensusClient>,
298 checkpoint_store: Arc<CheckpointStore>,
299 authority: AuthorityName,
300 connection_monitor_status: Arc<dyn CheckConnection>,
301 max_pending_transactions: usize,
302 max_pending_local_submissions: usize,
303 max_submit_position: Option<usize>,
304 submit_delay_step_override: Option<Duration>,
305 metrics: ConsensusAdapterMetrics,
306 protocol_config: ProtocolConfig,
307 ) -> Self {
308 let num_inflight_transactions = Default::default();
309 let low_scoring_authorities =
310 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
311 Self {
312 consensus_client,
313 checkpoint_store,
314 authority,
315 max_pending_transactions,
316 max_submit_position,
317 submit_delay_step_override,
318 num_inflight_transactions,
319 connection_monitor_status,
320 low_scoring_authorities,
321 metrics,
322 submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
323 latency_observer: LatencyObserver::new(),
324 consensus_throughput_profiler: ArcSwapOption::empty(),
325 protocol_config,
326 }
327 }
328
329 pub fn swap_low_scoring_authorities(
330 &self,
331 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
332 ) {
333 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
334 }
335
336 pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
337 self.consensus_throughput_profiler.store(Some(profiler))
338 }
339
340 pub fn num_inflight_transactions(&self) -> u64 {
342 self.num_inflight_transactions.load(Ordering::Relaxed)
343 }
344
345 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
346 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
353
354 #[allow(clippy::collapsible_if)] if epoch_store.should_send_end_of_publish() {
356 if !recovered
357 .iter()
358 .any(ConsensusTransaction::is_end_of_publish)
359 {
360 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
367 }
368 }
369 debug!(
370 "Submitting {:?} recovered pending consensus transactions to consensus",
371 recovered.len()
372 );
373 for transaction in recovered {
374 if transaction.is_end_of_publish() {
375 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
376 }
377 self.submit_unchecked(&[transaction], epoch_store, None, None);
378 }
379 }
380
381 fn await_submit_delay(
382 &self,
383 epoch_store: &Arc<AuthorityPerEpochStore>,
384 transactions: &[ConsensusTransaction],
385 ) -> (impl Future<Output = ()>, usize, usize, usize, usize) {
386 if transactions.iter().any(|tx| tx.is_mfp_transaction()) {
387 return (tokio::time::sleep(Duration::ZERO), 0, 0, 0, 0);
390 }
391
392 let min_digest_and_gas_price = transactions
394 .iter()
395 .filter_map(|tx| match &tx.kind {
396 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
397 Some((certificate.digest(), certificate.gas_price()))
398 }
399 ConsensusTransactionKind::UserTransaction(transaction) => Some((
400 transaction.digest(),
401 transaction.data().transaction_data().gas_price(),
402 )),
403 _ => None,
404 })
405 .min();
406 let mut amplification_factor = 0;
407
408 let (duration, position, positions_moved, preceding_disconnected) =
409 match min_digest_and_gas_price {
410 Some((digest, gas_price)) => {
411 let k = epoch_store
412 .protocol_config()
413 .sip_45_consensus_amplification_threshold_as_option()
414 .unwrap_or(u64::MAX);
415 let multiplier =
416 gas_price / std::cmp::max(epoch_store.reference_gas_price(), 1);
417 amplification_factor = if multiplier >= k { multiplier } else { 0 };
418 self.await_submit_delay_user_transaction(
419 epoch_store.committee(),
420 digest,
421 amplification_factor as usize,
422 )
423 }
424 _ => (Duration::ZERO, 0, 0, 0),
425 };
426 (
427 tokio::time::sleep(duration),
428 position,
429 positions_moved,
430 preceding_disconnected,
431 amplification_factor as usize,
432 )
433 }
434
435 fn await_submit_delay_user_transaction(
436 &self,
437 committee: &Committee,
438 tx_digest: &TransactionDigest,
439 amplification_factor: usize,
440 ) -> (Duration, usize, usize, usize) {
441 let (mut position, positions_moved, preceding_disconnected) =
442 self.submission_position(committee, tx_digest);
443 if amplification_factor > 0 {
444 position = (position + 1).saturating_sub(amplification_factor);
445 }
446
447 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
449 const MAX_LATENCY: Duration = Duration::from_secs(3);
450
451 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
452 self.metrics
453 .sequencing_estimated_latency
454 .set(latency.as_millis() as i64);
455
456 let latency = std::cmp::max(latency, MIN_LATENCY);
457 let latency = std::cmp::min(latency, MAX_LATENCY);
458 let latency = latency * 2;
459 let latency = self.override_by_throughput_profiler(position, latency);
460 let (delay_step, position) =
461 self.override_by_max_submit_position_settings(latency, position);
462
463 self.metrics
464 .sequencing_resubmission_interval_ms
465 .set(delay_step.as_millis() as i64);
466
467 (
468 delay_step * position as u32,
469 position,
470 positions_moved,
471 preceding_disconnected,
472 )
473 }
474
475 fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
479 const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
480 const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
481 const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;
482
483 let p = self.consensus_throughput_profiler.load();
484
485 if let Some(profiler) = p.as_ref() {
486 let (level, _) = profiler.throughput_level();
487
488 if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
491 return match level {
492 Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
493 Level::Medium => {
494 Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
495 }
496 Level::High => {
497 let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);
498
499 if latency >= 2 * l { latency } else { l }
501 }
502 };
503 }
504 }
505 latency
506 }
507
508 fn override_by_max_submit_position_settings(
512 &self,
513 latency: Duration,
514 mut position: usize,
515 ) -> (Duration, usize) {
516 if let Some(max_submit_position) = self.max_submit_position {
518 position = std::cmp::min(position, max_submit_position);
519 }
520
521 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
522 (delay_step, position)
523 }
524
525 fn submission_position(
533 &self,
534 committee: &Committee,
535 tx_digest: &TransactionDigest,
536 ) -> (usize, usize, usize) {
537 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
538
539 self.check_submission_wrt_connectivity_and_scores(positions)
540 }
541
542 fn check_submission_wrt_connectivity_and_scores(
563 &self,
564 positions: Vec<AuthorityName>,
565 ) -> (usize, usize, usize) {
566 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
567 if low_scoring_authorities.get(&self.authority).is_some() {
568 return (positions.len(), 0, 0);
569 }
570 let initial_position = get_position_in_list(self.authority, positions.clone());
571 let mut preceding_disconnected = 0;
572 let mut before_our_position = true;
573
574 let filtered_positions: Vec<_> = positions
575 .into_iter()
576 .filter(|authority| {
577 let keep = self.authority == *authority; if keep {
579 before_our_position = false;
580 }
581
582 let connected = self
584 .connection_monitor_status
585 .check_connection(&self.authority, authority)
586 .unwrap_or(ConnectionStatus::Disconnected)
587 == ConnectionStatus::Connected;
588 if !connected && before_our_position {
589 preceding_disconnected += 1; }
591
592 let high_scoring = low_scoring_authorities.get(authority).is_none();
594
595 keep || (connected && high_scoring)
596 })
597 .collect();
598
599 let position = get_position_in_list(self.authority, filtered_positions);
600
601 (
602 position,
603 initial_position - position,
604 preceding_disconnected,
605 )
606 }
607
608 pub fn submit(
616 self: &Arc<Self>,
617 transaction: ConsensusTransaction,
618 lock: Option<&RwLockReadGuard<ReconfigState>>,
619 epoch_store: &Arc<AuthorityPerEpochStore>,
620 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
621 submitter_client_addr: Option<IpAddr>,
622 ) -> SuiResult<JoinHandle<()>> {
623 self.submit_batch(
624 &[transaction],
625 lock,
626 epoch_store,
627 tx_consensus_position,
628 submitter_client_addr,
629 )
630 }
631
632 pub fn submit_batch(
635 self: &Arc<Self>,
636 transactions: &[ConsensusTransaction],
637 lock: Option<&RwLockReadGuard<ReconfigState>>,
638 epoch_store: &Arc<AuthorityPerEpochStore>,
639 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
640 submitter_client_addr: Option<IpAddr>,
641 ) -> SuiResult<JoinHandle<()>> {
642 if transactions.len() > 1 {
643 let first_kind = &transactions[0].kind;
647 let is_user_tx_batch =
648 matches!(first_kind, ConsensusTransactionKind::UserTransaction(_));
649 let is_cert_batch = matches!(
650 first_kind,
651 ConsensusTransactionKind::CertifiedTransaction(_)
652 );
653
654 for transaction in &transactions[1..] {
655 if is_user_tx_batch {
656 fp_ensure!(
657 matches!(
658 transaction.kind,
659 ConsensusTransactionKind::UserTransaction(_)
660 ),
661 SuiErrorKind::InvalidTxKindInSoftBundle.into()
662 );
663 } else if is_cert_batch {
664 fp_ensure!(
665 matches!(
666 transaction.kind,
667 ConsensusTransactionKind::CertifiedTransaction(_)
668 ),
669 SuiErrorKind::InvalidTxKindInSoftBundle.into()
670 );
671 } else {
672 return Err(SuiErrorKind::InvalidTxKindInSoftBundle.into());
674 }
675 }
676 }
677
678 if !transactions.is_empty() {
679 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
680 }
681
682 Ok(self.submit_unchecked(
683 transactions,
684 epoch_store,
685 tx_consensus_position,
686 submitter_client_addr,
687 ))
688 }
689
690 fn check_limits(&self) -> bool {
693 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
695 > self.max_pending_transactions
696 {
697 return false;
698 }
699 self.submit_semaphore.available_permits() > 0
701 }
702
703 fn submit_unchecked(
704 self: &Arc<Self>,
705 transactions: &[ConsensusTransaction],
706 epoch_store: &Arc<AuthorityPerEpochStore>,
707 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
708 submitter_client_addr: Option<IpAddr>,
709 ) -> JoinHandle<()> {
710 let async_stage = self
712 .clone()
713 .submit_and_wait(
714 transactions.to_vec(),
715 epoch_store.clone(),
716 tx_consensus_position,
717 submitter_client_addr,
718 )
719 .in_current_span();
720 let join_handle = spawn_monitored_task!(async_stage);
723 join_handle
724 }
725
726 async fn submit_and_wait(
727 self: Arc<Self>,
728 transactions: Vec<ConsensusTransaction>,
729 epoch_store: Arc<AuthorityPerEpochStore>,
730 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
731 submitter_client_addr: Option<IpAddr>,
732 ) {
733 epoch_store
745 .within_alive_epoch(self.submit_and_wait_inner(
746 transactions,
747 &epoch_store,
748 tx_consensus_position,
749 submitter_client_addr,
750 ))
751 .await
752 .ok(); }
754
755 #[allow(clippy::option_map_unit_fn)]
756 #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
757 async fn submit_and_wait_inner(
758 self: Arc<Self>,
759 transactions: Vec<ConsensusTransaction>,
760 epoch_store: &Arc<AuthorityPerEpochStore>,
761 mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
762 submitter_client_addr: Option<IpAddr>,
763 ) {
764 if transactions.is_empty() {
765 debug!(
768 "Performing a ping check, pinging consensus to get a consensus position in next block"
769 );
770 let (consensus_positions, _status_waiter) = self
771 .submit_inner(&transactions, epoch_store, &[], "ping", false)
772 .await;
773
774 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
775 let _ = tx_consensus_positions.send(consensus_positions);
776 } else {
777 debug_fatal!("Ping check must have a consensus position channel");
778 }
779 return;
780 }
781
782 if epoch_store.protocol_config().mysticeti_fastpath() {
784 for transaction in &transactions {
785 if let ConsensusTransactionKind::UserTransaction(tx) = &transaction.kind {
786 let amplification_factor = (tx.data().transaction_data().gas_price()
787 / epoch_store.reference_gas_price().max(1))
788 .max(1);
789 epoch_store.submitted_transaction_cache.record_submitted_tx(
790 tx.digest(),
791 amplification_factor as u32,
792 submitter_client_addr,
793 );
794 }
795 }
796 }
797
798 let skip_processed_checks = tx_consensus_positions.is_some();
802
803 let is_soft_bundle = transactions.len() > 1;
808
809 let mut transaction_keys = Vec::new();
810 let mut tx_consensus_positions = tx_consensus_positions;
811
812 for transaction in &transactions {
813 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
814 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
815 epoch_store.record_epoch_pending_certs_process_time_metric();
816 }
817
818 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
819 transaction_keys.push(transaction_key);
820 }
821 let tx_type = if is_soft_bundle {
822 "soft_bundle"
823 } else {
824 classify(&transactions[0])
825 };
826 tracing::Span::current().record("tx_type", tx_type);
827 tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
828
829 let mut guard = InflightDropGuard::acquire(&self, tx_type);
830
831 let (await_submit, position, positions_moved, preceding_disconnected, amplification_factor) =
833 self.await_submit_delay(epoch_store, &transactions[..]);
834
835 let processed_via_consensus_or_checkpoint = if skip_processed_checks {
836 future::pending().boxed()
839 } else {
840 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store)
841 .boxed()
842 };
843 pin_mut!(processed_via_consensus_or_checkpoint);
844
845 let processed_waiter = tokio::select! {
846 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
848
849 _ = epoch_store.user_certs_closed_notify() => {
851 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
852 Some(processed_via_consensus_or_checkpoint)
853 }
854
855 _ = &mut processed_via_consensus_or_checkpoint => {
857 None
858 }
859 };
860
861 let _monitor = if matches!(
863 transactions[0].kind,
864 ConsensusTransactionKind::EndOfPublish(_)
865 | ConsensusTransactionKind::CapabilityNotification(_)
866 | ConsensusTransactionKind::CapabilityNotificationV2(_)
867 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
868 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
869 ) {
870 assert!(
871 !is_soft_bundle,
872 "System transactions should have been submitted individually"
873 );
874 let transaction_keys = transaction_keys.clone();
875 Some(CancelOnDrop(spawn_monitored_task!(async {
876 let mut i = 0u64;
877 loop {
878 i += 1;
879 const WARN_DELAY_S: u64 = 30;
880 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
881 let total_wait = i * WARN_DELAY_S;
882 warn!(
883 "Still waiting {} seconds for transactions {:?} to commit in consensus",
884 total_wait, transaction_keys
885 );
886 }
887 })))
888 } else {
889 None
890 };
891
892 if let Some(processed_waiter) = processed_waiter {
893 debug!("Submitting {:?} to consensus", transaction_keys);
894
895 guard.position = Some(position);
898 guard.positions_moved = Some(positions_moved);
899 guard.preceding_disconnected = Some(preceding_disconnected);
900 guard.amplification_factor = Some(amplification_factor);
901
902 let _permit: SemaphorePermit = self
903 .submit_semaphore
904 .acquire()
905 .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
906 .await
907 .expect("Consensus adapter does not close semaphore");
908 let _in_flight_submission_guard =
909 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
910
911 let submit_inner = async {
914 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
915
916 loop {
917 let (consensus_positions, status_waiter) = self
919 .submit_inner(
920 &transactions,
921 epoch_store,
922 &transaction_keys,
923 tx_type,
924 is_soft_bundle,
925 )
926 .await;
927
928 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
929 tracing::Span::current().record(
930 "consensus_positions",
931 tracing::field::debug(&consensus_positions),
932 );
933 let _ = tx_consensus_positions.send(consensus_positions);
939 }
940
941 match status_waiter.await {
942 Ok(status @ BlockStatus::Sequenced(_)) => {
943 tracing::Span::current()
944 .record("status", tracing::field::debug(&status));
945 self.metrics
946 .sequencing_certificate_status
947 .with_label_values(&[tx_type, "sequenced"])
948 .inc();
949 trace!(
951 "Transaction {transaction_keys:?} has been sequenced by consensus."
952 );
953 break;
954 }
955 Ok(status @ BlockStatus::GarbageCollected(_)) => {
956 tracing::Span::current()
957 .record("status", tracing::field::debug(&status));
958 self.metrics
959 .sequencing_certificate_status
960 .with_label_values(&[tx_type, "garbage_collected"])
961 .inc();
962 debug!(
966 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
967 );
968 time::sleep(RETRY_DELAY_STEP).await;
969 continue;
970 }
971 Err(err) => {
972 warn!(
973 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
974 err
975 );
976 time::sleep(RETRY_DELAY_STEP).await;
977 continue;
978 }
979 }
980 }
981 };
982
983 guard.processed_method = if skip_processed_checks {
984 submit_inner.await;
986 ProcessedMethod::Consensus
987 } else {
988 match select(processed_waiter, submit_inner.boxed()).await {
989 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
990 Either::Right(((), processed_waiter)) => {
991 debug!("Submitted {transaction_keys:?} to consensus");
992 processed_waiter.await
993 }
994 }
995 };
996 }
997 debug!("{transaction_keys:?} processed by consensus");
998
999 let consensus_keys: Vec<_> = transactions
1000 .iter()
1001 .filter_map(|t| {
1002 if t.is_mfp_transaction() {
1003 None
1007 } else {
1008 Some(t.key())
1009 }
1010 })
1011 .collect();
1012 epoch_store
1013 .remove_pending_consensus_transactions(&consensus_keys)
1014 .expect("Storage error when removing consensus transaction");
1015
1016 let is_user_tx = is_soft_bundle
1017 || matches!(
1018 transactions[0].kind,
1019 ConsensusTransactionKind::CertifiedTransaction(_)
1020 )
1021 || matches!(
1022 transactions[0].kind,
1023 ConsensusTransactionKind::UserTransaction(_)
1024 );
1025 if is_user_tx && epoch_store.should_send_end_of_publish() {
1026 if let Err(err) = self.submit(
1028 ConsensusTransaction::new_end_of_publish(self.authority),
1029 None,
1030 epoch_store,
1031 None,
1032 None,
1033 ) {
1034 warn!("Error when sending end of publish message: {:?}", err);
1035 } else {
1036 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1037 }
1038 }
1039 self.metrics
1040 .sequencing_certificate_success
1041 .with_label_values(&[tx_type])
1042 .inc();
1043 }
1044
1045 #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
1046 async fn submit_inner(
1047 self: &Arc<Self>,
1048 transactions: &[ConsensusTransaction],
1049 epoch_store: &Arc<AuthorityPerEpochStore>,
1050 transaction_keys: &[SequencedConsensusTransactionKey],
1051 tx_type: &str,
1052 is_soft_bundle: bool,
1053 ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
1054 let ack_start = Instant::now();
1055 let mut retries: u32 = 0;
1056 let is_dkg = !transactions.is_empty() && transactions[0].is_dkg();
1057
1058 let (consensus_positions, status_waiter) = loop {
1059 let span = debug_span!("client_submit");
1060 match self
1061 .consensus_client
1062 .submit(transactions, epoch_store)
1063 .instrument(span)
1064 .await
1065 {
1066 Err(err) => {
1067 if retries > 30 || (retries > 3 && (is_soft_bundle || !is_dkg)) {
1070 warn!(
1071 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
1072 );
1073 }
1074 self.metrics
1075 .sequencing_certificate_failures
1076 .with_label_values(&[tx_type])
1077 .inc();
1078 retries += 1;
1079
1080 if is_dkg {
1081 time::sleep(Duration::from_millis(100)).await;
1084 } else {
1085 time::sleep(Duration::from_secs(10)).await;
1086 };
1087 }
1088 Ok((consensus_positions, status_waiter)) => {
1089 break (consensus_positions, status_waiter);
1090 }
1091 }
1092 };
1093
1094 let bucket = match retries {
1098 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
1100 21..=50 => "between_20_and_50".to_string(),
1101 51..=100 => "between_50_and_100".to_string(),
1102 _ => "over_100".to_string(),
1103 };
1104
1105 self.metrics
1106 .sequencing_acknowledge_latency
1107 .with_label_values(&[&bucket, tx_type])
1108 .observe(ack_start.elapsed().as_secs_f64());
1109
1110 (consensus_positions, status_waiter)
1111 }
1112
1113 async fn await_consensus_or_checkpoint(
1116 self: &Arc<Self>,
1117 transaction_keys: Vec<SequencedConsensusTransactionKey>,
1118 epoch_store: &Arc<AuthorityPerEpochStore>,
1119 ) -> ProcessedMethod {
1120 let notifications = FuturesUnordered::new();
1121 for transaction_key in transaction_keys {
1122 let transaction_digests = match transaction_key {
1123 SequencedConsensusTransactionKey::External(
1124 ConsensusTransactionKey::Certificate(digest),
1125 ) => vec![digest],
1126 _ => vec![],
1127 };
1128
1129 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1130 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
1131 | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
1132 ) = transaction_key
1133 {
1134 Either::Left(
1137 self.checkpoint_store
1138 .notify_read_synced_checkpoint(checkpoint_sequence_number),
1139 )
1140 } else {
1141 Either::Right(future::pending())
1142 };
1143
1144 notifications.push(async move {
1148 tokio::select! {
1149 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1150 processed.expect("Storage error when waiting for consensus message processed");
1151 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1152 return ProcessedMethod::Consensus;
1153 },
1154 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1155 processed.expect("Storage error when waiting for transaction executed in checkpoint");
1156 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1157 }
1158 _ = checkpoint_synced_future => {
1159 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1160 }
1161 }
1162 ProcessedMethod::Checkpoint
1163 });
1164 }
1165
1166 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1167 for method in processed_methods {
1168 if method == ProcessedMethod::Checkpoint {
1169 return ProcessedMethod::Checkpoint;
1170 }
1171 }
1172 ProcessedMethod::Consensus
1173 }
1174}
1175
1176impl CheckConnection for ConnectionMonitorStatus {
1177 fn check_connection(
1178 &self,
1179 ourself: &AuthorityName,
1180 authority: &AuthorityName,
1181 ) -> Option<ConnectionStatus> {
1182 if ourself == authority {
1183 return Some(ConnectionStatus::Connected);
1184 }
1185
1186 let mapping = self.authority_names_to_peer_ids.load_full();
1187 let peer_id = match mapping.get(authority) {
1188 Some(p) => p,
1189 None => {
1190 warn!(
1191 "failed to find peer {:?} in connection monitor listener",
1192 authority
1193 );
1194 return None;
1195 }
1196 };
1197
1198 match self.connection_statuses.try_get(peer_id) {
1199 TryResult::Present(c) => Some(c.value().clone()),
1200 TryResult::Absent => None,
1201 TryResult::Locked => {
1202 Some(ConnectionStatus::Disconnected)
1204 }
1205 }
1206 }
1207 fn update_mapping_for_epoch(
1208 &self,
1209 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1210 ) {
1211 self.authority_names_to_peer_ids
1212 .swap(Arc::new(authority_names_to_peer_ids));
1213 }
1214}
1215
1216impl CheckConnection for ConnectionMonitorStatusForTests {
1217 fn check_connection(
1218 &self,
1219 _ourself: &AuthorityName,
1220 _authority: &AuthorityName,
1221 ) -> Option<ConnectionStatus> {
1222 Some(ConnectionStatus::Connected)
1223 }
1224 fn update_mapping_for_epoch(
1225 &self,
1226 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1227 ) {
1228 }
1229}
1230
1231pub fn get_position_in_list(
1232 search_authority: AuthorityName,
1233 positions: Vec<AuthorityName>,
1234) -> usize {
1235 positions
1236 .into_iter()
1237 .find_position(|authority| *authority == search_authority)
1238 .expect("Couldn't find ourselves in shuffled committee")
1239 .0
1240}
1241
1242impl ConsensusOverloadChecker for ConsensusAdapter {
1243 fn check_consensus_overload(&self) -> SuiResult {
1244 fp_ensure!(
1245 self.check_limits(),
1246 SuiErrorKind::TooManyTransactionsPendingConsensus.into()
1247 );
1248 Ok(())
1249 }
1250}
1251
1252pub struct NoopConsensusOverloadChecker {}
1253
1254impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
1255 fn check_consensus_overload(&self) -> SuiResult {
1256 Ok(())
1257 }
1258}
1259
1260impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1261 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1265 {
1266 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1267 if !reconfig_guard.should_accept_user_certs() {
1268 return;
1270 }
1271 epoch_store.close_user_certs(reconfig_guard);
1272 }
1273 if epoch_store.should_send_end_of_publish() {
1274 if let Err(err) = self.submit(
1275 ConsensusTransaction::new_end_of_publish(self.authority),
1276 None,
1277 epoch_store,
1278 None,
1279 None,
1280 ) {
1281 warn!("Error when sending end of publish message: {:?}", err);
1282 } else {
1283 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1284 }
1285 }
1286 }
1287}
1288
1289struct CancelOnDrop<T>(JoinHandle<T>);
1290
1291impl<T> Deref for CancelOnDrop<T> {
1292 type Target = JoinHandle<T>;
1293
1294 fn deref(&self) -> &Self::Target {
1295 &self.0
1296 }
1297}
1298
1299impl<T> Drop for CancelOnDrop<T> {
1300 fn drop(&mut self) {
1301 self.0.abort();
1302 }
1303}
1304
1305struct InflightDropGuard<'a> {
1307 adapter: &'a ConsensusAdapter,
1308 start: Instant,
1309 position: Option<usize>,
1310 positions_moved: Option<usize>,
1311 preceding_disconnected: Option<usize>,
1312 amplification_factor: Option<usize>,
1313 tx_type: &'static str,
1314 processed_method: ProcessedMethod,
1315}
1316
1317#[derive(PartialEq, Eq)]
1318enum ProcessedMethod {
1319 Consensus,
1320 Checkpoint,
1321}
1322
1323impl<'a> InflightDropGuard<'a> {
1324 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1325 adapter
1326 .num_inflight_transactions
1327 .fetch_add(1, Ordering::SeqCst);
1328 adapter
1329 .metrics
1330 .sequencing_certificate_inflight
1331 .with_label_values(&[tx_type])
1332 .inc();
1333 adapter
1334 .metrics
1335 .sequencing_certificate_attempt
1336 .with_label_values(&[tx_type])
1337 .inc();
1338 Self {
1339 adapter,
1340 start: Instant::now(),
1341 position: None,
1342 positions_moved: None,
1343 preceding_disconnected: None,
1344 amplification_factor: None,
1345 tx_type,
1346 processed_method: ProcessedMethod::Consensus,
1347 }
1348 }
1349}
1350
1351impl Drop for InflightDropGuard<'_> {
1352 fn drop(&mut self) {
1353 self.adapter
1354 .num_inflight_transactions
1355 .fetch_sub(1, Ordering::SeqCst);
1356 self.adapter
1357 .metrics
1358 .sequencing_certificate_inflight
1359 .with_label_values(&[self.tx_type])
1360 .dec();
1361
1362 let position = if let Some(position) = self.position {
1363 self.adapter
1364 .metrics
1365 .sequencing_certificate_authority_position
1366 .observe(position as f64);
1367 position.to_string()
1368 } else {
1369 "not_submitted".to_string()
1370 };
1371
1372 if let Some(positions_moved) = self.positions_moved {
1373 self.adapter
1374 .metrics
1375 .sequencing_certificate_positions_moved
1376 .observe(positions_moved as f64);
1377 };
1378
1379 if let Some(preceding_disconnected) = self.preceding_disconnected {
1380 self.adapter
1381 .metrics
1382 .sequencing_certificate_preceding_disconnected
1383 .observe(preceding_disconnected as f64);
1384 };
1385
1386 if let Some(amplification_factor) = self.amplification_factor {
1387 self.adapter
1388 .metrics
1389 .sequencing_certificate_amplification_factor
1390 .observe(amplification_factor as f64);
1391 };
1392
1393 let latency = self.start.elapsed();
1394 let processed_method = match self.processed_method {
1395 ProcessedMethod::Consensus => "processed_via_consensus",
1396 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1397 };
1398
1399 self.adapter
1400 .metrics
1401 .sequencing_certificate_latency
1402 .with_label_values(&[&position, self.tx_type, processed_method])
1403 .observe(latency.as_secs_f64());
1404
1405 if self.position == Some(0) {
1410 let sampled = matches!(
1413 self.tx_type,
1414 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1415 );
1416 if sampled && self.processed_method == ProcessedMethod::Consensus {
1418 self.adapter.latency_observer.report(latency);
1419 }
1420 }
1421 }
1422}
1423
1424impl SubmitToConsensus for Arc<ConsensusAdapter> {
1425 fn submit_to_consensus(
1426 &self,
1427 transactions: &[ConsensusTransaction],
1428 epoch_store: &Arc<AuthorityPerEpochStore>,
1429 ) -> SuiResult {
1430 self.submit_batch(transactions, None, epoch_store, None, None)
1431 .map(|_| ())
1432 }
1433
1434 fn submit_best_effort(
1435 &self,
1436 transaction: &ConsensusTransaction,
1437 epoch_store: &Arc<AuthorityPerEpochStore>,
1438 timeout: Duration,
1440 ) -> SuiResult {
1441 let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1442 Ok(permit) => permit,
1443 Err(_) => {
1444 return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1445 }
1446 };
1447
1448 let _in_flight_submission_guard =
1449 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1450
1451 let key = SequencedConsensusTransactionKey::External(transaction.key());
1452 let tx_type = classify(transaction);
1453
1454 let async_stage = {
1455 let transaction = transaction.clone();
1456 let epoch_store = epoch_store.clone();
1457 let this = self.clone();
1458
1459 async move {
1460 let _permit = permit; let result = tokio::time::timeout(
1463 timeout,
1464 this.submit_inner(&[transaction], &epoch_store, &[key], tx_type, false),
1465 )
1466 .await;
1467
1468 if let Err(e) = result {
1469 warn!("Consensus submission timed out: {e:?}");
1470 this.metrics
1471 .sequencing_best_effort_timeout
1472 .with_label_values(&[tx_type])
1473 .inc();
1474 }
1475 }
1476 };
1477
1478 let epoch_store = epoch_store.clone();
1479 spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1480 Ok(())
1481 }
1482}
1483
1484pub fn position_submit_certificate(
1485 committee: &Committee,
1486 ourselves: &AuthorityName,
1487 tx_digest: &TransactionDigest,
1488) -> usize {
1489 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1490 get_position_in_list(*ourselves, validators)
1491}
1492
1493#[cfg(test)]
1494mod adapter_tests {
1495 use super::position_submit_certificate;
1496 use crate::checkpoints::CheckpointStore;
1497 use crate::consensus_adapter::{
1498 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1499 };
1500 use crate::mysticeti_adapter::LazyMysticetiClient;
1501 use fastcrypto::traits::KeyPair;
1502 use rand::Rng;
1503 use rand::{SeedableRng, rngs::StdRng};
1504 use std::sync::Arc;
1505 use std::time::Duration;
1506 use sui_types::{
1507 base_types::TransactionDigest,
1508 committee::Committee,
1509 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1510 };
1511
1512 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1513 let authorities = (0..size)
1514 .map(|_k| {
1515 (
1516 AuthorityPublicKeyBytes::from(
1517 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1518 ),
1519 rng.gen_range(0u64..10u64),
1520 )
1521 })
1522 .collect::<Vec<_>>();
1523 Committee::new_for_testing_with_normalized_voting_power(
1524 0,
1525 authorities.iter().cloned().collect(),
1526 )
1527 }
1528
1529 #[tokio::test]
1530 async fn test_await_submit_delay_user_transaction() {
1531 let mut rng = StdRng::from_seed([0; 32]);
1533 let committee = test_committee(&mut rng, 10);
1534
1535 let consensus_adapter = ConsensusAdapter::new(
1537 Arc::new(LazyMysticetiClient::new()),
1538 CheckpointStore::new_for_tests(),
1539 *committee.authority_by_index(0).unwrap(),
1540 Arc::new(ConnectionMonitorStatusForTests {}),
1541 100_000,
1542 100_000,
1543 Some(1),
1544 Some(Duration::from_secs(2)),
1545 ConsensusAdapterMetrics::new_test(),
1546 sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1547 );
1548
1549 let tx_digest = TransactionDigest::generate(&mut rng);
1551
1552 let (position, positions_moved, _) =
1554 consensus_adapter.submission_position(&committee, &tx_digest);
1555 assert_eq!(position, 7);
1556 assert!(!positions_moved > 0);
1557
1558 let (delay_step, position, positions_moved, _) =
1560 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1561
1562 assert_eq!(position, 1);
1563 assert_eq!(delay_step, Duration::from_secs(2));
1564 assert!(!positions_moved > 0);
1565
1566 let consensus_adapter = ConsensusAdapter::new(
1568 Arc::new(LazyMysticetiClient::new()),
1569 CheckpointStore::new_for_tests(),
1570 *committee.authority_by_index(0).unwrap(),
1571 Arc::new(ConnectionMonitorStatusForTests {}),
1572 100_000,
1573 100_000,
1574 None,
1575 None,
1576 ConsensusAdapterMetrics::new_test(),
1577 sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1578 );
1579
1580 let (delay_step, position, positions_moved, _) =
1581 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1582
1583 assert_eq!(position, 7);
1584
1585 assert_eq!(delay_step, Duration::from_secs(14));
1587 assert!(!positions_moved > 0);
1588
1589 let (delay_step, position, _, _) =
1591 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 7);
1592 assert_eq!(position, 1);
1593 assert_eq!(delay_step, Duration::from_secs(2));
1594
1595 let (delay_step, position, _, _) =
1597 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 8);
1598 assert_eq!(position, 0);
1599 assert_eq!(delay_step, Duration::ZERO);
1600 }
1601
1602 #[test]
1603 fn test_position_submit_certificate() {
1604 let mut rng = StdRng::from_seed([0; 32]);
1606 let committee = test_committee(&mut rng, 10);
1607
1608 const NUM_TEST_TRANSACTIONS: usize = 1000;
1610
1611 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1612 let tx_digest = TransactionDigest::generate(&mut rng);
1613
1614 let mut zero_found = false;
1615 for (name, _) in committee.members() {
1616 let f = position_submit_certificate(&committee, name, &tx_digest);
1617 assert!(f < committee.num_members());
1618 if f == 0 {
1619 assert!(!zero_found);
1621 zero_found = true;
1622 }
1623 }
1624 assert!(zero_found);
1625 }
1626 }
1627}