sui_storage/
key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Immutable key/value store trait for storing/retrieving transactions, effects, and events
5//! to/from a scalable.
6
7use crate::key_value_store_metrics::KeyValueStoreMetrics;
8use async_trait::async_trait;
9use mysten_common::ZipDebugEqIteratorExt;
10use std::sync::Arc;
11use std::time::Instant;
12use sui_types::base_types::{ObjectID, SequenceNumber, VersionNumber};
13use sui_types::digests::{CheckpointDigest, TransactionDigest};
14use sui_types::effects::{TransactionEffects, TransactionEvents};
15use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
16use sui_types::messages_checkpoint::{
17    CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18};
19use sui_types::object::Object;
20use sui_types::storage::ObjectKey;
21use sui_types::transaction::Transaction;
22use tracing::instrument;
23
24pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
25
26pub type KVStoreCheckpointData = (
27    Vec<Option<CertifiedCheckpointSummary>>,
28    Vec<Option<CheckpointContents>>,
29    Vec<Option<CertifiedCheckpointSummary>>,
30);
31
32pub struct TransactionKeyValueStore {
33    store_name: &'static str,
34    metrics: Arc<KeyValueStoreMetrics>,
35    inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
36}
37
38impl TransactionKeyValueStore {
39    pub fn new(
40        store_name: &'static str,
41        metrics: Arc<KeyValueStoreMetrics>,
42        inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
43    ) -> Self {
44        Self {
45            store_name,
46            metrics,
47            inner,
48        }
49    }
50
51    /// Generic multi_get, allows implementors to get heterogenous values with a single round trip.
52    pub async fn multi_get(
53        &self,
54        transactions: &[TransactionDigest],
55        effects: &[TransactionDigest],
56    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
57        let start = Instant::now();
58        let res = self.inner.multi_get(transactions, effects).await;
59        let elapsed = start.elapsed();
60
61        let num_txns = transactions.len() as u64;
62        let num_effects = effects.len() as u64;
63        let total_keys = num_txns + num_effects;
64
65        self.metrics
66            .key_value_store_num_fetches_latency_ms
67            .with_label_values(&[self.store_name, "tx"])
68            .observe(elapsed.as_millis() as f64);
69        self.metrics
70            .key_value_store_num_fetches_batch_size
71            .with_label_values(&[self.store_name, "tx"])
72            .observe(total_keys as f64);
73
74        if let Ok(res) = &res {
75            let txns_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64;
76            let effects_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
77
78            if num_txns > 0 {
79                self.metrics
80                    .key_value_store_num_fetches_success
81                    .with_label_values(&[self.store_name, "tx"])
82                    .inc_by(num_txns);
83            }
84            if num_effects > 0 {
85                self.metrics
86                    .key_value_store_num_fetches_success
87                    .with_label_values(&[self.store_name, "fx"])
88                    .inc_by(num_effects);
89            }
90
91            if txns_not_found > 0 {
92                self.metrics
93                    .key_value_store_num_fetches_not_found
94                    .with_label_values(&[self.store_name, "tx"])
95                    .inc_by(txns_not_found);
96            }
97            if effects_not_found > 0 {
98                self.metrics
99                    .key_value_store_num_fetches_not_found
100                    .with_label_values(&[self.store_name, "fx"])
101                    .inc_by(effects_not_found);
102            }
103        } else {
104            self.metrics
105                .key_value_store_num_fetches_error
106                .with_label_values(&[self.store_name, "tx"])
107                .inc_by(num_txns);
108            self.metrics
109                .key_value_store_num_fetches_error
110                .with_label_values(&[self.store_name, "fx"])
111                .inc_by(num_effects);
112        }
113
114        res
115    }
116
117    pub async fn multi_get_checkpoints(
118        &self,
119        checkpoint_summaries: &[CheckpointSequenceNumber],
120        checkpoint_contents: &[CheckpointSequenceNumber],
121        checkpoint_summaries_by_digest: &[CheckpointDigest],
122    ) -> SuiResult<(
123        Vec<Option<CertifiedCheckpointSummary>>,
124        Vec<Option<CheckpointContents>>,
125        Vec<Option<CertifiedCheckpointSummary>>,
126    )> {
127        let start = Instant::now();
128        let res = self
129            .inner
130            .multi_get_checkpoints(
131                checkpoint_summaries,
132                checkpoint_contents,
133                checkpoint_summaries_by_digest,
134            )
135            .await;
136        let elapsed = start.elapsed();
137
138        let num_summaries =
139            checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
140        let num_contents = checkpoint_contents.len() as u64;
141
142        self.metrics
143            .key_value_store_num_fetches_latency_ms
144            .with_label_values(&[self.store_name, "checkpoint"])
145            .observe(elapsed.as_millis() as f64);
146        self.metrics
147            .key_value_store_num_fetches_batch_size
148            .with_label_values(&[self.store_name, "checkpoint_summary"])
149            .observe(num_summaries as f64);
150        self.metrics
151            .key_value_store_num_fetches_batch_size
152            .with_label_values(&[self.store_name, "checkpoint_content"])
153            .observe(num_contents as f64);
154
155        if let Ok(res) = &res {
156            let summaries_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64
157                + res.2.iter().filter(|v| v.is_none()).count() as u64;
158            let contents_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
159
160            if num_summaries > 0 {
161                self.metrics
162                    .key_value_store_num_fetches_success
163                    .with_label_values(&[self.store_name, "ckpt_summary"])
164                    .inc_by(num_summaries);
165            }
166            if num_contents > 0 {
167                self.metrics
168                    .key_value_store_num_fetches_success
169                    .with_label_values(&[self.store_name, "ckpt_contents"])
170                    .inc_by(num_contents);
171            }
172
173            if summaries_not_found > 0 {
174                self.metrics
175                    .key_value_store_num_fetches_not_found
176                    .with_label_values(&[self.store_name, "ckpt_summary"])
177                    .inc_by(summaries_not_found);
178            }
179            if contents_not_found > 0 {
180                self.metrics
181                    .key_value_store_num_fetches_not_found
182                    .with_label_values(&[self.store_name, "ckpt_contents"])
183                    .inc_by(contents_not_found);
184            }
185        } else {
186            self.metrics
187                .key_value_store_num_fetches_error
188                .with_label_values(&[self.store_name, "ckpt_summary"])
189                .inc_by(num_summaries);
190            self.metrics
191                .key_value_store_num_fetches_error
192                .with_label_values(&[self.store_name, "ckpt_contents"])
193                .inc_by(num_contents);
194        }
195
196        res
197    }
198
199    pub async fn multi_get_checkpoints_summaries(
200        &self,
201        keys: &[CheckpointSequenceNumber],
202    ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
203        self.multi_get_checkpoints(keys, &[], &[])
204            .await
205            .map(|(summaries, _, _)| summaries)
206    }
207
208    pub async fn multi_get_checkpoints_contents(
209        &self,
210        keys: &[CheckpointSequenceNumber],
211    ) -> SuiResult<Vec<Option<CheckpointContents>>> {
212        self.multi_get_checkpoints(&[], keys, &[])
213            .await
214            .map(|(_, contents, _)| contents)
215    }
216
217    pub async fn multi_get_checkpoints_summaries_by_digest(
218        &self,
219        keys: &[CheckpointDigest],
220    ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
221        self.multi_get_checkpoints(&[], &[], keys)
222            .await
223            .map(|(_, _, summaries)| summaries)
224    }
225
226    pub async fn multi_get_tx(
227        &self,
228        keys: &[TransactionDigest],
229    ) -> SuiResult<Vec<Option<Transaction>>> {
230        self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
231    }
232
233    pub async fn multi_get_fx_by_tx_digest(
234        &self,
235        keys: &[TransactionDigest],
236    ) -> SuiResult<Vec<Option<TransactionEffects>>> {
237        self.multi_get(&[], keys).await.map(|(_, fx)| fx)
238    }
239
240    /// Convenience method for fetching single digest, and returning an error if it's not found.
241    /// Prefer using multi_get_tx whenever possible.
242    pub async fn get_tx(&self, digest: TransactionDigest) -> SuiResult<Transaction> {
243        self.multi_get_tx(&[digest])
244            .await?
245            .into_iter()
246            .next()
247            .flatten()
248            .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
249    }
250
251    /// Convenience method for fetching single digest, and returning an error if it's not found.
252    /// Prefer using multi_get_fx_by_tx_digest whenever possible.
253    pub async fn get_fx_by_tx_digest(
254        &self,
255        digest: TransactionDigest,
256    ) -> SuiResult<TransactionEffects> {
257        self.multi_get_fx_by_tx_digest(&[digest])
258            .await?
259            .into_iter()
260            .next()
261            .flatten()
262            .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
263    }
264
265    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
266    /// Prefer using multi_get_checkpoints_summaries whenever possible.
267    pub async fn get_checkpoint_summary(
268        &self,
269        checkpoint: CheckpointSequenceNumber,
270    ) -> SuiResult<CertifiedCheckpointSummary> {
271        self.multi_get_checkpoints_summaries(&[checkpoint])
272            .await?
273            .into_iter()
274            .next()
275            .flatten()
276            .ok_or(
277                SuiErrorKind::UserInputError {
278                    error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
279                }
280                .into(),
281            )
282    }
283
284    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
285    /// Prefer using multi_get_checkpoints_contents whenever possible.
286    pub async fn get_checkpoint_contents(
287        &self,
288        checkpoint: CheckpointSequenceNumber,
289    ) -> SuiResult<CheckpointContents> {
290        self.multi_get_checkpoints_contents(&[checkpoint])
291            .await?
292            .into_iter()
293            .next()
294            .flatten()
295            .ok_or(
296                SuiErrorKind::UserInputError {
297                    error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
298                }
299                .into(),
300            )
301    }
302
303    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
304    /// Prefer using multi_get_checkpoints_summaries_by_digest whenever possible.
305    pub async fn get_checkpoint_summary_by_digest(
306        &self,
307        digest: CheckpointDigest,
308    ) -> SuiResult<CertifiedCheckpointSummary> {
309        self.multi_get_checkpoints_summaries_by_digest(&[digest])
310            .await?
311            .into_iter()
312            .next()
313            .flatten()
314            .ok_or(
315                SuiErrorKind::UserInputError {
316                    error: UserInputError::VerifiedCheckpointDigestNotFound(format!(
317                        "{:?}",
318                        digest
319                    )),
320                }
321                .into(),
322            )
323    }
324
325    pub async fn deprecated_get_transaction_checkpoint(
326        &self,
327        digest: TransactionDigest,
328    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
329        self.inner
330            .deprecated_get_transaction_checkpoint(digest)
331            .await
332    }
333
334    pub async fn get_object(
335        &self,
336        object_id: ObjectID,
337        version: VersionNumber,
338    ) -> SuiResult<Option<Object>> {
339        self.inner.get_object(object_id, version).await
340    }
341
342    pub async fn multi_get_objects(
343        &self,
344        object_keys: &[ObjectKey],
345    ) -> SuiResult<Vec<Option<Object>>> {
346        self.inner.multi_get_objects(object_keys).await
347    }
348
349    pub async fn multi_get_transaction_checkpoint(
350        &self,
351        digests: &[TransactionDigest],
352    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
353        self.inner.multi_get_transaction_checkpoint(digests).await
354    }
355
356    pub async fn multi_get_events_by_tx_digests(
357        &self,
358        digests: &[TransactionDigest],
359    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
360        self.inner.multi_get_events_by_tx_digests(digests).await
361    }
362}
363
364/// Immutable key/value store trait for storing/retrieving transactions, effects, and events.
365/// Only defines multi_get/multi_put methods to discourage single key/value operations.
366#[async_trait]
367pub trait TransactionKeyValueStoreTrait {
368    /// Generic multi_get, allows implementors to get heterogenous values with a single round trip.
369    async fn multi_get(
370        &self,
371        transactions: &[TransactionDigest],
372        effects: &[TransactionDigest],
373    ) -> SuiResult<KVStoreTransactionData>;
374
375    /// Generic multi_get to allow implementors to get heterogenous values with a single round trip.
376    async fn multi_get_checkpoints(
377        &self,
378        checkpoint_summaries: &[CheckpointSequenceNumber],
379        checkpoint_contents: &[CheckpointSequenceNumber],
380        checkpoint_summaries_by_digest: &[CheckpointDigest],
381    ) -> SuiResult<KVStoreCheckpointData>;
382
383    async fn deprecated_get_transaction_checkpoint(
384        &self,
385        digest: TransactionDigest,
386    ) -> SuiResult<Option<CheckpointSequenceNumber>>;
387
388    async fn get_object(
389        &self,
390        object_id: ObjectID,
391        version: SequenceNumber,
392    ) -> SuiResult<Option<Object>>;
393
394    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>>;
395
396    async fn multi_get_transaction_checkpoint(
397        &self,
398        digests: &[TransactionDigest],
399    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;
400
401    async fn multi_get_events_by_tx_digests(
402        &self,
403        digests: &[TransactionDigest],
404    ) -> SuiResult<Vec<Option<TransactionEvents>>>;
405}
406
407/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the
408/// primary store returns None.
409///
410/// Will be used to check the local rocksdb store, before falling back to a remote scalable store.
411pub struct FallbackTransactionKVStore {
412    primary: TransactionKeyValueStore,
413    fallback: TransactionKeyValueStore,
414}
415
416impl FallbackTransactionKVStore {
417    pub fn new_kv(
418        primary: TransactionKeyValueStore,
419        fallback: TransactionKeyValueStore,
420        metrics: Arc<KeyValueStoreMetrics>,
421        label: &'static str,
422    ) -> TransactionKeyValueStore {
423        let store = Arc::new(Self { primary, fallback });
424        TransactionKeyValueStore::new(label, metrics, store)
425    }
426}
427
428#[async_trait]
429impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
430    #[instrument(level = "trace", skip_all)]
431    async fn multi_get(
432        &self,
433        transactions: &[TransactionDigest],
434        effects: &[TransactionDigest],
435    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
436        let mut res = self.primary.multi_get(transactions, effects).await?;
437
438        let (fallback_transactions, indices_transactions) = find_fallback(&res.0, transactions);
439        let (fallback_effects, indices_effects) = find_fallback(&res.1, effects);
440
441        if fallback_transactions.is_empty() && fallback_effects.is_empty() {
442            return Ok(res);
443        }
444
445        let secondary_res = self
446            .fallback
447            .multi_get(&fallback_transactions, &fallback_effects)
448            .await?;
449
450        merge_res(&mut res.0, secondary_res.0, &indices_transactions);
451        merge_res(&mut res.1, secondary_res.1, &indices_effects);
452
453        Ok((res.0, res.1))
454    }
455
456    #[instrument(level = "trace", skip_all)]
457    async fn multi_get_checkpoints(
458        &self,
459        checkpoint_summaries: &[CheckpointSequenceNumber],
460        checkpoint_contents: &[CheckpointSequenceNumber],
461        checkpoint_summaries_by_digest: &[CheckpointDigest],
462    ) -> SuiResult<(
463        Vec<Option<CertifiedCheckpointSummary>>,
464        Vec<Option<CheckpointContents>>,
465        Vec<Option<CertifiedCheckpointSummary>>,
466    )> {
467        let mut res = self
468            .primary
469            .multi_get_checkpoints(
470                checkpoint_summaries,
471                checkpoint_contents,
472                checkpoint_summaries_by_digest,
473            )
474            .await?;
475
476        let (fallback_summaries, indices_summaries) = find_fallback(&res.0, checkpoint_summaries);
477        let (fallback_contents, indices_contents) = find_fallback(&res.1, checkpoint_contents);
478        let (fallback_summaries_by_digest, indices_summaries_by_digest) =
479            find_fallback(&res.2, checkpoint_summaries_by_digest);
480
481        if fallback_summaries.is_empty()
482            && fallback_contents.is_empty()
483            && fallback_summaries_by_digest.is_empty()
484        {
485            return Ok(res);
486        }
487
488        let secondary_res = self
489            .fallback
490            .multi_get_checkpoints(
491                &fallback_summaries,
492                &fallback_contents,
493                &fallback_summaries_by_digest,
494            )
495            .await?;
496
497        merge_res(&mut res.0, secondary_res.0, &indices_summaries);
498        merge_res(&mut res.1, secondary_res.1, &indices_contents);
499        merge_res(&mut res.2, secondary_res.2, &indices_summaries_by_digest);
500
501        Ok((res.0, res.1, res.2))
502    }
503
504    #[instrument(level = "trace", skip_all)]
505    async fn deprecated_get_transaction_checkpoint(
506        &self,
507        digest: TransactionDigest,
508    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
509        let mut res = self
510            .primary
511            .deprecated_get_transaction_checkpoint(digest)
512            .await?;
513        if res.is_none() {
514            res = self
515                .fallback
516                .deprecated_get_transaction_checkpoint(digest)
517                .await?;
518        }
519        Ok(res)
520    }
521
522    #[instrument(level = "trace", skip_all)]
523    async fn get_object(
524        &self,
525        object_id: ObjectID,
526        version: SequenceNumber,
527    ) -> SuiResult<Option<Object>> {
528        let mut res = self.primary.get_object(object_id, version).await?;
529        if res.is_none() {
530            res = self.fallback.get_object(object_id, version).await?;
531        }
532        Ok(res)
533    }
534
535    #[instrument(level = "trace", skip_all)]
536    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
537        let mut res = self.primary.multi_get_objects(object_keys).await?;
538
539        let (fallback, indices) = find_fallback(&res, object_keys);
540
541        if fallback.is_empty() {
542            return Ok(res);
543        }
544
545        let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
546
547        merge_res(&mut res, secondary_res, &indices);
548
549        Ok(res)
550    }
551
552    #[instrument(level = "trace", skip_all)]
553    async fn multi_get_transaction_checkpoint(
554        &self,
555        digests: &[TransactionDigest],
556    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
557        let mut res = self
558            .primary
559            .multi_get_transaction_checkpoint(digests)
560            .await?;
561
562        let (fallback, indices) = find_fallback(&res, digests);
563
564        if fallback.is_empty() {
565            return Ok(res);
566        }
567
568        let secondary_res = self
569            .fallback
570            .multi_get_transaction_checkpoint(&fallback)
571            .await?;
572
573        merge_res(&mut res, secondary_res, &indices);
574
575        Ok(res)
576    }
577
578    #[instrument(level = "trace", skip_all)]
579    async fn multi_get_events_by_tx_digests(
580        &self,
581        digests: &[TransactionDigest],
582    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
583        let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
584        let (fallback, indices) = find_fallback(&res, digests);
585        if fallback.is_empty() {
586            return Ok(res);
587        }
588        let secondary_res = self
589            .fallback
590            .multi_get_events_by_tx_digests(&fallback)
591            .await?;
592        merge_res(&mut res, secondary_res, &indices);
593        Ok(res)
594    }
595}
596
597fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
598    let num_nones = values.iter().filter(|v| v.is_none()).count();
599    let mut fallback_keys = Vec::with_capacity(num_nones);
600    let mut fallback_indices = Vec::with_capacity(num_nones);
601    for (i, value) in values.iter().enumerate() {
602        if value.is_none() {
603            fallback_keys.push(keys[i].clone());
604            fallback_indices.push(i);
605        }
606    }
607    (fallback_keys, fallback_indices)
608}
609
610fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
611    for (&index, fallback_value) in indices.iter().zip_debug_eq(fallback_values) {
612        values[index] = fallback_value;
613    }
614}