1use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{self, StreamExt};
7use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
8use reqwest::Client;
9use reqwest::Url;
10use reqwest::header::{CONTENT_LENGTH, HeaderValue};
11use serde::{Deserialize, Serialize};
12use std::str::FromStr;
13use std::sync::Arc;
14use std::time::Duration;
15use sui_types::base_types::{ObjectID, SequenceNumber};
16use sui_types::object::Object;
17use sui_types::storage::ObjectKey;
18use sui_types::{
19 digests::{CheckpointContentsDigest, CheckpointDigest, TransactionDigest},
20 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
21 error::{SuiErrorKind, SuiResult},
22 messages_checkpoint::{
23 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
24 },
25 transaction::Transaction,
26};
27use tap::{TapFallible, TapOptional};
28use tracing::{error, info, instrument, trace, warn};
29
30use crate::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
31use crate::key_value_store_metrics::KeyValueStoreMetrics;
32
33pub struct HttpKVStore {
34 base_url: Url,
35 client: Client,
36 cache: MokaCache<Url, Bytes>,
37 metrics: Arc<KeyValueStoreMetrics>,
38}
39
40pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
41 base64_url::encode(digest)
42}
43
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
46pub enum TaggedKey {
47 CheckpointSequenceNumber(CheckpointSequenceNumber),
48}
49
50pub fn encoded_tagged_key(key: &TaggedKey) -> String {
51 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
52 base64_url::encode(&bytes)
53}
54
55pub fn encode_object_key(object_key: &ObjectKey) -> String {
56 let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
57 base64_url::encode(&bytes)
58}
59
60trait IntoSuiResult<T> {
61 fn into_sui_result(self) -> SuiResult<T>;
62}
63
64impl<T, E> IntoSuiResult<T> for Result<T, E>
65where
66 E: std::error::Error,
67{
68 fn into_sui_result(self) -> SuiResult<T> {
69 self.map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
70 }
71}
72
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum Key {
75 Tx(TransactionDigest),
76 Fx(TransactionDigest),
77 CheckpointContents(CheckpointSequenceNumber),
78 CheckpointSummary(CheckpointSequenceNumber),
79 CheckpointContentsByDigest(CheckpointContentsDigest),
80 CheckpointSummaryByDigest(CheckpointDigest),
81 TxToCheckpoint(TransactionDigest),
82 ObjectKey(ObjectKey),
83 EventsByTxDigest(TransactionDigest),
84}
85
86impl Key {
87 pub fn ty(&self) -> &'static str {
89 match self {
90 Key::Tx(_) => "tx",
91 Key::Fx(_) => "fx",
92 Key::CheckpointContents(_) => "cc",
93 Key::CheckpointSummary(_) => "cs",
94 Key::CheckpointContentsByDigest(_) => "cc",
95 Key::CheckpointSummaryByDigest(_) => "cs",
96 Key::TxToCheckpoint(_) => "tx2c",
97 Key::ObjectKey(_) => "ob",
98 Key::EventsByTxDigest(_) => "evtx",
99 }
100 }
101
102 pub fn encode(&self) -> String {
103 match self {
104 Key::Tx(digest) => encode_digest(digest),
105 Key::Fx(digest) => encode_digest(digest),
106 Key::CheckpointContents(seq) => {
107 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
108 }
109 Key::CheckpointSummary(seq) => {
110 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
111 }
112 Key::CheckpointContentsByDigest(digest) => encode_digest(digest),
113 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
114 Key::TxToCheckpoint(digest) => encode_digest(digest),
115 Key::ObjectKey(object_key) => encode_object_key(object_key),
116 Key::EventsByTxDigest(digest) => encode_digest(digest),
117 }
118 }
119
120 pub fn to_path_elements(&self) -> (String, &'static str) {
121 (self.encode(), self.ty())
122 }
123}
124
125#[derive(Clone, Debug)]
126enum Value {
127 Tx(Box<Transaction>),
128 Fx(Box<TransactionEffects>),
129 Events(Box<TransactionEvents>),
130 CheckpointContents(Box<CheckpointContents>),
131 CheckpointSummary(Box<CertifiedCheckpointSummary>),
132 TxToCheckpoint(CheckpointSequenceNumber),
133}
134
135pub fn path_elements_to_key(digest: &str, type_: &str) -> anyhow::Result<Key> {
136 let decoded_digest = base64_url::decode(digest)?;
137
138 match type_ {
139 "tx" => Ok(Key::Tx(TransactionDigest::try_from(decoded_digest)?)),
140 "fx" => Ok(Key::Fx(TransactionDigest::try_from(decoded_digest)?)),
141 "cc" => {
142 match CheckpointContentsDigest::try_from(decoded_digest.clone()) {
144 Err(_) => {
145 let tagged_key = bcs::from_bytes(&decoded_digest)?;
146 match tagged_key {
147 TaggedKey::CheckpointSequenceNumber(seq) => {
148 Ok(Key::CheckpointContents(seq))
149 }
150 }
151 }
152 Ok(cc_digest) => Ok(Key::CheckpointContentsByDigest(cc_digest)),
153 }
154 }
155 "cs" => {
156 match CheckpointDigest::try_from(decoded_digest.clone()) {
158 Err(_) => {
159 let tagged_key = bcs::from_bytes(&decoded_digest)?;
160 match tagged_key {
161 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointSummary(seq)),
162 }
163 }
164 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
165 }
166 }
167 "tx2c" => Ok(Key::TxToCheckpoint(TransactionDigest::try_from(
168 decoded_digest,
169 )?)),
170 "ob" => {
171 let object_key: ObjectKey = bcs::from_bytes(&decoded_digest)?;
172 Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
173 }
174 _ => Err(anyhow::anyhow!("Invalid type: {}", type_)),
175 }
176}
177
178impl HttpKVStore {
179 pub fn new_kv(
180 base_url: &str,
181 cache_size: u64,
182 metrics: Arc<KeyValueStoreMetrics>,
183 ) -> SuiResult<TransactionKeyValueStore> {
184 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
185 Ok(TransactionKeyValueStore::new("http", metrics, inner))
186 }
187
188 pub fn new(
189 base_url: &str,
190 cache_size: u64,
191 metrics: Arc<KeyValueStoreMetrics>,
192 ) -> SuiResult<Self> {
193 info!("creating HttpKVStore with base_url: {}", base_url);
194
195 let client = Client::builder().http2_prior_knowledge().build().unwrap();
196
197 let base_url = if base_url.ends_with('/') {
198 base_url.to_string()
199 } else {
200 format!("{}/", base_url)
201 };
202
203 let base_url = Url::parse(&base_url).into_sui_result()?;
204
205 let cache = MokaCacheBuilder::new(cache_size)
206 .time_to_idle(Duration::from_secs(600))
207 .build();
208
209 Ok(Self {
210 base_url,
211 client,
212 cache,
213 metrics,
214 })
215 }
216
217 fn get_url(&self, key: &Key) -> SuiResult<Url> {
218 let (digest, item_type) = key.to_path_elements();
219 let joined = self
220 .base_url
221 .join(&format!("{}/{}", digest, item_type))
222 .into_sui_result()?;
223 Url::from_str(joined.as_str()).into_sui_result()
224 }
225
226 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<SuiResult<Option<Bytes>>> {
227 let uris_vec = uris.to_vec();
228 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
229 fetches.buffered(uris.len()).collect::<Vec<_>>().await
230 }
231
232 async fn fetch(&self, key: Key) -> SuiResult<Option<Bytes>> {
233 let url = self.get_url(&key)?;
234
235 trace!("fetching url: {}", url);
236
237 if let Some(res) = self.cache.get(&url) {
238 trace!("found cached data for url: {}, len: {:?}", url, res.len());
239 self.metrics
240 .key_value_store_num_fetches_success
241 .with_label_values(&["http_cache", key.ty()])
242 .inc();
243 return Ok(Some(res));
244 }
245
246 self.metrics
247 .key_value_store_num_fetches_not_found
248 .with_label_values(&["http_cache", key.ty()])
249 .inc();
250
251 let resp = self
252 .client
253 .get(url.clone())
254 .send()
255 .await
256 .into_sui_result()?;
257 trace!(
258 "got response {} for url: {}, len: {:?}",
259 url,
260 resp.status(),
261 resp.headers()
262 .get(CONTENT_LENGTH)
263 .unwrap_or(&HeaderValue::from_static("0"))
264 );
265 if resp.status().is_success() {
267 let bytes = resp.bytes().await.into_sui_result()?;
268 self.cache.insert(url, bytes.clone());
269
270 Ok(Some(bytes))
271 } else {
272 Ok(None)
273 }
274 }
275}
276
277fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
278where
279 K: std::fmt::Debug,
280 T: for<'de> Deserialize<'de>,
281{
282 bcs::from_bytes(bytes)
283 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
284 .ok()
285}
286
287fn map_fetch<'a, K>(fetch: (&'a SuiResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
288where
289 K: std::fmt::Debug,
290{
291 let (fetch, key) = fetch;
292 match fetch {
293 Ok(Some(bytes)) => Some((bytes, key)),
294 Ok(None) => None,
295 Err(err) => {
296 warn!("Error fetching key: {:?}, error: {:?}", key, err);
297 None
298 }
299 }
300}
301
302fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
303 let mut start = 0;
304 lengths
305 .iter()
306 .map(|length| {
307 let end = start + length;
308 let result = &slice[start..end];
309 start = end;
310 result
311 })
312 .collect()
313}
314
315fn deser_check_digest<T, D>(
316 digest: &D,
317 bytes: &Bytes,
318 get_expected_digest: impl FnOnce(&T) -> D,
319) -> Option<T>
320where
321 D: std::fmt::Debug + PartialEq,
322 T: for<'de> Deserialize<'de>,
323{
324 deser(digest, bytes).and_then(|o: T| {
325 let expected_digest = get_expected_digest(&o);
326 if expected_digest == *digest {
327 Some(o)
328 } else {
329 error!(
330 "Digest mismatch - expected: {:?}, got: {:?}",
331 digest, expected_digest,
332 );
333 None
334 }
335 })
336}
337
338#[async_trait]
339impl TransactionKeyValueStoreTrait for HttpKVStore {
340 #[instrument(level = "trace", skip_all)]
341 async fn multi_get(
342 &self,
343 transactions: &[TransactionDigest],
344 effects: &[TransactionDigest],
345 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
346 let num_txns = transactions.len();
347 let num_effects = effects.len();
348
349 let keys = transactions
350 .iter()
351 .map(|tx| Key::Tx(*tx))
352 .chain(effects.iter().map(|fx| Key::Fx(*fx)))
353 .collect::<Vec<_>>();
354
355 let fetches = self.multi_fetch(keys).await;
356 let txn_slice = fetches[..num_txns].to_vec();
357 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
358
359 let txn_results = txn_slice
360 .iter()
361 .take(num_txns)
362 .zip(transactions.iter())
363 .map(map_fetch)
364 .map(|maybe_bytes| {
365 maybe_bytes.and_then(|(bytes, digest)| {
366 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
367 })
368 })
369 .collect::<Vec<_>>();
370
371 let fx_results = fx_slice
372 .iter()
373 .take(num_effects)
374 .zip(effects.iter())
375 .map(map_fetch)
376 .map(|maybe_bytes| {
377 maybe_bytes.and_then(|(bytes, digest)| {
378 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
379 *fx.transaction_digest()
380 })
381 })
382 })
383 .collect::<Vec<_>>();
384 Ok((txn_results, fx_results))
385 }
386
387 #[instrument(level = "trace", skip_all)]
388 async fn multi_get_checkpoints(
389 &self,
390 checkpoint_summaries: &[CheckpointSequenceNumber],
391 checkpoint_contents: &[CheckpointSequenceNumber],
392 checkpoint_summaries_by_digest: &[CheckpointDigest],
393 ) -> SuiResult<(
394 Vec<Option<CertifiedCheckpointSummary>>,
395 Vec<Option<CheckpointContents>>,
396 Vec<Option<CertifiedCheckpointSummary>>,
397 )> {
398 let keys = checkpoint_summaries
399 .iter()
400 .map(|cp| Key::CheckpointSummary(*cp))
401 .chain(
402 checkpoint_contents
403 .iter()
404 .map(|cp| Key::CheckpointContents(*cp)),
405 )
406 .chain(
407 checkpoint_summaries_by_digest
408 .iter()
409 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
410 )
411 .collect::<Vec<_>>();
412
413 let summaries_len = checkpoint_summaries.len();
414 let contents_len = checkpoint_contents.len();
415 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
416
417 let fetches = self.multi_fetch(keys).await;
418
419 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
420
421 let result_slices = multi_split_slice(&fetches, &input_slices);
422
423 let summaries_results = result_slices[0]
424 .iter()
425 .zip(checkpoint_summaries.iter())
426 .map(map_fetch)
427 .map(|maybe_bytes| {
428 maybe_bytes
429 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
430 })
431 .collect::<Vec<_>>();
432
433 let contents_results = result_slices[1]
434 .iter()
435 .zip(checkpoint_contents.iter())
436 .map(map_fetch)
437 .map(|maybe_bytes| {
438 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
439 })
440 .collect::<Vec<_>>();
441
442 let summaries_by_digest_results = result_slices[2]
443 .iter()
444 .zip(checkpoint_summaries_by_digest.iter())
445 .map(map_fetch)
446 .map(|maybe_bytes| {
447 maybe_bytes.and_then(|(bytes, digest)| {
448 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
449 })
450 })
451 .collect::<Vec<_>>();
452 Ok((
453 summaries_results,
454 contents_results,
455 summaries_by_digest_results,
456 ))
457 }
458
459 #[instrument(level = "trace", skip_all)]
460 async fn deprecated_get_transaction_checkpoint(
461 &self,
462 digest: TransactionDigest,
463 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
464 let key = Key::TxToCheckpoint(digest);
465 self.fetch(key).await.map(|maybe| {
466 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
467 })
468 }
469
470 #[instrument(level = "trace", skip_all)]
471 async fn get_object(
472 &self,
473 object_id: ObjectID,
474 version: SequenceNumber,
475 ) -> SuiResult<Option<Object>> {
476 let key = Key::ObjectKey(ObjectKey(object_id, version));
477 self.fetch(key).await.map(|maybe| {
478 maybe
479 .and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref()))
480 .tap_some(|_| {
481 self.metrics
482 .key_value_store_num_fetches_success
483 .with_label_values(&["http", key.ty()])
484 .inc();
485 })
486 .tap_none(|| {
487 self.metrics
488 .key_value_store_num_fetches_not_found
489 .with_label_values(&["http", key.ty()])
490 .inc();
491 })
492 })
493 }
494
495 #[instrument(level = "trace", skip_all)]
496 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
497 let keys = object_keys
498 .iter()
499 .map(|key| Key::ObjectKey(*key))
500 .collect::<Vec<_>>();
501
502 let fetches = self.multi_fetch(keys).await;
503
504 let results = fetches
505 .iter()
506 .zip(object_keys.iter())
507 .map(map_fetch)
508 .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
509 .collect::<Vec<_>>();
510
511 Ok(results)
512 }
513
514 #[instrument(level = "trace", skip_all)]
515 async fn multi_get_transaction_checkpoint(
516 &self,
517 digests: &[TransactionDigest],
518 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
519 let keys = digests
520 .iter()
521 .map(|digest| Key::TxToCheckpoint(*digest))
522 .collect::<Vec<_>>();
523
524 let fetches = self.multi_fetch(keys).await;
525
526 let results = fetches
527 .iter()
528 .zip(digests.iter())
529 .map(map_fetch)
530 .map(|maybe_bytes| {
531 maybe_bytes
532 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
533 })
534 .collect::<Vec<_>>();
535
536 Ok(results)
537 }
538
539 #[instrument(level = "trace", skip_all)]
540 async fn multi_get_events_by_tx_digests(
541 &self,
542 digests: &[TransactionDigest],
543 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
544 let keys = digests
545 .iter()
546 .map(|digest| Key::EventsByTxDigest(*digest))
547 .collect::<Vec<_>>();
548 Ok(self
549 .multi_fetch(keys)
550 .await
551 .iter()
552 .zip(digests.iter())
553 .map(map_fetch)
554 .map(|maybe_bytes| {
555 maybe_bytes
556 .and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..)))
557 })
558 .collect::<Vec<_>>())
559 }
560}