sui_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{self, StreamExt};
7use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
8use mysten_common::ZipDebugEqIteratorExt;
9use reqwest::Client;
10use reqwest::Url;
11use reqwest::header::{CONTENT_LENGTH, HeaderValue};
12use serde::{Deserialize, Serialize};
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16use sui_types::base_types::{ObjectID, SequenceNumber};
17use sui_types::object::Object;
18use sui_types::storage::ObjectKey;
19use sui_types::{
20    digests::{CheckpointContentsDigest, CheckpointDigest, TransactionDigest},
21    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
22    error::{SuiErrorKind, SuiResult},
23    messages_checkpoint::{
24        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
25    },
26    transaction::Transaction,
27};
28use tap::{TapFallible, TapOptional};
29use tracing::{error, info, instrument, trace, warn};
30
31use crate::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
32use crate::key_value_store_metrics::KeyValueStoreMetrics;
33
34pub struct HttpKVStore {
35    base_url: Url,
36    client: Client,
37    cache: MokaCache<Url, Bytes>,
38    metrics: Arc<KeyValueStoreMetrics>,
39}
40
41pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
42    base64_url::encode(digest)
43}
44
45// for non-digest keys, we need a tag to make sure we don't have collisions
46#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub enum TaggedKey {
48    CheckpointSequenceNumber(CheckpointSequenceNumber),
49}
50
51pub fn encoded_tagged_key(key: &TaggedKey) -> String {
52    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
53    base64_url::encode(&bytes)
54}
55
56pub fn encode_object_key(object_key: &ObjectKey) -> String {
57    let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
58    base64_url::encode(&bytes)
59}
60
61trait IntoSuiResult<T> {
62    fn into_sui_result(self) -> SuiResult<T>;
63}
64
65impl<T, E> IntoSuiResult<T> for Result<T, E>
66where
67    E: std::error::Error,
68{
69    fn into_sui_result(self) -> SuiResult<T> {
70        self.map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
71    }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum Key {
76    Tx(TransactionDigest),
77    Fx(TransactionDigest),
78    CheckpointContents(CheckpointSequenceNumber),
79    CheckpointSummary(CheckpointSequenceNumber),
80    CheckpointContentsByDigest(CheckpointContentsDigest),
81    CheckpointSummaryByDigest(CheckpointDigest),
82    TxToCheckpoint(TransactionDigest),
83    ObjectKey(ObjectKey),
84    EventsByTxDigest(TransactionDigest),
85}
86
87impl Key {
88    /// Return a string representation of the key type
89    pub fn ty(&self) -> &'static str {
90        match self {
91            Key::Tx(_) => "tx",
92            Key::Fx(_) => "fx",
93            Key::CheckpointContents(_) => "cc",
94            Key::CheckpointSummary(_) => "cs",
95            Key::CheckpointContentsByDigest(_) => "cc",
96            Key::CheckpointSummaryByDigest(_) => "cs",
97            Key::TxToCheckpoint(_) => "tx2c",
98            Key::ObjectKey(_) => "ob",
99            Key::EventsByTxDigest(_) => "evtx",
100        }
101    }
102
103    pub fn encode(&self) -> String {
104        match self {
105            Key::Tx(digest) => encode_digest(digest),
106            Key::Fx(digest) => encode_digest(digest),
107            Key::CheckpointContents(seq) => {
108                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
109            }
110            Key::CheckpointSummary(seq) => {
111                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
112            }
113            Key::CheckpointContentsByDigest(digest) => encode_digest(digest),
114            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
115            Key::TxToCheckpoint(digest) => encode_digest(digest),
116            Key::ObjectKey(object_key) => encode_object_key(object_key),
117            Key::EventsByTxDigest(digest) => encode_digest(digest),
118        }
119    }
120
121    pub fn to_path_elements(&self) -> (String, &'static str) {
122        (self.encode(), self.ty())
123    }
124}
125
126#[derive(Clone, Debug)]
127enum Value {
128    Tx(Box<Transaction>),
129    Fx(Box<TransactionEffects>),
130    Events(Box<TransactionEvents>),
131    CheckpointContents(Box<CheckpointContents>),
132    CheckpointSummary(Box<CertifiedCheckpointSummary>),
133    TxToCheckpoint(CheckpointSequenceNumber),
134}
135
136pub fn path_elements_to_key(digest: &str, type_: &str) -> anyhow::Result<Key> {
137    let decoded_digest = base64_url::decode(digest)?;
138
139    match type_ {
140        "tx" => Ok(Key::Tx(TransactionDigest::try_from(decoded_digest)?)),
141        "fx" => Ok(Key::Fx(TransactionDigest::try_from(decoded_digest)?)),
142        "cc" => {
143            // first try to decode as digest, otherwise try to decode as tagged key
144            match CheckpointContentsDigest::try_from(decoded_digest.clone()) {
145                Err(_) => {
146                    let tagged_key = bcs::from_bytes(&decoded_digest)?;
147                    match tagged_key {
148                        TaggedKey::CheckpointSequenceNumber(seq) => {
149                            Ok(Key::CheckpointContents(seq))
150                        }
151                    }
152                }
153                Ok(cc_digest) => Ok(Key::CheckpointContentsByDigest(cc_digest)),
154            }
155        }
156        "cs" => {
157            // first try to decode as digest, otherwise try to decode as tagged key
158            match CheckpointDigest::try_from(decoded_digest.clone()) {
159                Err(_) => {
160                    let tagged_key = bcs::from_bytes(&decoded_digest)?;
161                    match tagged_key {
162                        TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointSummary(seq)),
163                    }
164                }
165                Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
166            }
167        }
168        "tx2c" => Ok(Key::TxToCheckpoint(TransactionDigest::try_from(
169            decoded_digest,
170        )?)),
171        "ob" => {
172            let object_key: ObjectKey = bcs::from_bytes(&decoded_digest)?;
173            Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
174        }
175        _ => Err(anyhow::anyhow!("Invalid type: {}", type_)),
176    }
177}
178
179impl HttpKVStore {
180    pub fn new_kv(
181        base_url: &str,
182        cache_size: u64,
183        metrics: Arc<KeyValueStoreMetrics>,
184    ) -> SuiResult<TransactionKeyValueStore> {
185        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
186        Ok(TransactionKeyValueStore::new("http", metrics, inner))
187    }
188
189    pub fn new(
190        base_url: &str,
191        cache_size: u64,
192        metrics: Arc<KeyValueStoreMetrics>,
193    ) -> SuiResult<Self> {
194        info!("creating HttpKVStore with base_url: {}", base_url);
195
196        let client = Client::builder().http2_prior_knowledge().build().unwrap();
197
198        let base_url = if base_url.ends_with('/') {
199            base_url.to_string()
200        } else {
201            format!("{}/", base_url)
202        };
203
204        let base_url = Url::parse(&base_url).into_sui_result()?;
205
206        let cache = MokaCacheBuilder::new(cache_size)
207            .time_to_idle(Duration::from_secs(600))
208            .build();
209
210        Ok(Self {
211            base_url,
212            client,
213            cache,
214            metrics,
215        })
216    }
217
218    fn get_url(&self, key: &Key) -> SuiResult<Url> {
219        let (digest, item_type) = key.to_path_elements();
220        let joined = self
221            .base_url
222            .join(&format!("{}/{}", digest, item_type))
223            .into_sui_result()?;
224        Url::from_str(joined.as_str()).into_sui_result()
225    }
226
227    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<SuiResult<Option<Bytes>>> {
228        let uris_vec = uris.to_vec();
229        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
230        fetches.buffered(uris.len()).collect::<Vec<_>>().await
231    }
232
233    async fn fetch(&self, key: Key) -> SuiResult<Option<Bytes>> {
234        let url = self.get_url(&key)?;
235
236        trace!("fetching url: {}", url);
237
238        if let Some(res) = self.cache.get(&url) {
239            trace!("found cached data for url: {}, len: {:?}", url, res.len());
240            self.metrics
241                .key_value_store_num_fetches_success
242                .with_label_values(&["http_cache", key.ty()])
243                .inc();
244            return Ok(Some(res));
245        }
246
247        self.metrics
248            .key_value_store_num_fetches_not_found
249            .with_label_values(&["http_cache", key.ty()])
250            .inc();
251
252        let resp = self
253            .client
254            .get(url.clone())
255            .send()
256            .await
257            .into_sui_result()?;
258        trace!(
259            "got response {} for url: {}, len: {:?}",
260            url,
261            resp.status(),
262            resp.headers()
263                .get(CONTENT_LENGTH)
264                .unwrap_or(&HeaderValue::from_static("0"))
265        );
266        // return None if 400
267        if resp.status().is_success() {
268            let bytes = resp.bytes().await.into_sui_result()?;
269            self.cache.insert(url, bytes.clone());
270
271            Ok(Some(bytes))
272        } else {
273            Ok(None)
274        }
275    }
276}
277
278fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
279where
280    K: std::fmt::Debug,
281    T: for<'de> Deserialize<'de>,
282{
283    bcs::from_bytes(bytes)
284        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
285        .ok()
286}
287
288fn map_fetch<'a, K>(fetch: (&'a SuiResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
289where
290    K: std::fmt::Debug,
291{
292    let (fetch, key) = fetch;
293    match fetch {
294        Ok(Some(bytes)) => Some((bytes, key)),
295        Ok(None) => None,
296        Err(err) => {
297            warn!("Error fetching key: {:?}, error: {:?}", key, err);
298            None
299        }
300    }
301}
302
303fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
304    let mut start = 0;
305    lengths
306        .iter()
307        .map(|length| {
308            let end = start + length;
309            let result = &slice[start..end];
310            start = end;
311            result
312        })
313        .collect()
314}
315
316fn deser_check_digest<T, D>(
317    digest: &D,
318    bytes: &Bytes,
319    get_expected_digest: impl FnOnce(&T) -> D,
320) -> Option<T>
321where
322    D: std::fmt::Debug + PartialEq,
323    T: for<'de> Deserialize<'de>,
324{
325    deser(digest, bytes).and_then(|o: T| {
326        let expected_digest = get_expected_digest(&o);
327        if expected_digest == *digest {
328            Some(o)
329        } else {
330            error!(
331                "Digest mismatch - expected: {:?}, got: {:?}",
332                digest, expected_digest,
333            );
334            None
335        }
336    })
337}
338
339#[async_trait]
340impl TransactionKeyValueStoreTrait for HttpKVStore {
341    #[instrument(level = "trace", skip_all)]
342    async fn multi_get(
343        &self,
344        transactions: &[TransactionDigest],
345        effects: &[TransactionDigest],
346    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
347        let num_txns = transactions.len();
348        let num_effects = effects.len();
349
350        let keys = transactions
351            .iter()
352            .map(|tx| Key::Tx(*tx))
353            .chain(effects.iter().map(|fx| Key::Fx(*fx)))
354            .collect::<Vec<_>>();
355
356        let fetches = self.multi_fetch(keys).await;
357        let txn_slice = fetches[..num_txns].to_vec();
358        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
359
360        let txn_results = txn_slice
361            .iter()
362            .take(num_txns)
363            .zip_debug_eq(transactions.iter())
364            .map(map_fetch)
365            .map(|maybe_bytes| {
366                maybe_bytes.and_then(|(bytes, digest)| {
367                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
368                })
369            })
370            .collect::<Vec<_>>();
371
372        let fx_results = fx_slice
373            .iter()
374            .take(num_effects)
375            .zip_debug_eq(effects.iter())
376            .map(map_fetch)
377            .map(|maybe_bytes| {
378                maybe_bytes.and_then(|(bytes, digest)| {
379                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
380                        *fx.transaction_digest()
381                    })
382                })
383            })
384            .collect::<Vec<_>>();
385        Ok((txn_results, fx_results))
386    }
387
388    #[instrument(level = "trace", skip_all)]
389    async fn multi_get_checkpoints(
390        &self,
391        checkpoint_summaries: &[CheckpointSequenceNumber],
392        checkpoint_contents: &[CheckpointSequenceNumber],
393        checkpoint_summaries_by_digest: &[CheckpointDigest],
394    ) -> SuiResult<(
395        Vec<Option<CertifiedCheckpointSummary>>,
396        Vec<Option<CheckpointContents>>,
397        Vec<Option<CertifiedCheckpointSummary>>,
398    )> {
399        let keys = checkpoint_summaries
400            .iter()
401            .map(|cp| Key::CheckpointSummary(*cp))
402            .chain(
403                checkpoint_contents
404                    .iter()
405                    .map(|cp| Key::CheckpointContents(*cp)),
406            )
407            .chain(
408                checkpoint_summaries_by_digest
409                    .iter()
410                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
411            )
412            .collect::<Vec<_>>();
413
414        let summaries_len = checkpoint_summaries.len();
415        let contents_len = checkpoint_contents.len();
416        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
417
418        let fetches = self.multi_fetch(keys).await;
419
420        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
421
422        let result_slices = multi_split_slice(&fetches, &input_slices);
423
424        let summaries_results = result_slices[0]
425            .iter()
426            .zip_debug_eq(checkpoint_summaries.iter())
427            .map(map_fetch)
428            .map(|maybe_bytes| {
429                maybe_bytes
430                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
431            })
432            .collect::<Vec<_>>();
433
434        let contents_results = result_slices[1]
435            .iter()
436            .zip_debug_eq(checkpoint_contents.iter())
437            .map(map_fetch)
438            .map(|maybe_bytes| {
439                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
440            })
441            .collect::<Vec<_>>();
442
443        let summaries_by_digest_results = result_slices[2]
444            .iter()
445            .zip_debug_eq(checkpoint_summaries_by_digest.iter())
446            .map(map_fetch)
447            .map(|maybe_bytes| {
448                maybe_bytes.and_then(|(bytes, digest)| {
449                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
450                })
451            })
452            .collect::<Vec<_>>();
453        Ok((
454            summaries_results,
455            contents_results,
456            summaries_by_digest_results,
457        ))
458    }
459
460    #[instrument(level = "trace", skip_all)]
461    async fn deprecated_get_transaction_checkpoint(
462        &self,
463        digest: TransactionDigest,
464    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
465        let key = Key::TxToCheckpoint(digest);
466        self.fetch(key).await.map(|maybe| {
467            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
468        })
469    }
470
471    #[instrument(level = "trace", skip_all)]
472    async fn get_object(
473        &self,
474        object_id: ObjectID,
475        version: SequenceNumber,
476    ) -> SuiResult<Option<Object>> {
477        let key = Key::ObjectKey(ObjectKey(object_id, version));
478        self.fetch(key).await.map(|maybe| {
479            maybe
480                .and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref()))
481                .tap_some(|_| {
482                    self.metrics
483                        .key_value_store_num_fetches_success
484                        .with_label_values(&["http", key.ty()])
485                        .inc();
486                })
487                .tap_none(|| {
488                    self.metrics
489                        .key_value_store_num_fetches_not_found
490                        .with_label_values(&["http", key.ty()])
491                        .inc();
492                })
493        })
494    }
495
496    #[instrument(level = "trace", skip_all)]
497    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
498        let keys = object_keys
499            .iter()
500            .map(|key| Key::ObjectKey(*key))
501            .collect::<Vec<_>>();
502
503        let fetches = self.multi_fetch(keys).await;
504
505        let results = fetches
506            .iter()
507            .zip_debug_eq(object_keys.iter())
508            .map(map_fetch)
509            .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
510            .collect::<Vec<_>>();
511
512        Ok(results)
513    }
514
515    #[instrument(level = "trace", skip_all)]
516    async fn multi_get_transaction_checkpoint(
517        &self,
518        digests: &[TransactionDigest],
519    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
520        let keys = digests
521            .iter()
522            .map(|digest| Key::TxToCheckpoint(*digest))
523            .collect::<Vec<_>>();
524
525        let fetches = self.multi_fetch(keys).await;
526
527        let results = fetches
528            .iter()
529            .zip_debug_eq(digests.iter())
530            .map(map_fetch)
531            .map(|maybe_bytes| {
532                maybe_bytes
533                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
534            })
535            .collect::<Vec<_>>();
536
537        Ok(results)
538    }
539
540    #[instrument(level = "trace", skip_all)]
541    async fn multi_get_events_by_tx_digests(
542        &self,
543        digests: &[TransactionDigest],
544    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
545        let keys = digests
546            .iter()
547            .map(|digest| Key::EventsByTxDigest(*digest))
548            .collect::<Vec<_>>();
549        Ok(self
550            .multi_fetch(keys)
551            .await
552            .iter()
553            .zip_debug_eq(digests.iter())
554            .map(map_fetch)
555            .map(|maybe_bytes| {
556                maybe_bytes
557                    .and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..)))
558            })
559            .collect::<Vec<_>>())
560    }
561}