sui_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18    GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29    register_histogram_vec_with_registry, register_histogram_with_registry,
30    register_int_counter_vec_with_registry, register_int_counter_with_registry,
31    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56    // Certificate sequencing metrics
57    pub sequencing_certificate_attempt: IntCounterVec,
58    pub sequencing_certificate_success: IntCounterVec,
59    pub sequencing_certificate_failures: IntCounterVec,
60    pub sequencing_certificate_status: IntCounterVec,
61    pub sequencing_certificate_inflight: IntGaugeVec,
62    pub sequencing_acknowledge_latency: HistogramVec,
63    pub sequencing_certificate_latency: HistogramVec,
64    pub sequencing_certificate_processed: IntCounterVec,
65    pub sequencing_in_flight_semaphore_wait: IntGauge,
66    pub sequencing_in_flight_submissions: IntGauge,
67    pub sequencing_best_effort_timeout: IntCounterVec,
68    pub consensus_latency: Histogram,
69    pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73    pub fn new(registry: &Registry) -> Self {
74        Self {
75            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76                "sequencing_certificate_attempt",
77                "Counts the number of certificates the validator attempts to sequence.",
78                &["tx_type"],
79                registry,
80            )
81                .unwrap(),
82            sequencing_certificate_success: register_int_counter_vec_with_registry!(
83                "sequencing_certificate_success",
84                "Counts the number of successfully sequenced certificates.",
85                &["tx_type"],
86                registry,
87            )
88                .unwrap(),
89            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_failures",
91                "Counts the number of sequenced certificates that failed other than by timeout.",
92                &["tx_type"],
93                registry,
94            )
95                .unwrap(),
96                sequencing_certificate_status: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_status",
98                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99                &["tx_type", "status"],
100                registry,
101            )
102                .unwrap(),
103            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104                "sequencing_certificate_inflight",
105                "The inflight requests to sequence certificates.",
106                &["tx_type"],
107                registry,
108            )
109                .unwrap(),
110            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111                "sequencing_acknowledge_latency",
112                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113                &["retry", "tx_type"],
114                LATENCY_SEC_BUCKETS.to_vec(),
115                registry,
116            ).unwrap(),
117            sequencing_certificate_latency: register_histogram_vec_with_registry!(
118                "sequencing_certificate_latency",
119                "The latency for sequencing a certificate.",
120                &["submitted", "tx_type", "processed_method"],
121                LATENCY_SEC_BUCKETS.to_vec(),
122                registry,
123            ).unwrap(),
124            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125                "sequencing_certificate_processed",
126                "The number of certificates that have been processed either by consensus or checkpoint.",
127                &["source"],
128                registry
129            ).unwrap(),
130            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131                "sequencing_in_flight_semaphore_wait",
132                "How many requests are blocked on submit_permit.",
133                registry,
134            )
135                .unwrap(),
136            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137                "sequencing_in_flight_submissions",
138                "Number of transactions submitted to local consensus instance and not yet sequenced",
139                registry,
140            )
141                .unwrap(),
142            sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143                "sequencing_best_effort_timeout",
144                "The number of times the best effort submission has timed out.",
145                &["tx_type"],
146                registry,
147            ).unwrap(),
148            // These two metrics originally lived in ValidatorServiceMetrics (authority_server.rs)
149            // and keep their legacy names for dashboard compatibility.
150            consensus_latency: register_histogram_with_registry!(
151                "validator_service_consensus_latency",
152                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154                registry,
155            ).unwrap(),
156            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157                "validator_service_num_rejected_cert_in_epoch_boundary",
158                "Number of rejected transaction certificate during epoch transitioning",
159                registry,
160            ).unwrap(),
161        }
162    }
163
164    pub fn new_test() -> Self {
165        Self::new(&Registry::default())
166    }
167}
168
169/// An object that can be used to check if the consensus is overloaded.
170pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171    fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178    fn submit_to_consensus(
179        &self,
180        transactions: &[ConsensusTransaction],
181        epoch_store: &Arc<AuthorityPerEpochStore>,
182    ) -> SuiResult;
183
184    /// Submits a system transaction to consensus once, without waiting for it to
185    /// be sequenced and without retrying if it is garbage collected, bounded by
186    /// `timeout`. Suits periodic, self-superseding messages (e.g. execution time
187    /// observations) where a missed submission is replaced by the next one.
188    ///
189    /// For system transactions only. User transactions are rejected:
190    /// this fire-and-forget, no-retry, backpressure-free path would
191    /// silently mishandle them.
192    fn submit_best_effort(
193        &self,
194        transaction: &ConsensusTransaction,
195        epoch_store: &Arc<AuthorityPerEpochStore>,
196        timeout: Duration,
197    ) -> SuiResult;
198}
199
200#[mockall::automock]
201#[async_trait::async_trait]
202pub trait ConsensusClient: Sync + Send + 'static {
203    async fn submit(
204        &self,
205        transactions: &[ConsensusTransaction],
206        epoch_store: &Arc<AuthorityPerEpochStore>,
207    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
208}
209
210/// Submit Sui certificates to the consensus.
211pub struct ConsensusAdapter {
212    /// The network client connecting to the consensus node of this authority.
213    consensus_client: Arc<dyn ConsensusClient>,
214    /// The checkpoint store for the validator
215    checkpoint_store: Arc<CheckpointStore>,
216    /// Authority pubkey.
217    authority: AuthorityName,
218    /// The limit to number of inflight transactions at this node.
219    max_pending_transactions: usize,
220    /// Number of submitted transactions still inflight at this node.
221    num_inflight_transactions: AtomicU64,
222    /// A structure to register metrics
223    metrics: ConsensusAdapterMetrics,
224    /// Semaphore limiting parallel submissions to consensus
225    submit_semaphore: Arc<Semaphore>,
226    /// Notified when an inflight slot is freed (`InflightDropGuard` dropped).
227    /// Used by the admission queue drainer to wake up and submit more
228    /// transactions.
229    inflight_slot_freed_notify: Arc<Notify>,
230}
231
232impl ConsensusAdapter {
233    /// Make a new Consensus adapter instance.
234    pub fn new(
235        consensus_client: Arc<dyn ConsensusClient>,
236        checkpoint_store: Arc<CheckpointStore>,
237        authority: AuthorityName,
238        max_pending_transactions: usize,
239        max_pending_local_submissions: usize,
240        metrics: ConsensusAdapterMetrics,
241        inflight_slot_freed_notify: Arc<Notify>,
242    ) -> Self {
243        let num_inflight_transactions = Default::default();
244        Self {
245            consensus_client,
246            checkpoint_store,
247            authority,
248            max_pending_transactions,
249            num_inflight_transactions,
250            metrics,
251            submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
252            inflight_slot_freed_notify,
253        }
254    }
255
256    /// Get the current number of in-flight transactions
257    pub fn num_inflight_transactions(&self) -> u64 {
258        self.num_inflight_transactions.load(Ordering::Relaxed)
259    }
260
261    /// Get the maximum number of pending transactions (consensus capacity limit).
262    pub fn max_pending_transactions(&self) -> usize {
263        self.max_pending_transactions
264    }
265
266    /// Submits transactions to consensus within the reconfiguration lock and
267    /// returns their consensus positions.
268    pub async fn submit_and_get_positions(
269        self: &Arc<Self>,
270        consensus_transactions: Vec<ConsensusTransaction>,
271        epoch_store: &Arc<AuthorityPerEpochStore>,
272        submitter_client_addr: Option<IpAddr>,
273    ) -> Result<Vec<ConsensusPosition>, SuiError> {
274        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
275
276        {
277            // code block within reconfiguration lock
278            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
279            if !reconfiguration_lock.should_accept_user_certs() {
280                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
281                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
282            }
283
284            // Submit to consensus and wait for the position. If the transaction has
285            // already been processed via consensus output or a checkpoint, the adapter
286            // skips submission and reports `TransactionProcessing` instead of a position.
287            let _metrics_guard = self.metrics.consensus_latency.start_timer();
288
289            self.submit_batch(
290                &consensus_transactions,
291                Some(&reconfiguration_lock),
292                epoch_store,
293                Some(tx_consensus_positions),
294                submitter_client_addr,
295            )?;
296        }
297
298        rx_consensus_positions.await.map_err(|e| {
299            SuiError::from(SuiErrorKind::FailedToSubmitToConsensus(format!(
300                "Failed to get consensus position: {e}"
301            )))
302        })?
303    }
304
305    pub fn recover_end_of_publish(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
306        // This handles the case where the node crashed after setting reconfig lock state
307        // but before the EndOfPublish message was sent to consensus.
308        if epoch_store.should_send_end_of_publish() {
309            let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
310            info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
311            self.submit_unchecked(&[transaction], epoch_store, None, None);
312        }
313    }
314
315    /// This method blocks until transaction is persisted in local database
316    /// It then returns handle to async task, user can join this handle to await while transaction is processed by consensus
317    ///
318    /// This method guarantees that once submit(but not returned async handle) returns,
319    /// transaction is persisted and will eventually be sent to consensus even after restart
320    ///
321    /// When submitting a certificate caller **must** provide a ReconfigState lock guard
322    pub fn submit(
323        self: &Arc<Self>,
324        transaction: ConsensusTransaction,
325        lock: Option<&RwLockReadGuard<ReconfigState>>,
326        epoch_store: &Arc<AuthorityPerEpochStore>,
327        tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
328        submitter_client_addr: Option<IpAddr>,
329    ) -> SuiResult<JoinHandle<()>> {
330        self.submit_batch(
331            &[transaction],
332            lock,
333            epoch_store,
334            tx_consensus_position,
335            submitter_client_addr,
336        )
337    }
338
339    // Submits the provided transactions to consensus in a batched fashion. The `transactions` vector can be also empty in case of a ping check.
340    // In this case the system will simulate a transaction submission to consensus and return the consensus position.
341    pub fn submit_batch(
342        self: &Arc<Self>,
343        transactions: &[ConsensusTransaction],
344        _lock: Option<&RwLockReadGuard<ReconfigState>>,
345        epoch_store: &Arc<AuthorityPerEpochStore>,
346        tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
347        submitter_client_addr: Option<IpAddr>,
348    ) -> SuiResult<JoinHandle<()>> {
349        if transactions.len() > 1 {
350            // Soft bundles must contain only UserTransactionV2 transactions.
351            for transaction in transactions {
352                fp_ensure!(
353                    transaction.is_user_transaction(),
354                    SuiErrorKind::InvalidTxKindInSoftBundle.into()
355                );
356            }
357        }
358
359        Ok(self.submit_unchecked(
360            transactions,
361            epoch_store,
362            tx_consensus_position,
363            submitter_client_addr,
364        ))
365    }
366
367    /// Performs weakly consistent checks on internal buffers to quickly
368    /// discard transactions if we are overloaded
369    fn check_limits(&self) -> bool {
370        // First check total transactions (waiting and in submission)
371        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
372            >= self.max_pending_transactions
373        {
374            return false;
375        }
376        // Then check if submit_semaphore has permits
377        self.submit_semaphore.available_permits() > 0
378    }
379
380    fn submit_unchecked(
381        self: &Arc<Self>,
382        transactions: &[ConsensusTransaction],
383        epoch_store: &Arc<AuthorityPerEpochStore>,
384        tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
385        submitter_client_addr: Option<IpAddr>,
386    ) -> JoinHandle<()> {
387        // Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
388        let async_stage = self
389            .clone()
390            .submit_and_wait(
391                transactions.to_vec(),
392                epoch_store.clone(),
393                tx_consensus_position,
394                submitter_client_addr,
395            )
396            .in_current_span();
397        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
398        // (Limit is not applied atomically, and only to user transactions.)
399        let join_handle = spawn_monitored_task!(async_stage);
400        join_handle
401    }
402
403    async fn submit_and_wait(
404        self: Arc<Self>,
405        transactions: Vec<ConsensusTransaction>,
406        epoch_store: Arc<AuthorityPerEpochStore>,
407        tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
408        submitter_client_addr: Option<IpAddr>,
409    ) {
410        // When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
411        //
412        // This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
413        // which may never happen on epoch boundary.
414        //
415        // In addition to that, within_alive_epoch ensures that all pending consensus
416        // adapter tasks are stopped before reconfiguration can proceed.
417        //
418        // This is essential because after epoch change, this validator may exit the committee and become a full node.
419        // So it is no longer able to submit to consensus.
420        //
421        // Also, submission to consensus is not gated on epoch. Although it is ok to submit user transactions
422        // to the new epoch, we want to cancel system transaction submissions from the current epoch to the new epoch.
423        epoch_store
424            .within_alive_epoch(self.submit_and_wait_inner(
425                transactions,
426                &epoch_store,
427                tx_consensus_position,
428                submitter_client_addr,
429            ))
430            .await
431            .ok(); // result here indicates if epoch ended earlier, we don't care about it
432    }
433
434    #[allow(clippy::option_map_unit_fn)]
435    #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
436    async fn submit_and_wait_inner(
437        self: Arc<Self>,
438        transactions: Vec<ConsensusTransaction>,
439        epoch_store: &Arc<AuthorityPerEpochStore>,
440        mut tx_consensus_positions: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
441        submitter_client_addr: Option<IpAddr>,
442    ) {
443        if transactions.is_empty() {
444            // If transactions are empty, then we attempt to ping consensus and simulate a transaction submission to consensus.
445            // We intentionally do not wait for the block status, as we are only interested in the consensus position and return it immediately.
446            debug!(
447                "Performing a ping check, pinging consensus to get a consensus position in next block"
448            );
449            let (consensus_positions, _status_waiter) = self
450                .submit_inner(&transactions, epoch_store, &[], "ping")
451                .await;
452
453            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
454                let _ = tx_consensus_positions.send(Ok(consensus_positions));
455            } else {
456                debug_fatal!("Ping check must have a consensus position channel");
457            }
458            return;
459        }
460
461        // Record submitted transactions early for DoS protection
462        for transaction in &transactions {
463            if let Some(tx) = transaction.kind.as_user_transaction() {
464                let amplification_factor = (tx.data().transaction_data().gas_price()
465                    / epoch_store.reference_gas_price().max(1))
466                .max(1);
467                epoch_store.submitted_transaction_cache.record_submitted_tx(
468                    tx.digest(),
469                    amplification_factor as u32,
470                    submitter_client_addr,
471                );
472            }
473        }
474
475        // Current code path ensures:
476        // - If transactions.len() > 1, it is a soft bundle. System transactions should have been submitted individually.
477        // - If is_soft_bundle, then all transactions are of CertifiedTransaction or UserTransaction kind.
478        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and transactions[0] can be of any kind.
479        let is_soft_bundle = transactions.len() > 1;
480        let is_system_message = !transactions[0].is_user_transaction();
481
482        let mut transaction_keys = Vec::new();
483        let mut tx_consensus_positions = tx_consensus_positions;
484
485        for transaction in &transactions {
486            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
487                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
488                epoch_store.record_epoch_pending_certs_process_time_metric();
489            }
490
491            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
492            transaction_keys.push(transaction_key);
493        }
494        let tx_type = if is_soft_bundle {
495            "soft_bundle"
496        } else {
497            classify(&transactions[0])
498        };
499        tracing::Span::current().record("tx_type", tx_type);
500        tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
501
502        let mut guard = InflightDropGuard::acquire(&self, tx_type, transactions.len() as u64);
503
504        // Builds the error reported to a position-waiting caller (mfp) when the
505        // transaction is already being processed and we therefore skip (re)submission.
506        // The caller surfaces this as a retriable error so the client waits for
507        // effects / retries instead of receiving a meaningless consensus position.
508        let make_processing_error = |method: ProcessedMethod| -> SuiError {
509            let digest = transactions
510                .iter()
511                .find_map(|t| t.kind.as_user_transaction().map(|tx| *tx.digest()))
512                .unwrap_or_default();
513            SuiErrorKind::TransactionProcessing {
514                digest,
515                status: format!("processed via {}", method.processed_via()),
516            }
517            .into()
518        };
519
520        // Skip submission if the tx is already processed via consensus output or
521        // checkpoint state sync.
522        let already_processed =
523            self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store);
524        if let Some(method) = already_processed {
525            guard.processed_method = method;
526            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
527                let _ = tx_consensus_positions.send(Err(make_processing_error(method)));
528            }
529        }
530
531        // Log warnings for administrative transactions that fail to get sequenced
532        let _monitor = if matches!(
533            transactions[0].kind,
534            ConsensusTransactionKind::EndOfPublish(_)
535                | ConsensusTransactionKind::CapabilityNotification(_)
536                | ConsensusTransactionKind::CapabilityNotificationV2(_)
537                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
538                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
539        ) {
540            assert!(
541                !is_soft_bundle,
542                "System transactions should have been submitted individually"
543            );
544            let transaction_keys = transaction_keys.clone();
545            Some(CancelOnDrop(spawn_monitored_task!(async {
546                let mut i = 0u64;
547                loop {
548                    i += 1;
549                    const WARN_DELAY_S: u64 = 30;
550                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
551                    let total_wait = i * WARN_DELAY_S;
552                    warn!(
553                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
554                        total_wait, transaction_keys
555                    );
556                }
557            })))
558        } else {
559            None
560        };
561
562        if already_processed.is_none() {
563            debug!("Submitting {:?} to consensus", transaction_keys);
564            guard.submitted = true;
565
566            // System messages (checkpoint signatures, EndOfPublish, capability
567            // notifications, randomness DKG, etc.) are not buffered behind user
568            // tx; they are excluded from the semaphore.
569            let _permit: Option<SemaphorePermit> = if is_system_message {
570                None
571            } else {
572                Some(
573                    self.submit_semaphore
574                        .acquire()
575                        .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
576                        .await
577                        .expect("Consensus adapter does not close semaphore"),
578                )
579            };
580            let _in_flight_submission_guard =
581                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
582
583            // Submit the transaction to consensus, racing against the processed waiter in
584            // case another validator sequences the transaction first.
585            let submit_fut = async {
586                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
587
588                loop {
589                    // Submit the transaction to consensus and return the submit result with a status waiter
590                    let (consensus_positions, status_waiter) = self
591                        .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
592                        .await;
593
594                    if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
595                        tracing::Span::current().record(
596                            "consensus_positions",
597                            tracing::field::debug(&consensus_positions),
598                        );
599                        // We send the first consensus position returned by consensus
600                        // to the submitting client even if it is retried internally within
601                        // consensus adapter due to an error or GC. They can handle retries
602                        // as needed if the consensus position does not return the desired
603                        // results (e.g. not sequenced due to garbage collection).
604                        let _ = tx_consensus_positions.send(Ok(consensus_positions));
605                    }
606
607                    match status_waiter.await {
608                        Ok(status @ BlockStatus::Sequenced(_)) => {
609                            tracing::Span::current()
610                                .record("status", tracing::field::debug(&status));
611                            self.metrics
612                                .sequencing_certificate_status
613                                .with_label_values(&[tx_type, "sequenced"])
614                                .inc();
615                            // Block has been sequenced. Nothing more to do, we do have guarantees that the transaction will appear in consensus output.
616                            debug!(
617                                "Transaction {transaction_keys:?} has been sequenced by consensus."
618                            );
619                            break;
620                        }
621                        Ok(status @ BlockStatus::GarbageCollected(_)) => {
622                            tracing::Span::current()
623                                .record("status", tracing::field::debug(&status));
624                            self.metrics
625                                .sequencing_certificate_status
626                                .with_label_values(&[tx_type, "garbage_collected"])
627                                .inc();
628                            // Block has been garbage collected and we have no guarantees that the transaction will appear in consensus output. We'll
629                            // resubmit the transaction to consensus. If the transaction has been already "processed", then probably someone else has submitted
630                            // the transaction and managed to get sequenced. Then this future will have been cancelled anyways so no need to check here on the processed output.
631                            debug!(
632                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
633                            );
634                            time::sleep(RETRY_DELAY_STEP).await;
635                            continue;
636                        }
637                        Err(err) => {
638                            warn!(
639                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
640                                err
641                            );
642                            time::sleep(RETRY_DELAY_STEP).await;
643                            continue;
644                        }
645                    }
646                }
647            };
648
649            // Race `processed_notify` against the submit loop. If the tx is
650            // processed via another path (consensus output from another
651            // validator's submission, or checkpoint state sync) while we're
652            // inside the submit loop, the submission future is dropped and
653            // the retry loop is cancelled cleanly.
654            let processed_waiter = self
655                .processed_notify(transaction_keys.clone(), epoch_store)
656                .boxed();
657            let processed_via_notify;
658            guard.processed_method = match select(processed_waiter, submit_fut.boxed()).await {
659                Either::Left((observed, _submit_fut)) => {
660                    processed_via_notify = true;
661                    observed
662                }
663                Either::Right(((), processed_waiter)) => {
664                    debug!("Submitted {transaction_keys:?} to consensus");
665                    processed_via_notify = false;
666                    processed_waiter.await
667                }
668            };
669            // If processing was observed before a position was sent to a waiting caller,
670            // report that the transaction is already processing so the caller returns a
671            // retriable error. If a position was already sent, the channel is taken and this
672            // is a no-op.
673            if processed_via_notify
674                && let Some(tx_consensus_positions) = tx_consensus_positions.take()
675            {
676                let _ =
677                    tx_consensus_positions.send(Err(make_processing_error(guard.processed_method)));
678            }
679        }
680        debug!(
681            "{transaction_keys:?} processed via {}",
682            guard.processed_method.processed_via()
683        );
684
685        // After a user transaction or soft bundle submission,
686        // send EndOfPublish if the epoch is closing.
687        // EndOfPublish can also be sent during consensus commit handling, checkpoint execution and recovery.
688        if transactions[0].is_user_transaction()
689            && epoch_store.should_send_end_of_publish()
690            && !epoch_store.protocol_config().timestamp_based_epoch_close()
691        {
692            // sending message outside of any locks scope
693            if let Err(err) = self.submit(
694                ConsensusTransaction::new_end_of_publish(self.authority),
695                None,
696                epoch_store,
697                None,
698                None,
699            ) {
700                warn!("Error when sending end of publish message: {:?}", err);
701            } else {
702                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
703            }
704        }
705        self.metrics
706            .sequencing_certificate_success
707            .with_label_values(&[tx_type])
708            .inc();
709    }
710
711    #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
712    async fn submit_inner(
713        self: &Arc<Self>,
714        transactions: &[ConsensusTransaction],
715        epoch_store: &Arc<AuthorityPerEpochStore>,
716        transaction_keys: &[SequencedConsensusTransactionKey],
717        tx_type: &str,
718    ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
719        let ack_start = Instant::now();
720        let mut retries: u32 = 0;
721        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
722            Duration::from_millis(100),
723            Duration::from_secs(10),
724        );
725
726        let (consensus_positions, status_waiter) = loop {
727            let span = debug_span!("client_submit");
728            match self
729                .consensus_client
730                .submit(transactions, epoch_store)
731                .instrument(span)
732                .await
733            {
734                Err(err) => {
735                    // This can happen during reconfig, so keep retrying until succeed.
736                    if cfg!(msim) || retries > 3 {
737                        warn!(
738                            "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
739                        );
740                    }
741                    self.metrics
742                        .sequencing_certificate_failures
743                        .with_label_values(&[tx_type])
744                        .inc();
745                    retries += 1;
746
747                    time::sleep(backoff.next().unwrap()).await;
748                }
749                Ok((consensus_positions, status_waiter)) => {
750                    break (consensus_positions, status_waiter);
751                }
752            }
753        };
754
755        // we want to record the num of retries when reporting latency but to avoid label
756        // cardinality we do some simple bucketing to give us a good enough idea of how
757        // many retries happened associated with the latency.
758        let bucket = match retries {
759            0..=10 => retries.to_string(), // just report the retry count as is
760            11..=20 => "between_10_and_20".to_string(),
761            21..=50 => "between_20_and_50".to_string(),
762            51..=100 => "between_50_and_100".to_string(),
763            _ => "over_100".to_string(),
764        };
765
766        self.metrics
767            .sequencing_acknowledge_latency
768            .with_label_values(&[bucket.as_str(), tx_type])
769            .observe(ack_start.elapsed().as_secs_f64());
770
771        (consensus_positions, status_waiter)
772    }
773
774    /// Sync check for whether `transaction_keys` are already processed via
775    /// consensus output or checkpoint state sync. Returns `Some(method)` if
776    /// every key is already processed (Checkpoint dominates when any key was
777    /// processed via checkpoint or synced-checkpoint), else `None`.
778    ///
779    /// Also increments `sequencing_certificate_processed` with the matching
780    /// label for each key found processed, mirroring what `processed_notify`
781    /// emits for its async wake-ups.
782    fn check_processed_via_consensus_or_checkpoint(
783        self: &Arc<Self>,
784        transaction_keys: &[SequencedConsensusTransactionKey],
785        epoch_store: &Arc<AuthorityPerEpochStore>,
786    ) -> Option<ProcessedMethod> {
787        let mut seen_checkpoint = false;
788        for transaction_key in transaction_keys {
789            // Check consensus-processed first; if already visible in consensus
790            // output we don't need to submit again.
791            if epoch_store
792                .is_consensus_message_processed(transaction_key)
793                .expect("Storage error when checking consensus message processed")
794            {
795                self.metrics
796                    .sequencing_certificate_processed
797                    .with_label_values(&["consensus"])
798                    .inc();
799                continue;
800            }
801
802            // For a cert-shaped key, check whether state sync executed the tx
803            // via a checkpoint.
804            if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
805                digest,
806            )) = transaction_key
807                && epoch_store
808                    .is_transaction_executed_in_checkpoint(digest)
809                    .expect("Storage error when checking transaction executed in checkpoint")
810            {
811                self.metrics
812                    .sequencing_certificate_processed
813                    .with_label_values(&["checkpoint"])
814                    .inc();
815                seen_checkpoint = true;
816                continue;
817            }
818
819            // For a checkpoint-signature key, check whether a checkpoint at
820            // or above the target sequence number has already been synced —
821            // in which case the signature is redundant.
822            if let SequencedConsensusTransactionKey::External(
823                ConsensusTransactionKey::CheckpointSignature(_, seq)
824                | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
825            ) = transaction_key
826                && let Some(synced_seq) = self
827                    .checkpoint_store
828                    .get_highest_synced_checkpoint_seq_number()
829                    .expect("Storage error when reading highest synced checkpoint")
830                && synced_seq >= *seq
831            {
832                self.metrics
833                    .sequencing_certificate_processed
834                    .with_label_values(&["synced_checkpoint"])
835                    .inc();
836                seen_checkpoint = true;
837                continue;
838            }
839
840            // Not processed via any path — caller must submit.
841            return None;
842        }
843
844        if seen_checkpoint {
845            Some(ProcessedMethod::Checkpoint)
846        } else {
847            Some(ProcessedMethod::Consensus)
848        }
849    }
850
851    /// Async wait for any of `transaction_keys` to become processed via
852    /// consensus output or a checkpoint (either state-synced or executed
853    /// locally). Used in the in-flight race against submission: cancelling
854    /// the submit future when we learn the tx is processed by another path.
855    /// Returns `Checkpoint` if any key resolves via a checkpoint path, else
856    /// `Consensus`.
857    async fn processed_notify(
858        self: &Arc<Self>,
859        transaction_keys: Vec<SequencedConsensusTransactionKey>,
860        epoch_store: &Arc<AuthorityPerEpochStore>,
861    ) -> ProcessedMethod {
862        let notifications = FuturesUnordered::new();
863        for transaction_key in transaction_keys {
864            let transaction_digests = match transaction_key {
865                SequencedConsensusTransactionKey::External(
866                    ConsensusTransactionKey::Certificate(digest),
867                ) => vec![digest],
868                _ => vec![],
869            };
870
871            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
872                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
873                | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
874            ) = transaction_key
875            {
876                // If the transaction is a checkpoint signature, we can also wait to get notified when a checkpoint with equal or higher sequence
877                // number has been already synced. This way we don't try to unnecessarily sequence the signature for an already verified checkpoint.
878                Either::Left(
879                    self.checkpoint_store
880                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
881                )
882            } else {
883                Either::Right(future::pending())
884            };
885
886            // We wait for each transaction individually to be processed by consensus or executed in a checkpoint. We could equally just
887            // get notified in aggregate when all transactions are processed, but with this approach can get notified in a more fine-grained way
888            // as transactions can be marked as processed in different ways. This is mostly a concern for the soft-bundle transactions.
889            notifications.push(async move {
890                tokio::select! {
891                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
892                        processed.expect("Storage error when waiting for consensus message processed");
893                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
894                        return ProcessedMethod::Consensus;
895                    },
896                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
897                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
898                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
899                    }
900                    _ = checkpoint_synced_future => {
901                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
902                    }
903                }
904                ProcessedMethod::Checkpoint
905            });
906        }
907
908        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
909        for method in processed_methods {
910            if method == ProcessedMethod::Checkpoint {
911                return ProcessedMethod::Checkpoint;
912            }
913        }
914        ProcessedMethod::Consensus
915    }
916}
917
918impl ConsensusOverloadChecker for ConsensusAdapter {
919    fn check_consensus_overload(&self) -> SuiResult {
920        fp_ensure!(
921            self.check_limits(),
922            SuiErrorKind::TooManyTransactionsPendingConsensus.into()
923        );
924        Ok(())
925    }
926}
927
928pub struct NoopConsensusOverloadChecker {}
929
930impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
931    fn check_consensus_overload(&self) -> SuiResult {
932        Ok(())
933    }
934}
935
936impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
937    /// This method is called externally to begin reconfiguration
938    /// It sets reconfig state to reject new certificates from user.
939    /// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained.
940    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
941        {
942            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
943            if !reconfig_guard.should_accept_user_certs() {
944                // Allow caller to call this method multiple times
945                return;
946            }
947            epoch_store.close_user_certs(reconfig_guard);
948        }
949        if epoch_store.should_send_end_of_publish() {
950            if let Err(err) = self.submit(
951                ConsensusTransaction::new_end_of_publish(self.authority),
952                None,
953                epoch_store,
954                None,
955                None,
956            ) {
957                warn!("Error when sending end of publish message: {:?}", err);
958            } else {
959                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
960            }
961        }
962    }
963}
964
965struct CancelOnDrop<T>(JoinHandle<T>);
966
967impl<T> Deref for CancelOnDrop<T> {
968    type Target = JoinHandle<T>;
969
970    fn deref(&self) -> &Self::Target {
971        &self.0
972    }
973}
974
975impl<T> Drop for CancelOnDrop<T> {
976    fn drop(&mut self) {
977        self.0.abort();
978    }
979}
980
981/// Tracks number of inflight consensus requests and relevant metrics
982struct InflightDropGuard<'a> {
983    adapter: &'a ConsensusAdapter,
984    start: Instant,
985    submitted: bool,
986    tx_type: &'static str,
987    processed_method: ProcessedMethod,
988    /// Number of transactions this guard accounts for.
989    /// > 1 for soft bundles.
990    inflight_count: u64,
991}
992
993#[derive(Copy, Clone, PartialEq, Eq)]
994enum ProcessedMethod {
995    Consensus,
996    Checkpoint,
997}
998
999impl ProcessedMethod {
1000    fn processed_via(self) -> &'static str {
1001        match self {
1002            ProcessedMethod::Consensus => "consensus output",
1003            ProcessedMethod::Checkpoint => "checkpoint execution",
1004        }
1005    }
1006
1007    fn latency_metric_label(self) -> &'static str {
1008        match self {
1009            ProcessedMethod::Consensus => "processed_via_consensus",
1010            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1011        }
1012    }
1013}
1014
1015impl<'a> InflightDropGuard<'a> {
1016    pub fn acquire(
1017        adapter: &'a ConsensusAdapter,
1018        tx_type: &'static str,
1019        inflight_count: u64,
1020    ) -> Self {
1021        adapter
1022            .num_inflight_transactions
1023            .fetch_add(inflight_count, Ordering::SeqCst);
1024        adapter
1025            .metrics
1026            .sequencing_certificate_inflight
1027            .with_label_values(&[tx_type])
1028            .inc();
1029        adapter
1030            .metrics
1031            .sequencing_certificate_attempt
1032            .with_label_values(&[tx_type])
1033            .inc();
1034        Self {
1035            adapter,
1036            start: Instant::now(),
1037            submitted: false,
1038            tx_type,
1039            processed_method: ProcessedMethod::Consensus,
1040            inflight_count,
1041        }
1042    }
1043}
1044
1045impl Drop for InflightDropGuard<'_> {
1046    fn drop(&mut self) {
1047        self.adapter
1048            .num_inflight_transactions
1049            .fetch_sub(self.inflight_count, Ordering::SeqCst);
1050        self.adapter
1051            .metrics
1052            .sequencing_certificate_inflight
1053            .with_label_values(&[self.tx_type])
1054            .dec();
1055        // Wake the admission queue drainer so it can submit more transactions.
1056        self.adapter.inflight_slot_freed_notify.notify_one();
1057
1058        let latency = self.start.elapsed();
1059        let submitted = if self.submitted {
1060            "submitted"
1061        } else {
1062            "skipped"
1063        };
1064
1065        self.adapter
1066            .metrics
1067            .sequencing_certificate_latency
1068            .with_label_values(&[
1069                submitted,
1070                self.tx_type,
1071                self.processed_method.latency_metric_label(),
1072            ])
1073            .observe(latency.as_secs_f64());
1074    }
1075}
1076
1077impl SubmitToConsensus for Arc<ConsensusAdapter> {
1078    fn submit_to_consensus(
1079        &self,
1080        transactions: &[ConsensusTransaction],
1081        epoch_store: &Arc<AuthorityPerEpochStore>,
1082    ) -> SuiResult {
1083        self.submit_batch(transactions, None, epoch_store, None, None)
1084            .map(|_| ())
1085    }
1086
1087    fn submit_best_effort(
1088        &self,
1089        transaction: &ConsensusTransaction,
1090        epoch_store: &Arc<AuthorityPerEpochStore>,
1091        // timeout is required, or the spawned task can run forever
1092        timeout: Duration,
1093    ) -> SuiResult {
1094        if transaction.is_user_transaction() {
1095            debug_fatal!("submit_best_effort called with a user transaction");
1096            return Err(SuiErrorKind::GenericAuthorityError {
1097                error: "submit_best_effort does not accept user transactions".to_string(),
1098            }
1099            .into());
1100        }
1101
1102        // There is no submit semaphone on this path as it services system msgs only.
1103        let _in_flight_submission_guard =
1104            GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1105
1106        let key = SequencedConsensusTransactionKey::External(transaction.key());
1107        let tx_type = classify(transaction);
1108
1109        let async_stage = {
1110            let transaction = transaction.clone();
1111            let epoch_store = epoch_store.clone();
1112            let this = self.clone();
1113
1114            async move {
1115                let result = tokio::time::timeout(
1116                    timeout,
1117                    this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1118                )
1119                .await;
1120
1121                if let Err(e) = result {
1122                    warn!("Consensus submission timed out: {e:?}");
1123                    this.metrics
1124                        .sequencing_best_effort_timeout
1125                        .with_label_values(&[tx_type])
1126                        .inc();
1127                }
1128            }
1129        };
1130
1131        let epoch_store = epoch_store.clone();
1132        spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1133        Ok(())
1134    }
1135}