sui_core/quorum_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod metrics;
5pub use metrics::*;
6
7pub mod reconfig_observer;
8
9use arc_swap::ArcSwap;
10use std::fmt::{Debug, Formatter};
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_types::base_types::TransactionDigest;
15use sui_types::committee::{Committee, EpochId};
16use sui_types::messages_grpc::{HandleCertificateRequestV3, TxType};
17use sui_types::quorum_driver_types::{
18    ExecuteTransactionRequestV3, QuorumDriverEffectsQueueResult, QuorumDriverError,
19    QuorumDriverResponse, QuorumDriverResult,
20};
21use tap::TapFallible;
22use tokio::sync::Semaphore;
23use tokio::time::{Instant, sleep_until};
24
25use tokio::sync::mpsc::{self, Receiver, Sender};
26use tokio::task::JoinHandle;
27use tracing::{debug, error, info, instrument, trace_span, warn};
28
29use crate::authority_aggregator::{
30    AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
31    ProcessTransactionResult,
32};
33use crate::authority_client::AuthorityAPI;
34use mysten_common::sync::notify_read::{NotifyRead, Registration};
35use mysten_metrics::{GaugeGuard, spawn_monitored_task};
36use std::fmt::Write;
37use sui_macros::fail_point;
38use sui_types::error::{SuiErrorKind, SuiResult};
39use sui_types::transaction::{CertifiedTransaction, Transaction};
40
41use self::reconfig_observer::ReconfigObserver;
42
43#[cfg(test)]
44mod tests;
45
46const TASK_QUEUE_SIZE: usize = 2000;
47const EFFECTS_QUEUE_SIZE: usize = 10000;
48const TX_MAX_RETRY_TIMES: u32 = 10;
49
50pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
51    fn epoch(&self) -> EpochId;
52    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
53    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
54}
55
56#[derive(Clone)]
57pub struct QuorumDriverTask {
58    pub request: ExecuteTransactionRequestV3,
59    pub tx_cert: Option<CertifiedTransaction>,
60    pub retry_times: u32,
61    pub next_retry_after: Instant,
62    pub client_addr: Option<SocketAddr>,
63    pub trace_span: Option<tracing::Span>,
64}
65
66impl Debug for QuorumDriverTask {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        let mut writer = String::new();
69        write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
70        write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
71        write!(writer, "retry_times={} ", self.retry_times)?;
72        write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
73        write!(f, "{}", writer)
74    }
75}
76
77pub struct QuorumDriver<A: Clone> {
78    validators: ArcSwap<AuthorityAggregator<A>>,
79    task_sender: Sender<QuorumDriverTask>,
80    effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
81    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
82    metrics: Arc<QuorumDriverMetrics>,
83    max_retry_times: u32,
84}
85
86impl<A: Clone> QuorumDriver<A> {
87    pub(crate) fn new(
88        validators: ArcSwap<AuthorityAggregator<A>>,
89        task_sender: Sender<QuorumDriverTask>,
90        effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
91        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
92        metrics: Arc<QuorumDriverMetrics>,
93        max_retry_times: u32,
94    ) -> Self {
95        Self {
96            validators,
97            task_sender,
98            effects_subscribe_sender,
99            notifier,
100            metrics,
101            max_retry_times,
102        }
103    }
104
105    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
106        &self.validators
107    }
108
109    pub fn clone_committee(&self) -> Arc<Committee> {
110        self.validators.load().committee.clone()
111    }
112
113    pub fn current_epoch(&self) -> EpochId {
114        self.validators.load().committee.epoch
115    }
116
117    async fn enqueue_task(&self, task: QuorumDriverTask) -> SuiResult<()> {
118        self.task_sender
119            .send(task.clone())
120            .await
121            .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
122            .tap_ok(|_| {
123                debug!(?task, "Enqueued task.");
124                self.metrics.current_requests_in_flight.inc();
125                self.metrics.total_enqueued.inc();
126                if task.retry_times > 0 {
127                    if task.retry_times == 1 {
128                        self.metrics.current_transactions_in_retry.inc();
129                    }
130                    self.metrics
131                        .transaction_retry_count
132                        .observe(task.retry_times as f64);
133                }
134            })
135            .map_err(|e| {
136                SuiErrorKind::QuorumDriverCommunicationError {
137                    error: e.to_string(),
138                }
139                .into()
140            })
141    }
142
143    /// Enqueue the task again if it hasn't maxed out the total retry attempts.
144    /// If it has, notify failure.
145    async fn enqueue_again_maybe(
146        &self,
147        request: ExecuteTransactionRequestV3,
148        tx_cert: Option<CertifiedTransaction>,
149        old_retry_times: u32,
150        client_addr: Option<SocketAddr>,
151    ) -> SuiResult<()> {
152        if old_retry_times >= self.max_retry_times {
153            // max out the retry times, notify failure
154            info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
155            self.notify(
156                &request.transaction,
157                &Err(
158                    QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
159                        total_attempts: old_retry_times + 1,
160                    },
161                ),
162                old_retry_times + 1,
163            );
164            return Ok(());
165        }
166        self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
167            .await
168    }
169
170    /// Performs exponential backoff and enqueue the `transaction` to the execution queue.
171    /// When `min_backoff_duration` is provided, the backoff duration will be at least `min_backoff_duration`.
172    async fn backoff_and_enqueue(
173        &self,
174        request: ExecuteTransactionRequestV3,
175        tx_cert: Option<CertifiedTransaction>,
176        old_retry_times: u32,
177        client_addr: Option<SocketAddr>,
178        min_backoff_duration: Option<Duration>,
179    ) -> SuiResult<()> {
180        let next_retry_after = Instant::now()
181            + Duration::from_millis(200 * u64::pow(2, old_retry_times))
182                .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
183        sleep_until(next_retry_after).await;
184
185        fail_point!("count_retry_times");
186
187        let tx_cert = match tx_cert {
188            // TxCert is only valid when its epoch matches current epoch.
189            // Note, it's impossible that TxCert's epoch is larger than current epoch
190            // because the TxCert will be considered invalid and cannot reach here.
191            Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
192            _other => None,
193        };
194
195        self.enqueue_task(QuorumDriverTask {
196            request,
197            tx_cert,
198            retry_times: old_retry_times + 1,
199            next_retry_after,
200            client_addr,
201            trace_span: Some(tracing::Span::current()),
202        })
203        .await
204    }
205
206    pub fn notify(
207        &self,
208        transaction: &Transaction,
209        response: &QuorumDriverResult,
210        total_attempts: u32,
211    ) {
212        let tx_digest = transaction.digest();
213        let effects_queue_result = match &response {
214            Ok(resp) => {
215                self.metrics.total_ok_responses.inc();
216                self.metrics
217                    .attempt_times_ok_response
218                    .observe(total_attempts as f64);
219                Ok((transaction.clone(), resp.clone()))
220            }
221            Err(err) => {
222                self.metrics
223                    .total_err_responses
224                    .with_label_values(&[err.as_ref()])
225                    .inc();
226                Err((*tx_digest, err.clone()))
227            }
228        };
229        if total_attempts > 1 {
230            self.metrics.current_transactions_in_retry.dec();
231        }
232        // On fullnode we expect the send to always succeed because TransactionOrchestrator should be subscribing
233        // to this queue all the time. However the if QuorumDriver is used elsewhere log may be noisy.
234        if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
235            warn!(?tx_digest, "No subscriber found for effects: {}", err);
236        }
237        debug!(?tx_digest, "notify QuorumDriver task result");
238        self.notifier.notify(tx_digest, response);
239    }
240}
241
242impl<A> QuorumDriver<A>
243where
244    A: AuthorityAPI + Send + Sync + 'static + Clone,
245{
246    #[instrument(level = "trace", skip_all)]
247    pub async fn submit_transaction(
248        &self,
249        request: ExecuteTransactionRequestV3,
250    ) -> SuiResult<Registration<'_, TransactionDigest, QuorumDriverResult>> {
251        let tx_digest = request.transaction.digest();
252        debug!(?tx_digest, "Received transaction execution request.");
253        self.metrics.total_requests.inc();
254
255        let ticket = self.notifier.register_one(tx_digest);
256        self.enqueue_task(QuorumDriverTask {
257            request,
258            tx_cert: None,
259            retry_times: 0,
260            next_retry_after: Instant::now(),
261            client_addr: None,
262            trace_span: Some(tracing::Span::current()),
263        })
264        .await?;
265        Ok(ticket)
266    }
267
268    // Used when the it is called in a component holding the notifier, and a ticket is
269    // already obtained prior to calling this function, for instance, TransactionOrchestrator
270    #[instrument(level = "trace", skip_all)]
271    pub async fn submit_transaction_no_ticket(
272        &self,
273        request: ExecuteTransactionRequestV3,
274        client_addr: Option<SocketAddr>,
275    ) -> SuiResult<()> {
276        let tx_digest = request.transaction.digest();
277        debug!(
278            ?tx_digest,
279            "Received transaction execution request, no ticket."
280        );
281        self.metrics.total_requests.inc();
282
283        self.enqueue_task(QuorumDriverTask {
284            request,
285            tx_cert: None,
286            retry_times: 0,
287            next_retry_after: Instant::now(),
288            client_addr,
289            trace_span: Some(tracing::Span::current()),
290        })
291        .await
292    }
293
294    #[instrument(level = "trace", skip_all)]
295    pub(crate) async fn process_transaction(
296        &self,
297        transaction: Transaction,
298        client_addr: Option<SocketAddr>,
299    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
300        let auth_agg = self.validators.load();
301        let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
302        let tx_digest = *transaction.digest();
303        let result = auth_agg.process_transaction(transaction, client_addr).await;
304
305        self.process_transaction_result(result, tx_digest).await
306    }
307
308    #[instrument(level = "trace", skip_all)]
309    async fn process_transaction_result(
310        &self,
311        result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
312        tx_digest: TransactionDigest,
313    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
314        match result {
315            Ok(resp) => Ok(resp),
316
317            Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
318                errors,
319                conflicting_tx_digests,
320            }) => {
321                debug!(
322                    ?errors,
323                    "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
324                );
325                Err(Some(QuorumDriverError::ObjectsDoubleUsed {
326                    conflicting_txes: conflicting_tx_digests,
327                }))
328            }
329
330            Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
331                debug!(?tx_digest, ?errors, "Nonretryable transaction error");
332                Err(Some(QuorumDriverError::NonRecoverableTransactionError {
333                    errors,
334                }))
335            }
336
337            Err(AggregatorProcessTransactionError::SystemOverload {
338                overloaded_stake,
339                errors,
340            }) => {
341                debug!(?tx_digest, ?errors, "System overload");
342                Err(Some(QuorumDriverError::SystemOverload {
343                    overloaded_stake,
344                    errors,
345                }))
346            }
347
348            Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
349                overload_stake,
350                errors,
351                retry_after_secs,
352            }) => {
353                self.metrics.total_retryable_overload_errors.inc();
354                debug!(
355                    ?tx_digest,
356                    ?errors,
357                    "System overload and retry after secs {retry_after_secs}",
358                );
359                Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
360                    overload_stake,
361                    errors,
362                    retry_after_secs,
363                }))
364            }
365
366            Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
367                debug!(?tx_digest, ?errors, "Retryable transaction error");
368                Err(None)
369            }
370
371            Err(
372                AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
373            ) => {
374                debug!(
375                    ?tx_digest,
376                    "Transaction is already finalized with different user signatures"
377                );
378                Err(Some(
379                    QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
380                ))
381            }
382        }
383    }
384
385    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
386    pub(crate) async fn process_certificate(
387        &self,
388        request: HandleCertificateRequestV3,
389        client_addr: Option<SocketAddr>,
390    ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
391        let auth_agg = self.validators.load();
392        let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
393        let tx_digest = *request.certificate.digest();
394        let response = auth_agg
395            .process_certificate(request.clone(), client_addr)
396            .await
397            .map_err(|agg_err| match agg_err {
398                AggregatorProcessCertificateError::FatalExecuteCertificate {
399                    non_retryable_errors,
400                } => {
401                    // Normally a certificate shouldn't have fatal errors.
402                    error!(
403                        ?tx_digest,
404                        ?non_retryable_errors,
405                        "[WATCHOUT] Unexpected Fatal error for certificate"
406                    );
407                    Some(QuorumDriverError::NonRecoverableTransactionError {
408                        errors: non_retryable_errors,
409                    })
410                }
411                AggregatorProcessCertificateError::RetryableExecuteCertificate {
412                    retryable_errors,
413                } => {
414                    debug!(?retryable_errors, "Retryable certificate");
415                    None
416                }
417            })?;
418
419        Ok(response)
420    }
421}
422
423impl<A> AuthorityAggregatorUpdatable<A> for QuorumDriver<A>
424where
425    A: AuthorityAPI + Send + Sync + 'static + Clone,
426{
427    fn epoch(&self) -> EpochId {
428        self.validators.load().committee.epoch
429    }
430
431    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
432        self.validators.load_full()
433    }
434
435    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
436        info!(
437            "Quorum Driver updating AuthorityAggregator with committee {}",
438            new_authorities.committee
439        );
440        self.validators.store(new_authorities);
441    }
442}
443
444pub struct QuorumDriverHandler<A: Clone> {
445    quorum_driver: Arc<QuorumDriver<A>>,
446    effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
447    quorum_driver_metrics: Arc<QuorumDriverMetrics>,
448    reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
449    _processor_handle: JoinHandle<()>,
450}
451
452impl<A> QuorumDriverHandler<A>
453where
454    A: AuthorityAPI + Send + Sync + 'static + Clone,
455{
456    pub(crate) fn new(
457        validators: Arc<AuthorityAggregator<A>>,
458        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
459        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
460        metrics: Arc<QuorumDriverMetrics>,
461        max_retry_times: u32,
462    ) -> Self {
463        let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
464        let (subscriber_tx, subscriber_rx) =
465            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
466        let quorum_driver = Arc::new(QuorumDriver::new(
467            ArcSwap::new(validators),
468            task_tx,
469            subscriber_tx,
470            notifier,
471            metrics.clone(),
472            max_retry_times,
473        ));
474        let metrics_clone = metrics.clone();
475        let processor_handle = {
476            let quorum_driver_clone = quorum_driver.clone();
477            spawn_monitored_task!(Self::task_queue_processor(
478                quorum_driver_clone,
479                task_rx,
480                metrics_clone
481            ))
482        };
483        let reconfig_observer_clone = reconfig_observer.clone();
484        {
485            let quorum_driver_clone = quorum_driver.clone();
486            spawn_monitored_task!({
487                async move {
488                    let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
489                    reconfig_observer_clone.run(quorum_driver_clone).await;
490                }
491            });
492        };
493        Self {
494            quorum_driver,
495            effects_subscriber: subscriber_rx,
496            quorum_driver_metrics: metrics,
497            reconfig_observer,
498            _processor_handle: processor_handle,
499        }
500    }
501
502    // Used when the it is called in a component holding the notifier, and a ticket is
503    // already obtained prior to calling this function, for instance, TransactionOrchestrator
504    pub async fn submit_transaction_no_ticket(
505        &self,
506        request: ExecuteTransactionRequestV3,
507        client_addr: Option<SocketAddr>,
508    ) -> SuiResult<()> {
509        self.quorum_driver
510            .submit_transaction_no_ticket(request, client_addr)
511            .await
512    }
513
514    pub async fn submit_transaction(
515        &self,
516        request: ExecuteTransactionRequestV3,
517    ) -> SuiResult<Registration<'_, TransactionDigest, QuorumDriverResult>> {
518        self.quorum_driver.submit_transaction(request).await
519    }
520
521    /// Create a new `QuorumDriverHandler` based on the same AuthorityAggregator.
522    /// Note: the new `QuorumDriverHandler` will have a new `ArcSwap<AuthorityAggregator>`
523    /// that is NOT tied to the original one. So if there are multiple QuorumDriver(Handler)
524    /// then all of them need to do reconfigs on their own.
525    pub fn clone_new(&self) -> Self {
526        let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
527        let (effects_subscribe_sender, subscriber_rx) =
528            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
529        let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
530        let quorum_driver = Arc::new(QuorumDriver {
531            validators,
532            task_sender,
533            effects_subscribe_sender,
534            notifier: Arc::new(NotifyRead::new()),
535            metrics: self.quorum_driver_metrics.clone(),
536            max_retry_times: self.quorum_driver.max_retry_times,
537        });
538        let metrics = self.quorum_driver_metrics.clone();
539        let processor_handle = {
540            let quorum_driver_copy = quorum_driver.clone();
541            spawn_monitored_task!(Self::task_queue_processor(
542                quorum_driver_copy,
543                task_rx,
544                metrics,
545            ))
546        };
547        {
548            let quorum_driver_copy = quorum_driver.clone();
549            let reconfig_observer = self.reconfig_observer.clone();
550            spawn_monitored_task!({
551                async move {
552                    let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
553                    reconfig_observer_clone.run(quorum_driver_copy).await;
554                }
555            })
556        };
557
558        Self {
559            quorum_driver,
560            effects_subscriber: subscriber_rx,
561            quorum_driver_metrics: self.quorum_driver_metrics.clone(),
562            reconfig_observer: self.reconfig_observer.clone(),
563            _processor_handle: processor_handle,
564        }
565    }
566
567    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
568        self.quorum_driver.clone()
569    }
570
571    pub fn subscribe_to_effects(
572        &self,
573    ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
574        self.effects_subscriber.resubscribe()
575    }
576
577    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
578        self.quorum_driver.authority_aggregator()
579    }
580
581    pub fn current_epoch(&self) -> EpochId {
582        self.quorum_driver.current_epoch()
583    }
584
585    /// Process a QuorumDriverTask.
586    /// The function has no return value - the corresponding actions of task result
587    /// are performed in this call.
588    #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
589    async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
590        debug!(?task, "Quorum Driver processing task");
591        let QuorumDriverTask {
592            request,
593            tx_cert,
594            retry_times: old_retry_times,
595            client_addr,
596            ..
597        } = task;
598        let transaction = &request.transaction;
599        let tx_digest = *transaction.digest();
600        let tx_type = if transaction.is_consensus_tx() {
601            TxType::SharedObject
602        } else {
603            TxType::SingleWriter
604        };
605
606        let timer = Instant::now();
607        let (tx_cert, newly_formed) = match tx_cert {
608            None => match quorum_driver
609                .process_transaction(transaction.clone(), client_addr)
610                .await
611            {
612                Ok(ProcessTransactionResult::Certified {
613                    certificate,
614                    newly_formed,
615                }) => {
616                    debug!(?tx_digest, "Transaction processing succeeded");
617                    (certificate, newly_formed)
618                }
619                Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
620                    debug!(
621                        ?tx_digest,
622                        "Transaction processing succeeded with effects directly"
623                    );
624                    let response = QuorumDriverResponse {
625                        effects_cert,
626                        events: Some(events),
627                        input_objects: None,
628                        output_objects: None,
629                        auxiliary_data: None,
630                    };
631                    quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
632                    return;
633                }
634                Err(err) => {
635                    Self::handle_error(
636                        quorum_driver,
637                        request,
638                        err,
639                        None,
640                        old_retry_times,
641                        "get tx cert",
642                        client_addr,
643                    );
644                    return;
645                }
646            },
647            Some(tx_cert) => (tx_cert, false),
648        };
649
650        let response = match quorum_driver
651            .process_certificate(
652                HandleCertificateRequestV3 {
653                    certificate: tx_cert.clone(),
654                    include_events: request.include_events,
655                    include_input_objects: request.include_input_objects,
656                    include_output_objects: request.include_output_objects,
657                    include_auxiliary_data: request.include_auxiliary_data,
658                },
659                client_addr,
660            )
661            .await
662        {
663            Ok(response) => {
664                debug!(?tx_digest, "Certificate processing succeeded");
665                response
666            }
667            // Note: non retryable failure when processing a cert
668            // should be very rare.
669            Err(err) => {
670                Self::handle_error(
671                    quorum_driver,
672                    request,
673                    err,
674                    Some(tx_cert),
675                    old_retry_times,
676                    "get effects cert",
677                    client_addr,
678                );
679                return;
680            }
681        };
682        if newly_formed {
683            let settlement_finality_latency = timer.elapsed().as_secs_f64();
684            quorum_driver
685                .metrics
686                .settlement_finality_latency
687                .with_label_values(&[tx_type.as_str()])
688                .observe(settlement_finality_latency);
689            let is_out_of_expected_range =
690                settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
691            debug!(
692                ?tx_digest,
693                ?tx_type,
694                ?is_out_of_expected_range,
695                "QuorumDriver settlement finality latency: {:.3} seconds",
696                settlement_finality_latency
697            );
698        }
699
700        quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
701    }
702
703    fn handle_error(
704        quorum_driver: Arc<QuorumDriver<A>>,
705        request: ExecuteTransactionRequestV3,
706        err: Option<QuorumDriverError>,
707        tx_cert: Option<CertifiedTransaction>,
708        old_retry_times: u32,
709        action: &'static str,
710        client_addr: Option<SocketAddr>,
711    ) {
712        let tx_digest = *request.transaction.digest();
713        match err {
714            None => {
715                info!(?tx_digest, "Failed to {action}: {err:?} - Retrying");
716                spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
717                    request.clone(),
718                    tx_cert,
719                    old_retry_times,
720                    client_addr,
721                ));
722            }
723            Some(QuorumDriverError::SystemOverloadRetryAfter {
724                retry_after_secs, ..
725            }) => {
726                // Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
727                // locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
728                // TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
729                // of on going transactions in a quorum driver. When the limit is reached, the quorum driver should
730                // reject any new transaction requests.
731                info!(
732                    ?tx_digest,
733                    "Failed to {action} - Validator overloaded. Retrying"
734                );
735                spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
736                    request.clone(),
737                    tx_cert,
738                    old_retry_times,
739                    client_addr,
740                    Some(Duration::from_secs(retry_after_secs)),
741                ));
742            }
743            Some(qd_error) => {
744                info!(?tx_digest, "Failed to {action}: {}", qd_error);
745                // non-retryable failure, this task reaches terminal state for now, notify waiter.
746                quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
747            }
748        }
749    }
750
751    async fn task_queue_processor(
752        quorum_driver: Arc<QuorumDriver<A>>,
753        mut task_receiver: Receiver<QuorumDriverTask>,
754        metrics: Arc<QuorumDriverMetrics>,
755    ) {
756        let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
757        while let Some(task) = task_receiver.recv().await {
758            let task_queue_span =
759                trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
760            let task_span_guard = task_queue_span.enter();
761
762            // hold semaphore permit until task completes. unwrap ok because we never close
763            // the semaphore in this context.
764            let limit = limit.clone();
765            let permit = limit.acquire_owned().await.unwrap();
766
767            // TODO check reconfig process here
768
769            debug!(?task, "Dequeued task");
770            if Instant::now()
771                .checked_duration_since(task.next_retry_after)
772                .is_none()
773            {
774                // Not ready for next attempt yet, re-enqueue
775                let _ = quorum_driver.enqueue_task(task).await;
776                continue;
777            }
778            metrics.current_requests_in_flight.dec();
779            let qd = quorum_driver.clone();
780            drop(task_span_guard);
781            spawn_monitored_task!(async move {
782                let _guard = permit;
783                QuorumDriverHandler::process_task(qd, task).await
784            });
785        }
786    }
787}
788
789pub struct QuorumDriverHandlerBuilder<A: Clone> {
790    validators: Arc<AuthorityAggregator<A>>,
791    metrics: Arc<QuorumDriverMetrics>,
792    notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
793    reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
794    max_retry_times: u32,
795}
796
797impl<A> QuorumDriverHandlerBuilder<A>
798where
799    A: AuthorityAPI + Send + Sync + 'static + Clone,
800{
801    pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
802        Self {
803            validators,
804            metrics,
805            notifier: None,
806            reconfig_observer: None,
807            max_retry_times: TX_MAX_RETRY_TIMES,
808        }
809    }
810
811    pub(crate) fn with_notifier(
812        mut self,
813        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
814    ) -> Self {
815        self.notifier = Some(notifier);
816        self
817    }
818
819    pub fn with_reconfig_observer(
820        mut self,
821        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
822    ) -> Self {
823        self.reconfig_observer = Some(reconfig_observer);
824        self
825    }
826
827    /// Used in tests when smaller number of retries is desired
828    pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
829        self.max_retry_times = max_retry_times;
830        self
831    }
832
833    pub fn start(self) -> QuorumDriverHandler<A> {
834        QuorumDriverHandler::new(
835            self.validators,
836            self.notifier.unwrap_or_else(|| {
837                Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
838            }),
839            self.reconfig_observer
840                .expect("Reconfig observer is missing"),
841            self.metrics,
842            self.max_retry_times,
843        )
844    }
845}