sui_rpc_store/schema/
epochs.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `EpochId` → `StoredEpoch`.
5//!
6//! Each row records one epoch's metadata. The CF is populated by
7//! two independent indexer pipelines that emit partial records —
8//! one at epoch start, one at epoch end — combined by an
9//! associative merge operator that copies any field set in an
10//! operand into the accumulator.
11
12use prost::Message;
13use sui_consistent_store::Protobuf;
14use sui_consistent_store::error::DecodeError;
15use sui_consistent_store::error::Error;
16use sui_consistent_store::reader::Reader;
17use sui_types::committee::Committee;
18use sui_types::committee::EpochId;
19use sui_types::storage::EpochInfo;
20use sui_types::sui_system_state::SuiSystemState;
21use sui_types::sui_system_state::SuiSystemStateTrait;
22
23use crate::proto::StoredEpoch;
24use crate::schema::primitives::U64Be;
25
26pub const NAME: &str = "epochs";
27
28pub type Key = U64Be;
29pub type Value = Protobuf<StoredEpoch>;
30
31/// CF options: install the field-wise merge operator.
32pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
33    let mut opts = resolver.options(NAME);
34    opts.set_merge_operator_associative("epochs_merge", merge);
35    opts
36}
37
38/// Build a partial `StoredEpoch` carrying the start-of-epoch
39/// fields. Indexer pipelines that observe an epoch's first
40/// checkpoint stage this as a merge operand against the epoch's
41/// key.
42///
43/// `start_checkpoint` is itself optional: the embedded fullnode
44/// seeds the current epoch's row from a *mid-epoch* `SuiSystemState`
45/// at restore time and cannot know the epoch's first checkpoint
46/// (it precedes the restore tip). Passing `None` leaves the field
47/// unset, and because the merge operator only copies fields that
48/// are present, a later full start record from the upward backfill
49/// fills it in without either operand clobbering the other.
50pub fn start(
51    protocol_version: u64,
52    reference_gas_price: u64,
53    start_timestamp_ms: u64,
54    start_checkpoint: Option<u64>,
55    system_state_bcs: Option<Vec<u8>>,
56) -> Value {
57    Protobuf(StoredEpoch {
58        protocol_version: Some(protocol_version),
59        reference_gas_price: Some(reference_gas_price),
60        start_timestamp_ms: Some(start_timestamp_ms),
61        start_checkpoint,
62        system_state_bcs: system_state_bcs.map(Into::into),
63        ..StoredEpoch::default()
64    })
65}
66
67/// The end-of-epoch fields staged as a merge operand by [`end`].
68///
69/// The `SystemEpochInfoEvent` counters (everything from
70/// [`total_stake`](Self::total_stake) onward) are `None` when the
71/// epoch ended in safe mode, where no such event is emitted;
72/// [`safe_mode`](Self::safe_mode) records which case applies.
73/// Mirrors the columns of the postgres `kv_epoch_ends` table.
74#[derive(Debug, Default, Clone)]
75pub struct EpochEnd {
76    pub end_timestamp_ms: u64,
77    pub end_checkpoint: u64,
78    /// `network_total_transactions` at the epoch's final checkpoint.
79    pub tx_hi: u64,
80    pub safe_mode: bool,
81    /// BCS-encoded `Vec<CheckpointCommitment>` from the checkpoint's
82    /// end-of-epoch data.
83    pub epoch_commitments: Vec<u8>,
84    pub total_stake: Option<u64>,
85    pub storage_fund_balance: Option<u64>,
86    pub storage_fund_reinvestment: Option<u64>,
87    pub storage_charge: Option<u64>,
88    pub storage_rebate: Option<u64>,
89    pub stake_subsidy_amount: Option<u64>,
90    pub total_gas_fees: Option<u64>,
91    pub total_stake_rewards_distributed: Option<u64>,
92    pub leftover_storage_fund_inflow: Option<u64>,
93}
94
95/// Build a partial `StoredEpoch` carrying the end-of-epoch fields.
96/// Indexer pipelines that observe an epoch's final checkpoint
97/// stage this as a merge operand against the epoch's key.
98pub fn end(end: EpochEnd) -> Value {
99    Protobuf(StoredEpoch {
100        end_timestamp_ms: Some(end.end_timestamp_ms),
101        end_checkpoint: Some(end.end_checkpoint),
102        tx_hi: Some(end.tx_hi),
103        safe_mode: Some(end.safe_mode),
104        epoch_commitments: Some(end.epoch_commitments.into()),
105        total_stake: end.total_stake,
106        storage_fund_balance: end.storage_fund_balance,
107        storage_fund_reinvestment: end.storage_fund_reinvestment,
108        storage_charge: end.storage_charge,
109        storage_rebate: end.storage_rebate,
110        stake_subsidy_amount: end.stake_subsidy_amount,
111        total_gas_fees: end.total_gas_fees,
112        total_stake_rewards_distributed: end.total_stake_rewards_distributed,
113        leftover_storage_fund_inflow: end.leftover_storage_fund_inflow,
114        ..StoredEpoch::default()
115    })
116}
117
118/// Associative merge operator: take any field set on an operand
119/// over what was accumulated so far, processed left to right.
120///
121/// Encode and decode failures here signal a corrupt row or a
122/// programmer error in the helpers above; this CF is written only
123/// by the crate's own `start` and `end` builders, so a parse
124/// failure isn't a recoverable situation — panic rather than
125/// silently lose data.
126fn merge(
127    _key: &[u8],
128    existing_val: Option<&[u8]>,
129    operands: &rocksdb::MergeOperands,
130) -> Option<Vec<u8>> {
131    let mut merged = existing_val
132        .map(|v| StoredEpoch::decode(v).expect("decode existing StoredEpoch"))
133        .unwrap_or_default();
134    for operand in operands {
135        let next = StoredEpoch::decode(operand).expect("decode StoredEpoch operand");
136        if next.protocol_version.is_some() {
137            merged.protocol_version = next.protocol_version;
138        }
139        if next.reference_gas_price.is_some() {
140            merged.reference_gas_price = next.reference_gas_price;
141        }
142        if next.start_timestamp_ms.is_some() {
143            merged.start_timestamp_ms = next.start_timestamp_ms;
144        }
145        if next.end_timestamp_ms.is_some() {
146            merged.end_timestamp_ms = next.end_timestamp_ms;
147        }
148        if next.start_checkpoint.is_some() {
149            merged.start_checkpoint = next.start_checkpoint;
150        }
151        if next.end_checkpoint.is_some() {
152            merged.end_checkpoint = next.end_checkpoint;
153        }
154        if next.system_state_bcs.is_some() {
155            merged.system_state_bcs = next.system_state_bcs;
156        }
157        if next.tx_hi.is_some() {
158            merged.tx_hi = next.tx_hi;
159        }
160        if next.safe_mode.is_some() {
161            merged.safe_mode = next.safe_mode;
162        }
163        if next.epoch_commitments.is_some() {
164            merged.epoch_commitments = next.epoch_commitments;
165        }
166        if next.total_stake.is_some() {
167            merged.total_stake = next.total_stake;
168        }
169        if next.storage_fund_balance.is_some() {
170            merged.storage_fund_balance = next.storage_fund_balance;
171        }
172        if next.storage_fund_reinvestment.is_some() {
173            merged.storage_fund_reinvestment = next.storage_fund_reinvestment;
174        }
175        if next.storage_charge.is_some() {
176            merged.storage_charge = next.storage_charge;
177        }
178        if next.storage_rebate.is_some() {
179            merged.storage_rebate = next.storage_rebate;
180        }
181        if next.stake_subsidy_amount.is_some() {
182            merged.stake_subsidy_amount = next.stake_subsidy_amount;
183        }
184        if next.total_gas_fees.is_some() {
185            merged.total_gas_fees = next.total_gas_fees;
186        }
187        if next.total_stake_rewards_distributed.is_some() {
188            merged.total_stake_rewards_distributed = next.total_stake_rewards_distributed;
189        }
190        if next.leftover_storage_fund_inflow.is_some() {
191            merged.leftover_storage_fund_inflow = next.leftover_storage_fund_inflow;
192        }
193    }
194    Some(merged.encode_to_vec())
195}
196
197impl<R: Reader> super::RpcStoreSchema<R> {
198    /// Look up the metadata for an epoch, decoding the embedded
199    /// BCS `SuiSystemState` into the canonical
200    /// [`sui_types::storage::EpochInfo`].
201    pub fn get_epoch(&self, epoch: EpochId) -> Result<Option<EpochInfo>, Error> {
202        let Some(stored) = self.epochs.get(&U64Be(epoch))? else {
203            return Ok(None);
204        };
205        let stored = stored.into_inner();
206        let system_state = stored
207            .system_state_bcs
208            .as_ref()
209            .map(|bytes| {
210                bcs::from_bytes::<SuiSystemState>(bytes)
211                    .map_err(|e| DecodeError::with_source("bcs decode SuiSystemState", e))
212            })
213            .transpose()?;
214        Ok(Some(EpochInfo {
215            epoch,
216            protocol_version: stored.protocol_version,
217            start_timestamp_ms: stored.start_timestamp_ms,
218            end_timestamp_ms: stored.end_timestamp_ms,
219            start_checkpoint: stored.start_checkpoint,
220            end_checkpoint: stored.end_checkpoint,
221            reference_gas_price: stored.reference_gas_price,
222            system_state,
223        }))
224    }
225
226    /// Look up the validator committee active during an epoch.
227    ///
228    /// Derived from the stored `SuiSystemState` rather than kept
229    /// in its own CF — the system state already carries the
230    /// validator set, so a dedicated committee row would just be
231    /// duplicate bytes. Returns `Ok(None)` if no epoch row exists
232    /// or if the epoch row exists but no `system_state_bcs` has
233    /// been observed yet.
234    pub fn get_committee(&self, epoch: EpochId) -> Result<Option<Committee>, Error> {
235        let Some(stored) = self.epochs.get(&U64Be(epoch))? else {
236            return Ok(None);
237        };
238        let stored = stored.into_inner();
239        let Some(bytes) = stored.system_state_bcs else {
240            return Ok(None);
241        };
242        let system_state: SuiSystemState = bcs::from_bytes(&bytes)
243            .map_err(|e| DecodeError::with_source("bcs decode SuiSystemState", e))?;
244        Ok(Some(
245            system_state
246                .get_current_epoch_committee()
247                .committee()
248                .clone(),
249        ))
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use sui_consistent_store::Db;
256    use sui_consistent_store::DbOptions;
257
258    use super::*;
259    use crate::RpcStoreSchema;
260
261    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
262        let dir = tempfile::tempdir().unwrap();
263        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
264        (dir, db, schema)
265    }
266
267    #[test]
268    fn get_returns_none_for_unknown_epoch() {
269        let (_dir, _db, schema) = fresh_db();
270        assert!(schema.get_epoch(7).unwrap().is_none());
271    }
272
273    #[test]
274    fn start_then_end_merges_into_full_record() {
275        let (_dir, db, schema) = fresh_db();
276        let mut batch = db.batch();
277        batch
278            .merge(
279                &schema.epochs,
280                &U64Be(42),
281                &start(73, 1000, 111, Some(500), None),
282            )
283            .unwrap();
284        batch
285            .merge(
286                &schema.epochs,
287                &U64Be(42),
288                &end(EpochEnd {
289                    end_timestamp_ms: 999,
290                    end_checkpoint: 600,
291                    ..Default::default()
292                }),
293            )
294            .unwrap();
295        batch.commit().unwrap();
296
297        let info = schema.get_epoch(42).unwrap().expect("epoch present");
298        assert_eq!(info.epoch, 42);
299        assert_eq!(info.protocol_version, Some(73));
300        assert_eq!(info.reference_gas_price, Some(1000));
301        assert_eq!(info.start_timestamp_ms, Some(111));
302        assert_eq!(info.start_checkpoint, Some(500));
303        assert_eq!(info.end_timestamp_ms, Some(999));
304        assert_eq!(info.end_checkpoint, Some(600));
305        assert_eq!(info.system_state, None);
306    }
307
308    #[test]
309    fn end_before_start_still_yields_full_record() {
310        // Pipelines run independently; the synchronizer doesn't
311        // guarantee start is staged before end at the CF level
312        // (only that both are visible before the next snapshot).
313        // A row that sees `end` first then `start` should land in
314        // the same state.
315        let (_dir, db, schema) = fresh_db();
316        let mut batch = db.batch();
317        batch
318            .merge(
319                &schema.epochs,
320                &U64Be(42),
321                &end(EpochEnd {
322                    end_timestamp_ms: 999,
323                    end_checkpoint: 600,
324                    ..Default::default()
325                }),
326            )
327            .unwrap();
328        batch
329            .merge(
330                &schema.epochs,
331                &U64Be(42),
332                &start(73, 1000, 111, Some(500), None),
333            )
334            .unwrap();
335        batch.commit().unwrap();
336
337        let info = schema.get_epoch(42).unwrap().expect("epoch present");
338        assert_eq!(info.protocol_version, Some(73));
339        assert_eq!(info.end_checkpoint, Some(600));
340    }
341
342    #[test]
343    fn later_operand_overrides_earlier_for_same_field() {
344        // Re-indexing semantics: if the same field is written
345        // twice the later operand wins.
346        let (_dir, db, schema) = fresh_db();
347        let mut batch = db.batch();
348        batch
349            .merge(
350                &schema.epochs,
351                &U64Be(42),
352                &start(73, 1000, 111, Some(500), None),
353            )
354            .unwrap();
355        batch
356            .merge(
357                &schema.epochs,
358                &U64Be(42),
359                &start(74, 1500, 222, Some(501), None),
360            )
361            .unwrap();
362        batch.commit().unwrap();
363
364        let info = schema.get_epoch(42).unwrap().expect("epoch present");
365        assert_eq!(info.protocol_version, Some(74));
366        assert_eq!(info.reference_gas_price, Some(1500));
367        assert_eq!(info.start_timestamp_ms, Some(222));
368        assert_eq!(info.start_checkpoint, Some(501));
369    }
370
371    #[test]
372    fn get_committee_returns_none_for_unknown_epoch() {
373        let (_dir, _db, schema) = fresh_db();
374        assert!(schema.get_committee(7).unwrap().is_none());
375    }
376
377    #[test]
378    fn get_committee_returns_none_when_system_state_absent() {
379        // A row exists for the epoch but only the end-of-epoch
380        // partial record has been written, so we can't derive a
381        // committee from it.
382        let (_dir, db, schema) = fresh_db();
383        let mut batch = db.batch();
384        batch
385            .merge(
386                &schema.epochs,
387                &U64Be(42),
388                &end(EpochEnd {
389                    end_timestamp_ms: 999,
390                    end_checkpoint: 600,
391                    ..Default::default()
392                }),
393            )
394            .unwrap();
395        batch.commit().unwrap();
396
397        assert!(schema.get_committee(42).unwrap().is_none());
398    }
399
400    #[test]
401    fn only_start_leaves_end_fields_none() {
402        let (_dir, db, schema) = fresh_db();
403        let mut batch = db.batch();
404        batch
405            .merge(
406                &schema.epochs,
407                &U64Be(42),
408                &start(73, 1000, 111, Some(500), None),
409            )
410            .unwrap();
411        batch.commit().unwrap();
412
413        let info = schema.get_epoch(42).unwrap().expect("epoch present");
414        assert_eq!(info.end_timestamp_ms, None);
415        assert_eq!(info.end_checkpoint, None);
416    }
417
418    #[test]
419    fn partial_seed_then_backfill_fills_start_checkpoint() {
420        // The embedded restore seeds a partial start record from the
421        // mid-epoch system state with no `start_checkpoint`; a later
422        // full start record from the upward backfill supplies it. The
423        // merge must combine them rather than clobber.
424        let (_dir, db, schema) = fresh_db();
425        let mut batch = db.batch();
426        batch
427            .merge(
428                &schema.epochs,
429                &U64Be(42),
430                &start(73, 1000, 111, None, None),
431            )
432            .unwrap();
433        batch
434            .merge(
435                &schema.epochs,
436                &U64Be(42),
437                &start(73, 1000, 111, Some(500), None),
438            )
439            .unwrap();
440        batch.commit().unwrap();
441
442        let info = schema.get_epoch(42).unwrap().expect("epoch present");
443        assert_eq!(info.start_checkpoint, Some(500));
444        assert_eq!(info.protocol_version, Some(73));
445    }
446
447    #[test]
448    fn partial_seed_does_not_clobber_known_start_checkpoint() {
449        // Reverse order: a full start record then a partial re-seed
450        // (no `start_checkpoint`) must NOT erase the known value. This
451        // is the crux of the presence-tracked merge — a `None` operand
452        // leaves the field untouched.
453        let (_dir, db, schema) = fresh_db();
454        let mut batch = db.batch();
455        batch
456            .merge(
457                &schema.epochs,
458                &U64Be(42),
459                &start(73, 1000, 111, Some(500), None),
460            )
461            .unwrap();
462        batch
463            .merge(
464                &schema.epochs,
465                &U64Be(42),
466                &start(73, 1000, 111, None, None),
467            )
468            .unwrap();
469        batch.commit().unwrap();
470
471        let info = schema.get_epoch(42).unwrap().expect("epoch present");
472        assert_eq!(info.start_checkpoint, Some(500));
473    }
474
475    #[test]
476    fn end_record_stores_system_epoch_info_fields() {
477        let (_dir, db, schema) = fresh_db();
478        let mut batch = db.batch();
479        batch
480            .merge(
481                &schema.epochs,
482                &U64Be(42),
483                &end(EpochEnd {
484                    end_timestamp_ms: 999,
485                    end_checkpoint: 600,
486                    tx_hi: 12_345,
487                    safe_mode: false,
488                    epoch_commitments: vec![1, 2, 3],
489                    total_stake: Some(1_000),
490                    storage_fund_balance: Some(2_000),
491                    storage_fund_reinvestment: Some(3_000),
492                    storage_charge: Some(4_000),
493                    storage_rebate: Some(5_000),
494                    stake_subsidy_amount: Some(6_000),
495                    total_gas_fees: Some(7_000),
496                    total_stake_rewards_distributed: Some(8_000),
497                    leftover_storage_fund_inflow: Some(9_000),
498                }),
499            )
500            .unwrap();
501        batch.commit().unwrap();
502
503        // `get_epoch` deliberately does not surface these fields yet,
504        // so read the raw stored record to confirm they were written.
505        let stored = schema
506            .epochs
507            .get(&U64Be(42))
508            .unwrap()
509            .expect("epoch present")
510            .into_inner();
511        assert_eq!(stored.tx_hi, Some(12_345));
512        assert_eq!(stored.safe_mode, Some(false));
513        assert_eq!(stored.epoch_commitments.as_deref(), Some(&[1, 2, 3][..]));
514        assert_eq!(stored.total_stake, Some(1_000));
515        assert_eq!(stored.storage_fund_balance, Some(2_000));
516        assert_eq!(stored.storage_fund_reinvestment, Some(3_000));
517        assert_eq!(stored.storage_charge, Some(4_000));
518        assert_eq!(stored.storage_rebate, Some(5_000));
519        assert_eq!(stored.stake_subsidy_amount, Some(6_000));
520        assert_eq!(stored.total_gas_fees, Some(7_000));
521        assert_eq!(stored.total_stake_rewards_distributed, Some(8_000));
522        assert_eq!(stored.leftover_storage_fund_inflow, Some(9_000));
523    }
524
525    #[test]
526    fn safe_mode_end_leaves_event_counters_unset() {
527        // A safe-mode epoch emits no `SystemEpochInfoEvent`, so the
528        // counters stay `None` while `safe_mode`, `tx_hi`, and the
529        // commitments are still recorded.
530        let (_dir, db, schema) = fresh_db();
531        let mut batch = db.batch();
532        batch
533            .merge(
534                &schema.epochs,
535                &U64Be(42),
536                &end(EpochEnd {
537                    end_timestamp_ms: 999,
538                    end_checkpoint: 600,
539                    tx_hi: 7,
540                    safe_mode: true,
541                    epoch_commitments: vec![0],
542                    ..Default::default()
543                }),
544            )
545            .unwrap();
546        batch.commit().unwrap();
547
548        let stored = schema
549            .epochs
550            .get(&U64Be(42))
551            .unwrap()
552            .expect("epoch present")
553            .into_inner();
554        assert_eq!(stored.safe_mode, Some(true));
555        assert_eq!(stored.tx_hi, Some(7));
556        assert_eq!(stored.total_stake, None);
557        assert_eq!(stored.total_gas_fees, None);
558        assert_eq!(stored.storage_charge, None);
559    }
560}