1use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{self, StreamExt};
7use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
8use mysten_common::ZipDebugEqIteratorExt;
9use reqwest::Client;
10use reqwest::Url;
11use reqwest::header::{CONTENT_LENGTH, HeaderValue};
12use serde::{Deserialize, Serialize};
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16use sui_types::base_types::{ObjectID, SequenceNumber};
17use sui_types::object::Object;
18use sui_types::storage::ObjectKey;
19use sui_types::{
20 digests::{CheckpointContentsDigest, CheckpointDigest, TransactionDigest},
21 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
22 error::{SuiErrorKind, SuiResult},
23 messages_checkpoint::{
24 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
25 },
26 transaction::Transaction,
27};
28use tap::{TapFallible, TapOptional};
29use tracing::{error, info, instrument, trace, warn};
30
31use crate::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
32use crate::key_value_store_metrics::KeyValueStoreMetrics;
33
34pub struct HttpKVStore {
35 base_url: Url,
36 client: Client,
37 cache: MokaCache<Url, Bytes>,
38 metrics: Arc<KeyValueStoreMetrics>,
39}
40
41pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
42 base64_url::encode(digest)
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub enum TaggedKey {
48 CheckpointSequenceNumber(CheckpointSequenceNumber),
49}
50
51pub fn encoded_tagged_key(key: &TaggedKey) -> String {
52 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
53 base64_url::encode(&bytes)
54}
55
56pub fn encode_object_key(object_key: &ObjectKey) -> String {
57 let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
58 base64_url::encode(&bytes)
59}
60
61trait IntoSuiResult<T> {
62 fn into_sui_result(self) -> SuiResult<T>;
63}
64
65impl<T, E> IntoSuiResult<T> for Result<T, E>
66where
67 E: std::error::Error,
68{
69 fn into_sui_result(self) -> SuiResult<T> {
70 self.map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
71 }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum Key {
76 Tx(TransactionDigest),
77 Fx(TransactionDigest),
78 CheckpointContents(CheckpointSequenceNumber),
79 CheckpointSummary(CheckpointSequenceNumber),
80 CheckpointContentsByDigest(CheckpointContentsDigest),
81 CheckpointSummaryByDigest(CheckpointDigest),
82 TxToCheckpoint(TransactionDigest),
83 ObjectKey(ObjectKey),
84 EventsByTxDigest(TransactionDigest),
85}
86
87impl Key {
88 pub fn ty(&self) -> &'static str {
90 match self {
91 Key::Tx(_) => "tx",
92 Key::Fx(_) => "fx",
93 Key::CheckpointContents(_) => "cc",
94 Key::CheckpointSummary(_) => "cs",
95 Key::CheckpointContentsByDigest(_) => "cc",
96 Key::CheckpointSummaryByDigest(_) => "cs",
97 Key::TxToCheckpoint(_) => "tx2c",
98 Key::ObjectKey(_) => "ob",
99 Key::EventsByTxDigest(_) => "evtx",
100 }
101 }
102
103 pub fn encode(&self) -> String {
104 match self {
105 Key::Tx(digest) => encode_digest(digest),
106 Key::Fx(digest) => encode_digest(digest),
107 Key::CheckpointContents(seq) => {
108 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
109 }
110 Key::CheckpointSummary(seq) => {
111 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
112 }
113 Key::CheckpointContentsByDigest(digest) => encode_digest(digest),
114 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
115 Key::TxToCheckpoint(digest) => encode_digest(digest),
116 Key::ObjectKey(object_key) => encode_object_key(object_key),
117 Key::EventsByTxDigest(digest) => encode_digest(digest),
118 }
119 }
120
121 pub fn to_path_elements(&self) -> (String, &'static str) {
122 (self.encode(), self.ty())
123 }
124}
125
126#[derive(Clone, Debug)]
127enum Value {
128 Tx(Box<Transaction>),
129 Fx(Box<TransactionEffects>),
130 Events(Box<TransactionEvents>),
131 CheckpointContents(Box<CheckpointContents>),
132 CheckpointSummary(Box<CertifiedCheckpointSummary>),
133 TxToCheckpoint(CheckpointSequenceNumber),
134}
135
136pub fn path_elements_to_key(digest: &str, type_: &str) -> anyhow::Result<Key> {
137 let decoded_digest = base64_url::decode(digest)?;
138
139 match type_ {
140 "tx" => Ok(Key::Tx(TransactionDigest::try_from(decoded_digest)?)),
141 "fx" => Ok(Key::Fx(TransactionDigest::try_from(decoded_digest)?)),
142 "cc" => {
143 match CheckpointContentsDigest::try_from(decoded_digest.clone()) {
145 Err(_) => {
146 let tagged_key = bcs::from_bytes(&decoded_digest)?;
147 match tagged_key {
148 TaggedKey::CheckpointSequenceNumber(seq) => {
149 Ok(Key::CheckpointContents(seq))
150 }
151 }
152 }
153 Ok(cc_digest) => Ok(Key::CheckpointContentsByDigest(cc_digest)),
154 }
155 }
156 "cs" => {
157 match CheckpointDigest::try_from(decoded_digest.clone()) {
159 Err(_) => {
160 let tagged_key = bcs::from_bytes(&decoded_digest)?;
161 match tagged_key {
162 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointSummary(seq)),
163 }
164 }
165 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
166 }
167 }
168 "tx2c" => Ok(Key::TxToCheckpoint(TransactionDigest::try_from(
169 decoded_digest,
170 )?)),
171 "ob" => {
172 let object_key: ObjectKey = bcs::from_bytes(&decoded_digest)?;
173 Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
174 }
175 _ => Err(anyhow::anyhow!("Invalid type: {}", type_)),
176 }
177}
178
179impl HttpKVStore {
180 pub fn new_kv(
181 base_url: &str,
182 cache_size: u64,
183 metrics: Arc<KeyValueStoreMetrics>,
184 ) -> SuiResult<TransactionKeyValueStore> {
185 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
186 Ok(TransactionKeyValueStore::new("http", metrics, inner))
187 }
188
189 pub fn new(
190 base_url: &str,
191 cache_size: u64,
192 metrics: Arc<KeyValueStoreMetrics>,
193 ) -> SuiResult<Self> {
194 info!("creating HttpKVStore with base_url: {}", base_url);
195
196 let client = Client::builder().http2_prior_knowledge().build().unwrap();
197
198 let base_url = if base_url.ends_with('/') {
199 base_url.to_string()
200 } else {
201 format!("{}/", base_url)
202 };
203
204 let base_url = Url::parse(&base_url).into_sui_result()?;
205
206 let cache = MokaCacheBuilder::new(cache_size)
207 .time_to_idle(Duration::from_secs(600))
208 .build();
209
210 Ok(Self {
211 base_url,
212 client,
213 cache,
214 metrics,
215 })
216 }
217
218 fn get_url(&self, key: &Key) -> SuiResult<Url> {
219 let (digest, item_type) = key.to_path_elements();
220 let joined = self
221 .base_url
222 .join(&format!("{}/{}", digest, item_type))
223 .into_sui_result()?;
224 Url::from_str(joined.as_str()).into_sui_result()
225 }
226
227 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<SuiResult<Option<Bytes>>> {
228 let uris_vec = uris.to_vec();
229 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
230 fetches.buffered(uris.len()).collect::<Vec<_>>().await
231 }
232
233 async fn fetch(&self, key: Key) -> SuiResult<Option<Bytes>> {
234 let url = self.get_url(&key)?;
235
236 trace!("fetching url: {}", url);
237
238 if let Some(res) = self.cache.get(&url) {
239 trace!("found cached data for url: {}, len: {:?}", url, res.len());
240 self.metrics
241 .key_value_store_num_fetches_success
242 .with_label_values(&["http_cache", key.ty()])
243 .inc();
244 return Ok(Some(res));
245 }
246
247 self.metrics
248 .key_value_store_num_fetches_not_found
249 .with_label_values(&["http_cache", key.ty()])
250 .inc();
251
252 let resp = self
253 .client
254 .get(url.clone())
255 .send()
256 .await
257 .into_sui_result()?;
258 trace!(
259 "got response {} for url: {}, len: {:?}",
260 url,
261 resp.status(),
262 resp.headers()
263 .get(CONTENT_LENGTH)
264 .unwrap_or(&HeaderValue::from_static("0"))
265 );
266 if resp.status().is_success() {
268 let bytes = resp.bytes().await.into_sui_result()?;
269 self.cache.insert(url, bytes.clone());
270
271 Ok(Some(bytes))
272 } else {
273 Ok(None)
274 }
275 }
276}
277
278fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
279where
280 K: std::fmt::Debug,
281 T: for<'de> Deserialize<'de>,
282{
283 bcs::from_bytes(bytes)
284 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
285 .ok()
286}
287
288fn map_fetch<'a, K>(fetch: (&'a SuiResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
289where
290 K: std::fmt::Debug,
291{
292 let (fetch, key) = fetch;
293 match fetch {
294 Ok(Some(bytes)) => Some((bytes, key)),
295 Ok(None) => None,
296 Err(err) => {
297 warn!("Error fetching key: {:?}, error: {:?}", key, err);
298 None
299 }
300 }
301}
302
303fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
304 let mut start = 0;
305 lengths
306 .iter()
307 .map(|length| {
308 let end = start + length;
309 let result = &slice[start..end];
310 start = end;
311 result
312 })
313 .collect()
314}
315
316fn deser_check_digest<T, D>(
317 digest: &D,
318 bytes: &Bytes,
319 get_expected_digest: impl FnOnce(&T) -> D,
320) -> Option<T>
321where
322 D: std::fmt::Debug + PartialEq,
323 T: for<'de> Deserialize<'de>,
324{
325 deser(digest, bytes).and_then(|o: T| {
326 let expected_digest = get_expected_digest(&o);
327 if expected_digest == *digest {
328 Some(o)
329 } else {
330 error!(
331 "Digest mismatch - expected: {:?}, got: {:?}",
332 digest, expected_digest,
333 );
334 None
335 }
336 })
337}
338
339#[async_trait]
340impl TransactionKeyValueStoreTrait for HttpKVStore {
341 #[instrument(level = "trace", skip_all)]
342 async fn multi_get(
343 &self,
344 transactions: &[TransactionDigest],
345 effects: &[TransactionDigest],
346 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
347 let num_txns = transactions.len();
348 let num_effects = effects.len();
349
350 let keys = transactions
351 .iter()
352 .map(|tx| Key::Tx(*tx))
353 .chain(effects.iter().map(|fx| Key::Fx(*fx)))
354 .collect::<Vec<_>>();
355
356 let fetches = self.multi_fetch(keys).await;
357 let txn_slice = fetches[..num_txns].to_vec();
358 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
359
360 let txn_results = txn_slice
361 .iter()
362 .take(num_txns)
363 .zip_debug_eq(transactions.iter())
364 .map(map_fetch)
365 .map(|maybe_bytes| {
366 maybe_bytes.and_then(|(bytes, digest)| {
367 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
368 })
369 })
370 .collect::<Vec<_>>();
371
372 let fx_results = fx_slice
373 .iter()
374 .take(num_effects)
375 .zip_debug_eq(effects.iter())
376 .map(map_fetch)
377 .map(|maybe_bytes| {
378 maybe_bytes.and_then(|(bytes, digest)| {
379 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
380 *fx.transaction_digest()
381 })
382 })
383 })
384 .collect::<Vec<_>>();
385 Ok((txn_results, fx_results))
386 }
387
388 #[instrument(level = "trace", skip_all)]
389 async fn multi_get_checkpoints(
390 &self,
391 checkpoint_summaries: &[CheckpointSequenceNumber],
392 checkpoint_contents: &[CheckpointSequenceNumber],
393 checkpoint_summaries_by_digest: &[CheckpointDigest],
394 ) -> SuiResult<(
395 Vec<Option<CertifiedCheckpointSummary>>,
396 Vec<Option<CheckpointContents>>,
397 Vec<Option<CertifiedCheckpointSummary>>,
398 )> {
399 let keys = checkpoint_summaries
400 .iter()
401 .map(|cp| Key::CheckpointSummary(*cp))
402 .chain(
403 checkpoint_contents
404 .iter()
405 .map(|cp| Key::CheckpointContents(*cp)),
406 )
407 .chain(
408 checkpoint_summaries_by_digest
409 .iter()
410 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
411 )
412 .collect::<Vec<_>>();
413
414 let summaries_len = checkpoint_summaries.len();
415 let contents_len = checkpoint_contents.len();
416 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
417
418 let fetches = self.multi_fetch(keys).await;
419
420 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
421
422 let result_slices = multi_split_slice(&fetches, &input_slices);
423
424 let summaries_results = result_slices[0]
425 .iter()
426 .zip_debug_eq(checkpoint_summaries.iter())
427 .map(map_fetch)
428 .map(|maybe_bytes| {
429 maybe_bytes
430 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
431 })
432 .collect::<Vec<_>>();
433
434 let contents_results = result_slices[1]
435 .iter()
436 .zip_debug_eq(checkpoint_contents.iter())
437 .map(map_fetch)
438 .map(|maybe_bytes| {
439 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
440 })
441 .collect::<Vec<_>>();
442
443 let summaries_by_digest_results = result_slices[2]
444 .iter()
445 .zip_debug_eq(checkpoint_summaries_by_digest.iter())
446 .map(map_fetch)
447 .map(|maybe_bytes| {
448 maybe_bytes.and_then(|(bytes, digest)| {
449 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
450 })
451 })
452 .collect::<Vec<_>>();
453 Ok((
454 summaries_results,
455 contents_results,
456 summaries_by_digest_results,
457 ))
458 }
459
460 #[instrument(level = "trace", skip_all)]
461 async fn deprecated_get_transaction_checkpoint(
462 &self,
463 digest: TransactionDigest,
464 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
465 let key = Key::TxToCheckpoint(digest);
466 self.fetch(key).await.map(|maybe| {
467 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
468 })
469 }
470
471 #[instrument(level = "trace", skip_all)]
472 async fn get_object(
473 &self,
474 object_id: ObjectID,
475 version: SequenceNumber,
476 ) -> SuiResult<Option<Object>> {
477 let key = Key::ObjectKey(ObjectKey(object_id, version));
478 self.fetch(key).await.map(|maybe| {
479 maybe
480 .and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref()))
481 .tap_some(|_| {
482 self.metrics
483 .key_value_store_num_fetches_success
484 .with_label_values(&["http", key.ty()])
485 .inc();
486 })
487 .tap_none(|| {
488 self.metrics
489 .key_value_store_num_fetches_not_found
490 .with_label_values(&["http", key.ty()])
491 .inc();
492 })
493 })
494 }
495
496 #[instrument(level = "trace", skip_all)]
497 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
498 let keys = object_keys
499 .iter()
500 .map(|key| Key::ObjectKey(*key))
501 .collect::<Vec<_>>();
502
503 let fetches = self.multi_fetch(keys).await;
504
505 let results = fetches
506 .iter()
507 .zip_debug_eq(object_keys.iter())
508 .map(map_fetch)
509 .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
510 .collect::<Vec<_>>();
511
512 Ok(results)
513 }
514
515 #[instrument(level = "trace", skip_all)]
516 async fn multi_get_transaction_checkpoint(
517 &self,
518 digests: &[TransactionDigest],
519 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
520 let keys = digests
521 .iter()
522 .map(|digest| Key::TxToCheckpoint(*digest))
523 .collect::<Vec<_>>();
524
525 let fetches = self.multi_fetch(keys).await;
526
527 let results = fetches
528 .iter()
529 .zip_debug_eq(digests.iter())
530 .map(map_fetch)
531 .map(|maybe_bytes| {
532 maybe_bytes
533 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
534 })
535 .collect::<Vec<_>>();
536
537 Ok(results)
538 }
539
540 #[instrument(level = "trace", skip_all)]
541 async fn multi_get_events_by_tx_digests(
542 &self,
543 digests: &[TransactionDigest],
544 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
545 let keys = digests
546 .iter()
547 .map(|digest| Key::EventsByTxDigest(*digest))
548 .collect::<Vec<_>>();
549 Ok(self
550 .multi_fetch(keys)
551 .await
552 .iter()
553 .zip_debug_eq(digests.iter())
554 .map(map_fetch)
555 .map(|maybe_bytes| {
556 maybe_bytes
557 .and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..)))
558 })
559 .collect::<Vec<_>>())
560 }
561}