1mod metrics;
5pub use metrics::*;
6
7pub mod reconfig_observer;
8
9use arc_swap::ArcSwap;
10use std::fmt::{Debug, Formatter};
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_types::base_types::TransactionDigest;
15use sui_types::committee::{Committee, EpochId};
16use sui_types::messages_grpc::{HandleCertificateRequestV3, TxType};
17use sui_types::quorum_driver_types::{
18 ExecuteTransactionRequestV3, QuorumDriverEffectsQueueResult, QuorumDriverError,
19 QuorumDriverResponse, QuorumDriverResult,
20};
21use tap::TapFallible;
22use tokio::sync::Semaphore;
23use tokio::time::{Instant, sleep_until};
24
25use tokio::sync::mpsc::{self, Receiver, Sender};
26use tokio::task::JoinHandle;
27use tracing::{debug, error, info, instrument, trace_span, warn};
28
29use crate::authority_aggregator::{
30 AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
31 ProcessTransactionResult,
32};
33use crate::authority_client::AuthorityAPI;
34use mysten_common::sync::notify_read::{NotifyRead, Registration};
35use mysten_metrics::{GaugeGuard, spawn_monitored_task};
36use std::fmt::Write;
37use sui_macros::fail_point;
38use sui_types::error::{SuiErrorKind, SuiResult};
39use sui_types::transaction::{CertifiedTransaction, Transaction};
40
41use self::reconfig_observer::ReconfigObserver;
42
43#[cfg(test)]
44mod tests;
45
46const TASK_QUEUE_SIZE: usize = 2000;
47const EFFECTS_QUEUE_SIZE: usize = 10000;
48const TX_MAX_RETRY_TIMES: u32 = 10;
49
50pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
51 fn epoch(&self) -> EpochId;
52 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
53 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
54}
55
56#[derive(Clone)]
57pub struct QuorumDriverTask {
58 pub request: ExecuteTransactionRequestV3,
59 pub tx_cert: Option<CertifiedTransaction>,
60 pub retry_times: u32,
61 pub next_retry_after: Instant,
62 pub client_addr: Option<SocketAddr>,
63 pub trace_span: Option<tracing::Span>,
64}
65
66impl Debug for QuorumDriverTask {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 let mut writer = String::new();
69 write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
70 write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
71 write!(writer, "retry_times={} ", self.retry_times)?;
72 write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
73 write!(f, "{}", writer)
74 }
75}
76
77pub struct QuorumDriver<A: Clone> {
78 validators: ArcSwap<AuthorityAggregator<A>>,
79 task_sender: Sender<QuorumDriverTask>,
80 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
81 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
82 metrics: Arc<QuorumDriverMetrics>,
83 max_retry_times: u32,
84}
85
86impl<A: Clone> QuorumDriver<A> {
87 pub(crate) fn new(
88 validators: ArcSwap<AuthorityAggregator<A>>,
89 task_sender: Sender<QuorumDriverTask>,
90 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
91 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
92 metrics: Arc<QuorumDriverMetrics>,
93 max_retry_times: u32,
94 ) -> Self {
95 Self {
96 validators,
97 task_sender,
98 effects_subscribe_sender,
99 notifier,
100 metrics,
101 max_retry_times,
102 }
103 }
104
105 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
106 &self.validators
107 }
108
109 pub fn clone_committee(&self) -> Arc<Committee> {
110 self.validators.load().committee.clone()
111 }
112
113 pub fn current_epoch(&self) -> EpochId {
114 self.validators.load().committee.epoch
115 }
116
117 async fn enqueue_task(&self, task: QuorumDriverTask) -> SuiResult<()> {
118 self.task_sender
119 .send(task.clone())
120 .await
121 .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
122 .tap_ok(|_| {
123 debug!(?task, "Enqueued task.");
124 self.metrics.current_requests_in_flight.inc();
125 self.metrics.total_enqueued.inc();
126 if task.retry_times > 0 {
127 if task.retry_times == 1 {
128 self.metrics.current_transactions_in_retry.inc();
129 }
130 self.metrics
131 .transaction_retry_count
132 .observe(task.retry_times as f64);
133 }
134 })
135 .map_err(|e| {
136 SuiErrorKind::QuorumDriverCommunicationError {
137 error: e.to_string(),
138 }
139 .into()
140 })
141 }
142
143 async fn enqueue_again_maybe(
146 &self,
147 request: ExecuteTransactionRequestV3,
148 tx_cert: Option<CertifiedTransaction>,
149 old_retry_times: u32,
150 client_addr: Option<SocketAddr>,
151 ) -> SuiResult<()> {
152 if old_retry_times >= self.max_retry_times {
153 info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
155 self.notify(
156 &request.transaction,
157 &Err(
158 QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
159 total_attempts: old_retry_times + 1,
160 },
161 ),
162 old_retry_times + 1,
163 );
164 return Ok(());
165 }
166 self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
167 .await
168 }
169
170 async fn backoff_and_enqueue(
173 &self,
174 request: ExecuteTransactionRequestV3,
175 tx_cert: Option<CertifiedTransaction>,
176 old_retry_times: u32,
177 client_addr: Option<SocketAddr>,
178 min_backoff_duration: Option<Duration>,
179 ) -> SuiResult<()> {
180 let next_retry_after = Instant::now()
181 + Duration::from_millis(200 * u64::pow(2, old_retry_times))
182 .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
183 sleep_until(next_retry_after).await;
184
185 fail_point!("count_retry_times");
186
187 let tx_cert = match tx_cert {
188 Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
192 _other => None,
193 };
194
195 self.enqueue_task(QuorumDriverTask {
196 request,
197 tx_cert,
198 retry_times: old_retry_times + 1,
199 next_retry_after,
200 client_addr,
201 trace_span: Some(tracing::Span::current()),
202 })
203 .await
204 }
205
206 pub fn notify(
207 &self,
208 transaction: &Transaction,
209 response: &QuorumDriverResult,
210 total_attempts: u32,
211 ) {
212 let tx_digest = transaction.digest();
213 let effects_queue_result = match &response {
214 Ok(resp) => {
215 self.metrics.total_ok_responses.inc();
216 self.metrics
217 .attempt_times_ok_response
218 .observe(total_attempts as f64);
219 Ok((transaction.clone(), resp.clone()))
220 }
221 Err(err) => {
222 self.metrics
223 .total_err_responses
224 .with_label_values(&[err.as_ref()])
225 .inc();
226 Err((*tx_digest, err.clone()))
227 }
228 };
229 if total_attempts > 1 {
230 self.metrics.current_transactions_in_retry.dec();
231 }
232 if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
235 warn!(?tx_digest, "No subscriber found for effects: {}", err);
236 }
237 debug!(?tx_digest, "notify QuorumDriver task result");
238 self.notifier.notify(tx_digest, response);
239 }
240}
241
242impl<A> QuorumDriver<A>
243where
244 A: AuthorityAPI + Send + Sync + 'static + Clone,
245{
246 #[instrument(level = "trace", skip_all)]
247 pub async fn submit_transaction(
248 &self,
249 request: ExecuteTransactionRequestV3,
250 ) -> SuiResult<Registration<'_, TransactionDigest, QuorumDriverResult>> {
251 let tx_digest = request.transaction.digest();
252 debug!(?tx_digest, "Received transaction execution request.");
253 self.metrics.total_requests.inc();
254
255 let ticket = self.notifier.register_one(tx_digest);
256 self.enqueue_task(QuorumDriverTask {
257 request,
258 tx_cert: None,
259 retry_times: 0,
260 next_retry_after: Instant::now(),
261 client_addr: None,
262 trace_span: Some(tracing::Span::current()),
263 })
264 .await?;
265 Ok(ticket)
266 }
267
268 #[instrument(level = "trace", skip_all)]
271 pub async fn submit_transaction_no_ticket(
272 &self,
273 request: ExecuteTransactionRequestV3,
274 client_addr: Option<SocketAddr>,
275 ) -> SuiResult<()> {
276 let tx_digest = request.transaction.digest();
277 debug!(
278 ?tx_digest,
279 "Received transaction execution request, no ticket."
280 );
281 self.metrics.total_requests.inc();
282
283 self.enqueue_task(QuorumDriverTask {
284 request,
285 tx_cert: None,
286 retry_times: 0,
287 next_retry_after: Instant::now(),
288 client_addr,
289 trace_span: Some(tracing::Span::current()),
290 })
291 .await
292 }
293
294 #[instrument(level = "trace", skip_all)]
295 pub(crate) async fn process_transaction(
296 &self,
297 transaction: Transaction,
298 client_addr: Option<SocketAddr>,
299 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
300 let auth_agg = self.validators.load();
301 let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
302 let tx_digest = *transaction.digest();
303 let result = auth_agg.process_transaction(transaction, client_addr).await;
304
305 self.process_transaction_result(result, tx_digest).await
306 }
307
308 #[instrument(level = "trace", skip_all)]
309 async fn process_transaction_result(
310 &self,
311 result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
312 tx_digest: TransactionDigest,
313 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
314 match result {
315 Ok(resp) => Ok(resp),
316
317 Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
318 errors,
319 conflicting_tx_digests,
320 }) => {
321 debug!(
322 ?errors,
323 "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
324 );
325 Err(Some(QuorumDriverError::ObjectsDoubleUsed {
326 conflicting_txes: conflicting_tx_digests,
327 }))
328 }
329
330 Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
331 debug!(?tx_digest, ?errors, "Nonretryable transaction error");
332 Err(Some(QuorumDriverError::NonRecoverableTransactionError {
333 errors,
334 }))
335 }
336
337 Err(AggregatorProcessTransactionError::SystemOverload {
338 overloaded_stake,
339 errors,
340 }) => {
341 debug!(?tx_digest, ?errors, "System overload");
342 Err(Some(QuorumDriverError::SystemOverload {
343 overloaded_stake,
344 errors,
345 }))
346 }
347
348 Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
349 overload_stake,
350 errors,
351 retry_after_secs,
352 }) => {
353 self.metrics.total_retryable_overload_errors.inc();
354 debug!(
355 ?tx_digest,
356 ?errors,
357 "System overload and retry after secs {retry_after_secs}",
358 );
359 Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
360 overload_stake,
361 errors,
362 retry_after_secs,
363 }))
364 }
365
366 Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
367 debug!(?tx_digest, ?errors, "Retryable transaction error");
368 Err(None)
369 }
370
371 Err(
372 AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
373 ) => {
374 debug!(
375 ?tx_digest,
376 "Transaction is already finalized with different user signatures"
377 );
378 Err(Some(
379 QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
380 ))
381 }
382 }
383 }
384
385 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
386 pub(crate) async fn process_certificate(
387 &self,
388 request: HandleCertificateRequestV3,
389 client_addr: Option<SocketAddr>,
390 ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
391 let auth_agg = self.validators.load();
392 let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
393 let tx_digest = *request.certificate.digest();
394 let response = auth_agg
395 .process_certificate(request.clone(), client_addr)
396 .await
397 .map_err(|agg_err| match agg_err {
398 AggregatorProcessCertificateError::FatalExecuteCertificate {
399 non_retryable_errors,
400 } => {
401 error!(
403 ?tx_digest,
404 ?non_retryable_errors,
405 "[WATCHOUT] Unexpected Fatal error for certificate"
406 );
407 Some(QuorumDriverError::NonRecoverableTransactionError {
408 errors: non_retryable_errors,
409 })
410 }
411 AggregatorProcessCertificateError::RetryableExecuteCertificate {
412 retryable_errors,
413 } => {
414 debug!(?retryable_errors, "Retryable certificate");
415 None
416 }
417 })?;
418
419 Ok(response)
420 }
421}
422
423impl<A> AuthorityAggregatorUpdatable<A> for QuorumDriver<A>
424where
425 A: AuthorityAPI + Send + Sync + 'static + Clone,
426{
427 fn epoch(&self) -> EpochId {
428 self.validators.load().committee.epoch
429 }
430
431 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
432 self.validators.load_full()
433 }
434
435 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
436 info!(
437 "Quorum Driver updating AuthorityAggregator with committee {}",
438 new_authorities.committee
439 );
440 self.validators.store(new_authorities);
441 }
442}
443
444pub struct QuorumDriverHandler<A: Clone> {
445 quorum_driver: Arc<QuorumDriver<A>>,
446 effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
447 quorum_driver_metrics: Arc<QuorumDriverMetrics>,
448 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
449 _processor_handle: JoinHandle<()>,
450}
451
452impl<A> QuorumDriverHandler<A>
453where
454 A: AuthorityAPI + Send + Sync + 'static + Clone,
455{
456 pub(crate) fn new(
457 validators: Arc<AuthorityAggregator<A>>,
458 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
459 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
460 metrics: Arc<QuorumDriverMetrics>,
461 max_retry_times: u32,
462 ) -> Self {
463 let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
464 let (subscriber_tx, subscriber_rx) =
465 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
466 let quorum_driver = Arc::new(QuorumDriver::new(
467 ArcSwap::new(validators),
468 task_tx,
469 subscriber_tx,
470 notifier,
471 metrics.clone(),
472 max_retry_times,
473 ));
474 let metrics_clone = metrics.clone();
475 let processor_handle = {
476 let quorum_driver_clone = quorum_driver.clone();
477 spawn_monitored_task!(Self::task_queue_processor(
478 quorum_driver_clone,
479 task_rx,
480 metrics_clone
481 ))
482 };
483 let reconfig_observer_clone = reconfig_observer.clone();
484 {
485 let quorum_driver_clone = quorum_driver.clone();
486 spawn_monitored_task!({
487 async move {
488 let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
489 reconfig_observer_clone.run(quorum_driver_clone).await;
490 }
491 });
492 };
493 Self {
494 quorum_driver,
495 effects_subscriber: subscriber_rx,
496 quorum_driver_metrics: metrics,
497 reconfig_observer,
498 _processor_handle: processor_handle,
499 }
500 }
501
502 pub async fn submit_transaction_no_ticket(
505 &self,
506 request: ExecuteTransactionRequestV3,
507 client_addr: Option<SocketAddr>,
508 ) -> SuiResult<()> {
509 self.quorum_driver
510 .submit_transaction_no_ticket(request, client_addr)
511 .await
512 }
513
514 pub async fn submit_transaction(
515 &self,
516 request: ExecuteTransactionRequestV3,
517 ) -> SuiResult<Registration<'_, TransactionDigest, QuorumDriverResult>> {
518 self.quorum_driver.submit_transaction(request).await
519 }
520
521 pub fn clone_new(&self) -> Self {
526 let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
527 let (effects_subscribe_sender, subscriber_rx) =
528 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
529 let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
530 let quorum_driver = Arc::new(QuorumDriver {
531 validators,
532 task_sender,
533 effects_subscribe_sender,
534 notifier: Arc::new(NotifyRead::new()),
535 metrics: self.quorum_driver_metrics.clone(),
536 max_retry_times: self.quorum_driver.max_retry_times,
537 });
538 let metrics = self.quorum_driver_metrics.clone();
539 let processor_handle = {
540 let quorum_driver_copy = quorum_driver.clone();
541 spawn_monitored_task!(Self::task_queue_processor(
542 quorum_driver_copy,
543 task_rx,
544 metrics,
545 ))
546 };
547 {
548 let quorum_driver_copy = quorum_driver.clone();
549 let reconfig_observer = self.reconfig_observer.clone();
550 spawn_monitored_task!({
551 async move {
552 let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
553 reconfig_observer_clone.run(quorum_driver_copy).await;
554 }
555 })
556 };
557
558 Self {
559 quorum_driver,
560 effects_subscriber: subscriber_rx,
561 quorum_driver_metrics: self.quorum_driver_metrics.clone(),
562 reconfig_observer: self.reconfig_observer.clone(),
563 _processor_handle: processor_handle,
564 }
565 }
566
567 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
568 self.quorum_driver.clone()
569 }
570
571 pub fn subscribe_to_effects(
572 &self,
573 ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
574 self.effects_subscriber.resubscribe()
575 }
576
577 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
578 self.quorum_driver.authority_aggregator()
579 }
580
581 pub fn current_epoch(&self) -> EpochId {
582 self.quorum_driver.current_epoch()
583 }
584
585 #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
589 async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
590 debug!(?task, "Quorum Driver processing task");
591 let QuorumDriverTask {
592 request,
593 tx_cert,
594 retry_times: old_retry_times,
595 client_addr,
596 ..
597 } = task;
598 let transaction = &request.transaction;
599 let tx_digest = *transaction.digest();
600 let tx_type = if transaction.is_consensus_tx() {
601 TxType::SharedObject
602 } else {
603 TxType::SingleWriter
604 };
605
606 let timer = Instant::now();
607 let (tx_cert, newly_formed) = match tx_cert {
608 None => match quorum_driver
609 .process_transaction(transaction.clone(), client_addr)
610 .await
611 {
612 Ok(ProcessTransactionResult::Certified {
613 certificate,
614 newly_formed,
615 }) => {
616 debug!(?tx_digest, "Transaction processing succeeded");
617 (certificate, newly_formed)
618 }
619 Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
620 debug!(
621 ?tx_digest,
622 "Transaction processing succeeded with effects directly"
623 );
624 let response = QuorumDriverResponse {
625 effects_cert,
626 events: Some(events),
627 input_objects: None,
628 output_objects: None,
629 auxiliary_data: None,
630 };
631 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
632 return;
633 }
634 Err(err) => {
635 Self::handle_error(
636 quorum_driver,
637 request,
638 err,
639 None,
640 old_retry_times,
641 "get tx cert",
642 client_addr,
643 );
644 return;
645 }
646 },
647 Some(tx_cert) => (tx_cert, false),
648 };
649
650 let response = match quorum_driver
651 .process_certificate(
652 HandleCertificateRequestV3 {
653 certificate: tx_cert.clone(),
654 include_events: request.include_events,
655 include_input_objects: request.include_input_objects,
656 include_output_objects: request.include_output_objects,
657 include_auxiliary_data: request.include_auxiliary_data,
658 },
659 client_addr,
660 )
661 .await
662 {
663 Ok(response) => {
664 debug!(?tx_digest, "Certificate processing succeeded");
665 response
666 }
667 Err(err) => {
670 Self::handle_error(
671 quorum_driver,
672 request,
673 err,
674 Some(tx_cert),
675 old_retry_times,
676 "get effects cert",
677 client_addr,
678 );
679 return;
680 }
681 };
682 if newly_formed {
683 let settlement_finality_latency = timer.elapsed().as_secs_f64();
684 quorum_driver
685 .metrics
686 .settlement_finality_latency
687 .with_label_values(&[tx_type.as_str()])
688 .observe(settlement_finality_latency);
689 let is_out_of_expected_range =
690 settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
691 debug!(
692 ?tx_digest,
693 ?tx_type,
694 ?is_out_of_expected_range,
695 "QuorumDriver settlement finality latency: {:.3} seconds",
696 settlement_finality_latency
697 );
698 }
699
700 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
701 }
702
703 fn handle_error(
704 quorum_driver: Arc<QuorumDriver<A>>,
705 request: ExecuteTransactionRequestV3,
706 err: Option<QuorumDriverError>,
707 tx_cert: Option<CertifiedTransaction>,
708 old_retry_times: u32,
709 action: &'static str,
710 client_addr: Option<SocketAddr>,
711 ) {
712 let tx_digest = *request.transaction.digest();
713 match err {
714 None => {
715 info!(?tx_digest, "Failed to {action}: {err:?} - Retrying");
716 spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
717 request.clone(),
718 tx_cert,
719 old_retry_times,
720 client_addr,
721 ));
722 }
723 Some(QuorumDriverError::SystemOverloadRetryAfter {
724 retry_after_secs, ..
725 }) => {
726 info!(
732 ?tx_digest,
733 "Failed to {action} - Validator overloaded. Retrying"
734 );
735 spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
736 request.clone(),
737 tx_cert,
738 old_retry_times,
739 client_addr,
740 Some(Duration::from_secs(retry_after_secs)),
741 ));
742 }
743 Some(qd_error) => {
744 info!(?tx_digest, "Failed to {action}: {}", qd_error);
745 quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
747 }
748 }
749 }
750
751 async fn task_queue_processor(
752 quorum_driver: Arc<QuorumDriver<A>>,
753 mut task_receiver: Receiver<QuorumDriverTask>,
754 metrics: Arc<QuorumDriverMetrics>,
755 ) {
756 let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
757 while let Some(task) = task_receiver.recv().await {
758 let task_queue_span =
759 trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
760 let task_span_guard = task_queue_span.enter();
761
762 let limit = limit.clone();
765 let permit = limit.acquire_owned().await.unwrap();
766
767 debug!(?task, "Dequeued task");
770 if Instant::now()
771 .checked_duration_since(task.next_retry_after)
772 .is_none()
773 {
774 let _ = quorum_driver.enqueue_task(task).await;
776 continue;
777 }
778 metrics.current_requests_in_flight.dec();
779 let qd = quorum_driver.clone();
780 drop(task_span_guard);
781 spawn_monitored_task!(async move {
782 let _guard = permit;
783 QuorumDriverHandler::process_task(qd, task).await
784 });
785 }
786 }
787}
788
789pub struct QuorumDriverHandlerBuilder<A: Clone> {
790 validators: Arc<AuthorityAggregator<A>>,
791 metrics: Arc<QuorumDriverMetrics>,
792 notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
793 reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
794 max_retry_times: u32,
795}
796
797impl<A> QuorumDriverHandlerBuilder<A>
798where
799 A: AuthorityAPI + Send + Sync + 'static + Clone,
800{
801 pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
802 Self {
803 validators,
804 metrics,
805 notifier: None,
806 reconfig_observer: None,
807 max_retry_times: TX_MAX_RETRY_TIMES,
808 }
809 }
810
811 pub(crate) fn with_notifier(
812 mut self,
813 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
814 ) -> Self {
815 self.notifier = Some(notifier);
816 self
817 }
818
819 pub fn with_reconfig_observer(
820 mut self,
821 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
822 ) -> Self {
823 self.reconfig_observer = Some(reconfig_observer);
824 self
825 }
826
827 pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
829 self.max_retry_times = max_retry_times;
830 self
831 }
832
833 pub fn start(self) -> QuorumDriverHandler<A> {
834 QuorumDriverHandler::new(
835 self.validators,
836 self.notifier.unwrap_or_else(|| {
837 Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
838 }),
839 self.reconfig_observer
840 .expect("Reconfig observer is missing"),
841 self.metrics,
842 self.max_retry_times,
843 )
844 }
845}