1use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{self, StreamExt};
7use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
8use reqwest::Client;
9use reqwest::Url;
10use reqwest::header::{CONTENT_LENGTH, HeaderValue};
11use serde::{Deserialize, Serialize};
12use std::str::FromStr;
13use std::sync::Arc;
14use std::time::Duration;
15use sui_types::base_types::{ObjectID, SequenceNumber};
16use sui_types::object::Object;
17use sui_types::storage::ObjectKey;
18use sui_types::{
19    digests::{CheckpointContentsDigest, CheckpointDigest, TransactionDigest},
20    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
21    error::{SuiErrorKind, SuiResult},
22    messages_checkpoint::{
23        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
24    },
25    transaction::Transaction,
26};
27use tap::{TapFallible, TapOptional};
28use tracing::{error, info, instrument, trace, warn};
29
30use crate::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
31use crate::key_value_store_metrics::KeyValueStoreMetrics;
32
33pub struct HttpKVStore {
34    base_url: Url,
35    client: Client,
36    cache: MokaCache<Url, Bytes>,
37    metrics: Arc<KeyValueStoreMetrics>,
38}
39
40pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
41    base64_url::encode(digest)
42}
43
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
46pub enum TaggedKey {
47    CheckpointSequenceNumber(CheckpointSequenceNumber),
48}
49
50pub fn encoded_tagged_key(key: &TaggedKey) -> String {
51    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
52    base64_url::encode(&bytes)
53}
54
55pub fn encode_object_key(object_key: &ObjectKey) -> String {
56    let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
57    base64_url::encode(&bytes)
58}
59
60trait IntoSuiResult<T> {
61    fn into_sui_result(self) -> SuiResult<T>;
62}
63
64impl<T, E> IntoSuiResult<T> for Result<T, E>
65where
66    E: std::error::Error,
67{
68    fn into_sui_result(self) -> SuiResult<T> {
69        self.map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
70    }
71}
72
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum Key {
75    Tx(TransactionDigest),
76    Fx(TransactionDigest),
77    CheckpointContents(CheckpointSequenceNumber),
78    CheckpointSummary(CheckpointSequenceNumber),
79    CheckpointContentsByDigest(CheckpointContentsDigest),
80    CheckpointSummaryByDigest(CheckpointDigest),
81    TxToCheckpoint(TransactionDigest),
82    ObjectKey(ObjectKey),
83    EventsByTxDigest(TransactionDigest),
84}
85
86impl Key {
87    pub fn ty(&self) -> &'static str {
89        match self {
90            Key::Tx(_) => "tx",
91            Key::Fx(_) => "fx",
92            Key::CheckpointContents(_) => "cc",
93            Key::CheckpointSummary(_) => "cs",
94            Key::CheckpointContentsByDigest(_) => "cc",
95            Key::CheckpointSummaryByDigest(_) => "cs",
96            Key::TxToCheckpoint(_) => "tx2c",
97            Key::ObjectKey(_) => "ob",
98            Key::EventsByTxDigest(_) => "evtx",
99        }
100    }
101
102    pub fn encode(&self) -> String {
103        match self {
104            Key::Tx(digest) => encode_digest(digest),
105            Key::Fx(digest) => encode_digest(digest),
106            Key::CheckpointContents(seq) => {
107                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
108            }
109            Key::CheckpointSummary(seq) => {
110                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
111            }
112            Key::CheckpointContentsByDigest(digest) => encode_digest(digest),
113            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
114            Key::TxToCheckpoint(digest) => encode_digest(digest),
115            Key::ObjectKey(object_key) => encode_object_key(object_key),
116            Key::EventsByTxDigest(digest) => encode_digest(digest),
117        }
118    }
119
120    pub fn to_path_elements(&self) -> (String, &'static str) {
121        (self.encode(), self.ty())
122    }
123}
124
125#[derive(Clone, Debug)]
126enum Value {
127    Tx(Box<Transaction>),
128    Fx(Box<TransactionEffects>),
129    Events(Box<TransactionEvents>),
130    CheckpointContents(Box<CheckpointContents>),
131    CheckpointSummary(Box<CertifiedCheckpointSummary>),
132    TxToCheckpoint(CheckpointSequenceNumber),
133}
134
135pub fn path_elements_to_key(digest: &str, type_: &str) -> anyhow::Result<Key> {
136    let decoded_digest = base64_url::decode(digest)?;
137
138    match type_ {
139        "tx" => Ok(Key::Tx(TransactionDigest::try_from(decoded_digest)?)),
140        "fx" => Ok(Key::Fx(TransactionDigest::try_from(decoded_digest)?)),
141        "cc" => {
142            match CheckpointContentsDigest::try_from(decoded_digest.clone()) {
144                Err(_) => {
145                    let tagged_key = bcs::from_bytes(&decoded_digest)?;
146                    match tagged_key {
147                        TaggedKey::CheckpointSequenceNumber(seq) => {
148                            Ok(Key::CheckpointContents(seq))
149                        }
150                    }
151                }
152                Ok(cc_digest) => Ok(Key::CheckpointContentsByDigest(cc_digest)),
153            }
154        }
155        "cs" => {
156            match CheckpointDigest::try_from(decoded_digest.clone()) {
158                Err(_) => {
159                    let tagged_key = bcs::from_bytes(&decoded_digest)?;
160                    match tagged_key {
161                        TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointSummary(seq)),
162                    }
163                }
164                Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
165            }
166        }
167        "tx2c" => Ok(Key::TxToCheckpoint(TransactionDigest::try_from(
168            decoded_digest,
169        )?)),
170        "ob" => {
171            let object_key: ObjectKey = bcs::from_bytes(&decoded_digest)?;
172            Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
173        }
174        _ => Err(anyhow::anyhow!("Invalid type: {}", type_)),
175    }
176}
177
178impl HttpKVStore {
179    pub fn new_kv(
180        base_url: &str,
181        cache_size: u64,
182        metrics: Arc<KeyValueStoreMetrics>,
183    ) -> SuiResult<TransactionKeyValueStore> {
184        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
185        Ok(TransactionKeyValueStore::new("http", metrics, inner))
186    }
187
188    pub fn new(
189        base_url: &str,
190        cache_size: u64,
191        metrics: Arc<KeyValueStoreMetrics>,
192    ) -> SuiResult<Self> {
193        info!("creating HttpKVStore with base_url: {}", base_url);
194
195        let client = Client::builder().http2_prior_knowledge().build().unwrap();
196
197        let base_url = if base_url.ends_with('/') {
198            base_url.to_string()
199        } else {
200            format!("{}/", base_url)
201        };
202
203        let base_url = Url::parse(&base_url).into_sui_result()?;
204
205        let cache = MokaCacheBuilder::new(cache_size)
206            .time_to_idle(Duration::from_secs(600))
207            .build();
208
209        Ok(Self {
210            base_url,
211            client,
212            cache,
213            metrics,
214        })
215    }
216
217    fn get_url(&self, key: &Key) -> SuiResult<Url> {
218        let (digest, item_type) = key.to_path_elements();
219        let joined = self
220            .base_url
221            .join(&format!("{}/{}", digest, item_type))
222            .into_sui_result()?;
223        Url::from_str(joined.as_str()).into_sui_result()
224    }
225
226    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<SuiResult<Option<Bytes>>> {
227        let uris_vec = uris.to_vec();
228        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
229        fetches.buffered(uris.len()).collect::<Vec<_>>().await
230    }
231
232    async fn fetch(&self, key: Key) -> SuiResult<Option<Bytes>> {
233        let url = self.get_url(&key)?;
234
235        trace!("fetching url: {}", url);
236
237        if let Some(res) = self.cache.get(&url) {
238            trace!("found cached data for url: {}, len: {:?}", url, res.len());
239            self.metrics
240                .key_value_store_num_fetches_success
241                .with_label_values(&["http_cache", key.ty()])
242                .inc();
243            return Ok(Some(res));
244        }
245
246        self.metrics
247            .key_value_store_num_fetches_not_found
248            .with_label_values(&["http_cache", key.ty()])
249            .inc();
250
251        let resp = self
252            .client
253            .get(url.clone())
254            .send()
255            .await
256            .into_sui_result()?;
257        trace!(
258            "got response {} for url: {}, len: {:?}",
259            url,
260            resp.status(),
261            resp.headers()
262                .get(CONTENT_LENGTH)
263                .unwrap_or(&HeaderValue::from_static("0"))
264        );
265        if resp.status().is_success() {
267            let bytes = resp.bytes().await.into_sui_result()?;
268            self.cache.insert(url, bytes.clone());
269
270            Ok(Some(bytes))
271        } else {
272            Ok(None)
273        }
274    }
275}
276
277fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
278where
279    K: std::fmt::Debug,
280    T: for<'de> Deserialize<'de>,
281{
282    bcs::from_bytes(bytes)
283        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
284        .ok()
285}
286
287fn map_fetch<'a, K>(fetch: (&'a SuiResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
288where
289    K: std::fmt::Debug,
290{
291    let (fetch, key) = fetch;
292    match fetch {
293        Ok(Some(bytes)) => Some((bytes, key)),
294        Ok(None) => None,
295        Err(err) => {
296            warn!("Error fetching key: {:?}, error: {:?}", key, err);
297            None
298        }
299    }
300}
301
302fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
303    let mut start = 0;
304    lengths
305        .iter()
306        .map(|length| {
307            let end = start + length;
308            let result = &slice[start..end];
309            start = end;
310            result
311        })
312        .collect()
313}
314
315fn deser_check_digest<T, D>(
316    digest: &D,
317    bytes: &Bytes,
318    get_expected_digest: impl FnOnce(&T) -> D,
319) -> Option<T>
320where
321    D: std::fmt::Debug + PartialEq,
322    T: for<'de> Deserialize<'de>,
323{
324    deser(digest, bytes).and_then(|o: T| {
325        let expected_digest = get_expected_digest(&o);
326        if expected_digest == *digest {
327            Some(o)
328        } else {
329            error!(
330                "Digest mismatch - expected: {:?}, got: {:?}",
331                digest, expected_digest,
332            );
333            None
334        }
335    })
336}
337
338#[async_trait]
339impl TransactionKeyValueStoreTrait for HttpKVStore {
340    #[instrument(level = "trace", skip_all)]
341    async fn multi_get(
342        &self,
343        transactions: &[TransactionDigest],
344        effects: &[TransactionDigest],
345    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
346        let num_txns = transactions.len();
347        let num_effects = effects.len();
348
349        let keys = transactions
350            .iter()
351            .map(|tx| Key::Tx(*tx))
352            .chain(effects.iter().map(|fx| Key::Fx(*fx)))
353            .collect::<Vec<_>>();
354
355        let fetches = self.multi_fetch(keys).await;
356        let txn_slice = fetches[..num_txns].to_vec();
357        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
358
359        let txn_results = txn_slice
360            .iter()
361            .take(num_txns)
362            .zip(transactions.iter())
363            .map(map_fetch)
364            .map(|maybe_bytes| {
365                maybe_bytes.and_then(|(bytes, digest)| {
366                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
367                })
368            })
369            .collect::<Vec<_>>();
370
371        let fx_results = fx_slice
372            .iter()
373            .take(num_effects)
374            .zip(effects.iter())
375            .map(map_fetch)
376            .map(|maybe_bytes| {
377                maybe_bytes.and_then(|(bytes, digest)| {
378                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
379                        *fx.transaction_digest()
380                    })
381                })
382            })
383            .collect::<Vec<_>>();
384        Ok((txn_results, fx_results))
385    }
386
387    #[instrument(level = "trace", skip_all)]
388    async fn multi_get_checkpoints(
389        &self,
390        checkpoint_summaries: &[CheckpointSequenceNumber],
391        checkpoint_contents: &[CheckpointSequenceNumber],
392        checkpoint_summaries_by_digest: &[CheckpointDigest],
393    ) -> SuiResult<(
394        Vec<Option<CertifiedCheckpointSummary>>,
395        Vec<Option<CheckpointContents>>,
396        Vec<Option<CertifiedCheckpointSummary>>,
397    )> {
398        let keys = checkpoint_summaries
399            .iter()
400            .map(|cp| Key::CheckpointSummary(*cp))
401            .chain(
402                checkpoint_contents
403                    .iter()
404                    .map(|cp| Key::CheckpointContents(*cp)),
405            )
406            .chain(
407                checkpoint_summaries_by_digest
408                    .iter()
409                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
410            )
411            .collect::<Vec<_>>();
412
413        let summaries_len = checkpoint_summaries.len();
414        let contents_len = checkpoint_contents.len();
415        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
416
417        let fetches = self.multi_fetch(keys).await;
418
419        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
420
421        let result_slices = multi_split_slice(&fetches, &input_slices);
422
423        let summaries_results = result_slices[0]
424            .iter()
425            .zip(checkpoint_summaries.iter())
426            .map(map_fetch)
427            .map(|maybe_bytes| {
428                maybe_bytes
429                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
430            })
431            .collect::<Vec<_>>();
432
433        let contents_results = result_slices[1]
434            .iter()
435            .zip(checkpoint_contents.iter())
436            .map(map_fetch)
437            .map(|maybe_bytes| {
438                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
439            })
440            .collect::<Vec<_>>();
441
442        let summaries_by_digest_results = result_slices[2]
443            .iter()
444            .zip(checkpoint_summaries_by_digest.iter())
445            .map(map_fetch)
446            .map(|maybe_bytes| {
447                maybe_bytes.and_then(|(bytes, digest)| {
448                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
449                })
450            })
451            .collect::<Vec<_>>();
452        Ok((
453            summaries_results,
454            contents_results,
455            summaries_by_digest_results,
456        ))
457    }
458
459    #[instrument(level = "trace", skip_all)]
460    async fn deprecated_get_transaction_checkpoint(
461        &self,
462        digest: TransactionDigest,
463    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
464        let key = Key::TxToCheckpoint(digest);
465        self.fetch(key).await.map(|maybe| {
466            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
467        })
468    }
469
470    #[instrument(level = "trace", skip_all)]
471    async fn get_object(
472        &self,
473        object_id: ObjectID,
474        version: SequenceNumber,
475    ) -> SuiResult<Option<Object>> {
476        let key = Key::ObjectKey(ObjectKey(object_id, version));
477        self.fetch(key).await.map(|maybe| {
478            maybe
479                .and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref()))
480                .tap_some(|_| {
481                    self.metrics
482                        .key_value_store_num_fetches_success
483                        .with_label_values(&["http", key.ty()])
484                        .inc();
485                })
486                .tap_none(|| {
487                    self.metrics
488                        .key_value_store_num_fetches_not_found
489                        .with_label_values(&["http", key.ty()])
490                        .inc();
491                })
492        })
493    }
494
495    #[instrument(level = "trace", skip_all)]
496    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
497        let keys = object_keys
498            .iter()
499            .map(|key| Key::ObjectKey(*key))
500            .collect::<Vec<_>>();
501
502        let fetches = self.multi_fetch(keys).await;
503
504        let results = fetches
505            .iter()
506            .zip(object_keys.iter())
507            .map(map_fetch)
508            .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
509            .collect::<Vec<_>>();
510
511        Ok(results)
512    }
513
514    #[instrument(level = "trace", skip_all)]
515    async fn multi_get_transaction_checkpoint(
516        &self,
517        digests: &[TransactionDigest],
518    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
519        let keys = digests
520            .iter()
521            .map(|digest| Key::TxToCheckpoint(*digest))
522            .collect::<Vec<_>>();
523
524        let fetches = self.multi_fetch(keys).await;
525
526        let results = fetches
527            .iter()
528            .zip(digests.iter())
529            .map(map_fetch)
530            .map(|maybe_bytes| {
531                maybe_bytes
532                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
533            })
534            .collect::<Vec<_>>();
535
536        Ok(results)
537    }
538
539    #[instrument(level = "trace", skip_all)]
540    async fn multi_get_events_by_tx_digests(
541        &self,
542        digests: &[TransactionDigest],
543    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
544        let keys = digests
545            .iter()
546            .map(|digest| Key::EventsByTxDigest(*digest))
547            .collect::<Vec<_>>();
548        Ok(self
549            .multi_fetch(keys)
550            .await
551            .iter()
552            .zip(digests.iter())
553            .map(map_fetch)
554            .map(|maybe_bytes| {
555                maybe_bytes
556                    .and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..)))
557            })
558            .collect::<Vec<_>>())
559    }
560}