sui_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::IpAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::time::Instant;
12
13use arc_swap::{ArcSwap, ArcSwapOption};
14use consensus_core::BlockStatus;
15use dashmap::DashMap;
16use dashmap::try_result::TryResult;
17use futures::FutureExt;
18use futures::future::{self, Either, select};
19use futures::stream::FuturesUnordered;
20use futures::{StreamExt, pin_mut};
21use itertools::Itertools;
22use mysten_common::debug_fatal;
23use mysten_metrics::{
24    GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
25};
26use mysten_network::anemo_connection_monitor::ConnectionStatus;
27use parking_lot::RwLockReadGuard;
28use prometheus::Histogram;
29use prometheus::HistogramVec;
30use prometheus::IntCounterVec;
31use prometheus::IntGauge;
32use prometheus::IntGaugeVec;
33use prometheus::Registry;
34use prometheus::{
35    register_histogram_vec_with_registry, register_histogram_with_registry,
36    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
37    register_int_gauge_with_registry,
38};
39use sui_protocol_config::ProtocolConfig;
40use sui_simulator::anemo::PeerId;
41use sui_types::base_types::AuthorityName;
42use sui_types::base_types::TransactionDigest;
43use sui_types::committee::Committee;
44use sui_types::error::{SuiErrorKind, SuiResult};
45use sui_types::fp_ensure;
46use sui_types::messages_consensus::ConsensusPosition;
47use sui_types::messages_consensus::ConsensusTransactionKind;
48use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
49use sui_types::transaction::TransactionDataAPI;
50use tokio::sync::{Semaphore, SemaphorePermit, oneshot};
51use tokio::task::JoinHandle;
52use tokio::time::Duration;
53use tokio::time::{self};
54use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
55
56use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
57use crate::checkpoints::CheckpointStore;
58use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
59use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
60use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
61use crate::metrics::LatencyObserver;
62
63#[cfg(test)]
64#[path = "unit_tests/consensus_tests.rs"]
65pub mod consensus_tests;
66
67const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
68    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
69];
70
71pub struct ConsensusAdapterMetrics {
72    // Certificate sequencing metrics
73    pub sequencing_certificate_attempt: IntCounterVec,
74    pub sequencing_certificate_success: IntCounterVec,
75    pub sequencing_certificate_failures: IntCounterVec,
76    pub sequencing_certificate_status: IntCounterVec,
77    pub sequencing_certificate_inflight: IntGaugeVec,
78    pub sequencing_acknowledge_latency: HistogramVec,
79    pub sequencing_certificate_latency: HistogramVec,
80    pub sequencing_certificate_authority_position: Histogram,
81    pub sequencing_certificate_positions_moved: Histogram,
82    pub sequencing_certificate_preceding_disconnected: Histogram,
83    pub sequencing_certificate_processed: IntCounterVec,
84    pub sequencing_certificate_amplification_factor: Histogram,
85    pub sequencing_in_flight_semaphore_wait: IntGauge,
86    pub sequencing_in_flight_submissions: IntGauge,
87    pub sequencing_estimated_latency: IntGauge,
88    pub sequencing_resubmission_interval_ms: IntGauge,
89    pub sequencing_best_effort_timeout: IntCounterVec,
90}
91
92impl ConsensusAdapterMetrics {
93    pub fn new(registry: &Registry) -> Self {
94        Self {
95            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
96                "sequencing_certificate_attempt",
97                "Counts the number of certificates the validator attempts to sequence.",
98                &["tx_type"],
99                registry,
100            )
101                .unwrap(),
102            sequencing_certificate_success: register_int_counter_vec_with_registry!(
103                "sequencing_certificate_success",
104                "Counts the number of successfully sequenced certificates.",
105                &["tx_type"],
106                registry,
107            )
108                .unwrap(),
109            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
110                "sequencing_certificate_failures",
111                "Counts the number of sequenced certificates that failed other than by timeout.",
112                &["tx_type"],
113                registry,
114            )
115                .unwrap(),
116                sequencing_certificate_status: register_int_counter_vec_with_registry!(
117                "sequencing_certificate_status",
118                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
119                &["tx_type", "status"],
120                registry,
121            )
122                .unwrap(),
123            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
124                "sequencing_certificate_inflight",
125                "The inflight requests to sequence certificates.",
126                &["tx_type"],
127                registry,
128            )
129                .unwrap(),
130            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
131                "sequencing_acknowledge_latency",
132                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
133                &["retry", "tx_type"],
134                LATENCY_SEC_BUCKETS.to_vec(),
135                registry,
136            ).unwrap(),
137            sequencing_certificate_latency: register_histogram_vec_with_registry!(
138                "sequencing_certificate_latency",
139                "The latency for sequencing a certificate.",
140                &["position", "tx_type", "processed_method"],
141                LATENCY_SEC_BUCKETS.to_vec(),
142                registry,
143            ).unwrap(),
144            sequencing_certificate_authority_position: register_histogram_with_registry!(
145                "sequencing_certificate_authority_position",
146                "The position of the authority when submitted a certificate to consensus.",
147                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
148                registry,
149            ).unwrap(),
150            sequencing_certificate_positions_moved: register_histogram_with_registry!(
151                "sequencing_certificate_positions_moved",
152                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
153                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
154                registry,
155            ).unwrap(),
156            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
157                "sequencing_certificate_preceding_disconnected",
158                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
159                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
160                registry,
161            ).unwrap(),
162            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
163                "sequencing_certificate_processed",
164                "The number of certificates that have been processed either by consensus or checkpoint.",
165                &["source"],
166                registry
167            ).unwrap(),
168            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169                "sequencing_in_flight_semaphore_wait",
170                "How many requests are blocked on submit_permit.",
171                registry,
172            )
173                .unwrap(),
174            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175                "sequencing_in_flight_submissions",
176                "Number of transactions submitted to local consensus instance and not yet sequenced",
177                registry,
178            )
179                .unwrap(),
180            sequencing_estimated_latency: register_int_gauge_with_registry!(
181                "sequencing_estimated_latency",
182                "Consensus latency estimated by consensus adapter in milliseconds",
183                registry,
184            )
185                .unwrap(),
186            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187                "sequencing_resubmission_interval_ms",
188                "Resubmission interval used by consensus adapter in milliseconds",
189                registry,
190            )
191                .unwrap(),
192                sequencing_certificate_amplification_factor: register_histogram_with_registry!(
193                    "sequencing_certificate_amplification_factor",
194                    "The amplification factor used by consensus adapter to submit to consensus.",
195                    SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
196                    registry,
197                ).unwrap(),
198            sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
199                "sequencing_best_effort_timeout",
200                "The number of times the best effort submission has timed out.",
201                &["tx_type"],
202                registry,
203            ).unwrap(),
204        }
205    }
206
207    pub fn new_test() -> Self {
208        Self::new(&Registry::default())
209    }
210}
211
212/// An object that can be used to check if the consensus is overloaded.
213pub trait ConsensusOverloadChecker: Sync + Send + 'static {
214    fn check_consensus_overload(&self) -> SuiResult;
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221    fn submit_to_consensus(
222        &self,
223        transactions: &[ConsensusTransaction],
224        epoch_store: &Arc<AuthorityPerEpochStore>,
225    ) -> SuiResult;
226
227    fn submit_best_effort(
228        &self,
229        transaction: &ConsensusTransaction,
230        epoch_store: &Arc<AuthorityPerEpochStore>,
231        timeout: Duration,
232    ) -> SuiResult;
233}
234
235#[mockall::automock]
236#[async_trait::async_trait]
237pub trait ConsensusClient: Sync + Send + 'static {
238    async fn submit(
239        &self,
240        transactions: &[ConsensusTransaction],
241        epoch_store: &Arc<AuthorityPerEpochStore>,
242    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
243}
244
245/// Submit Sui certificates to the consensus.
246pub struct ConsensusAdapter {
247    /// The network client connecting to the consensus node of this authority.
248    consensus_client: Arc<dyn ConsensusClient>,
249    /// The checkpoint store for the validator
250    checkpoint_store: Arc<CheckpointStore>,
251    /// Authority pubkey.
252    authority: AuthorityName,
253    /// The limit to number of inflight transactions at this node.
254    max_pending_transactions: usize,
255    /// Number of submitted transactions still inflight at this node.
256    num_inflight_transactions: AtomicU64,
257    /// Dictates the maximum position  from which will submit to consensus. Even if the is elected to
258    /// submit from a higher position than this, it will "reset" to the max_submit_position.
259    max_submit_position: Option<usize>,
260    /// When provided it will override the current back off logic and will use this value instead
261    /// as delay step.
262    submit_delay_step_override: Option<Duration>,
263    /// A structure to check the connection statuses populated by the Connection Monitor Listener
264    connection_monitor_status: Arc<dyn CheckConnection>,
265    /// A structure to check the reputation scores populated by Consensus
266    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
267    /// The throughput profiler to be used when making decisions to submit to consensus
268    consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
269    /// A structure to register metrics
270    metrics: ConsensusAdapterMetrics,
271    /// Semaphore limiting parallel submissions to consensus
272    submit_semaphore: Arc<Semaphore>,
273    latency_observer: LatencyObserver,
274    protocol_config: ProtocolConfig,
275}
276
277pub trait CheckConnection: Send + Sync {
278    fn check_connection(
279        &self,
280        ourself: &AuthorityName,
281        authority: &AuthorityName,
282    ) -> Option<ConnectionStatus>;
283    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
284}
285
286pub struct ConnectionMonitorStatus {
287    /// Current connection statuses forwarded from the connection monitor
288    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
289    /// A map from authority name to peer id
290    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
291}
292
293pub struct ConnectionMonitorStatusForTests {}
294
295impl ConsensusAdapter {
296    /// Make a new Consensus adapter instance.
297    pub fn new(
298        consensus_client: Arc<dyn ConsensusClient>,
299        checkpoint_store: Arc<CheckpointStore>,
300        authority: AuthorityName,
301        connection_monitor_status: Arc<dyn CheckConnection>,
302        max_pending_transactions: usize,
303        max_pending_local_submissions: usize,
304        max_submit_position: Option<usize>,
305        submit_delay_step_override: Option<Duration>,
306        metrics: ConsensusAdapterMetrics,
307        protocol_config: ProtocolConfig,
308    ) -> Self {
309        let num_inflight_transactions = Default::default();
310        let low_scoring_authorities =
311            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
312        Self {
313            consensus_client,
314            checkpoint_store,
315            authority,
316            max_pending_transactions,
317            max_submit_position,
318            submit_delay_step_override,
319            num_inflight_transactions,
320            connection_monitor_status,
321            low_scoring_authorities,
322            metrics,
323            submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
324            latency_observer: LatencyObserver::new(),
325            consensus_throughput_profiler: ArcSwapOption::empty(),
326            protocol_config,
327        }
328    }
329
330    pub fn swap_low_scoring_authorities(
331        &self,
332        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
333    ) {
334        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
335    }
336
337    pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
338        self.consensus_throughput_profiler.store(Some(profiler))
339    }
340
341    /// Get the current number of in-flight transactions
342    pub fn num_inflight_transactions(&self) -> u64 {
343        self.num_inflight_transactions.load(Ordering::Relaxed)
344    }
345
346    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
347        // Transactions being sent to consensus can be dropped on crash, before included in a proposed block.
348        // System transactions do not have clients to retry them. They need to be resubmitted to consensus on restart.
349        // get_all_pending_consensus_transactions() can return both system and certified transactions though.
350        //
351        // todo - get_all_pending_consensus_transactions is called twice when
352        // initializing AuthorityPerEpochStore and here, should not be a big deal but can be optimized
353        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
354
355        #[allow(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
356        if epoch_store.should_send_end_of_publish() {
357            if !recovered
358                .iter()
359                .any(ConsensusTransaction::is_end_of_publish)
360            {
361                // There are two cases when this is needed
362                // (1) We send EndOfPublish message after removing pending certificates in submit_and_wait_inner
363                // It is possible that node will crash between those two steps, in which case we might need to
364                // re-introduce EndOfPublish message on restart
365                // (2) If node crashed inside ConsensusAdapter::close_epoch,
366                // after reconfig lock state was written to DB and before we persisted EndOfPublish message
367                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
368            }
369        }
370        debug!(
371            "Submitting {:?} recovered pending consensus transactions to consensus",
372            recovered.len()
373        );
374        for transaction in recovered {
375            if transaction.is_end_of_publish() {
376                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
377            }
378            self.submit_unchecked(&[transaction], epoch_store, None, None);
379        }
380    }
381
382    fn await_submit_delay(
383        &self,
384        epoch_store: &Arc<AuthorityPerEpochStore>,
385        transactions: &[ConsensusTransaction],
386    ) -> (impl Future<Output = ()>, usize, usize, usize, usize) {
387        if transactions.iter().any(|tx| tx.is_mfp_transaction()) {
388            // UserTransactions are generally sent to just one validator and should
389            // be submitted to consensus without delay.
390            return (tokio::time::sleep(Duration::ZERO), 0, 0, 0, 0);
391        }
392
393        // Use the minimum digest to compute submit delay.
394        let min_digest_and_gas_price = transactions
395            .iter()
396            .filter_map(|tx| match &tx.kind {
397                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
398                    Some((certificate.digest(), certificate.gas_price()))
399                }
400                ConsensusTransactionKind::UserTransaction(transaction) => Some((
401                    transaction.digest(),
402                    transaction.data().transaction_data().gas_price(),
403                )),
404                ConsensusTransactionKind::UserTransactionV2(transaction) => Some((
405                    transaction.tx().digest(),
406                    transaction.tx().data().transaction_data().gas_price(),
407                )),
408                _ => None,
409            })
410            .min();
411        let mut amplification_factor = 0;
412
413        let (duration, position, positions_moved, preceding_disconnected) =
414            match min_digest_and_gas_price {
415                Some((digest, gas_price)) => {
416                    let k = epoch_store
417                        .protocol_config()
418                        .sip_45_consensus_amplification_threshold_as_option()
419                        .unwrap_or(u64::MAX);
420                    let multiplier =
421                        gas_price / std::cmp::max(epoch_store.reference_gas_price(), 1);
422                    amplification_factor = if multiplier >= k { multiplier } else { 0 };
423                    self.await_submit_delay_user_transaction(
424                        epoch_store.committee(),
425                        digest,
426                        amplification_factor as usize,
427                    )
428                }
429                _ => (Duration::ZERO, 0, 0, 0),
430            };
431        (
432            tokio::time::sleep(duration),
433            position,
434            positions_moved,
435            preceding_disconnected,
436            amplification_factor as usize,
437        )
438    }
439
440    fn await_submit_delay_user_transaction(
441        &self,
442        committee: &Committee,
443        tx_digest: &TransactionDigest,
444        amplification_factor: usize,
445    ) -> (Duration, usize, usize, usize) {
446        let (mut position, positions_moved, preceding_disconnected) =
447            self.submission_position(committee, tx_digest);
448        if amplification_factor > 0 {
449            position = (position + 1).saturating_sub(amplification_factor);
450        }
451
452        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
453        const MIN_LATENCY: Duration = Duration::from_millis(150);
454        const MAX_LATENCY: Duration = Duration::from_secs(3);
455
456        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
457        self.metrics
458            .sequencing_estimated_latency
459            .set(latency.as_millis() as i64);
460
461        let latency = std::cmp::max(latency, MIN_LATENCY);
462        let latency = std::cmp::min(latency, MAX_LATENCY);
463        let latency = latency * 2;
464        let latency = self.override_by_throughput_profiler(position, latency);
465        let (delay_step, position) =
466            self.override_by_max_submit_position_settings(latency, position);
467
468        self.metrics
469            .sequencing_resubmission_interval_ms
470            .set(delay_step.as_millis() as i64);
471
472        (
473            delay_step * position as u32,
474            position,
475            positions_moved,
476            preceding_disconnected,
477        )
478    }
479
480    // According to the throughput profile we want to either allow some transaction duplication or not)
481    // When throughput profile is Low and the validator is in position = 1, then it will submit to consensus with much lower latency.
482    // When throughput profile is High then we go back to default operation and no-one co-submits.
483    fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
484        const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
485        const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
486        const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;
487
488        let p = self.consensus_throughput_profiler.load();
489
490        if let Some(profiler) = p.as_ref() {
491            let (level, _) = profiler.throughput_level();
492
493            // we only run this for the position = 1 validator to co-submit with the validator of
494            // position = 0. We also enable this only when the feature is enabled on the protocol config.
495            if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
496                return match level {
497                    Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
498                    Level::Medium => {
499                        Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
500                    }
501                    Level::High => {
502                        let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);
503
504                        // back off according to recorded latency if it's significantly higher
505                        if latency >= 2 * l { latency } else { l }
506                    }
507                };
508            }
509        }
510        latency
511    }
512
513    /// Overrides the latency and the position if there are defined settings for `max_submit_position` and
514    /// `submit_delay_step_override`. If the `max_submit_position` has defined, then that will always be used
515    /// irrespective of any so far decision. Same for the `submit_delay_step_override`.
516    fn override_by_max_submit_position_settings(
517        &self,
518        latency: Duration,
519        mut position: usize,
520    ) -> (Duration, usize) {
521        // Respect any manual override for position and latency from the settings
522        if let Some(max_submit_position) = self.max_submit_position {
523            position = std::cmp::min(position, max_submit_position);
524        }
525
526        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
527        (delay_step, position)
528    }
529
530    /// Check when this authority should submit the certificate to consensus.
531    /// This sorts all authorities based on pseudo-random distribution derived from transaction hash.
532    ///
533    /// The function targets having 1 consensus transaction submitted per user transaction
534    /// when system operates normally.
535    ///
536    /// The function returns the position of this authority when it is their turn to submit the transaction to consensus.
537    fn submission_position(
538        &self,
539        committee: &Committee,
540        tx_digest: &TransactionDigest,
541    ) -> (usize, usize, usize) {
542        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
543
544        self.check_submission_wrt_connectivity_and_scores(positions)
545    }
546
547    /// This function runs the following algorithm to decide whether or not to submit a transaction
548    /// to consensus.
549    ///
550    /// It takes in a deterministic list that represents positions of all the authorities.
551    /// The authority in the first position will be responsible for submitting to consensus, and
552    /// so we check if we are this validator, and if so, return true.
553    ///
554    /// If we are not in that position, we check our connectivity to the authority in that position.
555    /// If we are connected to them, we can assume that they are operational and will submit the transaction.
556    /// If we are not connected to them, we assume that they are not operational and we will not rely
557    /// on that authority to submit the transaction. So we shift them out of the first position, and
558    /// run this algorithm again on the new set of positions.
559    ///
560    /// This can possibly result in a transaction being submitted twice if an authority sees a false
561    /// negative in connectivity to another, such as in the case of a network partition.
562    ///
563    /// Recursively, if the authority further ahead of us in the positions is a low performing authority, we will
564    /// move our positions up one, and submit the transaction. This allows maintaining performance
565    /// overall. We will only do this part for authorities that are not low performers themselves to
566    /// prevent extra amplification in the case that the positions look like [low_scoring_a1, low_scoring_a2, a3]
567    fn check_submission_wrt_connectivity_and_scores(
568        &self,
569        positions: Vec<AuthorityName>,
570    ) -> (usize, usize, usize) {
571        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
572        if low_scoring_authorities.get(&self.authority).is_some() {
573            return (positions.len(), 0, 0);
574        }
575        let initial_position = get_position_in_list(self.authority, positions.clone());
576        let mut preceding_disconnected = 0;
577        let mut before_our_position = true;
578
579        let filtered_positions: Vec<_> = positions
580            .into_iter()
581            .filter(|authority| {
582                let keep = self.authority == *authority; // don't filter ourself out
583                if keep {
584                    before_our_position = false;
585                }
586
587                // filter out any nodes that appear disconnected
588                let connected = self
589                    .connection_monitor_status
590                    .check_connection(&self.authority, authority)
591                    .unwrap_or(ConnectionStatus::Disconnected)
592                    == ConnectionStatus::Connected;
593                if !connected && before_our_position {
594                    preceding_disconnected += 1; // used for metrics
595                }
596
597                // Filter out low scoring nodes
598                let high_scoring = low_scoring_authorities.get(authority).is_none();
599
600                keep || (connected && high_scoring)
601            })
602            .collect();
603
604        let position = get_position_in_list(self.authority, filtered_positions);
605
606        (
607            position,
608            initial_position - position,
609            preceding_disconnected,
610        )
611    }
612
613    /// This method blocks until transaction is persisted in local database
614    /// It then returns handle to async task, user can join this handle to await while transaction is processed by consensus
615    ///
616    /// This method guarantees that once submit(but not returned async handle) returns,
617    /// transaction is persisted and will eventually be sent to consensus even after restart
618    ///
619    /// When submitting a certificate caller **must** provide a ReconfigState lock guard
620    pub fn submit(
621        self: &Arc<Self>,
622        transaction: ConsensusTransaction,
623        lock: Option<&RwLockReadGuard<ReconfigState>>,
624        epoch_store: &Arc<AuthorityPerEpochStore>,
625        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
626        submitter_client_addr: Option<IpAddr>,
627    ) -> SuiResult<JoinHandle<()>> {
628        self.submit_batch(
629            &[transaction],
630            lock,
631            epoch_store,
632            tx_consensus_position,
633            submitter_client_addr,
634        )
635    }
636
637    // Submits the provided transactions to consensus in a batched fashion. The `transactions` vector can be also empty in case of a ping check.
638    // In this case the system will simulate a transaction submission to consensus and return the consensus position.
639    pub fn submit_batch(
640        self: &Arc<Self>,
641        transactions: &[ConsensusTransaction],
642        lock: Option<&RwLockReadGuard<ReconfigState>>,
643        epoch_store: &Arc<AuthorityPerEpochStore>,
644        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
645        submitter_client_addr: Option<IpAddr>,
646    ) -> SuiResult<JoinHandle<()>> {
647        if transactions.len() > 1 {
648            // When batching multiple transactions, ensure they are all of the same kind
649            // (either all CertifiedTransaction or all UserTransaction).
650            // This makes classifying the transactions easier in later steps.
651            let first_kind = &transactions[0].kind;
652            let is_user_tx_batch = matches!(
653                first_kind,
654                ConsensusTransactionKind::UserTransaction(_)
655                    | ConsensusTransactionKind::UserTransactionV2(_)
656            );
657            let is_cert_batch = matches!(
658                first_kind,
659                ConsensusTransactionKind::CertifiedTransaction(_)
660            );
661
662            for transaction in &transactions[1..] {
663                if is_user_tx_batch {
664                    fp_ensure!(
665                        matches!(
666                            transaction.kind,
667                            ConsensusTransactionKind::UserTransaction(_)
668                                | ConsensusTransactionKind::UserTransactionV2(_)
669                        ),
670                        SuiErrorKind::InvalidTxKindInSoftBundle.into()
671                    );
672                } else if is_cert_batch {
673                    fp_ensure!(
674                        matches!(
675                            transaction.kind,
676                            ConsensusTransactionKind::CertifiedTransaction(_)
677                        ),
678                        SuiErrorKind::InvalidTxKindInSoftBundle.into()
679                    );
680                } else {
681                    // Other transaction kinds cannot be batched
682                    return Err(SuiErrorKind::InvalidTxKindInSoftBundle.into());
683                }
684            }
685        }
686
687        if !transactions.is_empty() {
688            epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
689        }
690
691        Ok(self.submit_unchecked(
692            transactions,
693            epoch_store,
694            tx_consensus_position,
695            submitter_client_addr,
696        ))
697    }
698
699    /// Performs weakly consistent checks on internal buffers to quickly
700    /// discard transactions if we are overloaded
701    fn check_limits(&self) -> bool {
702        // First check total transactions (waiting and in submission)
703        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
704            > self.max_pending_transactions
705        {
706            return false;
707        }
708        // Then check if submit_semaphore has permits
709        self.submit_semaphore.available_permits() > 0
710    }
711
712    fn submit_unchecked(
713        self: &Arc<Self>,
714        transactions: &[ConsensusTransaction],
715        epoch_store: &Arc<AuthorityPerEpochStore>,
716        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
717        submitter_client_addr: Option<IpAddr>,
718    ) -> JoinHandle<()> {
719        // Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
720        let async_stage = self
721            .clone()
722            .submit_and_wait(
723                transactions.to_vec(),
724                epoch_store.clone(),
725                tx_consensus_position,
726                submitter_client_addr,
727            )
728            .in_current_span();
729        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
730        // (Limit is not applied atomically, and only to user transactions.)
731        let join_handle = spawn_monitored_task!(async_stage);
732        join_handle
733    }
734
735    async fn submit_and_wait(
736        self: Arc<Self>,
737        transactions: Vec<ConsensusTransaction>,
738        epoch_store: Arc<AuthorityPerEpochStore>,
739        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
740        submitter_client_addr: Option<IpAddr>,
741    ) {
742        // When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
743        //
744        // This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
745        // which may never happen on epoch boundary.
746        //
747        // In addition to that, within_alive_epoch ensures that all pending consensus
748        // adapter tasks are stopped before reconfiguration can proceed.
749        //
750        // This is essential because after epoch change, this validator may exit the committee and become a full node.
751        // So it is no longer able to submit to consensus.
752        //
753        // Also, submission to consensus is not gated on epoch. Although it is ok to submit user transactions
754        // to the new epoch, we want to cancel system transaction submissions from the current epoch to the new epoch.
755        epoch_store
756            .within_alive_epoch(self.submit_and_wait_inner(
757                transactions,
758                &epoch_store,
759                tx_consensus_position,
760                submitter_client_addr,
761            ))
762            .await
763            .ok(); // result here indicates if epoch ended earlier, we don't care about it
764    }
765
766    #[allow(clippy::option_map_unit_fn)]
767    #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
768    async fn submit_and_wait_inner(
769        self: Arc<Self>,
770        transactions: Vec<ConsensusTransaction>,
771        epoch_store: &Arc<AuthorityPerEpochStore>,
772        mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
773        submitter_client_addr: Option<IpAddr>,
774    ) {
775        if transactions.is_empty() {
776            // If transactions are empty, then we attempt to ping consensus and simulate a transaction submission to consensus.
777            // We intentionally do not wait for the block status, as we are only interested in the consensus position and return it immediately.
778            debug!(
779                "Performing a ping check, pinging consensus to get a consensus position in next block"
780            );
781            let (consensus_positions, _status_waiter) = self
782                .submit_inner(&transactions, epoch_store, &[], "ping")
783                .await;
784
785            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
786                let _ = tx_consensus_positions.send(consensus_positions);
787            } else {
788                debug_fatal!("Ping check must have a consensus position channel");
789            }
790            return;
791        }
792
793        // Record submitted transactions early for DoS protection
794        if epoch_store.protocol_config().mysticeti_fastpath() {
795            for transaction in &transactions {
796                if let Some(tx) = transaction.kind.as_user_transaction() {
797                    let amplification_factor = (tx.data().transaction_data().gas_price()
798                        / epoch_store.reference_gas_price().max(1))
799                    .max(1);
800                    epoch_store.submitted_transaction_cache.record_submitted_tx(
801                        tx.digest(),
802                        amplification_factor as u32,
803                        submitter_client_addr,
804                    );
805                }
806            }
807        }
808
809        // If tx_consensus_positions channel is provided, the caller is looking for a
810        // consensus position for mfp. Therefore we will skip shortcutting submission
811        // if txes have already been processed.
812        let skip_processed_checks = tx_consensus_positions.is_some();
813
814        // Current code path ensures:
815        // - If transactions.len() > 1, it is a soft bundle. System transactions should have been submitted individually.
816        // - If is_soft_bundle, then all transactions are of CertifiedTransaction or UserTransaction kind.
817        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and transactions[0] can be of any kind.
818        let is_soft_bundle = transactions.len() > 1;
819
820        let mut transaction_keys = Vec::new();
821        let mut tx_consensus_positions = tx_consensus_positions;
822
823        for transaction in &transactions {
824            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
825                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
826                epoch_store.record_epoch_pending_certs_process_time_metric();
827            }
828
829            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
830            transaction_keys.push(transaction_key);
831        }
832        let tx_type = if is_soft_bundle {
833            "soft_bundle"
834        } else {
835            classify(&transactions[0])
836        };
837        tracing::Span::current().record("tx_type", tx_type);
838        tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
839
840        let mut guard = InflightDropGuard::acquire(&self, tx_type);
841
842        // Create the waiter until the node's turn comes to submit to consensus
843        let (await_submit, position, positions_moved, preceding_disconnected, amplification_factor) =
844            self.await_submit_delay(epoch_store, &transactions[..]);
845
846        let processed_via_consensus_or_checkpoint = if skip_processed_checks {
847            // If we need to get consensus position, don't bypass consensus submission
848            // for tx digest returned from consensus/checkpoint processing
849            future::pending().boxed()
850        } else {
851            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store)
852                .boxed()
853        };
854        pin_mut!(processed_via_consensus_or_checkpoint);
855
856        let processed_waiter = tokio::select! {
857            // We need to wait for some delay until we submit transaction to the consensus
858            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
859
860            // If epoch ends, don't wait for submit delay
861            _ = epoch_store.user_certs_closed_notify() => {
862                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
863                Some(processed_via_consensus_or_checkpoint)
864            }
865
866            // If transaction is received by consensus or checkpoint while we wait, we are done.
867            _ = &mut processed_via_consensus_or_checkpoint => {
868                None
869            }
870        };
871
872        // Log warnings for administrative transactions that fail to get sequenced
873        let _monitor = if matches!(
874            transactions[0].kind,
875            ConsensusTransactionKind::EndOfPublish(_)
876                | ConsensusTransactionKind::CapabilityNotification(_)
877                | ConsensusTransactionKind::CapabilityNotificationV2(_)
878                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
879                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
880        ) {
881            assert!(
882                !is_soft_bundle,
883                "System transactions should have been submitted individually"
884            );
885            let transaction_keys = transaction_keys.clone();
886            Some(CancelOnDrop(spawn_monitored_task!(async {
887                let mut i = 0u64;
888                loop {
889                    i += 1;
890                    const WARN_DELAY_S: u64 = 30;
891                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
892                    let total_wait = i * WARN_DELAY_S;
893                    warn!(
894                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
895                        total_wait, transaction_keys
896                    );
897                }
898            })))
899        } else {
900            None
901        };
902
903        if let Some(processed_waiter) = processed_waiter {
904            debug!("Submitting {:?} to consensus", transaction_keys);
905
906            // populate the position only when this authority submits the transaction
907            // to consensus
908            guard.position = Some(position);
909            guard.positions_moved = Some(positions_moved);
910            guard.preceding_disconnected = Some(preceding_disconnected);
911            guard.amplification_factor = Some(amplification_factor);
912
913            let _permit: SemaphorePermit = self
914                .submit_semaphore
915                .acquire()
916                .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
917                .await
918                .expect("Consensus adapter does not close semaphore");
919            let _in_flight_submission_guard =
920                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
921
922            // We enter this branch when in select above await_submit completed and processed_waiter is pending
923            // This means it is time for us to submit transaction to consensus
924            let submit_inner = async {
925                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
926
927                loop {
928                    // Submit the transaction to consensus and return the submit result with a status waiter
929                    let (consensus_positions, status_waiter) = self
930                        .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
931                        .await;
932
933                    if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
934                        tracing::Span::current().record(
935                            "consensus_positions",
936                            tracing::field::debug(&consensus_positions),
937                        );
938                        // We send the first consensus position returned by consensus
939                        // to the submitting client even if it is retried internally within
940                        // consensus adapter due to an error or GC. They can handle retries
941                        // as needed if the consensus position does not return the desired
942                        // results (e.g. not sequenced due to garbage collection).
943                        let _ = tx_consensus_positions.send(consensus_positions);
944                    }
945
946                    match status_waiter.await {
947                        Ok(status @ BlockStatus::Sequenced(_)) => {
948                            tracing::Span::current()
949                                .record("status", tracing::field::debug(&status));
950                            self.metrics
951                                .sequencing_certificate_status
952                                .with_label_values(&[tx_type, "sequenced"])
953                                .inc();
954                            // Block has been sequenced. Nothing more to do, we do have guarantees that the transaction will appear in consensus output.
955                            trace!(
956                                "Transaction {transaction_keys:?} has been sequenced by consensus."
957                            );
958                            break;
959                        }
960                        Ok(status @ BlockStatus::GarbageCollected(_)) => {
961                            tracing::Span::current()
962                                .record("status", tracing::field::debug(&status));
963                            self.metrics
964                                .sequencing_certificate_status
965                                .with_label_values(&[tx_type, "garbage_collected"])
966                                .inc();
967                            // Block has been garbage collected and we have no guarantees that the transaction will appear in consensus output. We'll
968                            // resubmit the transaction to consensus. If the transaction has been already "processed", then probably someone else has submitted
969                            // the transaction and managed to get sequenced. Then this future will have been cancelled anyways so no need to check here on the processed output.
970                            debug!(
971                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
972                            );
973                            time::sleep(RETRY_DELAY_STEP).await;
974                            continue;
975                        }
976                        Err(err) => {
977                            warn!(
978                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
979                                err
980                            );
981                            time::sleep(RETRY_DELAY_STEP).await;
982                            continue;
983                        }
984                    }
985                }
986            };
987
988            guard.processed_method = if skip_processed_checks {
989                // When getting consensus positions, we only care about submit_inner completing
990                submit_inner.await;
991                ProcessedMethod::Consensus
992            } else {
993                match select(processed_waiter, submit_inner.boxed()).await {
994                    Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
995                    Either::Right(((), processed_waiter)) => {
996                        debug!("Submitted {transaction_keys:?} to consensus");
997                        processed_waiter.await
998                    }
999                }
1000            };
1001        }
1002        debug!("{transaction_keys:?} processed by consensus");
1003
1004        let consensus_keys: Vec<_> = transactions
1005            .iter()
1006            .filter_map(|t| {
1007                if t.is_mfp_transaction() {
1008                    // UserTransaction is not inserted into the pending consensus transactions table.
1009                    // Also UserTransaction shares the same key as CertifiedTransaction, so removing
1010                    // the key here can have unexpected effects.
1011                    None
1012                } else {
1013                    Some(t.key())
1014                }
1015            })
1016            .collect();
1017        epoch_store
1018            .remove_pending_consensus_transactions(&consensus_keys)
1019            .expect("Storage error when removing consensus transaction");
1020
1021        let is_user_tx = is_soft_bundle
1022            || matches!(
1023                transactions[0].kind,
1024                ConsensusTransactionKind::CertifiedTransaction(_)
1025                    | ConsensusTransactionKind::UserTransaction(_)
1026                    | ConsensusTransactionKind::UserTransactionV2(_)
1027            );
1028        if is_user_tx && epoch_store.should_send_end_of_publish() {
1029            // sending message outside of any locks scope
1030            if let Err(err) = self.submit(
1031                ConsensusTransaction::new_end_of_publish(self.authority),
1032                None,
1033                epoch_store,
1034                None,
1035                None,
1036            ) {
1037                warn!("Error when sending end of publish message: {:?}", err);
1038            } else {
1039                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1040            }
1041        }
1042        self.metrics
1043            .sequencing_certificate_success
1044            .with_label_values(&[tx_type])
1045            .inc();
1046    }
1047
1048    #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
1049    async fn submit_inner(
1050        self: &Arc<Self>,
1051        transactions: &[ConsensusTransaction],
1052        epoch_store: &Arc<AuthorityPerEpochStore>,
1053        transaction_keys: &[SequencedConsensusTransactionKey],
1054        tx_type: &str,
1055    ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
1056        let ack_start = Instant::now();
1057        let mut retries: u32 = 0;
1058        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
1059            Duration::from_millis(100),
1060            Duration::from_secs(10),
1061        );
1062
1063        let (consensus_positions, status_waiter) = loop {
1064            let span = debug_span!("client_submit");
1065            match self
1066                .consensus_client
1067                .submit(transactions, epoch_store)
1068                .instrument(span)
1069                .await
1070            {
1071                Err(err) => {
1072                    // This can happen during reconfig, so keep retrying until succeed.
1073                    if cfg!(msim) || retries > 3 {
1074                        warn!(
1075                            "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
1076                        );
1077                    }
1078                    self.metrics
1079                        .sequencing_certificate_failures
1080                        .with_label_values(&[tx_type])
1081                        .inc();
1082                    retries += 1;
1083
1084                    time::sleep(backoff.next().unwrap()).await;
1085                }
1086                Ok((consensus_positions, status_waiter)) => {
1087                    break (consensus_positions, status_waiter);
1088                }
1089            }
1090        };
1091
1092        // we want to record the num of retries when reporting latency but to avoid label
1093        // cardinality we do some simple bucketing to give us a good enough idea of how
1094        // many retries happened associated with the latency.
1095        let bucket = match retries {
1096            0..=10 => retries.to_string(), // just report the retry count as is
1097            11..=20 => "between_10_and_20".to_string(),
1098            21..=50 => "between_20_and_50".to_string(),
1099            51..=100 => "between_50_and_100".to_string(),
1100            _ => "over_100".to_string(),
1101        };
1102
1103        self.metrics
1104            .sequencing_acknowledge_latency
1105            .with_label_values(&[&bucket, tx_type])
1106            .observe(ack_start.elapsed().as_secs_f64());
1107
1108        (consensus_positions, status_waiter)
1109    }
1110
1111    /// Waits for transactions to appear either to consensus output or been executed via a checkpoint (state sync).
1112    /// Returns the processed method, whether the transactions have been processed via consensus, or have been synced via checkpoint.
1113    async fn await_consensus_or_checkpoint(
1114        self: &Arc<Self>,
1115        transaction_keys: Vec<SequencedConsensusTransactionKey>,
1116        epoch_store: &Arc<AuthorityPerEpochStore>,
1117    ) -> ProcessedMethod {
1118        let notifications = FuturesUnordered::new();
1119        for transaction_key in transaction_keys {
1120            let transaction_digests = match transaction_key {
1121                SequencedConsensusTransactionKey::External(
1122                    ConsensusTransactionKey::Certificate(digest),
1123                ) => vec![digest],
1124                _ => vec![],
1125            };
1126
1127            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1128                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
1129                | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
1130            ) = transaction_key
1131            {
1132                // If the transaction is a checkpoint signature, we can also wait to get notified when a checkpoint with equal or higher sequence
1133                // number has been already synced. This way we don't try to unnecessarily sequence the signature for an already verified checkpoint.
1134                Either::Left(
1135                    self.checkpoint_store
1136                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
1137                )
1138            } else {
1139                Either::Right(future::pending())
1140            };
1141
1142            // We wait for each transaction individually to be processed by consensus or executed in a checkpoint. We could equally just
1143            // get notified in aggregate when all transactions are processed, but with this approach can get notified in a more fine-grained way
1144            // as transactions can be marked as processed in different ways. This is mostly a concern for the soft-bundle transactions.
1145            notifications.push(async move {
1146                tokio::select! {
1147                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1148                        processed.expect("Storage error when waiting for consensus message processed");
1149                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1150                        return ProcessedMethod::Consensus;
1151                    },
1152                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1153                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
1154                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1155                    }
1156                    _ = checkpoint_synced_future => {
1157                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1158                    }
1159                }
1160                ProcessedMethod::Checkpoint
1161            });
1162        }
1163
1164        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1165        for method in processed_methods {
1166            if method == ProcessedMethod::Checkpoint {
1167                return ProcessedMethod::Checkpoint;
1168            }
1169        }
1170        ProcessedMethod::Consensus
1171    }
1172}
1173
1174impl CheckConnection for ConnectionMonitorStatus {
1175    fn check_connection(
1176        &self,
1177        ourself: &AuthorityName,
1178        authority: &AuthorityName,
1179    ) -> Option<ConnectionStatus> {
1180        if ourself == authority {
1181            return Some(ConnectionStatus::Connected);
1182        }
1183
1184        let mapping = self.authority_names_to_peer_ids.load_full();
1185        let peer_id = match mapping.get(authority) {
1186            Some(p) => p,
1187            None => {
1188                warn!(
1189                    "failed to find peer {:?} in connection monitor listener",
1190                    authority
1191                );
1192                return None;
1193            }
1194        };
1195
1196        match self.connection_statuses.try_get(peer_id) {
1197            TryResult::Present(c) => Some(c.value().clone()),
1198            TryResult::Absent => None,
1199            TryResult::Locked => {
1200                // update is in progress, assume the status is still or becoming disconnected
1201                Some(ConnectionStatus::Disconnected)
1202            }
1203        }
1204    }
1205    fn update_mapping_for_epoch(
1206        &self,
1207        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1208    ) {
1209        self.authority_names_to_peer_ids
1210            .swap(Arc::new(authority_names_to_peer_ids));
1211    }
1212}
1213
1214impl CheckConnection for ConnectionMonitorStatusForTests {
1215    fn check_connection(
1216        &self,
1217        _ourself: &AuthorityName,
1218        _authority: &AuthorityName,
1219    ) -> Option<ConnectionStatus> {
1220        Some(ConnectionStatus::Connected)
1221    }
1222    fn update_mapping_for_epoch(
1223        &self,
1224        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1225    ) {
1226    }
1227}
1228
1229pub fn get_position_in_list(
1230    search_authority: AuthorityName,
1231    positions: Vec<AuthorityName>,
1232) -> usize {
1233    positions
1234        .into_iter()
1235        .find_position(|authority| *authority == search_authority)
1236        .expect("Couldn't find ourselves in shuffled committee")
1237        .0
1238}
1239
1240impl ConsensusOverloadChecker for ConsensusAdapter {
1241    fn check_consensus_overload(&self) -> SuiResult {
1242        fp_ensure!(
1243            self.check_limits(),
1244            SuiErrorKind::TooManyTransactionsPendingConsensus.into()
1245        );
1246        Ok(())
1247    }
1248}
1249
1250pub struct NoopConsensusOverloadChecker {}
1251
1252impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
1253    fn check_consensus_overload(&self) -> SuiResult {
1254        Ok(())
1255    }
1256}
1257
1258impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1259    /// This method is called externally to begin reconfiguration
1260    /// It sets reconfig state to reject new certificates from user.
1261    /// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained.
1262    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1263        {
1264            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1265            if !reconfig_guard.should_accept_user_certs() {
1266                // Allow caller to call this method multiple times
1267                return;
1268            }
1269            epoch_store.close_user_certs(reconfig_guard);
1270        }
1271        if epoch_store.should_send_end_of_publish() {
1272            if let Err(err) = self.submit(
1273                ConsensusTransaction::new_end_of_publish(self.authority),
1274                None,
1275                epoch_store,
1276                None,
1277                None,
1278            ) {
1279                warn!("Error when sending end of publish message: {:?}", err);
1280            } else {
1281                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1282            }
1283        }
1284    }
1285}
1286
1287struct CancelOnDrop<T>(JoinHandle<T>);
1288
1289impl<T> Deref for CancelOnDrop<T> {
1290    type Target = JoinHandle<T>;
1291
1292    fn deref(&self) -> &Self::Target {
1293        &self.0
1294    }
1295}
1296
1297impl<T> Drop for CancelOnDrop<T> {
1298    fn drop(&mut self) {
1299        self.0.abort();
1300    }
1301}
1302
1303/// Tracks number of inflight consensus requests and relevant metrics
1304struct InflightDropGuard<'a> {
1305    adapter: &'a ConsensusAdapter,
1306    start: Instant,
1307    position: Option<usize>,
1308    positions_moved: Option<usize>,
1309    preceding_disconnected: Option<usize>,
1310    amplification_factor: Option<usize>,
1311    tx_type: &'static str,
1312    processed_method: ProcessedMethod,
1313}
1314
1315#[derive(PartialEq, Eq)]
1316enum ProcessedMethod {
1317    Consensus,
1318    Checkpoint,
1319}
1320
1321impl<'a> InflightDropGuard<'a> {
1322    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1323        adapter
1324            .num_inflight_transactions
1325            .fetch_add(1, Ordering::SeqCst);
1326        adapter
1327            .metrics
1328            .sequencing_certificate_inflight
1329            .with_label_values(&[tx_type])
1330            .inc();
1331        adapter
1332            .metrics
1333            .sequencing_certificate_attempt
1334            .with_label_values(&[tx_type])
1335            .inc();
1336        Self {
1337            adapter,
1338            start: Instant::now(),
1339            position: None,
1340            positions_moved: None,
1341            preceding_disconnected: None,
1342            amplification_factor: None,
1343            tx_type,
1344            processed_method: ProcessedMethod::Consensus,
1345        }
1346    }
1347}
1348
1349impl Drop for InflightDropGuard<'_> {
1350    fn drop(&mut self) {
1351        self.adapter
1352            .num_inflight_transactions
1353            .fetch_sub(1, Ordering::SeqCst);
1354        self.adapter
1355            .metrics
1356            .sequencing_certificate_inflight
1357            .with_label_values(&[self.tx_type])
1358            .dec();
1359
1360        let position = if let Some(position) = self.position {
1361            self.adapter
1362                .metrics
1363                .sequencing_certificate_authority_position
1364                .observe(position as f64);
1365            position.to_string()
1366        } else {
1367            "not_submitted".to_string()
1368        };
1369
1370        if let Some(positions_moved) = self.positions_moved {
1371            self.adapter
1372                .metrics
1373                .sequencing_certificate_positions_moved
1374                .observe(positions_moved as f64);
1375        };
1376
1377        if let Some(preceding_disconnected) = self.preceding_disconnected {
1378            self.adapter
1379                .metrics
1380                .sequencing_certificate_preceding_disconnected
1381                .observe(preceding_disconnected as f64);
1382        };
1383
1384        if let Some(amplification_factor) = self.amplification_factor {
1385            self.adapter
1386                .metrics
1387                .sequencing_certificate_amplification_factor
1388                .observe(amplification_factor as f64);
1389        };
1390
1391        let latency = self.start.elapsed();
1392        let processed_method = match self.processed_method {
1393            ProcessedMethod::Consensus => "processed_via_consensus",
1394            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1395        };
1396
1397        self.adapter
1398            .metrics
1399            .sequencing_certificate_latency
1400            .with_label_values(&[&position, self.tx_type, processed_method])
1401            .observe(latency.as_secs_f64());
1402
1403        // Only sample latency after consensus quorum is up. Otherwise, the wait for consensus
1404        // quorum at the beginning of an epoch can distort the sampled latencies.
1405        // Technically there are more system transaction types that can be included in samples
1406        // after the first consensus commit, but this set of types should be enough.
1407        if self.position == Some(0) {
1408            // Transaction types below require quorum existed in the current epoch.
1409            // TODO: refactor tx_type to enum.
1410            let sampled = matches!(
1411                self.tx_type,
1412                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1413            );
1414            // if tx has been processed by checkpoint state sync, then exclude from the latency calculations as this can introduce to misleading results.
1415            if sampled && self.processed_method == ProcessedMethod::Consensus {
1416                self.adapter.latency_observer.report(latency);
1417            }
1418        }
1419    }
1420}
1421
1422impl SubmitToConsensus for Arc<ConsensusAdapter> {
1423    fn submit_to_consensus(
1424        &self,
1425        transactions: &[ConsensusTransaction],
1426        epoch_store: &Arc<AuthorityPerEpochStore>,
1427    ) -> SuiResult {
1428        self.submit_batch(transactions, None, epoch_store, None, None)
1429            .map(|_| ())
1430    }
1431
1432    fn submit_best_effort(
1433        &self,
1434        transaction: &ConsensusTransaction,
1435        epoch_store: &Arc<AuthorityPerEpochStore>,
1436        // timeout is required, or the spawned task can run forever
1437        timeout: Duration,
1438    ) -> SuiResult {
1439        let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1440            Ok(permit) => permit,
1441            Err(_) => {
1442                return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1443            }
1444        };
1445
1446        let _in_flight_submission_guard =
1447            GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1448
1449        let key = SequencedConsensusTransactionKey::External(transaction.key());
1450        let tx_type = classify(transaction);
1451
1452        let async_stage = {
1453            let transaction = transaction.clone();
1454            let epoch_store = epoch_store.clone();
1455            let this = self.clone();
1456
1457            async move {
1458                let _permit = permit; // Hold permit for lifetime of task
1459
1460                let result = tokio::time::timeout(
1461                    timeout,
1462                    this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1463                )
1464                .await;
1465
1466                if let Err(e) = result {
1467                    warn!("Consensus submission timed out: {e:?}");
1468                    this.metrics
1469                        .sequencing_best_effort_timeout
1470                        .with_label_values(&[tx_type])
1471                        .inc();
1472                }
1473            }
1474        };
1475
1476        let epoch_store = epoch_store.clone();
1477        spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1478        Ok(())
1479    }
1480}
1481
1482pub fn position_submit_certificate(
1483    committee: &Committee,
1484    ourselves: &AuthorityName,
1485    tx_digest: &TransactionDigest,
1486) -> usize {
1487    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1488    get_position_in_list(*ourselves, validators)
1489}
1490
1491#[cfg(test)]
1492mod adapter_tests {
1493    use super::position_submit_certificate;
1494    use crate::checkpoints::CheckpointStore;
1495    use crate::consensus_adapter::{
1496        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1497    };
1498    use crate::mysticeti_adapter::LazyMysticetiClient;
1499    use fastcrypto::traits::KeyPair;
1500    use rand::Rng;
1501    use rand::{SeedableRng, rngs::StdRng};
1502    use std::sync::Arc;
1503    use std::time::Duration;
1504    use sui_types::{
1505        base_types::TransactionDigest,
1506        committee::Committee,
1507        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1508    };
1509
1510    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1511        let authorities = (0..size)
1512            .map(|_k| {
1513                (
1514                    AuthorityPublicKeyBytes::from(
1515                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1516                    ),
1517                    rng.gen_range(0u64..10u64),
1518                )
1519            })
1520            .collect::<Vec<_>>();
1521        Committee::new_for_testing_with_normalized_voting_power(
1522            0,
1523            authorities.iter().cloned().collect(),
1524        )
1525    }
1526
1527    #[tokio::test]
1528    async fn test_await_submit_delay_user_transaction() {
1529        // grab a random committee and a random stake distribution
1530        let mut rng = StdRng::from_seed([0; 32]);
1531        let committee = test_committee(&mut rng, 10);
1532
1533        // When we define max submit position and delay step
1534        let consensus_adapter = ConsensusAdapter::new(
1535            Arc::new(LazyMysticetiClient::new()),
1536            CheckpointStore::new_for_tests(),
1537            *committee.authority_by_index(0).unwrap(),
1538            Arc::new(ConnectionMonitorStatusForTests {}),
1539            100_000,
1540            100_000,
1541            Some(1),
1542            Some(Duration::from_secs(2)),
1543            ConsensusAdapterMetrics::new_test(),
1544            sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1545        );
1546
1547        // transaction to submit
1548        let tx_digest = TransactionDigest::generate(&mut rng);
1549
1550        // Ensure that the original position is higher
1551        let (position, positions_moved, _) =
1552            consensus_adapter.submission_position(&committee, &tx_digest);
1553        assert_eq!(position, 7);
1554        assert!(!positions_moved > 0);
1555
1556        // Make sure that position is set to max value 0
1557        let (delay_step, position, positions_moved, _) =
1558            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1559
1560        assert_eq!(position, 1);
1561        assert_eq!(delay_step, Duration::from_secs(2));
1562        assert!(!positions_moved > 0);
1563
1564        // Without submit position and delay step
1565        let consensus_adapter = ConsensusAdapter::new(
1566            Arc::new(LazyMysticetiClient::new()),
1567            CheckpointStore::new_for_tests(),
1568            *committee.authority_by_index(0).unwrap(),
1569            Arc::new(ConnectionMonitorStatusForTests {}),
1570            100_000,
1571            100_000,
1572            None,
1573            None,
1574            ConsensusAdapterMetrics::new_test(),
1575            sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1576        );
1577
1578        let (delay_step, position, positions_moved, _) =
1579            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1580
1581        assert_eq!(position, 7);
1582
1583        // delay_step * position * 2 = 1 * 7 * 2 = 14
1584        assert_eq!(delay_step, Duration::from_secs(14));
1585        assert!(!positions_moved > 0);
1586
1587        // With an amplification factor of 7, the position should be moved to 1.
1588        let (delay_step, position, _, _) =
1589            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 7);
1590        assert_eq!(position, 1);
1591        assert_eq!(delay_step, Duration::from_secs(2));
1592
1593        // With an amplification factor > 7, the position should become 0.
1594        let (delay_step, position, _, _) =
1595            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 8);
1596        assert_eq!(position, 0);
1597        assert_eq!(delay_step, Duration::ZERO);
1598    }
1599
1600    #[test]
1601    fn test_position_submit_certificate() {
1602        // grab a random committee and a random stake distribution
1603        let mut rng = StdRng::from_seed([0; 32]);
1604        let committee = test_committee(&mut rng, 10);
1605
1606        // generate random transaction digests, and account for validator selection
1607        const NUM_TEST_TRANSACTIONS: usize = 1000;
1608
1609        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1610            let tx_digest = TransactionDigest::generate(&mut rng);
1611
1612            let mut zero_found = false;
1613            for (name, _) in committee.members() {
1614                let f = position_submit_certificate(&committee, name, &tx_digest);
1615                assert!(f < committee.num_members());
1616                if f == 0 {
1617                    // One and only one validator gets position 0
1618                    assert!(!zero_found);
1619                    zero_found = true;
1620                }
1621            }
1622            assert!(zero_found);
1623        }
1624    }
1625}