sui_core/
mysticeti_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use arc_swap::{ArcSwapOption, Guard};
7use consensus_core::{ClientError, TransactionClient};
8use sui_types::{
9    error::{SuiErrorKind, SuiResult},
10    messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
11};
12use tap::prelude::*;
13use tokio::time::{Instant, sleep};
14use tracing::{error, info, warn};
15
16use crate::{
17    authority::authority_per_epoch_store::AuthorityPerEpochStore,
18    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
19    consensus_handler::SequencedConsensusTransactionKey,
20};
21
22/// Gets a client to submit transactions to Mysticeti, or waits for one to be available.
23/// This hides the complexities of async consensus initialization and submitting to different
24/// instances of consensus across epochs.
25// TODO: rename to LazyConsensusClient?
26#[derive(Default, Clone)]
27pub struct LazyMysticetiClient {
28    client: Arc<ArcSwapOption<TransactionClient>>,
29}
30
31impl LazyMysticetiClient {
32    pub fn new() -> Self {
33        Self {
34            client: Arc::new(ArcSwapOption::empty()),
35        }
36    }
37
38    async fn get(&self) -> Guard<Option<Arc<TransactionClient>>> {
39        let client = self.client.load();
40        if client.is_some() {
41            return client;
42        }
43
44        // Consensus client is initialized after validators or epoch starts, and cleared after an epoch ends.
45        // But calls to get() can happen during validator startup or epoch change, before consensus finished
46        // initializations.
47        // TODO: maybe listen to updates from consensus manager instead of polling.
48        let mut count = 0;
49        let start = Instant::now();
50        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
51        loop {
52            let client = self.client.load();
53            if client.is_some() {
54                return client;
55            } else {
56                sleep(RETRY_INTERVAL).await;
57                count += 1;
58                if count % 100 == 0 {
59                    warn!(
60                        "Waiting for consensus to initialize after {:?}",
61                        Instant::now() - start
62                    );
63                }
64            }
65        }
66    }
67
68    pub fn set(&self, client: Arc<TransactionClient>) {
69        self.client.store(Some(client));
70    }
71
72    pub fn clear(&self) {
73        self.client.store(None);
74    }
75}
76
77#[async_trait::async_trait]
78impl ConsensusClient for LazyMysticetiClient {
79    async fn submit(
80        &self,
81        transactions: &[ConsensusTransaction],
82        _epoch_store: &Arc<AuthorityPerEpochStore>,
83    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
84        // TODO(mysticeti): confirm comment is still true
85        // The retrieved TransactionClient can be from the past epoch. Submit would fail after
86        // Mysticeti shuts down, so there should be no correctness issue.
87        let client_guard = self.get().await;
88        let client = client_guard
89            .as_ref()
90            .expect("Client should always be returned");
91        let transactions_bytes = transactions
92            .iter()
93            .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
94            .collect::<Vec<_>>();
95        let (block_ref, tx_indices, status_waiter) = client
96            .submit(transactions_bytes)
97            .await
98            .tap_err(|err| {
99                // Will be logged by caller as well.
100                let msg = format!("Transaction submission failed with: {:?}", err);
101                match err {
102                    ClientError::ConsensusShuttingDown(_) => {
103                        info!("{}", msg);
104                    }
105                    ClientError::OversizedTransaction(_, _)
106                    | ClientError::OversizedTransactionBundleBytes(_, _)
107                    | ClientError::OversizedTransactionBundleCount(_, _) => {
108                        if cfg!(debug_assertions) {
109                            panic!("{}", msg);
110                        } else {
111                            error!("{}", msg);
112                        }
113                    }
114                };
115            })
116            .map_err(|err| SuiErrorKind::FailedToSubmitToConsensus(err.to_string()))?;
117
118        let is_soft_bundle = transactions.len() > 1;
119        let is_ping = transactions.is_empty();
120
121        if !is_soft_bundle
122            && !is_ping
123            && matches!(
124                transactions[0].kind,
125                ConsensusTransactionKind::EndOfPublish(_)
126                    | ConsensusTransactionKind::CapabilityNotification(_)
127                    | ConsensusTransactionKind::CapabilityNotificationV2(_)
128                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
129                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
130            )
131        {
132            let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
133            tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
134        };
135
136        // Calculate consensus tx positions
137        let mut consensus_positions = Vec::new();
138        for index in tx_indices {
139            consensus_positions.push(ConsensusPosition {
140                epoch: client.epoch(),
141                block: block_ref,
142                index,
143            });
144        }
145        Ok((consensus_positions, status_waiter))
146    }
147}