1use crate::key_value_store_metrics::KeyValueStoreMetrics;
8use async_trait::async_trait;
9use std::sync::Arc;
10use std::time::Instant;
11use sui_types::base_types::{ObjectID, SequenceNumber, VersionNumber};
12use sui_types::digests::{CheckpointDigest, TransactionDigest};
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
15use sui_types::messages_checkpoint::{
16 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
17};
18use sui_types::object::Object;
19use sui_types::storage::ObjectKey;
20use sui_types::transaction::Transaction;
21use tracing::instrument;
22
23pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
24
25pub type KVStoreCheckpointData = (
26 Vec<Option<CertifiedCheckpointSummary>>,
27 Vec<Option<CheckpointContents>>,
28 Vec<Option<CertifiedCheckpointSummary>>,
29);
30
31pub struct TransactionKeyValueStore {
32 store_name: &'static str,
33 metrics: Arc<KeyValueStoreMetrics>,
34 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
35}
36
37impl TransactionKeyValueStore {
38 pub fn new(
39 store_name: &'static str,
40 metrics: Arc<KeyValueStoreMetrics>,
41 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
42 ) -> Self {
43 Self {
44 store_name,
45 metrics,
46 inner,
47 }
48 }
49
50 pub async fn multi_get(
52 &self,
53 transactions: &[TransactionDigest],
54 effects: &[TransactionDigest],
55 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
56 let start = Instant::now();
57 let res = self.inner.multi_get(transactions, effects).await;
58 let elapsed = start.elapsed();
59
60 let num_txns = transactions.len() as u64;
61 let num_effects = effects.len() as u64;
62 let total_keys = num_txns + num_effects;
63
64 self.metrics
65 .key_value_store_num_fetches_latency_ms
66 .with_label_values(&[self.store_name, "tx"])
67 .observe(elapsed.as_millis() as f64);
68 self.metrics
69 .key_value_store_num_fetches_batch_size
70 .with_label_values(&[self.store_name, "tx"])
71 .observe(total_keys as f64);
72
73 if let Ok(res) = &res {
74 let txns_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64;
75 let effects_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
76
77 if num_txns > 0 {
78 self.metrics
79 .key_value_store_num_fetches_success
80 .with_label_values(&[self.store_name, "tx"])
81 .inc_by(num_txns);
82 }
83 if num_effects > 0 {
84 self.metrics
85 .key_value_store_num_fetches_success
86 .with_label_values(&[self.store_name, "fx"])
87 .inc_by(num_effects);
88 }
89
90 if txns_not_found > 0 {
91 self.metrics
92 .key_value_store_num_fetches_not_found
93 .with_label_values(&[self.store_name, "tx"])
94 .inc_by(txns_not_found);
95 }
96 if effects_not_found > 0 {
97 self.metrics
98 .key_value_store_num_fetches_not_found
99 .with_label_values(&[self.store_name, "fx"])
100 .inc_by(effects_not_found);
101 }
102 } else {
103 self.metrics
104 .key_value_store_num_fetches_error
105 .with_label_values(&[self.store_name, "tx"])
106 .inc_by(num_txns);
107 self.metrics
108 .key_value_store_num_fetches_error
109 .with_label_values(&[self.store_name, "fx"])
110 .inc_by(num_effects);
111 }
112
113 res
114 }
115
116 pub async fn multi_get_checkpoints(
117 &self,
118 checkpoint_summaries: &[CheckpointSequenceNumber],
119 checkpoint_contents: &[CheckpointSequenceNumber],
120 checkpoint_summaries_by_digest: &[CheckpointDigest],
121 ) -> SuiResult<(
122 Vec<Option<CertifiedCheckpointSummary>>,
123 Vec<Option<CheckpointContents>>,
124 Vec<Option<CertifiedCheckpointSummary>>,
125 )> {
126 let start = Instant::now();
127 let res = self
128 .inner
129 .multi_get_checkpoints(
130 checkpoint_summaries,
131 checkpoint_contents,
132 checkpoint_summaries_by_digest,
133 )
134 .await;
135 let elapsed = start.elapsed();
136
137 let num_summaries =
138 checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
139 let num_contents = checkpoint_contents.len() as u64;
140
141 self.metrics
142 .key_value_store_num_fetches_latency_ms
143 .with_label_values(&[self.store_name, "checkpoint"])
144 .observe(elapsed.as_millis() as f64);
145 self.metrics
146 .key_value_store_num_fetches_batch_size
147 .with_label_values(&[self.store_name, "checkpoint_summary"])
148 .observe(num_summaries as f64);
149 self.metrics
150 .key_value_store_num_fetches_batch_size
151 .with_label_values(&[self.store_name, "checkpoint_content"])
152 .observe(num_contents as f64);
153
154 if let Ok(res) = &res {
155 let summaries_not_found = res.0.iter().filter(|v| v.is_none()).count() as u64
156 + res.2.iter().filter(|v| v.is_none()).count() as u64;
157 let contents_not_found = res.1.iter().filter(|v| v.is_none()).count() as u64;
158
159 if num_summaries > 0 {
160 self.metrics
161 .key_value_store_num_fetches_success
162 .with_label_values(&[self.store_name, "ckpt_summary"])
163 .inc_by(num_summaries);
164 }
165 if num_contents > 0 {
166 self.metrics
167 .key_value_store_num_fetches_success
168 .with_label_values(&[self.store_name, "ckpt_contents"])
169 .inc_by(num_contents);
170 }
171
172 if summaries_not_found > 0 {
173 self.metrics
174 .key_value_store_num_fetches_not_found
175 .with_label_values(&[self.store_name, "ckpt_summary"])
176 .inc_by(summaries_not_found);
177 }
178 if contents_not_found > 0 {
179 self.metrics
180 .key_value_store_num_fetches_not_found
181 .with_label_values(&[self.store_name, "ckpt_contents"])
182 .inc_by(contents_not_found);
183 }
184 } else {
185 self.metrics
186 .key_value_store_num_fetches_error
187 .with_label_values(&[self.store_name, "ckpt_summary"])
188 .inc_by(num_summaries);
189 self.metrics
190 .key_value_store_num_fetches_error
191 .with_label_values(&[self.store_name, "ckpt_contents"])
192 .inc_by(num_contents);
193 }
194
195 res
196 }
197
198 pub async fn multi_get_checkpoints_summaries(
199 &self,
200 keys: &[CheckpointSequenceNumber],
201 ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
202 self.multi_get_checkpoints(keys, &[], &[])
203 .await
204 .map(|(summaries, _, _)| summaries)
205 }
206
207 pub async fn multi_get_checkpoints_contents(
208 &self,
209 keys: &[CheckpointSequenceNumber],
210 ) -> SuiResult<Vec<Option<CheckpointContents>>> {
211 self.multi_get_checkpoints(&[], keys, &[])
212 .await
213 .map(|(_, contents, _)| contents)
214 }
215
216 pub async fn multi_get_checkpoints_summaries_by_digest(
217 &self,
218 keys: &[CheckpointDigest],
219 ) -> SuiResult<Vec<Option<CertifiedCheckpointSummary>>> {
220 self.multi_get_checkpoints(&[], &[], keys)
221 .await
222 .map(|(_, _, summaries)| summaries)
223 }
224
225 pub async fn multi_get_tx(
226 &self,
227 keys: &[TransactionDigest],
228 ) -> SuiResult<Vec<Option<Transaction>>> {
229 self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
230 }
231
232 pub async fn multi_get_fx_by_tx_digest(
233 &self,
234 keys: &[TransactionDigest],
235 ) -> SuiResult<Vec<Option<TransactionEffects>>> {
236 self.multi_get(&[], keys).await.map(|(_, fx)| fx)
237 }
238
239 pub async fn get_tx(&self, digest: TransactionDigest) -> SuiResult<Transaction> {
242 self.multi_get_tx(&[digest])
243 .await?
244 .into_iter()
245 .next()
246 .flatten()
247 .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
248 }
249
250 pub async fn get_fx_by_tx_digest(
253 &self,
254 digest: TransactionDigest,
255 ) -> SuiResult<TransactionEffects> {
256 self.multi_get_fx_by_tx_digest(&[digest])
257 .await?
258 .into_iter()
259 .next()
260 .flatten()
261 .ok_or(SuiErrorKind::TransactionNotFound { digest }.into())
262 }
263
264 pub async fn get_checkpoint_summary(
267 &self,
268 checkpoint: CheckpointSequenceNumber,
269 ) -> SuiResult<CertifiedCheckpointSummary> {
270 self.multi_get_checkpoints_summaries(&[checkpoint])
271 .await?
272 .into_iter()
273 .next()
274 .flatten()
275 .ok_or(
276 SuiErrorKind::UserInputError {
277 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
278 }
279 .into(),
280 )
281 }
282
283 pub async fn get_checkpoint_contents(
286 &self,
287 checkpoint: CheckpointSequenceNumber,
288 ) -> SuiResult<CheckpointContents> {
289 self.multi_get_checkpoints_contents(&[checkpoint])
290 .await?
291 .into_iter()
292 .next()
293 .flatten()
294 .ok_or(
295 SuiErrorKind::UserInputError {
296 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
297 }
298 .into(),
299 )
300 }
301
302 pub async fn get_checkpoint_summary_by_digest(
305 &self,
306 digest: CheckpointDigest,
307 ) -> SuiResult<CertifiedCheckpointSummary> {
308 self.multi_get_checkpoints_summaries_by_digest(&[digest])
309 .await?
310 .into_iter()
311 .next()
312 .flatten()
313 .ok_or(
314 SuiErrorKind::UserInputError {
315 error: UserInputError::VerifiedCheckpointDigestNotFound(format!(
316 "{:?}",
317 digest
318 )),
319 }
320 .into(),
321 )
322 }
323
324 pub async fn deprecated_get_transaction_checkpoint(
325 &self,
326 digest: TransactionDigest,
327 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
328 self.inner
329 .deprecated_get_transaction_checkpoint(digest)
330 .await
331 }
332
333 pub async fn get_object(
334 &self,
335 object_id: ObjectID,
336 version: VersionNumber,
337 ) -> SuiResult<Option<Object>> {
338 self.inner.get_object(object_id, version).await
339 }
340
341 pub async fn multi_get_objects(
342 &self,
343 object_keys: &[ObjectKey],
344 ) -> SuiResult<Vec<Option<Object>>> {
345 self.inner.multi_get_objects(object_keys).await
346 }
347
348 pub async fn multi_get_transaction_checkpoint(
349 &self,
350 digests: &[TransactionDigest],
351 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
352 self.inner.multi_get_transaction_checkpoint(digests).await
353 }
354
355 pub async fn multi_get_events_by_tx_digests(
356 &self,
357 digests: &[TransactionDigest],
358 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
359 self.inner.multi_get_events_by_tx_digests(digests).await
360 }
361}
362
363#[async_trait]
366pub trait TransactionKeyValueStoreTrait {
367 async fn multi_get(
369 &self,
370 transactions: &[TransactionDigest],
371 effects: &[TransactionDigest],
372 ) -> SuiResult<KVStoreTransactionData>;
373
374 async fn multi_get_checkpoints(
376 &self,
377 checkpoint_summaries: &[CheckpointSequenceNumber],
378 checkpoint_contents: &[CheckpointSequenceNumber],
379 checkpoint_summaries_by_digest: &[CheckpointDigest],
380 ) -> SuiResult<KVStoreCheckpointData>;
381
382 async fn deprecated_get_transaction_checkpoint(
383 &self,
384 digest: TransactionDigest,
385 ) -> SuiResult<Option<CheckpointSequenceNumber>>;
386
387 async fn get_object(
388 &self,
389 object_id: ObjectID,
390 version: SequenceNumber,
391 ) -> SuiResult<Option<Object>>;
392
393 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>>;
394
395 async fn multi_get_transaction_checkpoint(
396 &self,
397 digests: &[TransactionDigest],
398 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;
399
400 async fn multi_get_events_by_tx_digests(
401 &self,
402 digests: &[TransactionDigest],
403 ) -> SuiResult<Vec<Option<TransactionEvents>>>;
404}
405
406pub struct FallbackTransactionKVStore {
411 primary: TransactionKeyValueStore,
412 fallback: TransactionKeyValueStore,
413}
414
415impl FallbackTransactionKVStore {
416 pub fn new_kv(
417 primary: TransactionKeyValueStore,
418 fallback: TransactionKeyValueStore,
419 metrics: Arc<KeyValueStoreMetrics>,
420 label: &'static str,
421 ) -> TransactionKeyValueStore {
422 let store = Arc::new(Self { primary, fallback });
423 TransactionKeyValueStore::new(label, metrics, store)
424 }
425}
426
427#[async_trait]
428impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
429 #[instrument(level = "trace", skip_all)]
430 async fn multi_get(
431 &self,
432 transactions: &[TransactionDigest],
433 effects: &[TransactionDigest],
434 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
435 let mut res = self.primary.multi_get(transactions, effects).await?;
436
437 let (fallback_transactions, indices_transactions) = find_fallback(&res.0, transactions);
438 let (fallback_effects, indices_effects) = find_fallback(&res.1, effects);
439
440 if fallback_transactions.is_empty() && fallback_effects.is_empty() {
441 return Ok(res);
442 }
443
444 let secondary_res = self
445 .fallback
446 .multi_get(&fallback_transactions, &fallback_effects)
447 .await?;
448
449 merge_res(&mut res.0, secondary_res.0, &indices_transactions);
450 merge_res(&mut res.1, secondary_res.1, &indices_effects);
451
452 Ok((res.0, res.1))
453 }
454
455 #[instrument(level = "trace", skip_all)]
456 async fn multi_get_checkpoints(
457 &self,
458 checkpoint_summaries: &[CheckpointSequenceNumber],
459 checkpoint_contents: &[CheckpointSequenceNumber],
460 checkpoint_summaries_by_digest: &[CheckpointDigest],
461 ) -> SuiResult<(
462 Vec<Option<CertifiedCheckpointSummary>>,
463 Vec<Option<CheckpointContents>>,
464 Vec<Option<CertifiedCheckpointSummary>>,
465 )> {
466 let mut res = self
467 .primary
468 .multi_get_checkpoints(
469 checkpoint_summaries,
470 checkpoint_contents,
471 checkpoint_summaries_by_digest,
472 )
473 .await?;
474
475 let (fallback_summaries, indices_summaries) = find_fallback(&res.0, checkpoint_summaries);
476 let (fallback_contents, indices_contents) = find_fallback(&res.1, checkpoint_contents);
477 let (fallback_summaries_by_digest, indices_summaries_by_digest) =
478 find_fallback(&res.2, checkpoint_summaries_by_digest);
479
480 if fallback_summaries.is_empty()
481 && fallback_contents.is_empty()
482 && fallback_summaries_by_digest.is_empty()
483 {
484 return Ok(res);
485 }
486
487 let secondary_res = self
488 .fallback
489 .multi_get_checkpoints(
490 &fallback_summaries,
491 &fallback_contents,
492 &fallback_summaries_by_digest,
493 )
494 .await?;
495
496 merge_res(&mut res.0, secondary_res.0, &indices_summaries);
497 merge_res(&mut res.1, secondary_res.1, &indices_contents);
498 merge_res(&mut res.2, secondary_res.2, &indices_summaries_by_digest);
499
500 Ok((res.0, res.1, res.2))
501 }
502
503 #[instrument(level = "trace", skip_all)]
504 async fn deprecated_get_transaction_checkpoint(
505 &self,
506 digest: TransactionDigest,
507 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
508 let mut res = self
509 .primary
510 .deprecated_get_transaction_checkpoint(digest)
511 .await?;
512 if res.is_none() {
513 res = self
514 .fallback
515 .deprecated_get_transaction_checkpoint(digest)
516 .await?;
517 }
518 Ok(res)
519 }
520
521 #[instrument(level = "trace", skip_all)]
522 async fn get_object(
523 &self,
524 object_id: ObjectID,
525 version: SequenceNumber,
526 ) -> SuiResult<Option<Object>> {
527 let mut res = self.primary.get_object(object_id, version).await?;
528 if res.is_none() {
529 res = self.fallback.get_object(object_id, version).await?;
530 }
531 Ok(res)
532 }
533
534 #[instrument(level = "trace", skip_all)]
535 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
536 let mut res = self.primary.multi_get_objects(object_keys).await?;
537
538 let (fallback, indices) = find_fallback(&res, object_keys);
539
540 if fallback.is_empty() {
541 return Ok(res);
542 }
543
544 let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
545
546 merge_res(&mut res, secondary_res, &indices);
547
548 Ok(res)
549 }
550
551 #[instrument(level = "trace", skip_all)]
552 async fn multi_get_transaction_checkpoint(
553 &self,
554 digests: &[TransactionDigest],
555 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
556 let mut res = self
557 .primary
558 .multi_get_transaction_checkpoint(digests)
559 .await?;
560
561 let (fallback, indices) = find_fallback(&res, digests);
562
563 if fallback.is_empty() {
564 return Ok(res);
565 }
566
567 let secondary_res = self
568 .fallback
569 .multi_get_transaction_checkpoint(&fallback)
570 .await?;
571
572 merge_res(&mut res, secondary_res, &indices);
573
574 Ok(res)
575 }
576
577 #[instrument(level = "trace", skip_all)]
578 async fn multi_get_events_by_tx_digests(
579 &self,
580 digests: &[TransactionDigest],
581 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
582 let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
583 let (fallback, indices) = find_fallback(&res, digests);
584 if fallback.is_empty() {
585 return Ok(res);
586 }
587 let secondary_res = self
588 .fallback
589 .multi_get_events_by_tx_digests(&fallback)
590 .await?;
591 merge_res(&mut res, secondary_res, &indices);
592 Ok(res)
593 }
594}
595
596fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
597 let num_nones = values.iter().filter(|v| v.is_none()).count();
598 let mut fallback_keys = Vec::with_capacity(num_nones);
599 let mut fallback_indices = Vec::with_capacity(num_nones);
600 for (i, value) in values.iter().enumerate() {
601 if value.is_none() {
602 fallback_keys.push(keys[i].clone());
603 fallback_indices.push(i);
604 }
605 }
606 (fallback_keys, fallback_indices)
607}
608
609fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
610 for (&index, fallback_value) in indices.iter().zip(fallback_values) {
611 values[index] = fallback_value;
612 }
613}