sui_storage/
key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Immutable key/value store trait for storing/retrieving transactions, effects, and events
5//! to/from a scalable.
6
7use crate::key_value_store_metrics::KeyValueStoreMetrics;
8use async_trait::async_trait;
9use std::sync::Arc;
10use std::time::Instant;
11use sui_types::base_types::{ObjectID, SequenceNumber, VersionNumber};
12use sui_types::digests::{CheckpointDigest, TransactionDigest};
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
15use sui_types::messages_checkpoint::{
16    CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
17};
18use sui_types::object::Object;
19use sui_types::storage::ObjectKey;
20use sui_types::transaction::Transaction;
21use tracing::instrument;
22
23pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
24
25pub type KVStoreCheckpointData = (
26    Vec<Option<CertifiedCheckpointSummary>>,
27    Vec<Option<CheckpointContents>>,
28    Vec<Option<CertifiedCheckpointSummary>>,
29);
30
31pub struct TransactionKeyValueStore {
32    store_name: &'static str,
33    metrics: Arc<KeyValueStoreMetrics>,
34    inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
35}
36
37impl TransactionKeyValueStore {
38    pub fn new(
39        store_name: &'static str,
40        metrics: Arc<KeyValueStoreMetrics>,
41        inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
42    ) -> Self {
43        Self {
44            store_name,
45            metrics,
46            inner,
47        }
48    }
49
50    /// Generic multi_get, allows implementors to get heterogenous values with a single round trip.
51    pub async fn multi_get(
52        &self,
53        transactions: &[TransactionDigest],
54        effects: &[TransactionDigest],
55    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
56        let start = Instant::now();
57        let res = self.inner.multi_get(transactions, effects).await;
58        let elapsed = start.elapsed();
59
60        let num_txns = transactions.len() as u64;
61        let num_effects = effects.len() as u64;
62        let total_keys = num_txns + num_effects;
63
64        self.metrics
65            .key_value_store_num_fetches_latency_ms
66            .with_label_values(&[self.store_name, "tx"])
67            .observe(elapsed.as_millis() as f64);
68        self.metrics
69            .key_value_store_num_fetches_batch_size
70            .with_label_values(&[self.store_name, "tx"])
71            .observe(total_keys as f64);
72
73        if let Ok(res) = &res {
74            let txns_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64;
75            let effects_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
76
77            if num_txns > 0 {
78                self.metrics
79                    .key_value_store_num_fetches_success
80                    .with_label_values(&[self.store_name, "tx"])
81                    .inc_by(num_txns);
82            }
83            if num_effects > 0 {
84                self.metrics
85                    .key_value_store_num_fetches_success
86                    .with_label_values(&[self.store_name, "fx"])
87                    .inc_by(num_effects);
88            }
89
90            if txns_not_found > 0 {
91                self.metrics
92                    .key_value_store_num_fetches_not_found
93                    .with_label_values(&[self.store_name, "tx"])
94                    .inc_by(txns_not_found);
95            }
96            if effects_not_found > 0 {
97                self.metrics
98                    .key_value_store_num_fetches_not_found
99                    .with_label_values(&[self.store_name, "fx"])
100                    .inc_by(effects_not_found);
101            }
102        } else {
103            self.metrics
104                .key_value_store_num_fetches_error
105                .with_label_values(&[self.store_name, "tx"])
106                .inc_by(num_txns);
107            self.metrics
108                .key_value_store_num_fetches_error
109                .with_label_values(&[self.store_name, "fx"])
110                .inc_by(num_effects);
111        }
112
113        res
114    }
115
116    pub async fn multi_get_checkpoints(
117        &self,
118        checkpoint_summaries: &[CheckpointSequenceNumber],
119        checkpoint_contents: &[CheckpointSequenceNumber],
120        checkpoint_summaries_by_digest: &[CheckpointDigest],
121    ) -> SuiResult<(
122        Vec<Option<CertifiedCheckpointSummary>>,
123        Vec<Option<CheckpointContents>>,
124        Vec<Option<CertifiedCheckpointSummary>>,
125    )> {
126        let start = Instant::now();
127        let res = self
128            .inner
129            .multi_get_checkpoints(
130                checkpoint_summaries,
131                checkpoint_contents,
132                checkpoint_summaries_by_digest,
133            )
134            .await;
135        let elapsed = start.elapsed();
136
137        let num_summaries =
138            checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
139        let num_contents = checkpoint_contents.len() as u64;
140
141        self.metrics
142            .key_value_store_num_fetches_latency_ms
143            .with_label_values(&[self.store_name, "checkpoint"])
144            .observe(elapsed.as_millis() as f64);
145        self.metrics
146            .key_value_store_num_fetches_batch_size
147            .with_label_values(&[self.store_name, "checkpoint_summary"])
148            .observe(num_summaries as f64);
149        self.metrics
150            .key_value_store_num_fetches_batch_size
151            .with_label_values(&[self.store_name, "checkpoint_content"])
152            .observe(num_contents as f64);
153
154        if let Ok(res) = &res {
155            let summaries_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64
156                + res.2.iter().filter(|v| v.is_none()).count() as u64;
157            let contents_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
158
159            if num_summaries > 0 {
160                self.metrics
161                    .key_value_store_num_fetches_success
162                    .with_label_values(&[self.store_name, "ckpt_summary"])
163                    .inc_by(num_summaries);
164            }
165            if num_contents > 0 {
166                self.metrics
167                    .key_value_store_num_fetches_success
168                    .with_label_values(&[self.store_name, "ckpt_contents"])
169                    .inc_by(num_contents);
170            }
171
172            if summaries_not_found > 0 {
173                self.metrics
174                    .key_value_store_num_fetches_not_found
175                    .with_label_values(&[self.store_name, "ckpt_summary"])
176                    .inc_by(summaries_not_found);
177            }
178            if contents_not_found > 0 {
179                self.metrics
180                    .key_value_store_num_fetches_not_found
181                    .with_label_values(&[self.store_name, "ckpt_contents"])
182                    .inc_by(contents_not_found);
183            }
184        } else {
185            self.metrics
186                .key_value_store_num_fetches_error
187                .with_label_values(&[self.store_name, "ckpt_summary"])
188                .inc_by(num_summaries);
189            self.metrics
190                .key_value_store_num_fetches_error
191                .with_label_values(&[self.store_name, "ckpt_contents"])
192                .inc_by(num_contents);
193        }
194
195        res
196    }
197
198    pub async fn multi_get_checkpoints_summaries(
199        &self,
200        keys: &[CheckpointSequenceNumber],
201    ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
202        self.multi_get_checkpoints(keys, &[], &[])
203            .await
204            .map(|(summaries, _, _)| summaries)
205    }
206
207    pub async fn multi_get_checkpoints_contents(
208        &self,
209        keys: &[CheckpointSequenceNumber],
210    ) -> SuiResult<Vec<Option<CheckpointContents>>> {
211        self.multi_get_checkpoints(&[], keys, &[])
212            .await
213            .map(|(_, contents, _)| contents)
214    }
215
216    pub async fn multi_get_checkpoints_summaries_by_digest(
217        &self,
218        keys: &[CheckpointDigest],
219    ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
220        self.multi_get_checkpoints(&[], &[], keys)
221            .await
222            .map(|(_, _, summaries)| summaries)
223    }
224
225    pub async fn multi_get_tx(
226        &self,
227        keys: &[TransactionDigest],
228    ) -> SuiResult<Vec<Option<Transaction>>> {
229        self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
230    }
231
232    pub async fn multi_get_fx_by_tx_digest(
233        &self,
234        keys: &[TransactionDigest],
235    ) -> SuiResult<Vec<Option<TransactionEffects>>> {
236        self.multi_get(&[], keys).await.map(|(_, fx)| fx)
237    }
238
239    /// Convenience method for fetching single digest, and returning an error if it's not found.
240    /// Prefer using multi_get_tx whenever possible.
241    pub async fn get_tx(&self, digest: TransactionDigest) -> SuiResult<Transaction> {
242        self.multi_get_tx(&[digest])
243            .await?
244            .into_iter()
245            .next()
246            .flatten()
247            .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
248    }
249
250    /// Convenience method for fetching single digest, and returning an error if it's not found.
251    /// Prefer using multi_get_fx_by_tx_digest whenever possible.
252    pub async fn get_fx_by_tx_digest(
253        &self,
254        digest: TransactionDigest,
255    ) -> SuiResult<TransactionEffects> {
256        self.multi_get_fx_by_tx_digest(&[digest])
257            .await?
258            .into_iter()
259            .next()
260            .flatten()
261            .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
262    }
263
264    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
265    /// Prefer using multi_get_checkpoints_summaries whenever possible.
266    pub async fn get_checkpoint_summary(
267        &self,
268        checkpoint: CheckpointSequenceNumber,
269    ) -> SuiResult<CertifiedCheckpointSummary> {
270        self.multi_get_checkpoints_summaries(&[checkpoint])
271            .await?
272            .into_iter()
273            .next()
274            .flatten()
275            .ok_or(
276                SuiErrorKind::UserInputError {
277                    error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
278                }
279                .into(),
280            )
281    }
282
283    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
284    /// Prefer using multi_get_checkpoints_contents whenever possible.
285    pub async fn get_checkpoint_contents(
286        &self,
287        checkpoint: CheckpointSequenceNumber,
288    ) -> SuiResult<CheckpointContents> {
289        self.multi_get_checkpoints_contents(&[checkpoint])
290            .await?
291            .into_iter()
292            .next()
293            .flatten()
294            .ok_or(
295                SuiErrorKind::UserInputError {
296                    error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
297                }
298                .into(),
299            )
300    }
301
302    /// Convenience method for fetching single checkpoint, and returning an error if it's not found.
303    /// Prefer using multi_get_checkpoints_summaries_by_digest whenever possible.
304    pub async fn get_checkpoint_summary_by_digest(
305        &self,
306        digest: CheckpointDigest,
307    ) -> SuiResult<CertifiedCheckpointSummary> {
308        self.multi_get_checkpoints_summaries_by_digest(&[digest])
309            .await?
310            .into_iter()
311            .next()
312            .flatten()
313            .ok_or(
314                SuiErrorKind::UserInputError {
315                    error: UserInputError::VerifiedCheckpointDigestNotFound(format!(
316                        "{:?}",
317                        digest
318                    )),
319                }
320                .into(),
321            )
322    }
323
324    pub async fn deprecated_get_transaction_checkpoint(
325        &self,
326        digest: TransactionDigest,
327    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
328        self.inner
329            .deprecated_get_transaction_checkpoint(digest)
330            .await
331    }
332
333    pub async fn get_object(
334        &self,
335        object_id: ObjectID,
336        version: VersionNumber,
337    ) -> SuiResult<Option<Object>> {
338        self.inner.get_object(object_id, version).await
339    }
340
341    pub async fn multi_get_objects(
342        &self,
343        object_keys: &[ObjectKey],
344    ) -> SuiResult<Vec<Option<Object>>> {
345        self.inner.multi_get_objects(object_keys).await
346    }
347
348    pub async fn multi_get_transaction_checkpoint(
349        &self,
350        digests: &[TransactionDigest],
351    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
352        self.inner.multi_get_transaction_checkpoint(digests).await
353    }
354
355    pub async fn multi_get_events_by_tx_digests(
356        &self,
357        digests: &[TransactionDigest],
358    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
359        self.inner.multi_get_events_by_tx_digests(digests).await
360    }
361}
362
363/// Immutable key/value store trait for storing/retrieving transactions, effects, and events.
364/// Only defines multi_get/multi_put methods to discourage single key/value operations.
365#[async_trait]
366pub trait TransactionKeyValueStoreTrait {
367    /// Generic multi_get, allows implementors to get heterogenous values with a single round trip.
368    async fn multi_get(
369        &self,
370        transactions: &[TransactionDigest],
371        effects: &[TransactionDigest],
372    ) -> SuiResult<KVStoreTransactionData>;
373
374    /// Generic multi_get to allow implementors to get heterogenous values with a single round trip.
375    async fn multi_get_checkpoints(
376        &self,
377        checkpoint_summaries: &[CheckpointSequenceNumber],
378        checkpoint_contents: &[CheckpointSequenceNumber],
379        checkpoint_summaries_by_digest: &[CheckpointDigest],
380    ) -> SuiResult<KVStoreCheckpointData>;
381
382    async fn deprecated_get_transaction_checkpoint(
383        &self,
384        digest: TransactionDigest,
385    ) -> SuiResult<Option<CheckpointSequenceNumber>>;
386
387    async fn get_object(
388        &self,
389        object_id: ObjectID,
390        version: SequenceNumber,
391    ) -> SuiResult<Option<Object>>;
392
393    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>>;
394
395    async fn multi_get_transaction_checkpoint(
396        &self,
397        digests: &[TransactionDigest],
398    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;
399
400    async fn multi_get_events_by_tx_digests(
401        &self,
402        digests: &[TransactionDigest],
403    ) -> SuiResult<Vec<Option<TransactionEvents>>>;
404}
405
406/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the
407/// primary store returns None.
408///
409/// Will be used to check the local rocksdb store, before falling back to a remote scalable store.
410pub struct FallbackTransactionKVStore {
411    primary: TransactionKeyValueStore,
412    fallback: TransactionKeyValueStore,
413}
414
415impl FallbackTransactionKVStore {
416    pub fn new_kv(
417        primary: TransactionKeyValueStore,
418        fallback: TransactionKeyValueStore,
419        metrics: Arc<KeyValueStoreMetrics>,
420        label: &'static str,
421    ) -> TransactionKeyValueStore {
422        let store = Arc::new(Self { primary, fallback });
423        TransactionKeyValueStore::new(label, metrics, store)
424    }
425}
426
427#[async_trait]
428impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
429    #[instrument(level = "trace", skip_all)]
430    async fn multi_get(
431        &self,
432        transactions: &[TransactionDigest],
433        effects: &[TransactionDigest],
434    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
435        let mut res = self.primary.multi_get(transactions, effects).await?;
436
437        let (fallback_transactions, indices_transactions) = find_fallback(&res.0, transactions);
438        let (fallback_effects, indices_effects) = find_fallback(&res.1, effects);
439
440        if fallback_transactions.is_empty() && fallback_effects.is_empty() {
441            return Ok(res);
442        }
443
444        let secondary_res = self
445            .fallback
446            .multi_get(&fallback_transactions, &fallback_effects)
447            .await?;
448
449        merge_res(&mut res.0, secondary_res.0, &indices_transactions);
450        merge_res(&mut res.1, secondary_res.1, &indices_effects);
451
452        Ok((res.0, res.1))
453    }
454
455    #[instrument(level = "trace", skip_all)]
456    async fn multi_get_checkpoints(
457        &self,
458        checkpoint_summaries: &[CheckpointSequenceNumber],
459        checkpoint_contents: &[CheckpointSequenceNumber],
460        checkpoint_summaries_by_digest: &[CheckpointDigest],
461    ) -> SuiResult<(
462        Vec<Option<CertifiedCheckpointSummary>>,
463        Vec<Option<CheckpointContents>>,
464        Vec<Option<CertifiedCheckpointSummary>>,
465    )> {
466        let mut res = self
467            .primary
468            .multi_get_checkpoints(
469                checkpoint_summaries,
470                checkpoint_contents,
471                checkpoint_summaries_by_digest,
472            )
473            .await?;
474
475        let (fallback_summaries, indices_summaries) = find_fallback(&res.0, checkpoint_summaries);
476        let (fallback_contents, indices_contents) = find_fallback(&res.1, checkpoint_contents);
477        let (fallback_summaries_by_digest, indices_summaries_by_digest) =
478            find_fallback(&res.2, checkpoint_summaries_by_digest);
479
480        if fallback_summaries.is_empty()
481            && fallback_contents.is_empty()
482            && fallback_summaries_by_digest.is_empty()
483        {
484            return Ok(res);
485        }
486
487        let secondary_res = self
488            .fallback
489            .multi_get_checkpoints(
490                &fallback_summaries,
491                &fallback_contents,
492                &fallback_summaries_by_digest,
493            )
494            .await?;
495
496        merge_res(&mut res.0, secondary_res.0, &indices_summaries);
497        merge_res(&mut res.1, secondary_res.1, &indices_contents);
498        merge_res(&mut res.2, secondary_res.2, &indices_summaries_by_digest);
499
500        Ok((res.0, res.1, res.2))
501    }
502
503    #[instrument(level = "trace", skip_all)]
504    async fn deprecated_get_transaction_checkpoint(
505        &self,
506        digest: TransactionDigest,
507    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
508        let mut res = self
509            .primary
510            .deprecated_get_transaction_checkpoint(digest)
511            .await?;
512        if res.is_none() {
513            res = self
514                .fallback
515                .deprecated_get_transaction_checkpoint(digest)
516                .await?;
517        }
518        Ok(res)
519    }
520
521    #[instrument(level = "trace", skip_all)]
522    async fn get_object(
523        &self,
524        object_id: ObjectID,
525        version: SequenceNumber,
526    ) -> SuiResult<Option<Object>> {
527        let mut res = self.primary.get_object(object_id, version).await?;
528        if res.is_none() {
529            res = self.fallback.get_object(object_id, version).await?;
530        }
531        Ok(res)
532    }
533
534    #[instrument(level = "trace", skip_all)]
535    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
536        let mut res = self.primary.multi_get_objects(object_keys).await?;
537
538        let (fallback, indices) = find_fallback(&res, object_keys);
539
540        if fallback.is_empty() {
541            return Ok(res);
542        }
543
544        let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
545
546        merge_res(&mut res, secondary_res, &indices);
547
548        Ok(res)
549    }
550
551    #[instrument(level = "trace", skip_all)]
552    async fn multi_get_transaction_checkpoint(
553        &self,
554        digests: &[TransactionDigest],
555    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
556        let mut res = self
557            .primary
558            .multi_get_transaction_checkpoint(digests)
559            .await?;
560
561        let (fallback, indices) = find_fallback(&res, digests);
562
563        if fallback.is_empty() {
564            return Ok(res);
565        }
566
567        let secondary_res = self
568            .fallback
569            .multi_get_transaction_checkpoint(&fallback)
570            .await?;
571
572        merge_res(&mut res, secondary_res, &indices);
573
574        Ok(res)
575    }
576
577    #[instrument(level = "trace", skip_all)]
578    async fn multi_get_events_by_tx_digests(
579        &self,
580        digests: &[TransactionDigest],
581    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
582        let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
583        let (fallback, indices) = find_fallback(&res, digests);
584        if fallback.is_empty() {
585            return Ok(res);
586        }
587        let secondary_res = self
588            .fallback
589            .multi_get_events_by_tx_digests(&fallback)
590            .await?;
591        merge_res(&mut res, secondary_res, &indices);
592        Ok(res)
593    }
594}
595
596fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
597    let num_nones = values.iter().filter(|v| v.is_none()).count();
598    let mut fallback_keys = Vec::with_capacity(num_nones);
599    let mut fallback_indices = Vec::with_capacity(num_nones);
600    for (i, value) in values.iter().enumerate() {
601        if value.is_none() {
602            fallback_keys.push(keys[i].clone());
603            fallback_indices.push(i);
604        }
605    }
606    (fallback_keys, fallback_indices)
607}
608
609fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
610    for (&index, fallback_value) in indices.iter().zip(fallback_values) {
611        values[index] = fallback_value;
612    }
613}