sui_indexer_alt_jsonrpc/
data.rs1use std::time::Duration;
5
6use anyhow::Context as _;
7use anyhow::ensure;
8use serde::de::DeserializeOwned;
9use sui_indexer_alt_reader::object_versions::LatestObjectVersionKey;
10use sui_types::base_types::ObjectID;
11use sui_types::object::Object;
12
13use crate::context::Context;
14
15pub(crate) async fn load_live(
18 ctx: &Context,
19 object_id: ObjectID,
20) -> Result<Option<Object>, anyhow::Error> {
21 let Some(latest_version) = ctx
22 .pg_loader()
23 .load_one(LatestObjectVersionKey(object_id))
24 .await
25 .context("Failed to load latest version")?
26 else {
27 return Ok(None);
28 };
29
30 if latest_version.object_digest.is_none() {
31 return Ok(None);
32 }
33
34 let mut object = None;
36 let config = &ctx.config().objects;
37 let mut interval = tokio::time::interval(Duration::from_millis(config.obj_retry_interval_ms));
38 let mut retries = 0;
39
40 for _ in 0..=config.obj_retry_count {
41 interval.tick().await;
42
43 object = ctx
44 .kv_loader()
45 .load_one_object(object_id, latest_version.object_version as u64)
46 .await
47 .context("Failed to load latest object")?;
48 if object.is_some() {
49 break;
50 }
51
52 retries += 1;
53 ctx.metrics()
54 .read_retries
55 .with_label_values(&["kv_object"])
56 .inc();
57 }
58
59 ctx.metrics()
60 .read_retries_per_request
61 .with_label_values(&["kv_object"])
62 .observe(retries as f64);
63
64 ensure!(
66 object.is_some(),
67 "Eventual consistency discrepancy, try again later"
68 );
69 Ok(object)
70}
71
72pub(crate) async fn load_live_deserialized<T: DeserializeOwned>(
75 ctx: &Context,
76 object_id: ObjectID,
77) -> Result<T, anyhow::Error> {
78 let object = load_live(ctx, object_id).await?.context("No data found")?;
79
80 let move_object = object.data.try_as_move().context("Not a Move object")?;
81 bcs::from_bytes(move_object.contents()).context("Failed to deserialize Move value")
82}