1use crate::key_value_store_metrics::KeyValueStoreMetrics;
8use async_trait::async_trait;
9use mysten_common::ZipDebugEqIteratorExt;
10use std::sync::Arc;
11use std::time::Instant;
12use sui_types::base_types::{ObjectID, SequenceNumber, VersionNumber};
13use sui_types::digests::{CheckpointDigest, TransactionDigest};
14use sui_types::effects::{TransactionEffects, TransactionEvents};
15use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
16use sui_types::messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18};
19use sui_types::object::Object;
20use sui_types::storage::ObjectKey;
21use sui_types::transaction::Transaction;
22use tracing::instrument;
23
24pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
25
26pub type KVStoreCheckpointData = (
27 Vec<Option<CertifiedCheckpointSummary>>,
28 Vec<Option<CheckpointContents>>,
29 Vec<Option<CertifiedCheckpointSummary>>,
30);
31
32pub struct TransactionKeyValueStore {
33 store_name: &'static str,
34 metrics: Arc<KeyValueStoreMetrics>,
35 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
36}
37
38impl TransactionKeyValueStore {
39 pub fn new(
40 store_name: &'static str,
41 metrics: Arc<KeyValueStoreMetrics>,
42 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
43 ) -> Self {
44 Self {
45 store_name,
46 metrics,
47 inner,
48 }
49 }
50
51 pub async fn multi_get(
53 &self,
54 transactions: &[TransactionDigest],
55 effects: &[TransactionDigest],
56 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
57 let start = Instant::now();
58 let res = self.inner.multi_get(transactions, effects).await;
59 let elapsed = start.elapsed();
60
61 let num_txns = transactions.len() as u64;
62 let num_effects = effects.len() as u64;
63 let total_keys = num_txns + num_effects;
64
65 self.metrics
66 .key_value_store_num_fetches_latency_ms
67 .with_label_values(&[self.store_name, "tx"])
68 .observe(elapsed.as_millis() as f64);
69 self.metrics
70 .key_value_store_num_fetches_batch_size
71 .with_label_values(&[self.store_name, "tx"])
72 .observe(total_keys as f64);
73
74 if let Ok(res) = &res {
75 let txns_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64;
76 let effects_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
77
78 if num_txns > 0 {
79 self.metrics
80 .key_value_store_num_fetches_success
81 .with_label_values(&[self.store_name, "tx"])
82 .inc_by(num_txns);
83 }
84 if num_effects > 0 {
85 self.metrics
86 .key_value_store_num_fetches_success
87 .with_label_values(&[self.store_name, "fx"])
88 .inc_by(num_effects);
89 }
90
91 if txns_not_found > 0 {
92 self.metrics
93 .key_value_store_num_fetches_not_found
94 .with_label_values(&[self.store_name, "tx"])
95 .inc_by(txns_not_found);
96 }
97 if effects_not_found > 0 {
98 self.metrics
99 .key_value_store_num_fetches_not_found
100 .with_label_values(&[self.store_name, "fx"])
101 .inc_by(effects_not_found);
102 }
103 } else {
104 self.metrics
105 .key_value_store_num_fetches_error
106 .with_label_values(&[self.store_name, "tx"])
107 .inc_by(num_txns);
108 self.metrics
109 .key_value_store_num_fetches_error
110 .with_label_values(&[self.store_name, "fx"])
111 .inc_by(num_effects);
112 }
113
114 res
115 }
116
117 pub async fn multi_get_checkpoints(
118 &self,
119 checkpoint_summaries: &[CheckpointSequenceNumber],
120 checkpoint_contents: &[CheckpointSequenceNumber],
121 checkpoint_summaries_by_digest: &[CheckpointDigest],
122 ) -> SuiResult<(
123 Vec<Option<CertifiedCheckpointSummary>>,
124 Vec<Option<CheckpointContents>>,
125 Vec<Option<CertifiedCheckpointSummary>>,
126 )> {
127 let start = Instant::now();
128 let res = self
129 .inner
130 .multi_get_checkpoints(
131 checkpoint_summaries,
132 checkpoint_contents,
133 checkpoint_summaries_by_digest,
134 )
135 .await;
136 let elapsed = start.elapsed();
137
138 let num_summaries =
139 checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
140 let num_contents = checkpoint_contents.len() as u64;
141
142 self.metrics
143 .key_value_store_num_fetches_latency_ms
144 .with_label_values(&[self.store_name, "checkpoint"])
145 .observe(elapsed.as_millis() as f64);
146 self.metrics
147 .key_value_store_num_fetches_batch_size
148 .with_label_values(&[self.store_name, "checkpoint_summary"])
149 .observe(num_summaries as f64);
150 self.metrics
151 .key_value_store_num_fetches_batch_size
152 .with_label_values(&[self.store_name, "checkpoint_content"])
153 .observe(num_contents as f64);
154
155 if let Ok(res) = &res {
156 let summaries_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64
157 + res.2.iter().filter(|v| v.is_none()).count() as u64;
158 let contents_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
159
160 if num_summaries > 0 {
161 self.metrics
162 .key_value_store_num_fetches_success
163 .with_label_values(&[self.store_name, "ckpt_summary"])
164 .inc_by(num_summaries);
165 }
166 if num_contents > 0 {
167 self.metrics
168 .key_value_store_num_fetches_success
169 .with_label_values(&[self.store_name, "ckpt_contents"])
170 .inc_by(num_contents);
171 }
172
173 if summaries_not_found > 0 {
174 self.metrics
175 .key_value_store_num_fetches_not_found
176 .with_label_values(&[self.store_name, "ckpt_summary"])
177 .inc_by(summaries_not_found);
178 }
179 if contents_not_found > 0 {
180 self.metrics
181 .key_value_store_num_fetches_not_found
182 .with_label_values(&[self.store_name, "ckpt_contents"])
183 .inc_by(contents_not_found);
184 }
185 } else {
186 self.metrics
187 .key_value_store_num_fetches_error
188 .with_label_values(&[self.store_name, "ckpt_summary"])
189 .inc_by(num_summaries);
190 self.metrics
191 .key_value_store_num_fetches_error
192 .with_label_values(&[self.store_name, "ckpt_contents"])
193 .inc_by(num_contents);
194 }
195
196 res
197 }
198
199 pub async fn multi_get_checkpoints_summaries(
200 &self,
201 keys: &[CheckpointSequenceNumber],
202 ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
203 self.multi_get_checkpoints(keys, &[], &[])
204 .await
205 .map(|(summaries, _, _)| summaries)
206 }
207
208 pub async fn multi_get_checkpoints_contents(
209 &self,
210 keys: &[CheckpointSequenceNumber],
211 ) -> SuiResult<Vec<Option<CheckpointContents>>> {
212 self.multi_get_checkpoints(&[], keys, &[])
213 .await
214 .map(|(_, contents, _)| contents)
215 }
216
217 pub async fn multi_get_checkpoints_summaries_by_digest(
218 &self,
219 keys: &[CheckpointDigest],
220 ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
221 self.multi_get_checkpoints(&[], &[], keys)
222 .await
223 .map(|(_, _, summaries)| summaries)
224 }
225
226 pub async fn multi_get_tx(
227 &self,
228 keys: &[TransactionDigest],
229 ) -> SuiResult<Vec<Option<Transaction>>> {
230 self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
231 }
232
233 pub async fn multi_get_fx_by_tx_digest(
234 &self,
235 keys: &[TransactionDigest],
236 ) -> SuiResult<Vec<Option<TransactionEffects>>> {
237 self.multi_get(&[], keys).await.map(|(_, fx)| fx)
238 }
239
240 pub async fn get_tx(&self, digest: TransactionDigest) -> SuiResult<Transaction> {
243 self.multi_get_tx(&[digest])
244 .await?
245 .into_iter()
246 .next()
247 .flatten()
248 .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
249 }
250
251 pub async fn get_fx_by_tx_digest(
254 &self,
255 digest: TransactionDigest,
256 ) -> SuiResult<TransactionEffects> {
257 self.multi_get_fx_by_tx_digest(&[digest])
258 .await?
259 .into_iter()
260 .next()
261 .flatten()
262 .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
263 }
264
265 pub async fn get_checkpoint_summary(
268 &self,
269 checkpoint: CheckpointSequenceNumber,
270 ) -> SuiResult<CertifiedCheckpointSummary> {
271 self.multi_get_checkpoints_summaries(&[checkpoint])
272 .await?
273 .into_iter()
274 .next()
275 .flatten()
276 .ok_or(
277 SuiErrorKind::UserInputError {
278 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
279 }
280 .into(),
281 )
282 }
283
284 pub async fn get_checkpoint_contents(
287 &self,
288 checkpoint: CheckpointSequenceNumber,
289 ) -> SuiResult<CheckpointContents> {
290 self.multi_get_checkpoints_contents(&[checkpoint])
291 .await?
292 .into_iter()
293 .next()
294 .flatten()
295 .ok_or(
296 SuiErrorKind::UserInputError {
297 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
298 }
299 .into(),
300 )
301 }
302
303 pub async fn get_checkpoint_summary_by_digest(
306 &self,
307 digest: CheckpointDigest,
308 ) -> SuiResult<CertifiedCheckpointSummary> {
309 self.multi_get_checkpoints_summaries_by_digest(&[digest])
310 .await?
311 .into_iter()
312 .next()
313 .flatten()
314 .ok_or(
315 SuiErrorKind::UserInputError {
316 error: UserInputError::VerifiedCheckpointDigestNotFound(format!(
317 "{:?}",
318 digest
319 )),
320 }
321 .into(),
322 )
323 }
324
325 pub async fn deprecated_get_transaction_checkpoint(
326 &self,
327 digest: TransactionDigest,
328 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
329 self.inner
330 .deprecated_get_transaction_checkpoint(digest)
331 .await
332 }
333
334 pub async fn get_object(
335 &self,
336 object_id: ObjectID,
337 version: VersionNumber,
338 ) -> SuiResult<Option<Object>> {
339 self.inner.get_object(object_id, version).await
340 }
341
342 pub async fn multi_get_objects(
343 &self,
344 object_keys: &[ObjectKey],
345 ) -> SuiResult<Vec<Option<Object>>> {
346 self.inner.multi_get_objects(object_keys).await
347 }
348
349 pub async fn multi_get_transaction_checkpoint(
350 &self,
351 digests: &[TransactionDigest],
352 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
353 self.inner.multi_get_transaction_checkpoint(digests).await
354 }
355
356 pub async fn multi_get_events_by_tx_digests(
357 &self,
358 digests: &[TransactionDigest],
359 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
360 self.inner.multi_get_events_by_tx_digests(digests).await
361 }
362}
363
364#[async_trait]
367pub trait TransactionKeyValueStoreTrait {
368 async fn multi_get(
370 &self,
371 transactions: &[TransactionDigest],
372 effects: &[TransactionDigest],
373 ) -> SuiResult<KVStoreTransactionData>;
374
375 async fn multi_get_checkpoints(
377 &self,
378 checkpoint_summaries: &[CheckpointSequenceNumber],
379 checkpoint_contents: &[CheckpointSequenceNumber],
380 checkpoint_summaries_by_digest: &[CheckpointDigest],
381 ) -> SuiResult<KVStoreCheckpointData>;
382
383 async fn deprecated_get_transaction_checkpoint(
384 &self,
385 digest: TransactionDigest,
386 ) -> SuiResult<Option<CheckpointSequenceNumber>>;
387
388 async fn get_object(
389 &self,
390 object_id: ObjectID,
391 version: SequenceNumber,
392 ) -> SuiResult<Option<Object>>;
393
394 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>>;
395
396 async fn multi_get_transaction_checkpoint(
397 &self,
398 digests: &[TransactionDigest],
399 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;
400
401 async fn multi_get_events_by_tx_digests(
402 &self,
403 digests: &[TransactionDigest],
404 ) -> SuiResult<Vec<Option<TransactionEvents>>>;
405}
406
407pub struct FallbackTransactionKVStore {
412 primary: TransactionKeyValueStore,
413 fallback: TransactionKeyValueStore,
414}
415
416impl FallbackTransactionKVStore {
417 pub fn new_kv(
418 primary: TransactionKeyValueStore,
419 fallback: TransactionKeyValueStore,
420 metrics: Arc<KeyValueStoreMetrics>,
421 label: &'static str,
422 ) -> TransactionKeyValueStore {
423 let store = Arc::new(Self { primary, fallback });
424 TransactionKeyValueStore::new(label, metrics, store)
425 }
426}
427
428#[async_trait]
429impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
430 #[instrument(level = "trace", skip_all)]
431 async fn multi_get(
432 &self,
433 transactions: &[TransactionDigest],
434 effects: &[TransactionDigest],
435 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
436 let mut res = self.primary.multi_get(transactions, effects).await?;
437
438 let (fallback_transactions, indices_transactions) = find_fallback(&res.0, transactions);
439 let (fallback_effects, indices_effects) = find_fallback(&res.1, effects);
440
441 if fallback_transactions.is_empty() && fallback_effects.is_empty() {
442 return Ok(res);
443 }
444
445 let secondary_res = self
446 .fallback
447 .multi_get(&fallback_transactions, &fallback_effects)
448 .await?;
449
450 merge_res(&mut res.0, secondary_res.0, &indices_transactions);
451 merge_res(&mut res.1, secondary_res.1, &indices_effects);
452
453 Ok((res.0, res.1))
454 }
455
456 #[instrument(level = "trace", skip_all)]
457 async fn multi_get_checkpoints(
458 &self,
459 checkpoint_summaries: &[CheckpointSequenceNumber],
460 checkpoint_contents: &[CheckpointSequenceNumber],
461 checkpoint_summaries_by_digest: &[CheckpointDigest],
462 ) -> SuiResult<(
463 Vec<Option<CertifiedCheckpointSummary>>,
464 Vec<Option<CheckpointContents>>,
465 Vec<Option<CertifiedCheckpointSummary>>,
466 )> {
467 let mut res = self
468 .primary
469 .multi_get_checkpoints(
470 checkpoint_summaries,
471 checkpoint_contents,
472 checkpoint_summaries_by_digest,
473 )
474 .await?;
475
476 let (fallback_summaries, indices_summaries) = find_fallback(&res.0, checkpoint_summaries);
477 let (fallback_contents, indices_contents) = find_fallback(&res.1, checkpoint_contents);
478 let (fallback_summaries_by_digest, indices_summaries_by_digest) =
479 find_fallback(&res.2, checkpoint_summaries_by_digest);
480
481 if fallback_summaries.is_empty()
482 && fallback_contents.is_empty()
483 && fallback_summaries_by_digest.is_empty()
484 {
485 return Ok(res);
486 }
487
488 let secondary_res = self
489 .fallback
490 .multi_get_checkpoints(
491 &fallback_summaries,
492 &fallback_contents,
493 &fallback_summaries_by_digest,
494 )
495 .await?;
496
497 merge_res(&mut res.0, secondary_res.0, &indices_summaries);
498 merge_res(&mut res.1, secondary_res.1, &indices_contents);
499 merge_res(&mut res.2, secondary_res.2, &indices_summaries_by_digest);
500
501 Ok((res.0, res.1, res.2))
502 }
503
504 #[instrument(level = "trace", skip_all)]
505 async fn deprecated_get_transaction_checkpoint(
506 &self,
507 digest: TransactionDigest,
508 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
509 let mut res = self
510 .primary
511 .deprecated_get_transaction_checkpoint(digest)
512 .await?;
513 if res.is_none() {
514 res = self
515 .fallback
516 .deprecated_get_transaction_checkpoint(digest)
517 .await?;
518 }
519 Ok(res)
520 }
521
522 #[instrument(level = "trace", skip_all)]
523 async fn get_object(
524 &self,
525 object_id: ObjectID,
526 version: SequenceNumber,
527 ) -> SuiResult<Option<Object>> {
528 let mut res = self.primary.get_object(object_id, version).await?;
529 if res.is_none() {
530 res = self.fallback.get_object(object_id, version).await?;
531 }
532 Ok(res)
533 }
534
535 #[instrument(level = "trace", skip_all)]
536 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
537 let mut res = self.primary.multi_get_objects(object_keys).await?;
538
539 let (fallback, indices) = find_fallback(&res, object_keys);
540
541 if fallback.is_empty() {
542 return Ok(res);
543 }
544
545 let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
546
547 merge_res(&mut res, secondary_res, &indices);
548
549 Ok(res)
550 }
551
552 #[instrument(level = "trace", skip_all)]
553 async fn multi_get_transaction_checkpoint(
554 &self,
555 digests: &[TransactionDigest],
556 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
557 let mut res = self
558 .primary
559 .multi_get_transaction_checkpoint(digests)
560 .await?;
561
562 let (fallback, indices) = find_fallback(&res, digests);
563
564 if fallback.is_empty() {
565 return Ok(res);
566 }
567
568 let secondary_res = self
569 .fallback
570 .multi_get_transaction_checkpoint(&fallback)
571 .await?;
572
573 merge_res(&mut res, secondary_res, &indices);
574
575 Ok(res)
576 }
577
578 #[instrument(level = "trace", skip_all)]
579 async fn multi_get_events_by_tx_digests(
580 &self,
581 digests: &[TransactionDigest],
582 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
583 let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
584 let (fallback, indices) = find_fallback(&res, digests);
585 if fallback.is_empty() {
586 return Ok(res);
587 }
588 let secondary_res = self
589 .fallback
590 .multi_get_events_by_tx_digests(&fallback)
591 .await?;
592 merge_res(&mut res, secondary_res, &indices);
593 Ok(res)
594 }
595}
596
597fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
598 let num_nones = values.iter().filter(|v| v.is_none()).count();
599 let mut fallback_keys = Vec::with_capacity(num_nones);
600 let mut fallback_indices = Vec::with_capacity(num_nones);
601 for (i, value) in values.iter().enumerate() {
602 if value.is_none() {
603 fallback_keys.push(keys[i].clone());
604 fallback_indices.push(i);
605 }
606 }
607 (fallback_keys, fallback_indices)
608}
609
610fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
611 for (&index, fallback_value) in indices.iter().zip_debug_eq(fallback_values) {
612 values[index] = fallback_value;
613 }
614}