sui_rpc_store/indexer/
epochs.rs1use std::sync::Arc;
32
33use async_trait::async_trait;
34use sui_indexer_alt_framework::pipeline::Processor;
35use sui_indexer_alt_framework::pipeline::sequential;
36use sui_types::event::SystemEpochInfoEvent;
37use sui_types::full_checkpoint_content::Checkpoint;
38
39use crate::indexer::Schema;
40use crate::indexer::Store;
41use crate::schema::epochs;
42use crate::schema::primitives::U64Be;
43
44pub struct Epochs;
46
47pub struct Row {
53 pub epoch: u64,
54 pub value: epochs::Value,
55}
56
57#[async_trait]
58impl Processor for Epochs {
59 const NAME: &'static str = "epochs";
60 type Value = Row;
61
62 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
63 let Some(epoch_info) = checkpoint
64 .epoch_info()
65 .map_err(|e| anyhow::anyhow!("extract epoch_info: {e}"))?
66 else {
67 return Ok(vec![]);
68 };
69
70 let mut rows = Vec::with_capacity(2);
71
72 let system_state_bcs = epoch_info
76 .system_state
77 .as_ref()
78 .map(bcs::to_bytes)
79 .transpose()
80 .map_err(|e| anyhow::anyhow!("bcs encode SuiSystemState: {e}"))?;
81
82 rows.push(Row {
83 epoch: epoch_info.epoch,
84 value: epochs::start(
85 epoch_info.protocol_version.unwrap_or(0),
86 epoch_info.reference_gas_price.unwrap_or(0),
87 epoch_info.start_timestamp_ms.unwrap_or(0),
88 Some(epoch_info.start_checkpoint.unwrap_or(0)),
89 system_state_bcs,
90 ),
91 });
92
93 if epoch_info.epoch > 0 {
96 rows.push(Row {
97 epoch: checkpoint.summary.epoch(),
98 value: epochs::end(epoch_end(checkpoint)?),
99 });
100 }
101
102 Ok(rows)
103 }
104}
105
106fn epoch_end(checkpoint: &Checkpoint) -> anyhow::Result<epochs::EpochEnd> {
116 let summary = &checkpoint.summary;
117
118 let epoch_commitments = summary
119 .end_of_epoch_data
120 .as_ref()
121 .map(|data| bcs::to_bytes(&data.epoch_commitments))
122 .transpose()
123 .map_err(|e| anyhow::anyhow!("bcs encode epoch_commitments: {e}"))?
124 .unwrap_or_default();
125
126 let event: Option<SystemEpochInfoEvent> = checkpoint
130 .transactions
131 .iter()
132 .filter_map(|tx| tx.events.as_ref())
133 .flat_map(|events| &events.data)
134 .find_map(|event| {
135 event
136 .is_system_epoch_info_event()
137 .then(|| bcs::from_bytes(&event.contents))
138 })
139 .transpose()
140 .map_err(|e| anyhow::anyhow!("bcs decode SystemEpochInfoEvent: {e}"))?;
141
142 let mut end = epochs::EpochEnd {
143 end_timestamp_ms: summary.timestamp_ms,
144 end_checkpoint: summary.sequence_number,
145 tx_hi: summary.network_total_transactions,
146 safe_mode: event.is_none(),
147 epoch_commitments,
148 ..Default::default()
149 };
150
151 if let Some(e) = event {
152 end.total_stake = Some(e.total_stake);
153 end.storage_fund_balance = Some(e.storage_fund_balance);
154 end.storage_fund_reinvestment = Some(e.storage_fund_reinvestment);
155 end.storage_charge = Some(e.storage_charge);
156 end.storage_rebate = Some(e.storage_rebate);
157 end.stake_subsidy_amount = Some(e.stake_subsidy_amount);
158 end.total_gas_fees = Some(e.total_gas_fees);
159 end.total_stake_rewards_distributed = Some(e.total_stake_rewards_distributed);
160 end.leftover_storage_fund_inflow = Some(e.leftover_storage_fund_inflow);
161 }
162
163 Ok(end)
164}
165
166#[async_trait]
167impl sequential::Handler for Epochs {
168 type Store = Store;
169 type Batch = Vec<Row>;
170
171 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
172 batch.extend(values);
173 }
174
175 async fn commit<'a>(
176 &self,
177 batch: &Self::Batch,
178 conn: &mut sui_consistent_store::Connection<'a, Schema>,
179 ) -> anyhow::Result<usize> {
180 let cf = &conn.store.schema().epochs;
181 for row in batch {
182 conn.batch.merge(cf, &U64Be(row.epoch), &row.value)?;
183 }
184 Ok(batch.len())
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::sync::Arc;
191
192 use sui_types::test_checkpoint_data_builder::AdvanceEpochConfig;
193 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
194
195 use super::*;
196
197 #[tokio::test]
198 async fn process_emits_nothing_for_non_epoch_boundary_checkpoint() {
199 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
200 let rows = Epochs.process(&checkpoint).await.unwrap();
201 assert!(rows.is_empty());
202 }
203
204 #[test]
205 fn epoch_end_captures_system_epoch_info_event() {
206 let mut builder = TestCheckpointBuilder::new(0);
207 let checkpoint = builder.advance_epoch(AdvanceEpochConfig::default());
208
209 let end = epoch_end(&checkpoint).unwrap();
210 assert!(!end.safe_mode);
211 assert_eq!(end.total_gas_fees, Some(0));
215 assert_eq!(end.total_stake, Some(0));
216 assert_eq!(end.storage_charge, Some(0));
217 assert!(!end.epoch_commitments.is_empty());
220 }
221
222 #[test]
223 fn epoch_end_safe_mode_leaves_counters_unset() {
224 let mut builder = TestCheckpointBuilder::new(0);
225 let checkpoint = builder.advance_epoch(AdvanceEpochConfig {
226 safe_mode: true,
227 ..Default::default()
228 });
229
230 let end = epoch_end(&checkpoint).unwrap();
231 assert!(end.safe_mode);
232 assert_eq!(end.total_gas_fees, None);
233 assert_eq!(end.total_stake, None);
234 }
235}