sui_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18    GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29    register_histogram_vec_with_registry, register_histogram_with_registry,
30    register_int_counter_vec_with_registry, register_int_counter_with_registry,
31    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56    // Certificate sequencing metrics
57    pub sequencing_certificate_attempt: IntCounterVec,
58    pub sequencing_certificate_success: IntCounterVec,
59    pub sequencing_certificate_failures: IntCounterVec,
60    pub sequencing_certificate_status: IntCounterVec,
61    pub sequencing_certificate_inflight: IntGaugeVec,
62    pub sequencing_acknowledge_latency: HistogramVec,
63    pub sequencing_certificate_latency: HistogramVec,
64    pub sequencing_certificate_processed: IntCounterVec,
65    pub sequencing_in_flight_semaphore_wait: IntGauge,
66    pub sequencing_in_flight_submissions: IntGauge,
67    pub sequencing_best_effort_timeout: IntCounterVec,
68    pub consensus_latency: Histogram,
69    pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73    pub fn new(registry: &Registry) -> Self {
74        Self {
75            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76                "sequencing_certificate_attempt",
77                "Counts the number of certificates the validator attempts to sequence.",
78                &["tx_type"],
79                registry,
80            )
81                .unwrap(),
82            sequencing_certificate_success: register_int_counter_vec_with_registry!(
83                "sequencing_certificate_success",
84                "Counts the number of successfully sequenced certificates.",
85                &["tx_type"],
86                registry,
87            )
88                .unwrap(),
89            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_failures",
91                "Counts the number of sequenced certificates that failed other than by timeout.",
92                &["tx_type"],
93                registry,
94            )
95                .unwrap(),
96                sequencing_certificate_status: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_status",
98                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99                &["tx_type", "status"],
100                registry,
101            )
102                .unwrap(),
103            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104                "sequencing_certificate_inflight",
105                "The inflight requests to sequence certificates.",
106                &["tx_type"],
107                registry,
108            )
109                .unwrap(),
110            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111                "sequencing_acknowledge_latency",
112                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113                &["retry", "tx_type"],
114                LATENCY_SEC_BUCKETS.to_vec(),
115                registry,
116            ).unwrap(),
117            sequencing_certificate_latency: register_histogram_vec_with_registry!(
118                "sequencing_certificate_latency",
119                "The latency for sequencing a certificate.",
120                &["submitted", "tx_type", "processed_method"],
121                LATENCY_SEC_BUCKETS.to_vec(),
122                registry,
123            ).unwrap(),
124            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125                "sequencing_certificate_processed",
126                "The number of certificates that have been processed either by consensus or checkpoint.",
127                &["source"],
128                registry
129            ).unwrap(),
130            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131                "sequencing_in_flight_semaphore_wait",
132                "How many requests are blocked on submit_permit.",
133                registry,
134            )
135                .unwrap(),
136            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137                "sequencing_in_flight_submissions",
138                "Number of transactions submitted to local consensus instance and not yet sequenced",
139                registry,
140            )
141                .unwrap(),
142            sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143                "sequencing_best_effort_timeout",
144                "The number of times the best effort submission has timed out.",
145                &["tx_type"],
146                registry,
147            ).unwrap(),
148            // These two metrics originally lived in ValidatorServiceMetrics (authority_server.rs)
149            // and keep their legacy names for dashboard compatibility.
150            consensus_latency: register_histogram_with_registry!(
151                "validator_service_consensus_latency",
152                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154                registry,
155            ).unwrap(),
156            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157                "validator_service_num_rejected_cert_in_epoch_boundary",
158                "Number of rejected transaction certificate during epoch transitioning",
159                registry,
160            ).unwrap(),
161        }
162    }
163
164    pub fn new_test() -> Self {
165        Self::new(&Registry::default())
166    }
167}
168
169/// An object that can be used to check if the consensus is overloaded.
170pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171    fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178    fn submit_to_consensus(
179        &self,
180        transactions: &[ConsensusTransaction],
181        epoch_store: &Arc<AuthorityPerEpochStore>,
182    ) -> SuiResult;
183
184    fn submit_best_effort(
185        &self,
186        transaction: &ConsensusTransaction,
187        epoch_store: &Arc<AuthorityPerEpochStore>,
188        timeout: Duration,
189    ) -> SuiResult;
190}
191
192#[mockall::automock]
193#[async_trait::async_trait]
194pub trait ConsensusClient: Sync + Send + 'static {
195    async fn submit(
196        &self,
197        transactions: &[ConsensusTransaction],
198        epoch_store: &Arc<AuthorityPerEpochStore>,
199    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
200}
201
202/// Submit Sui certificates to the consensus.
203pub struct ConsensusAdapter {
204    /// The network client connecting to the consensus node of this authority.
205    consensus_client: Arc<dyn ConsensusClient>,
206    /// The checkpoint store for the validator
207    checkpoint_store: Arc<CheckpointStore>,
208    /// Authority pubkey.
209    authority: AuthorityName,
210    /// The limit to number of inflight transactions at this node.
211    max_pending_transactions: usize,
212    /// Number of submitted transactions still inflight at this node.
213    num_inflight_transactions: AtomicU64,
214    /// A structure to register metrics
215    metrics: ConsensusAdapterMetrics,
216    /// Semaphore limiting parallel submissions to consensus
217    submit_semaphore: Arc<Semaphore>,
218    /// Notified when an inflight slot is freed (`InflightDropGuard` dropped).
219    /// Used by the admission queue drainer to wake up and submit more
220    /// transactions.
221    inflight_slot_freed_notify: Arc<Notify>,
222}
223
224impl ConsensusAdapter {
225    /// Make a new Consensus adapter instance.
226    pub fn new(
227        consensus_client: Arc<dyn ConsensusClient>,
228        checkpoint_store: Arc<CheckpointStore>,
229        authority: AuthorityName,
230        max_pending_transactions: usize,
231        max_pending_local_submissions: usize,
232        metrics: ConsensusAdapterMetrics,
233        inflight_slot_freed_notify: Arc<Notify>,
234    ) -> Self {
235        let num_inflight_transactions = Default::default();
236        Self {
237            consensus_client,
238            checkpoint_store,
239            authority,
240            max_pending_transactions,
241            num_inflight_transactions,
242            metrics,
243            submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
244            inflight_slot_freed_notify,
245        }
246    }
247
248    /// Get the current number of in-flight transactions
249    pub fn num_inflight_transactions(&self) -> u64 {
250        self.num_inflight_transactions.load(Ordering::Relaxed)
251    }
252
253    /// Get the maximum number of pending transactions (consensus capacity limit).
254    pub fn max_pending_transactions(&self) -> usize {
255        self.max_pending_transactions
256    }
257
258    /// Submits transactions to consensus within the reconfiguration lock and
259    /// returns their consensus positions.
260    pub async fn submit_and_get_positions(
261        self: &Arc<Self>,
262        consensus_transactions: Vec<ConsensusTransaction>,
263        epoch_store: &Arc<AuthorityPerEpochStore>,
264        submitter_client_addr: Option<IpAddr>,
265    ) -> Result<Vec<ConsensusPosition>, SuiError> {
266        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
267
268        {
269            // code block within reconfiguration lock
270            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
271            if !reconfiguration_lock.should_accept_user_certs() {
272                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
273                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
274            }
275
276            // Submit to consensus and wait for position, we do not check if tx
277            // has been processed by consensus already as this method is called
278            // to get back a consensus position.
279            let _metrics_guard = self.metrics.consensus_latency.start_timer();
280
281            self.submit_batch(
282                &consensus_transactions,
283                Some(&reconfiguration_lock),
284                epoch_store,
285                Some(tx_consensus_positions),
286                submitter_client_addr,
287            )?;
288        }
289
290        rx_consensus_positions.await.map_err(|e| {
291            SuiErrorKind::FailedToSubmitToConsensus(format!(
292                "Failed to get consensus position: {e}"
293            ))
294            .into()
295        })
296    }
297
298    pub fn recover_end_of_publish(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
299        // This handles the case where the node crashed after setting reconfig lock state
300        // but before the EndOfPublish message was sent to consensus.
301        if epoch_store.should_send_end_of_publish() {
302            let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
303            info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
304            self.submit_unchecked(&[transaction], epoch_store, None, None);
305        }
306    }
307
308    /// This method blocks until transaction is persisted in local database
309    /// It then returns handle to async task, user can join this handle to await while transaction is processed by consensus
310    ///
311    /// This method guarantees that once submit(but not returned async handle) returns,
312    /// transaction is persisted and will eventually be sent to consensus even after restart
313    ///
314    /// When submitting a certificate caller **must** provide a ReconfigState lock guard
315    pub fn submit(
316        self: &Arc<Self>,
317        transaction: ConsensusTransaction,
318        lock: Option<&RwLockReadGuard<ReconfigState>>,
319        epoch_store: &Arc<AuthorityPerEpochStore>,
320        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
321        submitter_client_addr: Option<IpAddr>,
322    ) -> SuiResult<JoinHandle<()>> {
323        self.submit_batch(
324            &[transaction],
325            lock,
326            epoch_store,
327            tx_consensus_position,
328            submitter_client_addr,
329        )
330    }
331
332    // Submits the provided transactions to consensus in a batched fashion. The `transactions` vector can be also empty in case of a ping check.
333    // In this case the system will simulate a transaction submission to consensus and return the consensus position.
334    pub fn submit_batch(
335        self: &Arc<Self>,
336        transactions: &[ConsensusTransaction],
337        _lock: Option<&RwLockReadGuard<ReconfigState>>,
338        epoch_store: &Arc<AuthorityPerEpochStore>,
339        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
340        submitter_client_addr: Option<IpAddr>,
341    ) -> SuiResult<JoinHandle<()>> {
342        if transactions.len() > 1 {
343            // Soft bundles must contain only UserTransactionV2 transactions.
344            for transaction in transactions {
345                fp_ensure!(
346                    transaction.is_user_transaction(),
347                    SuiErrorKind::InvalidTxKindInSoftBundle.into()
348                );
349            }
350        }
351
352        Ok(self.submit_unchecked(
353            transactions,
354            epoch_store,
355            tx_consensus_position,
356            submitter_client_addr,
357        ))
358    }
359
360    /// Performs weakly consistent checks on internal buffers to quickly
361    /// discard transactions if we are overloaded
362    fn check_limits(&self) -> bool {
363        // First check total transactions (waiting and in submission)
364        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
365            > self.max_pending_transactions
366        {
367            return false;
368        }
369        // Then check if submit_semaphore has permits
370        self.submit_semaphore.available_permits() > 0
371    }
372
373    fn submit_unchecked(
374        self: &Arc<Self>,
375        transactions: &[ConsensusTransaction],
376        epoch_store: &Arc<AuthorityPerEpochStore>,
377        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
378        submitter_client_addr: Option<IpAddr>,
379    ) -> JoinHandle<()> {
380        // Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
381        let async_stage = self
382            .clone()
383            .submit_and_wait(
384                transactions.to_vec(),
385                epoch_store.clone(),
386                tx_consensus_position,
387                submitter_client_addr,
388            )
389            .in_current_span();
390        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
391        // (Limit is not applied atomically, and only to user transactions.)
392        let join_handle = spawn_monitored_task!(async_stage);
393        join_handle
394    }
395
396    async fn submit_and_wait(
397        self: Arc<Self>,
398        transactions: Vec<ConsensusTransaction>,
399        epoch_store: Arc<AuthorityPerEpochStore>,
400        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
401        submitter_client_addr: Option<IpAddr>,
402    ) {
403        // When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
404        //
405        // This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
406        // which may never happen on epoch boundary.
407        //
408        // In addition to that, within_alive_epoch ensures that all pending consensus
409        // adapter tasks are stopped before reconfiguration can proceed.
410        //
411        // This is essential because after epoch change, this validator may exit the committee and become a full node.
412        // So it is no longer able to submit to consensus.
413        //
414        // Also, submission to consensus is not gated on epoch. Although it is ok to submit user transactions
415        // to the new epoch, we want to cancel system transaction submissions from the current epoch to the new epoch.
416        epoch_store
417            .within_alive_epoch(self.submit_and_wait_inner(
418                transactions,
419                &epoch_store,
420                tx_consensus_position,
421                submitter_client_addr,
422            ))
423            .await
424            .ok(); // result here indicates if epoch ended earlier, we don't care about it
425    }
426
427    #[allow(clippy::option_map_unit_fn)]
428    #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
429    async fn submit_and_wait_inner(
430        self: Arc<Self>,
431        transactions: Vec<ConsensusTransaction>,
432        epoch_store: &Arc<AuthorityPerEpochStore>,
433        mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
434        submitter_client_addr: Option<IpAddr>,
435    ) {
436        if transactions.is_empty() {
437            // If transactions are empty, then we attempt to ping consensus and simulate a transaction submission to consensus.
438            // We intentionally do not wait for the block status, as we are only interested in the consensus position and return it immediately.
439            debug!(
440                "Performing a ping check, pinging consensus to get a consensus position in next block"
441            );
442            let (consensus_positions, _status_waiter) = self
443                .submit_inner(&transactions, epoch_store, &[], "ping")
444                .await;
445
446            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
447                let _ = tx_consensus_positions.send(consensus_positions);
448            } else {
449                debug_fatal!("Ping check must have a consensus position channel");
450            }
451            return;
452        }
453
454        // Record submitted transactions early for DoS protection
455        for transaction in &transactions {
456            if let Some(tx) = transaction.kind.as_user_transaction() {
457                let amplification_factor = (tx.data().transaction_data().gas_price()
458                    / epoch_store.reference_gas_price().max(1))
459                .max(1);
460                epoch_store.submitted_transaction_cache.record_submitted_tx(
461                    tx.digest(),
462                    amplification_factor as u32,
463                    submitter_client_addr,
464                );
465            }
466        }
467
468        // If tx_consensus_positions channel is provided, the caller is looking for a
469        // consensus position for mfp. Therefore we will skip shortcutting submission
470        // if txes have already been processed.
471        let skip_processed_checks = tx_consensus_positions.is_some();
472
473        // Current code path ensures:
474        // - If transactions.len() > 1, it is a soft bundle. System transactions should have been submitted individually.
475        // - If is_soft_bundle, then all transactions are of CertifiedTransaction or UserTransaction kind.
476        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and transactions[0] can be of any kind.
477        let is_soft_bundle = transactions.len() > 1;
478
479        let mut transaction_keys = Vec::new();
480        let mut tx_consensus_positions = tx_consensus_positions;
481
482        for transaction in &transactions {
483            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
484                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
485                epoch_store.record_epoch_pending_certs_process_time_metric();
486            }
487
488            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
489            transaction_keys.push(transaction_key);
490        }
491        let tx_type = if is_soft_bundle {
492            "soft_bundle"
493        } else {
494            classify(&transactions[0])
495        };
496        tracing::Span::current().record("tx_type", tx_type);
497        tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
498
499        let mut guard = InflightDropGuard::acquire(&self, tx_type);
500
501        // Skip submission if the tx is already processed via consensus output
502        // or checkpoint state sync. `skip_processed_checks` is set by callers
503        // that need a consensus position (mfp/ping), so we must submit even
504        // if already processed.
505        let already_processed = if skip_processed_checks {
506            None
507        } else {
508            self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store)
509        };
510        if let Some(method) = &already_processed {
511            guard.processed_method = *method;
512        }
513
514        // Log warnings for administrative transactions that fail to get sequenced
515        let _monitor = if matches!(
516            transactions[0].kind,
517            ConsensusTransactionKind::EndOfPublish(_)
518                | ConsensusTransactionKind::CapabilityNotification(_)
519                | ConsensusTransactionKind::CapabilityNotificationV2(_)
520                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
521                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
522        ) {
523            assert!(
524                !is_soft_bundle,
525                "System transactions should have been submitted individually"
526            );
527            let transaction_keys = transaction_keys.clone();
528            Some(CancelOnDrop(spawn_monitored_task!(async {
529                let mut i = 0u64;
530                loop {
531                    i += 1;
532                    const WARN_DELAY_S: u64 = 30;
533                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
534                    let total_wait = i * WARN_DELAY_S;
535                    warn!(
536                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
537                        total_wait, transaction_keys
538                    );
539                }
540            })))
541        } else {
542            None
543        };
544
545        if already_processed.is_none() {
546            debug!("Submitting {:?} to consensus", transaction_keys);
547            guard.submitted = true;
548
549            let _permit: SemaphorePermit = self
550                .submit_semaphore
551                .acquire()
552                .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
553                .await
554                .expect("Consensus adapter does not close semaphore");
555            let _in_flight_submission_guard =
556                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
557
558            // Submit the transaction to consensus, racing against the processed waiter in
559            // case another validator sequences the transaction first.
560            let submit_fut = async {
561                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
562
563                loop {
564                    // Submit the transaction to consensus and return the submit result with a status waiter
565                    let (consensus_positions, status_waiter) = self
566                        .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
567                        .await;
568
569                    if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
570                        tracing::Span::current().record(
571                            "consensus_positions",
572                            tracing::field::debug(&consensus_positions),
573                        );
574                        // We send the first consensus position returned by consensus
575                        // to the submitting client even if it is retried internally within
576                        // consensus adapter due to an error or GC. They can handle retries
577                        // as needed if the consensus position does not return the desired
578                        // results (e.g. not sequenced due to garbage collection).
579                        let _ = tx_consensus_positions.send(consensus_positions);
580                    }
581
582                    match status_waiter.await {
583                        Ok(status @ BlockStatus::Sequenced(_)) => {
584                            tracing::Span::current()
585                                .record("status", tracing::field::debug(&status));
586                            self.metrics
587                                .sequencing_certificate_status
588                                .with_label_values(&[tx_type, "sequenced"])
589                                .inc();
590                            // Block has been sequenced. Nothing more to do, we do have guarantees that the transaction will appear in consensus output.
591                            trace!(
592                                "Transaction {transaction_keys:?} has been sequenced by consensus."
593                            );
594                            break;
595                        }
596                        Ok(status @ BlockStatus::GarbageCollected(_)) => {
597                            tracing::Span::current()
598                                .record("status", tracing::field::debug(&status));
599                            self.metrics
600                                .sequencing_certificate_status
601                                .with_label_values(&[tx_type, "garbage_collected"])
602                                .inc();
603                            // Block has been garbage collected and we have no guarantees that the transaction will appear in consensus output. We'll
604                            // resubmit the transaction to consensus. If the transaction has been already "processed", then probably someone else has submitted
605                            // the transaction and managed to get sequenced. Then this future will have been cancelled anyways so no need to check here on the processed output.
606                            debug!(
607                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
608                            );
609                            time::sleep(RETRY_DELAY_STEP).await;
610                            continue;
611                        }
612                        Err(err) => {
613                            warn!(
614                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
615                                err
616                            );
617                            time::sleep(RETRY_DELAY_STEP).await;
618                            continue;
619                        }
620                    }
621                }
622            };
623
624            guard.processed_method = if skip_processed_checks {
625                // When getting consensus positions, we only care about submit_fut completing.
626                submit_fut.await;
627                ProcessedMethod::Consensus
628            } else {
629                // Race `processed_notify` against the submit loop. If the tx is
630                // processed via another path (consensus output from another
631                // validator's submission, or checkpoint state sync) while we're
632                // inside the submit loop, the submission future is dropped and
633                // the retry loop is cancelled cleanly.
634                let processed_waiter = self
635                    .processed_notify(transaction_keys.clone(), epoch_store)
636                    .boxed();
637                match select(processed_waiter, submit_fut.boxed()).await {
638                    Either::Left((observed, _submit_fut)) => observed,
639                    Either::Right(((), processed_waiter)) => {
640                        debug!("Submitted {transaction_keys:?} to consensus");
641                        processed_waiter.await
642                    }
643                }
644            };
645        }
646        debug!("{transaction_keys:?} processed by consensus");
647
648        // After a user transaction or soft bundle submission,
649        // send EndOfPublish if the epoch is closing.
650        // EndOfPublish can also be sent during consensus commit handling, checkpoint execution and recovery.
651        if transactions[0].is_user_transaction()
652            && epoch_store.should_send_end_of_publish()
653            && !epoch_store.protocol_config().timestamp_based_epoch_close()
654        {
655            // sending message outside of any locks scope
656            if let Err(err) = self.submit(
657                ConsensusTransaction::new_end_of_publish(self.authority),
658                None,
659                epoch_store,
660                None,
661                None,
662            ) {
663                warn!("Error when sending end of publish message: {:?}", err);
664            } else {
665                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
666            }
667        }
668        self.metrics
669            .sequencing_certificate_success
670            .with_label_values(&[tx_type])
671            .inc();
672    }
673
674    #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
675    async fn submit_inner(
676        self: &Arc<Self>,
677        transactions: &[ConsensusTransaction],
678        epoch_store: &Arc<AuthorityPerEpochStore>,
679        transaction_keys: &[SequencedConsensusTransactionKey],
680        tx_type: &str,
681    ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
682        let ack_start = Instant::now();
683        let mut retries: u32 = 0;
684        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
685            Duration::from_millis(100),
686            Duration::from_secs(10),
687        );
688
689        let (consensus_positions, status_waiter) = loop {
690            let span = debug_span!("client_submit");
691            match self
692                .consensus_client
693                .submit(transactions, epoch_store)
694                .instrument(span)
695                .await
696            {
697                Err(err) => {
698                    // This can happen during reconfig, so keep retrying until succeed.
699                    if cfg!(msim) || retries > 3 {
700                        warn!(
701                            "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
702                        );
703                    }
704                    self.metrics
705                        .sequencing_certificate_failures
706                        .with_label_values(&[tx_type])
707                        .inc();
708                    retries += 1;
709
710                    time::sleep(backoff.next().unwrap()).await;
711                }
712                Ok((consensus_positions, status_waiter)) => {
713                    break (consensus_positions, status_waiter);
714                }
715            }
716        };
717
718        // we want to record the num of retries when reporting latency but to avoid label
719        // cardinality we do some simple bucketing to give us a good enough idea of how
720        // many retries happened associated with the latency.
721        let bucket = match retries {
722            0..=10 => retries.to_string(), // just report the retry count as is
723            11..=20 => "between_10_and_20".to_string(),
724            21..=50 => "between_20_and_50".to_string(),
725            51..=100 => "between_50_and_100".to_string(),
726            _ => "over_100".to_string(),
727        };
728
729        self.metrics
730            .sequencing_acknowledge_latency
731            .with_label_values(&[bucket.as_str(), tx_type])
732            .observe(ack_start.elapsed().as_secs_f64());
733
734        (consensus_positions, status_waiter)
735    }
736
737    /// Sync check for whether `transaction_keys` are already processed via
738    /// consensus output or checkpoint state sync. Returns `Some(method)` if
739    /// every key is already processed (Checkpoint dominates when any key was
740    /// processed via checkpoint or synced-checkpoint), else `None`.
741    ///
742    /// Also increments `sequencing_certificate_processed` with the matching
743    /// label for each key found processed, mirroring what `processed_notify`
744    /// emits for its async wake-ups.
745    fn check_processed_via_consensus_or_checkpoint(
746        self: &Arc<Self>,
747        transaction_keys: &[SequencedConsensusTransactionKey],
748        epoch_store: &Arc<AuthorityPerEpochStore>,
749    ) -> Option<ProcessedMethod> {
750        let mut seen_checkpoint = false;
751        for transaction_key in transaction_keys {
752            // Check consensus-processed first; if already visible in consensus
753            // output we don't need to submit again.
754            if epoch_store
755                .is_consensus_message_processed(transaction_key)
756                .expect("Storage error when checking consensus message processed")
757            {
758                self.metrics
759                    .sequencing_certificate_processed
760                    .with_label_values(&["consensus"])
761                    .inc();
762                continue;
763            }
764
765            // For a cert-shaped key, check whether state sync executed the tx
766            // via a checkpoint.
767            if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
768                digest,
769            )) = transaction_key
770                && epoch_store
771                    .is_transaction_executed_in_checkpoint(digest)
772                    .expect("Storage error when checking transaction executed in checkpoint")
773            {
774                self.metrics
775                    .sequencing_certificate_processed
776                    .with_label_values(&["checkpoint"])
777                    .inc();
778                seen_checkpoint = true;
779                continue;
780            }
781
782            // For a checkpoint-signature key, check whether a checkpoint at
783            // or above the target sequence number has already been synced —
784            // in which case the signature is redundant.
785            if let SequencedConsensusTransactionKey::External(
786                ConsensusTransactionKey::CheckpointSignature(_, seq)
787                | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
788            ) = transaction_key
789                && let Some(synced_seq) = self
790                    .checkpoint_store
791                    .get_highest_synced_checkpoint_seq_number()
792                    .expect("Storage error when reading highest synced checkpoint")
793                && synced_seq >= *seq
794            {
795                self.metrics
796                    .sequencing_certificate_processed
797                    .with_label_values(&["synced_checkpoint"])
798                    .inc();
799                seen_checkpoint = true;
800                continue;
801            }
802
803            // Not processed via any path — caller must submit.
804            return None;
805        }
806
807        if seen_checkpoint {
808            Some(ProcessedMethod::Checkpoint)
809        } else {
810            Some(ProcessedMethod::Consensus)
811        }
812    }
813
814    /// Async wait for any of `transaction_keys` to become processed via
815    /// consensus output or a checkpoint (either state-synced or executed
816    /// locally). Used in the in-flight race against submission: cancelling
817    /// the submit future when we learn the tx is processed by another path.
818    /// Returns `Checkpoint` if any key resolves via a checkpoint path, else
819    /// `Consensus`.
820    async fn processed_notify(
821        self: &Arc<Self>,
822        transaction_keys: Vec<SequencedConsensusTransactionKey>,
823        epoch_store: &Arc<AuthorityPerEpochStore>,
824    ) -> ProcessedMethod {
825        let notifications = FuturesUnordered::new();
826        for transaction_key in transaction_keys {
827            let transaction_digests = match transaction_key {
828                SequencedConsensusTransactionKey::External(
829                    ConsensusTransactionKey::Certificate(digest),
830                ) => vec![digest],
831                _ => vec![],
832            };
833
834            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
835                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
836                | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
837            ) = transaction_key
838            {
839                // If the transaction is a checkpoint signature, we can also wait to get notified when a checkpoint with equal or higher sequence
840                // number has been already synced. This way we don't try to unnecessarily sequence the signature for an already verified checkpoint.
841                Either::Left(
842                    self.checkpoint_store
843                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
844                )
845            } else {
846                Either::Right(future::pending())
847            };
848
849            // We wait for each transaction individually to be processed by consensus or executed in a checkpoint. We could equally just
850            // get notified in aggregate when all transactions are processed, but with this approach can get notified in a more fine-grained way
851            // as transactions can be marked as processed in different ways. This is mostly a concern for the soft-bundle transactions.
852            notifications.push(async move {
853                tokio::select! {
854                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
855                        processed.expect("Storage error when waiting for consensus message processed");
856                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
857                        return ProcessedMethod::Consensus;
858                    },
859                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
860                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
861                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
862                    }
863                    _ = checkpoint_synced_future => {
864                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
865                    }
866                }
867                ProcessedMethod::Checkpoint
868            });
869        }
870
871        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
872        for method in processed_methods {
873            if method == ProcessedMethod::Checkpoint {
874                return ProcessedMethod::Checkpoint;
875            }
876        }
877        ProcessedMethod::Consensus
878    }
879}
880
881impl ConsensusOverloadChecker for ConsensusAdapter {
882    fn check_consensus_overload(&self) -> SuiResult {
883        fp_ensure!(
884            self.check_limits(),
885            SuiErrorKind::TooManyTransactionsPendingConsensus.into()
886        );
887        Ok(())
888    }
889}
890
891pub struct NoopConsensusOverloadChecker {}
892
893impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
894    fn check_consensus_overload(&self) -> SuiResult {
895        Ok(())
896    }
897}
898
899impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
900    /// This method is called externally to begin reconfiguration
901    /// It sets reconfig state to reject new certificates from user.
902    /// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained.
903    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
904        {
905            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
906            if !reconfig_guard.should_accept_user_certs() {
907                // Allow caller to call this method multiple times
908                return;
909            }
910            epoch_store.close_user_certs(reconfig_guard);
911        }
912        if epoch_store.should_send_end_of_publish() {
913            if let Err(err) = self.submit(
914                ConsensusTransaction::new_end_of_publish(self.authority),
915                None,
916                epoch_store,
917                None,
918                None,
919            ) {
920                warn!("Error when sending end of publish message: {:?}", err);
921            } else {
922                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
923            }
924        }
925    }
926}
927
928struct CancelOnDrop<T>(JoinHandle<T>);
929
930impl<T> Deref for CancelOnDrop<T> {
931    type Target = JoinHandle<T>;
932
933    fn deref(&self) -> &Self::Target {
934        &self.0
935    }
936}
937
938impl<T> Drop for CancelOnDrop<T> {
939    fn drop(&mut self) {
940        self.0.abort();
941    }
942}
943
944/// Tracks number of inflight consensus requests and relevant metrics
945struct InflightDropGuard<'a> {
946    adapter: &'a ConsensusAdapter,
947    start: Instant,
948    submitted: bool,
949    tx_type: &'static str,
950    processed_method: ProcessedMethod,
951}
952
953#[derive(Copy, Clone, PartialEq, Eq)]
954enum ProcessedMethod {
955    Consensus,
956    Checkpoint,
957}
958
959impl<'a> InflightDropGuard<'a> {
960    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
961        adapter
962            .num_inflight_transactions
963            .fetch_add(1, Ordering::SeqCst);
964        adapter
965            .metrics
966            .sequencing_certificate_inflight
967            .with_label_values(&[tx_type])
968            .inc();
969        adapter
970            .metrics
971            .sequencing_certificate_attempt
972            .with_label_values(&[tx_type])
973            .inc();
974        Self {
975            adapter,
976            start: Instant::now(),
977            submitted: false,
978            tx_type,
979            processed_method: ProcessedMethod::Consensus,
980        }
981    }
982}
983
984impl Drop for InflightDropGuard<'_> {
985    fn drop(&mut self) {
986        self.adapter
987            .num_inflight_transactions
988            .fetch_sub(1, Ordering::SeqCst);
989        self.adapter
990            .metrics
991            .sequencing_certificate_inflight
992            .with_label_values(&[self.tx_type])
993            .dec();
994        // Wake the admission queue drainer so it can submit more transactions.
995        self.adapter.inflight_slot_freed_notify.notify_one();
996
997        let latency = self.start.elapsed();
998        let processed_method = match self.processed_method {
999            ProcessedMethod::Consensus => "processed_via_consensus",
1000            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1001        };
1002        let submitted = if self.submitted {
1003            "submitted"
1004        } else {
1005            "skipped"
1006        };
1007
1008        self.adapter
1009            .metrics
1010            .sequencing_certificate_latency
1011            .with_label_values(&[submitted, self.tx_type, processed_method])
1012            .observe(latency.as_secs_f64());
1013    }
1014}
1015
1016impl SubmitToConsensus for Arc<ConsensusAdapter> {
1017    fn submit_to_consensus(
1018        &self,
1019        transactions: &[ConsensusTransaction],
1020        epoch_store: &Arc<AuthorityPerEpochStore>,
1021    ) -> SuiResult {
1022        self.submit_batch(transactions, None, epoch_store, None, None)
1023            .map(|_| ())
1024    }
1025
1026    fn submit_best_effort(
1027        &self,
1028        transaction: &ConsensusTransaction,
1029        epoch_store: &Arc<AuthorityPerEpochStore>,
1030        // timeout is required, or the spawned task can run forever
1031        timeout: Duration,
1032    ) -> SuiResult {
1033        let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1034            Ok(permit) => permit,
1035            Err(_) => {
1036                return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1037            }
1038        };
1039
1040        let _in_flight_submission_guard =
1041            GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1042
1043        let key = SequencedConsensusTransactionKey::External(transaction.key());
1044        let tx_type = classify(transaction);
1045
1046        let async_stage = {
1047            let transaction = transaction.clone();
1048            let epoch_store = epoch_store.clone();
1049            let this = self.clone();
1050
1051            async move {
1052                let _permit = permit; // Hold permit for lifetime of task
1053
1054                let result = tokio::time::timeout(
1055                    timeout,
1056                    this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1057                )
1058                .await;
1059
1060                if let Err(e) = result {
1061                    warn!("Consensus submission timed out: {e:?}");
1062                    this.metrics
1063                        .sequencing_best_effort_timeout
1064                        .with_label_values(&[tx_type])
1065                        .inc();
1066                }
1067            }
1068        };
1069
1070        let epoch_store = epoch_store.clone();
1071        spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1072        Ok(())
1073    }
1074}