1use prost::Message;
13use sui_consistent_store::Protobuf;
14use sui_consistent_store::error::DecodeError;
15use sui_consistent_store::error::Error;
16use sui_consistent_store::reader::Reader;
17use sui_types::committee::Committee;
18use sui_types::committee::EpochId;
19use sui_types::storage::EpochInfo;
20use sui_types::sui_system_state::SuiSystemState;
21use sui_types::sui_system_state::SuiSystemStateTrait;
22
23use crate::proto::StoredEpoch;
24use crate::schema::primitives::U64Be;
25
26pub const NAME: &str = "epochs";
27
28pub type Key = U64Be;
29pub type Value = Protobuf<StoredEpoch>;
30
31pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
33 let mut opts = resolver.options(NAME);
34 opts.set_merge_operator_associative("epochs_merge", merge);
35 opts
36}
37
38pub fn start(
51 protocol_version: u64,
52 reference_gas_price: u64,
53 start_timestamp_ms: u64,
54 start_checkpoint: Option<u64>,
55 system_state_bcs: Option<Vec<u8>>,
56) -> Value {
57 Protobuf(StoredEpoch {
58 protocol_version: Some(protocol_version),
59 reference_gas_price: Some(reference_gas_price),
60 start_timestamp_ms: Some(start_timestamp_ms),
61 start_checkpoint,
62 system_state_bcs: system_state_bcs.map(Into::into),
63 ..StoredEpoch::default()
64 })
65}
66
67#[derive(Debug, Default, Clone)]
75pub struct EpochEnd {
76 pub end_timestamp_ms: u64,
77 pub end_checkpoint: u64,
78 pub tx_hi: u64,
80 pub safe_mode: bool,
81 pub epoch_commitments: Vec<u8>,
84 pub total_stake: Option<u64>,
85 pub storage_fund_balance: Option<u64>,
86 pub storage_fund_reinvestment: Option<u64>,
87 pub storage_charge: Option<u64>,
88 pub storage_rebate: Option<u64>,
89 pub stake_subsidy_amount: Option<u64>,
90 pub total_gas_fees: Option<u64>,
91 pub total_stake_rewards_distributed: Option<u64>,
92 pub leftover_storage_fund_inflow: Option<u64>,
93}
94
95pub fn end(end: EpochEnd) -> Value {
99 Protobuf(StoredEpoch {
100 end_timestamp_ms: Some(end.end_timestamp_ms),
101 end_checkpoint: Some(end.end_checkpoint),
102 tx_hi: Some(end.tx_hi),
103 safe_mode: Some(end.safe_mode),
104 epoch_commitments: Some(end.epoch_commitments.into()),
105 total_stake: end.total_stake,
106 storage_fund_balance: end.storage_fund_balance,
107 storage_fund_reinvestment: end.storage_fund_reinvestment,
108 storage_charge: end.storage_charge,
109 storage_rebate: end.storage_rebate,
110 stake_subsidy_amount: end.stake_subsidy_amount,
111 total_gas_fees: end.total_gas_fees,
112 total_stake_rewards_distributed: end.total_stake_rewards_distributed,
113 leftover_storage_fund_inflow: end.leftover_storage_fund_inflow,
114 ..StoredEpoch::default()
115 })
116}
117
118fn merge(
127 _key: &[u8],
128 existing_val: Option<&[u8]>,
129 operands: &rocksdb::MergeOperands,
130) -> Option<Vec<u8>> {
131 let mut merged = existing_val
132 .map(|v| StoredEpoch::decode(v).expect("decode existing StoredEpoch"))
133 .unwrap_or_default();
134 for operand in operands {
135 let next = StoredEpoch::decode(operand).expect("decode StoredEpoch operand");
136 if next.protocol_version.is_some() {
137 merged.protocol_version = next.protocol_version;
138 }
139 if next.reference_gas_price.is_some() {
140 merged.reference_gas_price = next.reference_gas_price;
141 }
142 if next.start_timestamp_ms.is_some() {
143 merged.start_timestamp_ms = next.start_timestamp_ms;
144 }
145 if next.end_timestamp_ms.is_some() {
146 merged.end_timestamp_ms = next.end_timestamp_ms;
147 }
148 if next.start_checkpoint.is_some() {
149 merged.start_checkpoint = next.start_checkpoint;
150 }
151 if next.end_checkpoint.is_some() {
152 merged.end_checkpoint = next.end_checkpoint;
153 }
154 if next.system_state_bcs.is_some() {
155 merged.system_state_bcs = next.system_state_bcs;
156 }
157 if next.tx_hi.is_some() {
158 merged.tx_hi = next.tx_hi;
159 }
160 if next.safe_mode.is_some() {
161 merged.safe_mode = next.safe_mode;
162 }
163 if next.epoch_commitments.is_some() {
164 merged.epoch_commitments = next.epoch_commitments;
165 }
166 if next.total_stake.is_some() {
167 merged.total_stake = next.total_stake;
168 }
169 if next.storage_fund_balance.is_some() {
170 merged.storage_fund_balance = next.storage_fund_balance;
171 }
172 if next.storage_fund_reinvestment.is_some() {
173 merged.storage_fund_reinvestment = next.storage_fund_reinvestment;
174 }
175 if next.storage_charge.is_some() {
176 merged.storage_charge = next.storage_charge;
177 }
178 if next.storage_rebate.is_some() {
179 merged.storage_rebate = next.storage_rebate;
180 }
181 if next.stake_subsidy_amount.is_some() {
182 merged.stake_subsidy_amount = next.stake_subsidy_amount;
183 }
184 if next.total_gas_fees.is_some() {
185 merged.total_gas_fees = next.total_gas_fees;
186 }
187 if next.total_stake_rewards_distributed.is_some() {
188 merged.total_stake_rewards_distributed = next.total_stake_rewards_distributed;
189 }
190 if next.leftover_storage_fund_inflow.is_some() {
191 merged.leftover_storage_fund_inflow = next.leftover_storage_fund_inflow;
192 }
193 }
194 Some(merged.encode_to_vec())
195}
196
197impl<R: Reader> super::RpcStoreSchema<R> {
198 pub fn get_epoch(&self, epoch: EpochId) -> Result<Option<EpochInfo>, Error> {
202 let Some(stored) = self.epochs.get(&U64Be(epoch))? else {
203 return Ok(None);
204 };
205 let stored = stored.into_inner();
206 let system_state = stored
207 .system_state_bcs
208 .as_ref()
209 .map(|bytes| {
210 bcs::from_bytes::<SuiSystemState>(bytes)
211 .map_err(|e| DecodeError::with_source("bcs decode SuiSystemState", e))
212 })
213 .transpose()?;
214 Ok(Some(EpochInfo {
215 epoch,
216 protocol_version: stored.protocol_version,
217 start_timestamp_ms: stored.start_timestamp_ms,
218 end_timestamp_ms: stored.end_timestamp_ms,
219 start_checkpoint: stored.start_checkpoint,
220 end_checkpoint: stored.end_checkpoint,
221 reference_gas_price: stored.reference_gas_price,
222 system_state,
223 }))
224 }
225
226 pub fn get_committee(&self, epoch: EpochId) -> Result<Option<Committee>, Error> {
235 let Some(stored) = self.epochs.get(&U64Be(epoch))? else {
236 return Ok(None);
237 };
238 let stored = stored.into_inner();
239 let Some(bytes) = stored.system_state_bcs else {
240 return Ok(None);
241 };
242 let system_state: SuiSystemState = bcs::from_bytes(&bytes)
243 .map_err(|e| DecodeError::with_source("bcs decode SuiSystemState", e))?;
244 Ok(Some(
245 system_state
246 .get_current_epoch_committee()
247 .committee()
248 .clone(),
249 ))
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use sui_consistent_store::Db;
256 use sui_consistent_store::DbOptions;
257
258 use super::*;
259 use crate::RpcStoreSchema;
260
261 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
262 let dir = tempfile::tempdir().unwrap();
263 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
264 (dir, db, schema)
265 }
266
267 #[test]
268 fn get_returns_none_for_unknown_epoch() {
269 let (_dir, _db, schema) = fresh_db();
270 assert!(schema.get_epoch(7).unwrap().is_none());
271 }
272
273 #[test]
274 fn start_then_end_merges_into_full_record() {
275 let (_dir, db, schema) = fresh_db();
276 let mut batch = db.batch();
277 batch
278 .merge(
279 &schema.epochs,
280 &U64Be(42),
281 &start(73, 1000, 111, Some(500), None),
282 )
283 .unwrap();
284 batch
285 .merge(
286 &schema.epochs,
287 &U64Be(42),
288 &end(EpochEnd {
289 end_timestamp_ms: 999,
290 end_checkpoint: 600,
291 ..Default::default()
292 }),
293 )
294 .unwrap();
295 batch.commit().unwrap();
296
297 let info = schema.get_epoch(42).unwrap().expect("epoch present");
298 assert_eq!(info.epoch, 42);
299 assert_eq!(info.protocol_version, Some(73));
300 assert_eq!(info.reference_gas_price, Some(1000));
301 assert_eq!(info.start_timestamp_ms, Some(111));
302 assert_eq!(info.start_checkpoint, Some(500));
303 assert_eq!(info.end_timestamp_ms, Some(999));
304 assert_eq!(info.end_checkpoint, Some(600));
305 assert_eq!(info.system_state, None);
306 }
307
308 #[test]
309 fn end_before_start_still_yields_full_record() {
310 let (_dir, db, schema) = fresh_db();
316 let mut batch = db.batch();
317 batch
318 .merge(
319 &schema.epochs,
320 &U64Be(42),
321 &end(EpochEnd {
322 end_timestamp_ms: 999,
323 end_checkpoint: 600,
324 ..Default::default()
325 }),
326 )
327 .unwrap();
328 batch
329 .merge(
330 &schema.epochs,
331 &U64Be(42),
332 &start(73, 1000, 111, Some(500), None),
333 )
334 .unwrap();
335 batch.commit().unwrap();
336
337 let info = schema.get_epoch(42).unwrap().expect("epoch present");
338 assert_eq!(info.protocol_version, Some(73));
339 assert_eq!(info.end_checkpoint, Some(600));
340 }
341
342 #[test]
343 fn later_operand_overrides_earlier_for_same_field() {
344 let (_dir, db, schema) = fresh_db();
347 let mut batch = db.batch();
348 batch
349 .merge(
350 &schema.epochs,
351 &U64Be(42),
352 &start(73, 1000, 111, Some(500), None),
353 )
354 .unwrap();
355 batch
356 .merge(
357 &schema.epochs,
358 &U64Be(42),
359 &start(74, 1500, 222, Some(501), None),
360 )
361 .unwrap();
362 batch.commit().unwrap();
363
364 let info = schema.get_epoch(42).unwrap().expect("epoch present");
365 assert_eq!(info.protocol_version, Some(74));
366 assert_eq!(info.reference_gas_price, Some(1500));
367 assert_eq!(info.start_timestamp_ms, Some(222));
368 assert_eq!(info.start_checkpoint, Some(501));
369 }
370
371 #[test]
372 fn get_committee_returns_none_for_unknown_epoch() {
373 let (_dir, _db, schema) = fresh_db();
374 assert!(schema.get_committee(7).unwrap().is_none());
375 }
376
377 #[test]
378 fn get_committee_returns_none_when_system_state_absent() {
379 let (_dir, db, schema) = fresh_db();
383 let mut batch = db.batch();
384 batch
385 .merge(
386 &schema.epochs,
387 &U64Be(42),
388 &end(EpochEnd {
389 end_timestamp_ms: 999,
390 end_checkpoint: 600,
391 ..Default::default()
392 }),
393 )
394 .unwrap();
395 batch.commit().unwrap();
396
397 assert!(schema.get_committee(42).unwrap().is_none());
398 }
399
400 #[test]
401 fn only_start_leaves_end_fields_none() {
402 let (_dir, db, schema) = fresh_db();
403 let mut batch = db.batch();
404 batch
405 .merge(
406 &schema.epochs,
407 &U64Be(42),
408 &start(73, 1000, 111, Some(500), None),
409 )
410 .unwrap();
411 batch.commit().unwrap();
412
413 let info = schema.get_epoch(42).unwrap().expect("epoch present");
414 assert_eq!(info.end_timestamp_ms, None);
415 assert_eq!(info.end_checkpoint, None);
416 }
417
418 #[test]
419 fn partial_seed_then_backfill_fills_start_checkpoint() {
420 let (_dir, db, schema) = fresh_db();
425 let mut batch = db.batch();
426 batch
427 .merge(
428 &schema.epochs,
429 &U64Be(42),
430 &start(73, 1000, 111, None, None),
431 )
432 .unwrap();
433 batch
434 .merge(
435 &schema.epochs,
436 &U64Be(42),
437 &start(73, 1000, 111, Some(500), None),
438 )
439 .unwrap();
440 batch.commit().unwrap();
441
442 let info = schema.get_epoch(42).unwrap().expect("epoch present");
443 assert_eq!(info.start_checkpoint, Some(500));
444 assert_eq!(info.protocol_version, Some(73));
445 }
446
447 #[test]
448 fn partial_seed_does_not_clobber_known_start_checkpoint() {
449 let (_dir, db, schema) = fresh_db();
454 let mut batch = db.batch();
455 batch
456 .merge(
457 &schema.epochs,
458 &U64Be(42),
459 &start(73, 1000, 111, Some(500), None),
460 )
461 .unwrap();
462 batch
463 .merge(
464 &schema.epochs,
465 &U64Be(42),
466 &start(73, 1000, 111, None, None),
467 )
468 .unwrap();
469 batch.commit().unwrap();
470
471 let info = schema.get_epoch(42).unwrap().expect("epoch present");
472 assert_eq!(info.start_checkpoint, Some(500));
473 }
474
475 #[test]
476 fn end_record_stores_system_epoch_info_fields() {
477 let (_dir, db, schema) = fresh_db();
478 let mut batch = db.batch();
479 batch
480 .merge(
481 &schema.epochs,
482 &U64Be(42),
483 &end(EpochEnd {
484 end_timestamp_ms: 999,
485 end_checkpoint: 600,
486 tx_hi: 12_345,
487 safe_mode: false,
488 epoch_commitments: vec![1, 2, 3],
489 total_stake: Some(1_000),
490 storage_fund_balance: Some(2_000),
491 storage_fund_reinvestment: Some(3_000),
492 storage_charge: Some(4_000),
493 storage_rebate: Some(5_000),
494 stake_subsidy_amount: Some(6_000),
495 total_gas_fees: Some(7_000),
496 total_stake_rewards_distributed: Some(8_000),
497 leftover_storage_fund_inflow: Some(9_000),
498 }),
499 )
500 .unwrap();
501 batch.commit().unwrap();
502
503 let stored = schema
506 .epochs
507 .get(&U64Be(42))
508 .unwrap()
509 .expect("epoch present")
510 .into_inner();
511 assert_eq!(stored.tx_hi, Some(12_345));
512 assert_eq!(stored.safe_mode, Some(false));
513 assert_eq!(stored.epoch_commitments.as_deref(), Some(&[1, 2, 3][..]));
514 assert_eq!(stored.total_stake, Some(1_000));
515 assert_eq!(stored.storage_fund_balance, Some(2_000));
516 assert_eq!(stored.storage_fund_reinvestment, Some(3_000));
517 assert_eq!(stored.storage_charge, Some(4_000));
518 assert_eq!(stored.storage_rebate, Some(5_000));
519 assert_eq!(stored.stake_subsidy_amount, Some(6_000));
520 assert_eq!(stored.total_gas_fees, Some(7_000));
521 assert_eq!(stored.total_stake_rewards_distributed, Some(8_000));
522 assert_eq!(stored.leftover_storage_fund_inflow, Some(9_000));
523 }
524
525 #[test]
526 fn safe_mode_end_leaves_event_counters_unset() {
527 let (_dir, db, schema) = fresh_db();
531 let mut batch = db.batch();
532 batch
533 .merge(
534 &schema.epochs,
535 &U64Be(42),
536 &end(EpochEnd {
537 end_timestamp_ms: 999,
538 end_checkpoint: 600,
539 tx_hi: 7,
540 safe_mode: true,
541 epoch_commitments: vec![0],
542 ..Default::default()
543 }),
544 )
545 .unwrap();
546 batch.commit().unwrap();
547
548 let stored = schema
549 .epochs
550 .get(&U64Be(42))
551 .unwrap()
552 .expect("epoch present")
553 .into_inner();
554 assert_eq!(stored.safe_mode, Some(true));
555 assert_eq!(stored.tx_hi, Some(7));
556 assert_eq!(stored.total_stake, None);
557 assert_eq!(stored.total_gas_fees, None);
558 assert_eq!(stored.storage_charge, None);
559 }
560}