sui_rpc_store/indexer/
epochs.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::epochs`](crate::schema::epochs) CF.
6//!
7//! Driven by [`Checkpoint::epoch_info`], which returns a populated
8//! [`sui_types::storage::EpochInfo`] for the *new* epoch on
9//! two cases: the genesis checkpoint, and the end-of-epoch
10//! checkpoint of the prior epoch (the one carrying
11//! `end_of_epoch_data`). Every other checkpoint returns `None`
12//! and contributes nothing.
13//!
14//! Mirroring the `index_epoch` flow in `sui-core::rpc_index`:
15//!
16//! 1. Emit a *start* operand for the new epoch — protocol version,
17//!    reference gas price, start timestamp, start checkpoint, and
18//!    BCS-encoded `SuiSystemState`, all read off the new
19//!    system-state object the end-of-epoch transaction wrote to
20//!    its outputs.
21//! 2. If the new epoch is non-genesis, emit an *end* operand for
22//!    the prior epoch, built from the boundary checkpoint (the prior
23//!    epoch's last, which carries `end_of_epoch_data`): its
24//!    timestamp and sequence number, its
25//!    `network_total_transactions` as `tx_hi`, the end-of-epoch
26//!    commitments, and — unless the epoch ended in safe mode — the
27//!    gas and stake counters from the change-epoch transaction's
28//!    `SystemEpochInfoEvent`. Mirrors the postgres `kv_epoch_ends`
29//!    handler.
30
31use std::sync::Arc;
32
33use async_trait::async_trait;
34use sui_indexer_alt_framework::pipeline::Processor;
35use sui_indexer_alt_framework::pipeline::sequential;
36use sui_types::event::SystemEpochInfoEvent;
37use sui_types::full_checkpoint_content::Checkpoint;
38
39use crate::indexer::Schema;
40use crate::indexer::Store;
41use crate::schema::epochs;
42use crate::schema::primitives::U64Be;
43
44/// Pipeline marker for `epochs`.
45pub struct Epochs;
46
47/// A partial [`StoredEpoch`](crate::proto::StoredEpoch) record keyed
48/// by epoch number, staged as a merge operand at commit time. Both
49/// the start and end partial records share this shape; the field-wise
50/// merge operator in [`schema::epochs`](crate::schema::epochs)
51/// combines them (and any re-seeds) into a full row.
52pub struct Row {
53    pub epoch: u64,
54    pub value: epochs::Value,
55}
56
57#[async_trait]
58impl Processor for Epochs {
59    const NAME: &'static str = "epochs";
60    type Value = Row;
61
62    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
63        let Some(epoch_info) = checkpoint
64            .epoch_info()
65            .map_err(|e| anyhow::anyhow!("extract epoch_info: {e}"))?
66        else {
67            return Ok(vec![]);
68        };
69
70        let mut rows = Vec::with_capacity(2);
71
72        // Start record for the new epoch. `EpochInfo` from
73        // `epoch_info()` populates every field except the end-of-
74        // epoch ones; pull them out into our `Row::Start` shape.
75        let system_state_bcs = epoch_info
76            .system_state
77            .as_ref()
78            .map(bcs::to_bytes)
79            .transpose()
80            .map_err(|e| anyhow::anyhow!("bcs encode SuiSystemState: {e}"))?;
81
82        rows.push(Row {
83            epoch: epoch_info.epoch,
84            value: epochs::start(
85                epoch_info.protocol_version.unwrap_or(0),
86                epoch_info.reference_gas_price.unwrap_or(0),
87                epoch_info.start_timestamp_ms.unwrap_or(0),
88                Some(epoch_info.start_checkpoint.unwrap_or(0)),
89                system_state_bcs,
90            ),
91        });
92
93        // End record for the prior epoch — skip on genesis where
94        // there is no prior epoch.
95        if epoch_info.epoch > 0 {
96            rows.push(Row {
97                epoch: checkpoint.summary.epoch(),
98                value: epochs::end(epoch_end(checkpoint)?),
99            });
100        }
101
102        Ok(rows)
103    }
104}
105
106/// Build the end-of-epoch record for the epoch ending at
107/// `checkpoint` — the boundary checkpoint carrying
108/// `end_of_epoch_data`.
109///
110/// Mirrors the postgres `kv_epoch_ends` handler: the gas and stake
111/// counters come from the `SystemEpochInfoEvent` emitted by the
112/// change-epoch transaction. An epoch that ends in safe mode emits
113/// no such event, so those counters stay `None` and `safe_mode` is
114/// recorded as `true`.
115fn epoch_end(checkpoint: &Checkpoint) -> anyhow::Result<epochs::EpochEnd> {
116    let summary = &checkpoint.summary;
117
118    let epoch_commitments = summary
119        .end_of_epoch_data
120        .as_ref()
121        .map(|data| bcs::to_bytes(&data.epoch_commitments))
122        .transpose()
123        .map_err(|e| anyhow::anyhow!("bcs encode epoch_commitments: {e}"))?
124        .unwrap_or_default();
125
126    // The `SystemEpochInfoEvent` is emitted only by the change-epoch
127    // transaction, so scanning every transaction's events finds it
128    // without having to identify that transaction by kind.
129    let event: Option<SystemEpochInfoEvent> = checkpoint
130        .transactions
131        .iter()
132        .filter_map(|tx| tx.events.as_ref())
133        .flat_map(|events| &events.data)
134        .find_map(|event| {
135            event
136                .is_system_epoch_info_event()
137                .then(|| bcs::from_bytes(&event.contents))
138        })
139        .transpose()
140        .map_err(|e| anyhow::anyhow!("bcs decode SystemEpochInfoEvent: {e}"))?;
141
142    let mut end = epochs::EpochEnd {
143        end_timestamp_ms: summary.timestamp_ms,
144        end_checkpoint: summary.sequence_number,
145        tx_hi: summary.network_total_transactions,
146        safe_mode: event.is_none(),
147        epoch_commitments,
148        ..Default::default()
149    };
150
151    if let Some(e) = event {
152        end.total_stake = Some(e.total_stake);
153        end.storage_fund_balance = Some(e.storage_fund_balance);
154        end.storage_fund_reinvestment = Some(e.storage_fund_reinvestment);
155        end.storage_charge = Some(e.storage_charge);
156        end.storage_rebate = Some(e.storage_rebate);
157        end.stake_subsidy_amount = Some(e.stake_subsidy_amount);
158        end.total_gas_fees = Some(e.total_gas_fees);
159        end.total_stake_rewards_distributed = Some(e.total_stake_rewards_distributed);
160        end.leftover_storage_fund_inflow = Some(e.leftover_storage_fund_inflow);
161    }
162
163    Ok(end)
164}
165
166#[async_trait]
167impl sequential::Handler for Epochs {
168    type Store = Store;
169    type Batch = Vec<Row>;
170
171    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
172        batch.extend(values);
173    }
174
175    async fn commit<'a>(
176        &self,
177        batch: &Self::Batch,
178        conn: &mut sui_consistent_store::Connection<'a, Schema>,
179    ) -> anyhow::Result<usize> {
180        let cf = &conn.store.schema().epochs;
181        for row in batch {
182            conn.batch.merge(cf, &U64Be(row.epoch), &row.value)?;
183        }
184        Ok(batch.len())
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::sync::Arc;
191
192    use sui_types::test_checkpoint_data_builder::AdvanceEpochConfig;
193    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
194
195    use super::*;
196
197    #[tokio::test]
198    async fn process_emits_nothing_for_non_epoch_boundary_checkpoint() {
199        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
200        let rows = Epochs.process(&checkpoint).await.unwrap();
201        assert!(rows.is_empty());
202    }
203
204    #[test]
205    fn epoch_end_captures_system_epoch_info_event() {
206        let mut builder = TestCheckpointBuilder::new(0);
207        let checkpoint = builder.advance_epoch(AdvanceEpochConfig::default());
208
209        let end = epoch_end(&checkpoint).unwrap();
210        assert!(!end.safe_mode);
211        // The builder emits a `SystemEpochInfoEvent` with default
212        // (zero) counters, which we record as `Some(0)` — distinct
213        // from the safe-mode `None`.
214        assert_eq!(end.total_gas_fees, Some(0));
215        assert_eq!(end.total_stake, Some(0));
216        assert_eq!(end.storage_charge, Some(0));
217        // Commitments are always recorded (BCS of the vec, never
218        // empty bytes even for an empty vec).
219        assert!(!end.epoch_commitments.is_empty());
220    }
221
222    #[test]
223    fn epoch_end_safe_mode_leaves_counters_unset() {
224        let mut builder = TestCheckpointBuilder::new(0);
225        let checkpoint = builder.advance_epoch(AdvanceEpochConfig {
226            safe_mode: true,
227            ..Default::default()
228        });
229
230        let end = epoch_end(&checkpoint).unwrap();
231        assert!(end.safe_mode);
232        assert_eq!(end.total_gas_fees, None);
233        assert_eq!(end.total_stake, None);
234    }
235}