sui_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::IpAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::time::Instant;
12
13use arc_swap::{ArcSwap, ArcSwapOption};
14use consensus_core::{BlockStatus, ConnectionStatus};
15use dashmap::DashMap;
16use dashmap::try_result::TryResult;
17use futures::FutureExt;
18use futures::future::{self, Either, select};
19use futures::stream::FuturesUnordered;
20use futures::{StreamExt, pin_mut};
21use itertools::Itertools;
22use mysten_common::debug_fatal;
23use mysten_metrics::{
24    GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
25};
26use parking_lot::RwLockReadGuard;
27use prometheus::Histogram;
28use prometheus::HistogramVec;
29use prometheus::IntCounterVec;
30use prometheus::IntGauge;
31use prometheus::IntGaugeVec;
32use prometheus::Registry;
33use prometheus::{
34    register_histogram_vec_with_registry, register_histogram_with_registry,
35    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36    register_int_gauge_with_registry,
37};
38use sui_protocol_config::ProtocolConfig;
39use sui_simulator::anemo::PeerId;
40use sui_types::base_types::AuthorityName;
41use sui_types::base_types::TransactionDigest;
42use sui_types::committee::Committee;
43use sui_types::error::{SuiErrorKind, SuiResult};
44use sui_types::fp_ensure;
45use sui_types::messages_consensus::ConsensusPosition;
46use sui_types::messages_consensus::ConsensusTransactionKind;
47use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
48use sui_types::transaction::TransactionDataAPI;
49use tokio::sync::{Semaphore, SemaphorePermit, oneshot};
50use tokio::task::JoinHandle;
51use tokio::time::Duration;
52use tokio::time::{self};
53use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
54
55use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
56use crate::checkpoints::CheckpointStore;
57use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
58use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
59use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
60use crate::metrics::LatencyObserver;
61
62#[cfg(test)]
63#[path = "unit_tests/consensus_tests.rs"]
64pub mod consensus_tests;
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71    // Certificate sequencing metrics
72    pub sequencing_certificate_attempt: IntCounterVec,
73    pub sequencing_certificate_success: IntCounterVec,
74    pub sequencing_certificate_failures: IntCounterVec,
75    pub sequencing_certificate_status: IntCounterVec,
76    pub sequencing_certificate_inflight: IntGaugeVec,
77    pub sequencing_acknowledge_latency: HistogramVec,
78    pub sequencing_certificate_latency: HistogramVec,
79    pub sequencing_certificate_authority_position: Histogram,
80    pub sequencing_certificate_positions_moved: Histogram,
81    pub sequencing_certificate_preceding_disconnected: Histogram,
82    pub sequencing_certificate_processed: IntCounterVec,
83    pub sequencing_certificate_amplification_factor: Histogram,
84    pub sequencing_in_flight_semaphore_wait: IntGauge,
85    pub sequencing_in_flight_submissions: IntGauge,
86    pub sequencing_estimated_latency: IntGauge,
87    pub sequencing_resubmission_interval_ms: IntGauge,
88    pub sequencing_best_effort_timeout: IntCounterVec,
89}
90
91impl ConsensusAdapterMetrics {
92    pub fn new(registry: &Registry) -> Self {
93        Self {
94            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
95                "sequencing_certificate_attempt",
96                "Counts the number of certificates the validator attempts to sequence.",
97                &["tx_type"],
98                registry,
99            )
100                .unwrap(),
101            sequencing_certificate_success: register_int_counter_vec_with_registry!(
102                "sequencing_certificate_success",
103                "Counts the number of successfully sequenced certificates.",
104                &["tx_type"],
105                registry,
106            )
107                .unwrap(),
108            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
109                "sequencing_certificate_failures",
110                "Counts the number of sequenced certificates that failed other than by timeout.",
111                &["tx_type"],
112                registry,
113            )
114                .unwrap(),
115                sequencing_certificate_status: register_int_counter_vec_with_registry!(
116                "sequencing_certificate_status",
117                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
118                &["tx_type", "status"],
119                registry,
120            )
121                .unwrap(),
122            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
123                "sequencing_certificate_inflight",
124                "The inflight requests to sequence certificates.",
125                &["tx_type"],
126                registry,
127            )
128                .unwrap(),
129            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
130                "sequencing_acknowledge_latency",
131                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
132                &["retry", "tx_type"],
133                LATENCY_SEC_BUCKETS.to_vec(),
134                registry,
135            ).unwrap(),
136            sequencing_certificate_latency: register_histogram_vec_with_registry!(
137                "sequencing_certificate_latency",
138                "The latency for sequencing a certificate.",
139                &["position", "tx_type", "processed_method"],
140                LATENCY_SEC_BUCKETS.to_vec(),
141                registry,
142            ).unwrap(),
143            sequencing_certificate_authority_position: register_histogram_with_registry!(
144                "sequencing_certificate_authority_position",
145                "The position of the authority when submitted a certificate to consensus.",
146                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
147                registry,
148            ).unwrap(),
149            sequencing_certificate_positions_moved: register_histogram_with_registry!(
150                "sequencing_certificate_positions_moved",
151                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
152                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
153                registry,
154            ).unwrap(),
155            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
156                "sequencing_certificate_preceding_disconnected",
157                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
158                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
159                registry,
160            ).unwrap(),
161            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162                "sequencing_certificate_processed",
163                "The number of certificates that have been processed either by consensus or checkpoint.",
164                &["source"],
165                registry
166            ).unwrap(),
167            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
168                "sequencing_in_flight_semaphore_wait",
169                "How many requests are blocked on submit_permit.",
170                registry,
171            )
172                .unwrap(),
173            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
174                "sequencing_in_flight_submissions",
175                "Number of transactions submitted to local consensus instance and not yet sequenced",
176                registry,
177            )
178                .unwrap(),
179            sequencing_estimated_latency: register_int_gauge_with_registry!(
180                "sequencing_estimated_latency",
181                "Consensus latency estimated by consensus adapter in milliseconds",
182                registry,
183            )
184                .unwrap(),
185            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
186                "sequencing_resubmission_interval_ms",
187                "Resubmission interval used by consensus adapter in milliseconds",
188                registry,
189            )
190                .unwrap(),
191                sequencing_certificate_amplification_factor: register_histogram_with_registry!(
192                    "sequencing_certificate_amplification_factor",
193                    "The amplification factor used by consensus adapter to submit to consensus.",
194                    SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
195                    registry,
196                ).unwrap(),
197            sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
198                "sequencing_best_effort_timeout",
199                "The number of times the best effort submission has timed out.",
200                &["tx_type"],
201                registry,
202            ).unwrap(),
203        }
204    }
205
206    pub fn new_test() -> Self {
207        Self::new(&Registry::default())
208    }
209}
210
211/// An object that can be used to check if the consensus is overloaded.
212pub trait ConsensusOverloadChecker: Sync + Send + 'static {
213    fn check_consensus_overload(&self) -> SuiResult;
214}
215
216pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
217
218#[mockall::automock]
219pub trait SubmitToConsensus: Sync + Send + 'static {
220    fn submit_to_consensus(
221        &self,
222        transactions: &[ConsensusTransaction],
223        epoch_store: &Arc<AuthorityPerEpochStore>,
224    ) -> SuiResult;
225
226    fn submit_best_effort(
227        &self,
228        transaction: &ConsensusTransaction,
229        epoch_store: &Arc<AuthorityPerEpochStore>,
230        timeout: Duration,
231    ) -> SuiResult;
232}
233
234#[mockall::automock]
235#[async_trait::async_trait]
236pub trait ConsensusClient: Sync + Send + 'static {
237    async fn submit(
238        &self,
239        transactions: &[ConsensusTransaction],
240        epoch_store: &Arc<AuthorityPerEpochStore>,
241    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
242}
243
244/// Submit Sui certificates to the consensus.
245pub struct ConsensusAdapter {
246    /// The network client connecting to the consensus node of this authority.
247    consensus_client: Arc<dyn ConsensusClient>,
248    /// The checkpoint store for the validator
249    checkpoint_store: Arc<CheckpointStore>,
250    /// Authority pubkey.
251    authority: AuthorityName,
252    /// The limit to number of inflight transactions at this node.
253    max_pending_transactions: usize,
254    /// Number of submitted transactions still inflight at this node.
255    num_inflight_transactions: AtomicU64,
256    /// Dictates the maximum position  from which will submit to consensus. Even if the is elected to
257    /// submit from a higher position than this, it will "reset" to the max_submit_position.
258    max_submit_position: Option<usize>,
259    /// When provided it will override the current back off logic and will use this value instead
260    /// as delay step.
261    submit_delay_step_override: Option<Duration>,
262    /// A structure to check the connection statuses populated by the Connection Monitor Listener
263    connection_monitor_status: Arc<dyn CheckConnection>,
264    /// A structure to check the reputation scores populated by Consensus
265    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
266    /// The throughput profiler to be used when making decisions to submit to consensus
267    consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
268    /// A structure to register metrics
269    metrics: ConsensusAdapterMetrics,
270    /// Semaphore limiting parallel submissions to consensus
271    submit_semaphore: Arc<Semaphore>,
272    latency_observer: LatencyObserver,
273    protocol_config: ProtocolConfig,
274}
275
276pub trait CheckConnection: Send + Sync {
277    fn check_connection(
278        &self,
279        ourself: &AuthorityName,
280        authority: &AuthorityName,
281    ) -> Option<ConnectionStatus>;
282    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
283}
284
285pub struct ConnectionMonitorStatus {
286    /// Current connection statuses forwarded from the connection monitor
287    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
288    /// A map from authority name to peer id
289    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
290}
291
292pub struct ConnectionMonitorStatusForTests {}
293
294impl ConsensusAdapter {
295    /// Make a new Consensus adapter instance.
296    pub fn new(
297        consensus_client: Arc<dyn ConsensusClient>,
298        checkpoint_store: Arc<CheckpointStore>,
299        authority: AuthorityName,
300        connection_monitor_status: Arc<dyn CheckConnection>,
301        max_pending_transactions: usize,
302        max_pending_local_submissions: usize,
303        max_submit_position: Option<usize>,
304        submit_delay_step_override: Option<Duration>,
305        metrics: ConsensusAdapterMetrics,
306        protocol_config: ProtocolConfig,
307    ) -> Self {
308        let num_inflight_transactions = Default::default();
309        let low_scoring_authorities =
310            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
311        Self {
312            consensus_client,
313            checkpoint_store,
314            authority,
315            max_pending_transactions,
316            max_submit_position,
317            submit_delay_step_override,
318            num_inflight_transactions,
319            connection_monitor_status,
320            low_scoring_authorities,
321            metrics,
322            submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
323            latency_observer: LatencyObserver::new(),
324            consensus_throughput_profiler: ArcSwapOption::empty(),
325            protocol_config,
326        }
327    }
328
329    pub fn swap_low_scoring_authorities(
330        &self,
331        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
332    ) {
333        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
334    }
335
336    pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
337        self.consensus_throughput_profiler.store(Some(profiler))
338    }
339
340    /// Get the current number of in-flight transactions
341    pub fn num_inflight_transactions(&self) -> u64 {
342        self.num_inflight_transactions.load(Ordering::Relaxed)
343    }
344
345    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
346        // Transactions being sent to consensus can be dropped on crash, before included in a proposed block.
347        // System transactions do not have clients to retry them. They need to be resubmitted to consensus on restart.
348        // get_all_pending_consensus_transactions() can return both system and certified transactions though.
349        //
350        // todo - get_all_pending_consensus_transactions is called twice when
351        // initializing AuthorityPerEpochStore and here, should not be a big deal but can be optimized
352        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
353
354        #[allow(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
355        if epoch_store.should_send_end_of_publish() {
356            if !recovered
357                .iter()
358                .any(ConsensusTransaction::is_end_of_publish)
359            {
360                // There are two cases when this is needed
361                // (1) We send EndOfPublish message after removing pending certificates in submit_and_wait_inner
362                // It is possible that node will crash between those two steps, in which case we might need to
363                // re-introduce EndOfPublish message on restart
364                // (2) If node crashed inside ConsensusAdapter::close_epoch,
365                // after reconfig lock state was written to DB and before we persisted EndOfPublish message
366                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
367            }
368        }
369        debug!(
370            "Submitting {:?} recovered pending consensus transactions to consensus",
371            recovered.len()
372        );
373        for transaction in recovered {
374            if transaction.is_end_of_publish() {
375                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
376            }
377            self.submit_unchecked(&[transaction], epoch_store, None, None);
378        }
379    }
380
381    fn await_submit_delay(
382        &self,
383        epoch_store: &Arc<AuthorityPerEpochStore>,
384        transactions: &[ConsensusTransaction],
385    ) -> (impl Future<Output = ()>, usize, usize, usize, usize) {
386        if transactions.iter().any(|tx| tx.is_mfp_transaction()) {
387            // UserTransactions are generally sent to just one validator and should
388            // be submitted to consensus without delay.
389            return (tokio::time::sleep(Duration::ZERO), 0, 0, 0, 0);
390        }
391
392        // Use the minimum digest to compute submit delay.
393        let min_digest_and_gas_price = transactions
394            .iter()
395            .filter_map(|tx| match &tx.kind {
396                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
397                    Some((certificate.digest(), certificate.gas_price()))
398                }
399                ConsensusTransactionKind::UserTransaction(transaction) => Some((
400                    transaction.digest(),
401                    transaction.data().transaction_data().gas_price(),
402                )),
403                _ => None,
404            })
405            .min();
406        let mut amplification_factor = 0;
407
408        let (duration, position, positions_moved, preceding_disconnected) =
409            match min_digest_and_gas_price {
410                Some((digest, gas_price)) => {
411                    let k = epoch_store
412                        .protocol_config()
413                        .sip_45_consensus_amplification_threshold_as_option()
414                        .unwrap_or(u64::MAX);
415                    let multiplier =
416                        gas_price / std::cmp::max(epoch_store.reference_gas_price(), 1);
417                    amplification_factor = if multiplier >= k { multiplier } else { 0 };
418                    self.await_submit_delay_user_transaction(
419                        epoch_store.committee(),
420                        digest,
421                        amplification_factor as usize,
422                    )
423                }
424                _ => (Duration::ZERO, 0, 0, 0),
425            };
426        (
427            tokio::time::sleep(duration),
428            position,
429            positions_moved,
430            preceding_disconnected,
431            amplification_factor as usize,
432        )
433    }
434
435    fn await_submit_delay_user_transaction(
436        &self,
437        committee: &Committee,
438        tx_digest: &TransactionDigest,
439        amplification_factor: usize,
440    ) -> (Duration, usize, usize, usize) {
441        let (mut position, positions_moved, preceding_disconnected) =
442            self.submission_position(committee, tx_digest);
443        if amplification_factor > 0 {
444            position = (position + 1).saturating_sub(amplification_factor);
445        }
446
447        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
448        const MIN_LATENCY: Duration = Duration::from_millis(150);
449        const MAX_LATENCY: Duration = Duration::from_secs(3);
450
451        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
452        self.metrics
453            .sequencing_estimated_latency
454            .set(latency.as_millis() as i64);
455
456        let latency = std::cmp::max(latency, MIN_LATENCY);
457        let latency = std::cmp::min(latency, MAX_LATENCY);
458        let latency = latency * 2;
459        let latency = self.override_by_throughput_profiler(position, latency);
460        let (delay_step, position) =
461            self.override_by_max_submit_position_settings(latency, position);
462
463        self.metrics
464            .sequencing_resubmission_interval_ms
465            .set(delay_step.as_millis() as i64);
466
467        (
468            delay_step * position as u32,
469            position,
470            positions_moved,
471            preceding_disconnected,
472        )
473    }
474
475    // According to the throughput profile we want to either allow some transaction duplication or not)
476    // When throughput profile is Low and the validator is in position = 1, then it will submit to consensus with much lower latency.
477    // When throughput profile is High then we go back to default operation and no-one co-submits.
478    fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
479        const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
480        const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
481        const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;
482
483        let p = self.consensus_throughput_profiler.load();
484
485        if let Some(profiler) = p.as_ref() {
486            let (level, _) = profiler.throughput_level();
487
488            // we only run this for the position = 1 validator to co-submit with the validator of
489            // position = 0. We also enable this only when the feature is enabled on the protocol config.
490            if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
491                return match level {
492                    Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
493                    Level::Medium => {
494                        Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
495                    }
496                    Level::High => {
497                        let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);
498
499                        // back off according to recorded latency if it's significantly higher
500                        if latency >= 2 * l { latency } else { l }
501                    }
502                };
503            }
504        }
505        latency
506    }
507
508    /// Overrides the latency and the position if there are defined settings for `max_submit_position` and
509    /// `submit_delay_step_override`. If the `max_submit_position` has defined, then that will always be used
510    /// irrespective of any so far decision. Same for the `submit_delay_step_override`.
511    fn override_by_max_submit_position_settings(
512        &self,
513        latency: Duration,
514        mut position: usize,
515    ) -> (Duration, usize) {
516        // Respect any manual override for position and latency from the settings
517        if let Some(max_submit_position) = self.max_submit_position {
518            position = std::cmp::min(position, max_submit_position);
519        }
520
521        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
522        (delay_step, position)
523    }
524
525    /// Check when this authority should submit the certificate to consensus.
526    /// This sorts all authorities based on pseudo-random distribution derived from transaction hash.
527    ///
528    /// The function targets having 1 consensus transaction submitted per user transaction
529    /// when system operates normally.
530    ///
531    /// The function returns the position of this authority when it is their turn to submit the transaction to consensus.
532    fn submission_position(
533        &self,
534        committee: &Committee,
535        tx_digest: &TransactionDigest,
536    ) -> (usize, usize, usize) {
537        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
538
539        self.check_submission_wrt_connectivity_and_scores(positions)
540    }
541
542    /// This function runs the following algorithm to decide whether or not to submit a transaction
543    /// to consensus.
544    ///
545    /// It takes in a deterministic list that represents positions of all the authorities.
546    /// The authority in the first position will be responsible for submitting to consensus, and
547    /// so we check if we are this validator, and if so, return true.
548    ///
549    /// If we are not in that position, we check our connectivity to the authority in that position.
550    /// If we are connected to them, we can assume that they are operational and will submit the transaction.
551    /// If we are not connected to them, we assume that they are not operational and we will not rely
552    /// on that authority to submit the transaction. So we shift them out of the first position, and
553    /// run this algorithm again on the new set of positions.
554    ///
555    /// This can possibly result in a transaction being submitted twice if an authority sees a false
556    /// negative in connectivity to another, such as in the case of a network partition.
557    ///
558    /// Recursively, if the authority further ahead of us in the positions is a low performing authority, we will
559    /// move our positions up one, and submit the transaction. This allows maintaining performance
560    /// overall. We will only do this part for authorities that are not low performers themselves to
561    /// prevent extra amplification in the case that the positions look like [low_scoring_a1, low_scoring_a2, a3]
562    fn check_submission_wrt_connectivity_and_scores(
563        &self,
564        positions: Vec<AuthorityName>,
565    ) -> (usize, usize, usize) {
566        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
567        if low_scoring_authorities.get(&self.authority).is_some() {
568            return (positions.len(), 0, 0);
569        }
570        let initial_position = get_position_in_list(self.authority, positions.clone());
571        let mut preceding_disconnected = 0;
572        let mut before_our_position = true;
573
574        let filtered_positions: Vec<_> = positions
575            .into_iter()
576            .filter(|authority| {
577                let keep = self.authority == *authority; // don't filter ourself out
578                if keep {
579                    before_our_position = false;
580                }
581
582                // filter out any nodes that appear disconnected
583                let connected = self
584                    .connection_monitor_status
585                    .check_connection(&self.authority, authority)
586                    .unwrap_or(ConnectionStatus::Disconnected)
587                    == ConnectionStatus::Connected;
588                if !connected && before_our_position {
589                    preceding_disconnected += 1; // used for metrics
590                }
591
592                // Filter out low scoring nodes
593                let high_scoring = low_scoring_authorities.get(authority).is_none();
594
595                keep || (connected && high_scoring)
596            })
597            .collect();
598
599        let position = get_position_in_list(self.authority, filtered_positions);
600
601        (
602            position,
603            initial_position - position,
604            preceding_disconnected,
605        )
606    }
607
608    /// This method blocks until transaction is persisted in local database
609    /// It then returns handle to async task, user can join this handle to await while transaction is processed by consensus
610    ///
611    /// This method guarantees that once submit(but not returned async handle) returns,
612    /// transaction is persisted and will eventually be sent to consensus even after restart
613    ///
614    /// When submitting a certificate caller **must** provide a ReconfigState lock guard
615    pub fn submit(
616        self: &Arc<Self>,
617        transaction: ConsensusTransaction,
618        lock: Option<&RwLockReadGuard<ReconfigState>>,
619        epoch_store: &Arc<AuthorityPerEpochStore>,
620        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
621        submitter_client_addr: Option<IpAddr>,
622    ) -> SuiResult<JoinHandle<()>> {
623        self.submit_batch(
624            &[transaction],
625            lock,
626            epoch_store,
627            tx_consensus_position,
628            submitter_client_addr,
629        )
630    }
631
632    // Submits the provided transactions to consensus in a batched fashion. The `transactions` vector can be also empty in case of a ping check.
633    // In this case the system will simulate a transaction submission to consensus and return the consensus position.
634    pub fn submit_batch(
635        self: &Arc<Self>,
636        transactions: &[ConsensusTransaction],
637        lock: Option<&RwLockReadGuard<ReconfigState>>,
638        epoch_store: &Arc<AuthorityPerEpochStore>,
639        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
640        submitter_client_addr: Option<IpAddr>,
641    ) -> SuiResult<JoinHandle<()>> {
642        if transactions.len() > 1 {
643            // When batching multiple transactions, ensure they are all of the same kind
644            // (either all CertifiedTransaction or all UserTransaction).
645            // This makes classifying the transactions easier in later steps.
646            let first_kind = &transactions[0].kind;
647            let is_user_tx_batch =
648                matches!(first_kind, ConsensusTransactionKind::UserTransaction(_));
649            let is_cert_batch = matches!(
650                first_kind,
651                ConsensusTransactionKind::CertifiedTransaction(_)
652            );
653
654            for transaction in &transactions[1..] {
655                if is_user_tx_batch {
656                    fp_ensure!(
657                        matches!(
658                            transaction.kind,
659                            ConsensusTransactionKind::UserTransaction(_)
660                        ),
661                        SuiErrorKind::InvalidTxKindInSoftBundle.into()
662                    );
663                } else if is_cert_batch {
664                    fp_ensure!(
665                        matches!(
666                            transaction.kind,
667                            ConsensusTransactionKind::CertifiedTransaction(_)
668                        ),
669                        SuiErrorKind::InvalidTxKindInSoftBundle.into()
670                    );
671                } else {
672                    // Other transaction kinds cannot be batched
673                    return Err(SuiErrorKind::InvalidTxKindInSoftBundle.into());
674                }
675            }
676        }
677
678        if !transactions.is_empty() {
679            epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
680        }
681
682        Ok(self.submit_unchecked(
683            transactions,
684            epoch_store,
685            tx_consensus_position,
686            submitter_client_addr,
687        ))
688    }
689
690    /// Performs weakly consistent checks on internal buffers to quickly
691    /// discard transactions if we are overloaded
692    fn check_limits(&self) -> bool {
693        // First check total transactions (waiting and in submission)
694        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
695            > self.max_pending_transactions
696        {
697            return false;
698        }
699        // Then check if submit_semaphore has permits
700        self.submit_semaphore.available_permits() > 0
701    }
702
703    fn submit_unchecked(
704        self: &Arc<Self>,
705        transactions: &[ConsensusTransaction],
706        epoch_store: &Arc<AuthorityPerEpochStore>,
707        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
708        submitter_client_addr: Option<IpAddr>,
709    ) -> JoinHandle<()> {
710        // Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
711        let async_stage = self
712            .clone()
713            .submit_and_wait(
714                transactions.to_vec(),
715                epoch_store.clone(),
716                tx_consensus_position,
717                submitter_client_addr,
718            )
719            .in_current_span();
720        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
721        // (Limit is not applied atomically, and only to user transactions.)
722        let join_handle = spawn_monitored_task!(async_stage);
723        join_handle
724    }
725
726    async fn submit_and_wait(
727        self: Arc<Self>,
728        transactions: Vec<ConsensusTransaction>,
729        epoch_store: Arc<AuthorityPerEpochStore>,
730        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
731        submitter_client_addr: Option<IpAddr>,
732    ) {
733        // When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
734        //
735        // This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
736        // which may never happen on epoch boundary.
737        //
738        // In addition to that, within_alive_epoch ensures that all pending consensus
739        // adapter tasks are stopped before reconfiguration can proceed.
740        //
741        // This is essential because narwhal workers reuse same ports when narwhal restarts,
742        // this means we might be sending transactions from previous epochs to narwhal of
743        // new epoch if we have not had this barrier.
744        epoch_store
745            .within_alive_epoch(self.submit_and_wait_inner(
746                transactions,
747                &epoch_store,
748                tx_consensus_position,
749                submitter_client_addr,
750            ))
751            .await
752            .ok(); // result here indicates if epoch ended earlier, we don't care about it
753    }
754
755    #[allow(clippy::option_map_unit_fn)]
756    #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
757    async fn submit_and_wait_inner(
758        self: Arc<Self>,
759        transactions: Vec<ConsensusTransaction>,
760        epoch_store: &Arc<AuthorityPerEpochStore>,
761        mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
762        submitter_client_addr: Option<IpAddr>,
763    ) {
764        if transactions.is_empty() {
765            // If transactions are empty, then we attempt to ping consensus and simulate a transaction submission to consensus.
766            // We intentionally do not wait for the block status, as we are only interested in the consensus position and return it immediately.
767            debug!(
768                "Performing a ping check, pinging consensus to get a consensus position in next block"
769            );
770            let (consensus_positions, _status_waiter) = self
771                .submit_inner(&transactions, epoch_store, &[], "ping", false)
772                .await;
773
774            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
775                let _ = tx_consensus_positions.send(consensus_positions);
776            } else {
777                debug_fatal!("Ping check must have a consensus position channel");
778            }
779            return;
780        }
781
782        // Record submitted transactions early for DoS protection
783        if epoch_store.protocol_config().mysticeti_fastpath() {
784            for transaction in &transactions {
785                if let ConsensusTransactionKind::UserTransaction(tx) = &transaction.kind {
786                    let amplification_factor = (tx.data().transaction_data().gas_price()
787                        / epoch_store.reference_gas_price().max(1))
788                    .max(1);
789                    epoch_store.submitted_transaction_cache.record_submitted_tx(
790                        tx.digest(),
791                        amplification_factor as u32,
792                        submitter_client_addr,
793                    );
794                }
795            }
796        }
797
798        // If tx_consensus_positions channel is provided, the caller is looking for a
799        // consensus position for mfp. Therefore we will skip shortcutting submission
800        // if txes have already been processed.
801        let skip_processed_checks = tx_consensus_positions.is_some();
802
803        // Current code path ensures:
804        // - If transactions.len() > 1, it is a soft bundle. System transactions should have been submitted individually.
805        // - If is_soft_bundle, then all transactions are of CertifiedTransaction or UserTransaction kind.
806        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and transactions[0] can be of any kind.
807        let is_soft_bundle = transactions.len() > 1;
808
809        let mut transaction_keys = Vec::new();
810        let mut tx_consensus_positions = tx_consensus_positions;
811
812        for transaction in &transactions {
813            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
814                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
815                epoch_store.record_epoch_pending_certs_process_time_metric();
816            }
817
818            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
819            transaction_keys.push(transaction_key);
820        }
821        let tx_type = if is_soft_bundle {
822            "soft_bundle"
823        } else {
824            classify(&transactions[0])
825        };
826        tracing::Span::current().record("tx_type", tx_type);
827        tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
828
829        let mut guard = InflightDropGuard::acquire(&self, tx_type);
830
831        // Create the waiter until the node's turn comes to submit to consensus
832        let (await_submit, position, positions_moved, preceding_disconnected, amplification_factor) =
833            self.await_submit_delay(epoch_store, &transactions[..]);
834
835        let processed_via_consensus_or_checkpoint = if skip_processed_checks {
836            // If we need to get consensus position, don't bypass consensus submission
837            // for tx digest returned from consensus/checkpoint processing
838            future::pending().boxed()
839        } else {
840            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store)
841                .boxed()
842        };
843        pin_mut!(processed_via_consensus_or_checkpoint);
844
845        let processed_waiter = tokio::select! {
846            // We need to wait for some delay until we submit transaction to the consensus
847            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
848
849            // If epoch ends, don't wait for submit delay
850            _ = epoch_store.user_certs_closed_notify() => {
851                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
852                Some(processed_via_consensus_or_checkpoint)
853            }
854
855            // If transaction is received by consensus or checkpoint while we wait, we are done.
856            _ = &mut processed_via_consensus_or_checkpoint => {
857                None
858            }
859        };
860
861        // Log warnings for administrative transactions that fail to get sequenced
862        let _monitor = if matches!(
863            transactions[0].kind,
864            ConsensusTransactionKind::EndOfPublish(_)
865                | ConsensusTransactionKind::CapabilityNotification(_)
866                | ConsensusTransactionKind::CapabilityNotificationV2(_)
867                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
868                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
869        ) {
870            assert!(
871                !is_soft_bundle,
872                "System transactions should have been submitted individually"
873            );
874            let transaction_keys = transaction_keys.clone();
875            Some(CancelOnDrop(spawn_monitored_task!(async {
876                let mut i = 0u64;
877                loop {
878                    i += 1;
879                    const WARN_DELAY_S: u64 = 30;
880                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
881                    let total_wait = i * WARN_DELAY_S;
882                    warn!(
883                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
884                        total_wait, transaction_keys
885                    );
886                }
887            })))
888        } else {
889            None
890        };
891
892        if let Some(processed_waiter) = processed_waiter {
893            debug!("Submitting {:?} to consensus", transaction_keys);
894
895            // populate the position only when this authority submits the transaction
896            // to consensus
897            guard.position = Some(position);
898            guard.positions_moved = Some(positions_moved);
899            guard.preceding_disconnected = Some(preceding_disconnected);
900            guard.amplification_factor = Some(amplification_factor);
901
902            let _permit: SemaphorePermit = self
903                .submit_semaphore
904                .acquire()
905                .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
906                .await
907                .expect("Consensus adapter does not close semaphore");
908            let _in_flight_submission_guard =
909                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
910
911            // We enter this branch when in select above await_submit completed and processed_waiter is pending
912            // This means it is time for us to submit transaction to consensus
913            let submit_inner = async {
914                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
915
916                loop {
917                    // Submit the transaction to consensus and return the submit result with a status waiter
918                    let (consensus_positions, status_waiter) = self
919                        .submit_inner(
920                            &transactions,
921                            epoch_store,
922                            &transaction_keys,
923                            tx_type,
924                            is_soft_bundle,
925                        )
926                        .await;
927
928                    if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
929                        tracing::Span::current().record(
930                            "consensus_positions",
931                            tracing::field::debug(&consensus_positions),
932                        );
933                        // We send the first consensus position returned by consensus
934                        // to the submitting client even if it is retried internally within
935                        // consensus adapter due to an error or GC. They can handle retries
936                        // as needed if the consensus position does not return the desired
937                        // results (e.g. not sequenced due to garbage collection).
938                        let _ = tx_consensus_positions.send(consensus_positions);
939                    }
940
941                    match status_waiter.await {
942                        Ok(status @ BlockStatus::Sequenced(_)) => {
943                            tracing::Span::current()
944                                .record("status", tracing::field::debug(&status));
945                            self.metrics
946                                .sequencing_certificate_status
947                                .with_label_values(&[tx_type, "sequenced"])
948                                .inc();
949                            // Block has been sequenced. Nothing more to do, we do have guarantees that the transaction will appear in consensus output.
950                            trace!(
951                                "Transaction {transaction_keys:?} has been sequenced by consensus."
952                            );
953                            break;
954                        }
955                        Ok(status @ BlockStatus::GarbageCollected(_)) => {
956                            tracing::Span::current()
957                                .record("status", tracing::field::debug(&status));
958                            self.metrics
959                                .sequencing_certificate_status
960                                .with_label_values(&[tx_type, "garbage_collected"])
961                                .inc();
962                            // Block has been garbage collected and we have no guarantees that the transaction will appear in consensus output. We'll
963                            // resubmit the transaction to consensus. If the transaction has been already "processed", then probably someone else has submitted
964                            // the transaction and managed to get sequenced. Then this future will have been cancelled anyways so no need to check here on the processed output.
965                            debug!(
966                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
967                            );
968                            time::sleep(RETRY_DELAY_STEP).await;
969                            continue;
970                        }
971                        Err(err) => {
972                            warn!(
973                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
974                                err
975                            );
976                            time::sleep(RETRY_DELAY_STEP).await;
977                            continue;
978                        }
979                    }
980                }
981            };
982
983            guard.processed_method = if skip_processed_checks {
984                // When getting consensus positions, we only care about submit_inner completing
985                submit_inner.await;
986                ProcessedMethod::Consensus
987            } else {
988                match select(processed_waiter, submit_inner.boxed()).await {
989                    Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
990                    Either::Right(((), processed_waiter)) => {
991                        debug!("Submitted {transaction_keys:?} to consensus");
992                        processed_waiter.await
993                    }
994                }
995            };
996        }
997        debug!("{transaction_keys:?} processed by consensus");
998
999        let consensus_keys: Vec<_> = transactions
1000            .iter()
1001            .filter_map(|t| {
1002                if t.is_mfp_transaction() {
1003                    // UserTransaction is not inserted into the pending consensus transactions table.
1004                    // Also UserTransaction shares the same key as CertifiedTransaction, so removing
1005                    // the key here can have unexpected effects.
1006                    None
1007                } else {
1008                    Some(t.key())
1009                }
1010            })
1011            .collect();
1012        epoch_store
1013            .remove_pending_consensus_transactions(&consensus_keys)
1014            .expect("Storage error when removing consensus transaction");
1015
1016        let is_user_tx = is_soft_bundle
1017            || matches!(
1018                transactions[0].kind,
1019                ConsensusTransactionKind::CertifiedTransaction(_)
1020            )
1021            || matches!(
1022                transactions[0].kind,
1023                ConsensusTransactionKind::UserTransaction(_)
1024            );
1025        if is_user_tx && epoch_store.should_send_end_of_publish() {
1026            // sending message outside of any locks scope
1027            if let Err(err) = self.submit(
1028                ConsensusTransaction::new_end_of_publish(self.authority),
1029                None,
1030                epoch_store,
1031                None,
1032                None,
1033            ) {
1034                warn!("Error when sending end of publish message: {:?}", err);
1035            } else {
1036                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1037            }
1038        }
1039        self.metrics
1040            .sequencing_certificate_success
1041            .with_label_values(&[tx_type])
1042            .inc();
1043    }
1044
1045    #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
1046    async fn submit_inner(
1047        self: &Arc<Self>,
1048        transactions: &[ConsensusTransaction],
1049        epoch_store: &Arc<AuthorityPerEpochStore>,
1050        transaction_keys: &[SequencedConsensusTransactionKey],
1051        tx_type: &str,
1052        is_soft_bundle: bool,
1053    ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
1054        let ack_start = Instant::now();
1055        let mut retries: u32 = 0;
1056        let is_dkg = !transactions.is_empty() && transactions[0].is_dkg();
1057
1058        let (consensus_positions, status_waiter) = loop {
1059            let span = debug_span!("client_submit");
1060            match self
1061                .consensus_client
1062                .submit(transactions, epoch_store)
1063                .instrument(span)
1064                .await
1065            {
1066                Err(err) => {
1067                    // This can happen during reconfig, or when consensus has full internal buffers
1068                    // and needs to back pressure, so retry a few times before logging warnings.
1069                    if retries > 30 || (retries > 3 && (is_soft_bundle || !is_dkg)) {
1070                        warn!(
1071                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
1072                        );
1073                    }
1074                    self.metrics
1075                        .sequencing_certificate_failures
1076                        .with_label_values(&[tx_type])
1077                        .inc();
1078                    retries += 1;
1079
1080                    if is_dkg {
1081                        // Shorter delay for DKG messages, which are time-sensitive and happen at
1082                        // start-of-epoch when submit errors due to active reconfig are likely.
1083                        time::sleep(Duration::from_millis(100)).await;
1084                    } else {
1085                        time::sleep(Duration::from_secs(10)).await;
1086                    };
1087                }
1088                Ok((consensus_positions, status_waiter)) => {
1089                    break (consensus_positions, status_waiter);
1090                }
1091            }
1092        };
1093
1094        // we want to record the num of retries when reporting latency but to avoid label
1095        // cardinality we do some simple bucketing to give us a good enough idea of how
1096        // many retries happened associated with the latency.
1097        let bucket = match retries {
1098            0..=10 => retries.to_string(), // just report the retry count as is
1099            11..=20 => "between_10_and_20".to_string(),
1100            21..=50 => "between_20_and_50".to_string(),
1101            51..=100 => "between_50_and_100".to_string(),
1102            _ => "over_100".to_string(),
1103        };
1104
1105        self.metrics
1106            .sequencing_acknowledge_latency
1107            .with_label_values(&[&bucket, tx_type])
1108            .observe(ack_start.elapsed().as_secs_f64());
1109
1110        (consensus_positions, status_waiter)
1111    }
1112
1113    /// Waits for transactions to appear either to consensus output or been executed via a checkpoint (state sync).
1114    /// Returns the processed method, whether the transactions have been processed via consensus, or have been synced via checkpoint.
1115    async fn await_consensus_or_checkpoint(
1116        self: &Arc<Self>,
1117        transaction_keys: Vec<SequencedConsensusTransactionKey>,
1118        epoch_store: &Arc<AuthorityPerEpochStore>,
1119    ) -> ProcessedMethod {
1120        let notifications = FuturesUnordered::new();
1121        for transaction_key in transaction_keys {
1122            let transaction_digests = match transaction_key {
1123                SequencedConsensusTransactionKey::External(
1124                    ConsensusTransactionKey::Certificate(digest),
1125                ) => vec![digest],
1126                _ => vec![],
1127            };
1128
1129            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1130                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
1131                | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
1132            ) = transaction_key
1133            {
1134                // If the transaction is a checkpoint signature, we can also wait to get notified when a checkpoint with equal or higher sequence
1135                // number has been already synced. This way we don't try to unnecessarily sequence the signature for an already verified checkpoint.
1136                Either::Left(
1137                    self.checkpoint_store
1138                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
1139                )
1140            } else {
1141                Either::Right(future::pending())
1142            };
1143
1144            // We wait for each transaction individually to be processed by consensus or executed in a checkpoint. We could equally just
1145            // get notified in aggregate when all transactions are processed, but with this approach can get notified in a more fine-grained way
1146            // as transactions can be marked as processed in different ways. This is mostly a concern for the soft-bundle transactions.
1147            notifications.push(async move {
1148                tokio::select! {
1149                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1150                        processed.expect("Storage error when waiting for consensus message processed");
1151                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1152                        return ProcessedMethod::Consensus;
1153                    },
1154                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1155                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
1156                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1157                    }
1158                    _ = checkpoint_synced_future => {
1159                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1160                    }
1161                }
1162                ProcessedMethod::Checkpoint
1163            });
1164        }
1165
1166        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1167        for method in processed_methods {
1168            if method == ProcessedMethod::Checkpoint {
1169                return ProcessedMethod::Checkpoint;
1170            }
1171        }
1172        ProcessedMethod::Consensus
1173    }
1174}
1175
1176impl CheckConnection for ConnectionMonitorStatus {
1177    fn check_connection(
1178        &self,
1179        ourself: &AuthorityName,
1180        authority: &AuthorityName,
1181    ) -> Option<ConnectionStatus> {
1182        if ourself == authority {
1183            return Some(ConnectionStatus::Connected);
1184        }
1185
1186        let mapping = self.authority_names_to_peer_ids.load_full();
1187        let peer_id = match mapping.get(authority) {
1188            Some(p) => p,
1189            None => {
1190                warn!(
1191                    "failed to find peer {:?} in connection monitor listener",
1192                    authority
1193                );
1194                return None;
1195            }
1196        };
1197
1198        match self.connection_statuses.try_get(peer_id) {
1199            TryResult::Present(c) => Some(c.value().clone()),
1200            TryResult::Absent => None,
1201            TryResult::Locked => {
1202                // update is in progress, assume the status is still or becoming disconnected
1203                Some(ConnectionStatus::Disconnected)
1204            }
1205        }
1206    }
1207    fn update_mapping_for_epoch(
1208        &self,
1209        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1210    ) {
1211        self.authority_names_to_peer_ids
1212            .swap(Arc::new(authority_names_to_peer_ids));
1213    }
1214}
1215
1216impl CheckConnection for ConnectionMonitorStatusForTests {
1217    fn check_connection(
1218        &self,
1219        _ourself: &AuthorityName,
1220        _authority: &AuthorityName,
1221    ) -> Option<ConnectionStatus> {
1222        Some(ConnectionStatus::Connected)
1223    }
1224    fn update_mapping_for_epoch(
1225        &self,
1226        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1227    ) {
1228    }
1229}
1230
1231pub fn get_position_in_list(
1232    search_authority: AuthorityName,
1233    positions: Vec<AuthorityName>,
1234) -> usize {
1235    positions
1236        .into_iter()
1237        .find_position(|authority| *authority == search_authority)
1238        .expect("Couldn't find ourselves in shuffled committee")
1239        .0
1240}
1241
1242impl ConsensusOverloadChecker for ConsensusAdapter {
1243    fn check_consensus_overload(&self) -> SuiResult {
1244        fp_ensure!(
1245            self.check_limits(),
1246            SuiErrorKind::TooManyTransactionsPendingConsensus.into()
1247        );
1248        Ok(())
1249    }
1250}
1251
1252pub struct NoopConsensusOverloadChecker {}
1253
1254impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
1255    fn check_consensus_overload(&self) -> SuiResult {
1256        Ok(())
1257    }
1258}
1259
1260impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1261    /// This method is called externally to begin reconfiguration
1262    /// It sets reconfig state to reject new certificates from user.
1263    /// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained.
1264    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1265        {
1266            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1267            if !reconfig_guard.should_accept_user_certs() {
1268                // Allow caller to call this method multiple times
1269                return;
1270            }
1271            epoch_store.close_user_certs(reconfig_guard);
1272        }
1273        if epoch_store.should_send_end_of_publish() {
1274            if let Err(err) = self.submit(
1275                ConsensusTransaction::new_end_of_publish(self.authority),
1276                None,
1277                epoch_store,
1278                None,
1279                None,
1280            ) {
1281                warn!("Error when sending end of publish message: {:?}", err);
1282            } else {
1283                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1284            }
1285        }
1286    }
1287}
1288
1289struct CancelOnDrop<T>(JoinHandle<T>);
1290
1291impl<T> Deref for CancelOnDrop<T> {
1292    type Target = JoinHandle<T>;
1293
1294    fn deref(&self) -> &Self::Target {
1295        &self.0
1296    }
1297}
1298
1299impl<T> Drop for CancelOnDrop<T> {
1300    fn drop(&mut self) {
1301        self.0.abort();
1302    }
1303}
1304
1305/// Tracks number of inflight consensus requests and relevant metrics
1306struct InflightDropGuard<'a> {
1307    adapter: &'a ConsensusAdapter,
1308    start: Instant,
1309    position: Option<usize>,
1310    positions_moved: Option<usize>,
1311    preceding_disconnected: Option<usize>,
1312    amplification_factor: Option<usize>,
1313    tx_type: &'static str,
1314    processed_method: ProcessedMethod,
1315}
1316
1317#[derive(PartialEq, Eq)]
1318enum ProcessedMethod {
1319    Consensus,
1320    Checkpoint,
1321}
1322
1323impl<'a> InflightDropGuard<'a> {
1324    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1325        adapter
1326            .num_inflight_transactions
1327            .fetch_add(1, Ordering::SeqCst);
1328        adapter
1329            .metrics
1330            .sequencing_certificate_inflight
1331            .with_label_values(&[tx_type])
1332            .inc();
1333        adapter
1334            .metrics
1335            .sequencing_certificate_attempt
1336            .with_label_values(&[tx_type])
1337            .inc();
1338        Self {
1339            adapter,
1340            start: Instant::now(),
1341            position: None,
1342            positions_moved: None,
1343            preceding_disconnected: None,
1344            amplification_factor: None,
1345            tx_type,
1346            processed_method: ProcessedMethod::Consensus,
1347        }
1348    }
1349}
1350
1351impl Drop for InflightDropGuard<'_> {
1352    fn drop(&mut self) {
1353        self.adapter
1354            .num_inflight_transactions
1355            .fetch_sub(1, Ordering::SeqCst);
1356        self.adapter
1357            .metrics
1358            .sequencing_certificate_inflight
1359            .with_label_values(&[self.tx_type])
1360            .dec();
1361
1362        let position = if let Some(position) = self.position {
1363            self.adapter
1364                .metrics
1365                .sequencing_certificate_authority_position
1366                .observe(position as f64);
1367            position.to_string()
1368        } else {
1369            "not_submitted".to_string()
1370        };
1371
1372        if let Some(positions_moved) = self.positions_moved {
1373            self.adapter
1374                .metrics
1375                .sequencing_certificate_positions_moved
1376                .observe(positions_moved as f64);
1377        };
1378
1379        if let Some(preceding_disconnected) = self.preceding_disconnected {
1380            self.adapter
1381                .metrics
1382                .sequencing_certificate_preceding_disconnected
1383                .observe(preceding_disconnected as f64);
1384        };
1385
1386        if let Some(amplification_factor) = self.amplification_factor {
1387            self.adapter
1388                .metrics
1389                .sequencing_certificate_amplification_factor
1390                .observe(amplification_factor as f64);
1391        };
1392
1393        let latency = self.start.elapsed();
1394        let processed_method = match self.processed_method {
1395            ProcessedMethod::Consensus => "processed_via_consensus",
1396            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1397        };
1398
1399        self.adapter
1400            .metrics
1401            .sequencing_certificate_latency
1402            .with_label_values(&[&position, self.tx_type, processed_method])
1403            .observe(latency.as_secs_f64());
1404
1405        // Only sample latency after consensus quorum is up. Otherwise, the wait for consensus
1406        // quorum at the beginning of an epoch can distort the sampled latencies.
1407        // Technically there are more system transaction types that can be included in samples
1408        // after the first consensus commit, but this set of types should be enough.
1409        if self.position == Some(0) {
1410            // Transaction types below require quorum existed in the current epoch.
1411            // TODO: refactor tx_type to enum.
1412            let sampled = matches!(
1413                self.tx_type,
1414                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1415            );
1416            // if tx has been processed by checkpoint state sync, then exclude from the latency calculations as this can introduce to misleading results.
1417            if sampled && self.processed_method == ProcessedMethod::Consensus {
1418                self.adapter.latency_observer.report(latency);
1419            }
1420        }
1421    }
1422}
1423
1424impl SubmitToConsensus for Arc<ConsensusAdapter> {
1425    fn submit_to_consensus(
1426        &self,
1427        transactions: &[ConsensusTransaction],
1428        epoch_store: &Arc<AuthorityPerEpochStore>,
1429    ) -> SuiResult {
1430        self.submit_batch(transactions, None, epoch_store, None, None)
1431            .map(|_| ())
1432    }
1433
1434    fn submit_best_effort(
1435        &self,
1436        transaction: &ConsensusTransaction,
1437        epoch_store: &Arc<AuthorityPerEpochStore>,
1438        // timeout is required, or the spawned task can run forever
1439        timeout: Duration,
1440    ) -> SuiResult {
1441        let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1442            Ok(permit) => permit,
1443            Err(_) => {
1444                return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1445            }
1446        };
1447
1448        let _in_flight_submission_guard =
1449            GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1450
1451        let key = SequencedConsensusTransactionKey::External(transaction.key());
1452        let tx_type = classify(transaction);
1453
1454        let async_stage = {
1455            let transaction = transaction.clone();
1456            let epoch_store = epoch_store.clone();
1457            let this = self.clone();
1458
1459            async move {
1460                let _permit = permit; // Hold permit for lifetime of task
1461
1462                let result = tokio::time::timeout(
1463                    timeout,
1464                    this.submit_inner(&[transaction], &epoch_store, &[key], tx_type, false),
1465                )
1466                .await;
1467
1468                if let Err(e) = result {
1469                    warn!("Consensus submission timed out: {e:?}");
1470                    this.metrics
1471                        .sequencing_best_effort_timeout
1472                        .with_label_values(&[tx_type])
1473                        .inc();
1474                }
1475            }
1476        };
1477
1478        let epoch_store = epoch_store.clone();
1479        spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1480        Ok(())
1481    }
1482}
1483
1484pub fn position_submit_certificate(
1485    committee: &Committee,
1486    ourselves: &AuthorityName,
1487    tx_digest: &TransactionDigest,
1488) -> usize {
1489    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1490    get_position_in_list(*ourselves, validators)
1491}
1492
1493#[cfg(test)]
1494mod adapter_tests {
1495    use super::position_submit_certificate;
1496    use crate::checkpoints::CheckpointStore;
1497    use crate::consensus_adapter::{
1498        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1499    };
1500    use crate::mysticeti_adapter::LazyMysticetiClient;
1501    use fastcrypto::traits::KeyPair;
1502    use rand::Rng;
1503    use rand::{SeedableRng, rngs::StdRng};
1504    use std::sync::Arc;
1505    use std::time::Duration;
1506    use sui_types::{
1507        base_types::TransactionDigest,
1508        committee::Committee,
1509        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1510    };
1511
1512    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1513        let authorities = (0..size)
1514            .map(|_k| {
1515                (
1516                    AuthorityPublicKeyBytes::from(
1517                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1518                    ),
1519                    rng.gen_range(0u64..10u64),
1520                )
1521            })
1522            .collect::<Vec<_>>();
1523        Committee::new_for_testing_with_normalized_voting_power(
1524            0,
1525            authorities.iter().cloned().collect(),
1526        )
1527    }
1528
1529    #[tokio::test]
1530    async fn test_await_submit_delay_user_transaction() {
1531        // grab a random committee and a random stake distribution
1532        let mut rng = StdRng::from_seed([0; 32]);
1533        let committee = test_committee(&mut rng, 10);
1534
1535        // When we define max submit position and delay step
1536        let consensus_adapter = ConsensusAdapter::new(
1537            Arc::new(LazyMysticetiClient::new()),
1538            CheckpointStore::new_for_tests(),
1539            *committee.authority_by_index(0).unwrap(),
1540            Arc::new(ConnectionMonitorStatusForTests {}),
1541            100_000,
1542            100_000,
1543            Some(1),
1544            Some(Duration::from_secs(2)),
1545            ConsensusAdapterMetrics::new_test(),
1546            sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1547        );
1548
1549        // transaction to submit
1550        let tx_digest = TransactionDigest::generate(&mut rng);
1551
1552        // Ensure that the original position is higher
1553        let (position, positions_moved, _) =
1554            consensus_adapter.submission_position(&committee, &tx_digest);
1555        assert_eq!(position, 7);
1556        assert!(!positions_moved > 0);
1557
1558        // Make sure that position is set to max value 0
1559        let (delay_step, position, positions_moved, _) =
1560            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1561
1562        assert_eq!(position, 1);
1563        assert_eq!(delay_step, Duration::from_secs(2));
1564        assert!(!positions_moved > 0);
1565
1566        // Without submit position and delay step
1567        let consensus_adapter = ConsensusAdapter::new(
1568            Arc::new(LazyMysticetiClient::new()),
1569            CheckpointStore::new_for_tests(),
1570            *committee.authority_by_index(0).unwrap(),
1571            Arc::new(ConnectionMonitorStatusForTests {}),
1572            100_000,
1573            100_000,
1574            None,
1575            None,
1576            ConsensusAdapterMetrics::new_test(),
1577            sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
1578        );
1579
1580        let (delay_step, position, positions_moved, _) =
1581            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0);
1582
1583        assert_eq!(position, 7);
1584
1585        // delay_step * position * 2 = 1 * 7 * 2 = 14
1586        assert_eq!(delay_step, Duration::from_secs(14));
1587        assert!(!positions_moved > 0);
1588
1589        // With an amplification factor of 7, the position should be moved to 1.
1590        let (delay_step, position, _, _) =
1591            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 7);
1592        assert_eq!(position, 1);
1593        assert_eq!(delay_step, Duration::from_secs(2));
1594
1595        // With an amplification factor > 7, the position should become 0.
1596        let (delay_step, position, _, _) =
1597            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 8);
1598        assert_eq!(position, 0);
1599        assert_eq!(delay_step, Duration::ZERO);
1600    }
1601
1602    #[test]
1603    fn test_position_submit_certificate() {
1604        // grab a random committee and a random stake distribution
1605        let mut rng = StdRng::from_seed([0; 32]);
1606        let committee = test_committee(&mut rng, 10);
1607
1608        // generate random transaction digests, and account for validator selection
1609        const NUM_TEST_TRANSACTIONS: usize = 1000;
1610
1611        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1612            let tx_digest = TransactionDigest::generate(&mut rng);
1613
1614            let mut zero_found = false;
1615            for (name, _) in committee.members() {
1616                let f = position_submit_certificate(&committee, name, &tx_digest);
1617                assert!(f < committee.num_members());
1618                if f == 0 {
1619                    // One and only one validator gets position 0
1620                    assert!(!zero_found);
1621                    zero_found = true;
1622                }
1623            }
1624            assert!(zero_found);
1625        }
1626    }
1627}