1use std::{sync::Arc, time::Instant};
5
6use crate::checkpoints::CheckpointStore;
7use crate::execution_cache::TransactionCacheRead;
8use futures::{Stream, future::Either};
9use mysten_common::fatal;
10use std::time::Duration;
11use strum::VariantNames;
12use sui_types::{
13 base_types::{TransactionDigest, TransactionEffectsDigest},
14 message_envelope::Message,
15 messages_checkpoint::{CheckpointSequenceNumber, CheckpointSummary, VerifiedCheckpoint},
16};
17use tokio::sync::watch;
18use tracing::{debug, error, info, instrument, warn};
19
20use super::metrics::CheckpointExecutorMetrics;
21
22#[instrument(level = "debug", skip_all)]
23pub(super) fn stream_synced_checkpoints(
24 checkpoint_store: Arc<CheckpointStore>,
25 start_seq: CheckpointSequenceNumber,
26 stop_seq: Option<CheckpointSequenceNumber>,
27) -> impl Stream<Item = VerifiedCheckpoint> + 'static {
28 let scheduling_timeout_config = get_scheduling_timeout();
29 let panic_timeout = scheduling_timeout_config.panic_timeout;
30 let warning_timeout = scheduling_timeout_config.warning_timeout;
31
32 struct State {
33 current_seq: CheckpointSequenceNumber,
34 checkpoint_store: Arc<CheckpointStore>,
35 warning_timeout: Duration,
36 panic_timeout: Option<Duration>,
37 stop_seq: Option<CheckpointSequenceNumber>,
38 }
39
40 let state = State {
41 current_seq: start_seq,
42 checkpoint_store,
43 warning_timeout,
44 panic_timeout,
45 stop_seq,
46 };
47
48 futures::stream::unfold(Some(state), |state| async move {
49 match state {
50 None => None,
51 Some(state) if state.current_seq > state.stop_seq.unwrap_or(u64::MAX) => None,
52 Some(mut state) => {
53 let seq = state.current_seq;
54 let checkpoint = wait_for_checkpoint(
55 &state.checkpoint_store,
56 seq,
57 state.warning_timeout,
58 state.panic_timeout,
59 )
60 .await;
61 info!(
62 "received synced checkpoint: {:?}",
63 checkpoint.sequence_number
64 );
65 if checkpoint.end_of_epoch_data.is_some() {
66 Some((checkpoint, None))
67 } else {
68 state.current_seq = seq + 1;
69 Some((checkpoint, Some(state)))
70 }
71 }
72 }
73 })
74}
75
76async fn wait_for_checkpoint(
77 checkpoint_store: &CheckpointStore,
78 seq: CheckpointSequenceNumber,
79 warning_timeout: Duration,
80 panic_timeout: Option<Duration>,
81) -> VerifiedCheckpoint {
82 debug!("waiting for checkpoint: {:?}", seq);
83 loop {
84 tokio::select! {
85 checkpoint = checkpoint_store.notify_read_synced_checkpoint(seq) => {
86 return checkpoint;
87 }
88
89 _ = tokio::time::sleep(warning_timeout) => {
90 warn!(
91 "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {seq}",
92 );
93 }
94
95 _ = panic_timeout
96 .map(|d| Either::Left(tokio::time::sleep(d)))
97 .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
98 fatal!("No new synced checkpoints received for {panic_timeout:?}");
99 },
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy)]
105pub struct CheckpointTimeoutConfig {
106 pub panic_timeout: Option<Duration>,
107 pub warning_timeout: Duration,
108}
109
110thread_local! {
114 static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
115 const { once_cell::sync::OnceCell::new() };
116}
117
118#[cfg(msim)]
119pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
120 SCHEDULING_TIMEOUT.with(|s| {
121 s.set(config).expect("SchedulingTimeoutConfig already set");
122 });
123}
124
125fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
126 fn inner() -> CheckpointTimeoutConfig {
127 let panic_timeout: Option<Duration> = if cfg!(msim) {
128 Some(Duration::from_secs(45))
129 } else {
130 std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
131 .ok()
132 .and_then(|s| s.parse::<u64>().ok())
133 .map(Duration::from_millis)
134 };
135
136 let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
137 .ok()
138 .and_then(|s| s.parse::<u64>().ok())
139 .map(Duration::from_millis)
140 .unwrap_or(Duration::from_secs(5));
141
142 CheckpointTimeoutConfig {
143 panic_timeout,
144 warning_timeout,
145 }
146 }
147
148 SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
149}
150
151pub(super) fn assert_not_forked(
152 checkpoint: &VerifiedCheckpoint,
153 tx_digest: &TransactionDigest,
154 expected_digest: &TransactionEffectsDigest,
155 actual_effects_digest: &TransactionEffectsDigest,
156 cache_reader: &dyn TransactionCacheRead,
157) {
158 if *expected_digest != *actual_effects_digest {
159 let actual_effects = cache_reader
160 .get_executed_effects(tx_digest)
161 .expect("actual effects should exist");
162
163 error!(
165 ?checkpoint,
166 ?tx_digest,
167 ?expected_digest,
168 ?actual_effects,
169 "fork detected!"
170 );
171 panic!(
172 "When executing checkpoint {}, transaction {} \
173 is expected to have effects digest {}, but got {}!",
174 checkpoint.sequence_number(),
175 tx_digest,
176 expected_digest,
177 actual_effects_digest,
178 );
179 }
180}
181
182pub(super) fn assert_checkpoint_not_forked(
183 locally_built_checkpoint: &CheckpointSummary,
184 verified_checkpoint: &VerifiedCheckpoint,
185 checkpoint_store: &CheckpointStore,
186) {
187 assert_eq!(
188 locally_built_checkpoint.sequence_number(),
189 verified_checkpoint.sequence_number(),
190 "Checkpoint sequence numbers must match"
191 );
192
193 if locally_built_checkpoint.digest() == *verified_checkpoint.digest() {
194 return;
195 }
196
197 let verified_checkpoint_summary = verified_checkpoint.data();
198
199 if locally_built_checkpoint.content_digest == verified_checkpoint_summary.content_digest {
200 fatal!(
202 "Checkpoint fork detected in header! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
203 locally_built_checkpoint,
204 verified_checkpoint
205 );
206 } else {
207 let local_contents = checkpoint_store
208 .get_checkpoint_contents(&locally_built_checkpoint.content_digest)
209 .expect("db error")
210 .expect("contents must exist if checkpoint was built locally!");
211
212 let verified_contents = checkpoint_store
213 .get_checkpoint_contents(&verified_checkpoint_summary.content_digest)
214 .expect("db error")
215 .expect("contents must exist if checkpoint has been synced!");
216
217 let mut local_contents_iter = local_contents.iter();
219 let mut verified_contents_iter = verified_contents.iter();
220 let mut pos = 0;
221
222 loop {
223 let local_digests = local_contents_iter.next();
224 let verified_digests = verified_contents_iter.next();
225
226 match (local_digests, verified_digests) {
227 (Some(local_digests), Some(verified_digests)) => {
228 if local_digests != verified_digests {
229 fatal!(
230 "Checkpoint contents diverge at position {pos}! {local_digests:?} != {verified_digests:?}"
231 );
232 }
233 }
234 (None, Some(_)) | (Some(_), None) => {
235 fatal!(
236 "Checkpoint contents have different lengths! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
237 locally_built_checkpoint,
238 verified_checkpoint
239 );
240 }
241 (None, None) => {
242 break;
243 }
244 }
245 pos += 1;
246 }
247
248 fatal!(
249 "Checkpoint fork detected in contents! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
250 locally_built_checkpoint,
251 verified_checkpoint
252 );
253 }
254}
255
256struct SequenceWatch {
259 watch: watch::Sender<CheckpointSequenceNumber>,
260}
261
262impl SequenceWatch {
263 fn new(starting_seq: CheckpointSequenceNumber) -> Self {
264 Self {
265 watch: watch::channel(starting_seq).0,
266 }
267 }
268
269 async fn wait_for(&self, seq: CheckpointSequenceNumber) {
270 let mut ready_seq = self.watch.subscribe();
271 while *ready_seq.borrow_and_update() < seq {
272 ready_seq.changed().await.expect("sender cannot be dropped");
273 }
274 }
275
276 fn signal(&self, new_ready: CheckpointSequenceNumber) {
277 self.watch.send_modify(|prev| {
278 assert_eq!(*prev + 1, new_ready);
279 *prev = new_ready;
280 });
281 }
282}
283
284#[derive(
329 Debug,
330 Clone,
331 Copy,
332 PartialEq,
333 Eq,
334 PartialOrd,
335 Ord,
336 strum::EnumIter,
337 strum_macros::VariantNames,
338 strum_macros::FromRepr,
339 strum_macros::EnumCount,
340)]
341
342pub(crate) enum PipelineStage {
344 ExecuteTransactions = 0,
345 WaitForTransactions = 1,
346 FinalizeTransactions = 2,
347 ProcessCheckpointData = 3,
348 BuildDbBatch = 4,
349 CommitTransactionOutputs = 5,
350 FinalizeCheckpoint = 6,
351 UpdateRpcIndex = 7,
352 BumpHighestExecutedCheckpoint = 8,
353 End = 9,
354}
355
356impl PipelineStage {
357 pub const fn first() -> Self {
358 Self::ExecuteTransactions
359 }
360
361 fn next(self) -> Self {
362 assert!(self < Self::End);
363 Self::from_repr((self as usize) + 1).unwrap()
364 }
365
366 fn as_str(self) -> &'static str {
367 Self::VARIANTS[self as usize]
368 }
369}
370
371pub(super) struct PipelineHandle {
374 seq: CheckpointSequenceNumber,
375 cur_stage: PipelineStage,
376 stages: Arc<PipelineStages>,
377 timer: Instant,
378}
379
380impl PipelineHandle {
381 fn new(stages: Arc<PipelineStages>, seq: CheckpointSequenceNumber) -> Self {
382 Self {
383 seq,
384 cur_stage: PipelineStage::first(),
385 stages,
386 timer: Instant::now(),
387 }
388 }
389
390 async fn begin(&mut self) {
392 assert_eq!(self.cur_stage, PipelineStage::first(), "cannot begin twice");
393 self.stages.begin(self.cur_stage, self.seq).await;
394 self.timer = Instant::now();
395 }
396
397 pub async fn finish_stage(&mut self, finished: PipelineStage) {
399 let duration = self.timer.elapsed();
400 self.stages
401 .metrics
402 .stage_active_duration_ns
403 .with_label_values(&[self.cur_stage.as_str()])
404 .inc_by(duration.as_nanos() as u64);
405 assert_eq!(finished, self.cur_stage, "cannot skip stages");
406
407 self.stages.end(self.cur_stage, self.seq);
408
409 self.cur_stage = self.cur_stage.next();
410 if self.cur_stage != PipelineStage::End {
411 self.stages.begin(self.cur_stage, self.seq).await;
412 }
413 }
414
415 pub async fn skip_to(&mut self, stage: PipelineStage) {
417 assert!(self.cur_stage < stage);
418 while self.cur_stage < stage {
419 self.finish_stage(self.cur_stage).await;
420 }
421 }
422}
423
424pub(super) struct PipelineStages {
427 stages: [SequenceWatch; PipelineStage::End as usize],
428 metrics: Arc<CheckpointExecutorMetrics>,
429}
430
431impl PipelineStages {
432 pub fn new(
433 starting_seq: CheckpointSequenceNumber,
434 metrics: Arc<CheckpointExecutorMetrics>,
435 ) -> Arc<Self> {
436 Arc::new(Self {
437 stages: std::array::from_fn(|_| SequenceWatch::new(starting_seq)),
438 metrics,
439 })
440 }
441
442 pub async fn handle(self: &Arc<Self>, seq: CheckpointSequenceNumber) -> PipelineHandle {
444 let mut handle = PipelineHandle::new(self.clone(), seq);
445 handle.begin().await;
446 handle
447 }
448
449 async fn begin(&self, stage: PipelineStage, seq: CheckpointSequenceNumber) {
451 debug!(?stage, ?seq, "begin stage");
452 let start = Instant::now();
453 self.stages[stage as usize].wait_for(seq).await;
454 let duration = start.elapsed();
455 self.metrics
456 .stage_wait_duration_ns
457 .with_label_values(&[stage.as_str()])
458 .inc_by(duration.as_nanos() as u64);
459 }
460
461 fn end(&self, stage: PipelineStage, seq: CheckpointSequenceNumber) {
463 debug!(?stage, ?seq, "end stage");
464 self.stages[stage as usize].signal(seq + 1);
465 }
466}
467
468#[derive(Default)]
469pub(super) struct TPSEstimator {
470 last_update: Option<Instant>,
471 transaction_count: u64,
472 tps: f64,
473}
474
475impl TPSEstimator {
476 pub fn update(&mut self, now: Instant, transaction_count: u64) -> f64 {
477 if let Some(last_update) = self.last_update
478 && now > last_update
479 {
480 let delta_t = now.duration_since(last_update);
481 let delta_c = transaction_count - self.transaction_count;
482 let tps = delta_c as f64 / delta_t.as_secs_f64();
483 self.tps = self.tps * 0.9 + tps * 0.1;
484 }
485
486 self.last_update = Some(now);
487 self.transaction_count = transaction_count;
488 self.tps
489 }
490}
491
492#[cfg(test)]
493mod test {
494 use rand::{Rng, thread_rng};
495 use std::collections::HashMap;
496 use sui_macros::sim_test;
497
498 use super::*;
499 use futures::future::join_all;
500 use parking_lot::Mutex;
501
502 #[tokio::test]
503 #[should_panic(expected = "cannot skip stages")]
504 async fn test_skip_pipeline_stages() {
505 let stages = PipelineStages::new(0, CheckpointExecutorMetrics::new_for_tests());
506 let mut handle = stages.handle(0).await;
507 handle
508 .finish_stage(PipelineStage::WaitForTransactions)
509 .await;
510 }
511
512 #[sim_test]
513 async fn test_pipeline_stages() {
514 let stages = PipelineStages::new(0, CheckpointExecutorMetrics::new_for_tests());
515
516 let output_by_stage = Arc::new(Mutex::new(HashMap::new()));
517 let output_by_order = Arc::new(Mutex::new(Vec::new()));
518
519 let mut tasks = Vec::new();
520
521 for seq in 0..30 {
522 let stages = stages.clone();
523 let output_by_stage = output_by_stage.clone();
524 let output_by_order = output_by_order.clone();
525 tasks.push(tokio::spawn(async move {
526 let mut handle = stages.handle(seq).await;
527 let mut val = 0;
528 let mut get_next_val = || {
529 val += 1;
530 val
531 };
532
533 async fn finish_stage(handle: &mut PipelineHandle, stage: PipelineStage) {
534 handle.finish_stage(stage).await;
535 let sleep_time = Duration::from_millis(thread_rng().gen_range(0..10));
536 tokio::time::sleep(sleep_time).await;
537 }
538
539 async fn push_output(
540 seq: CheckpointSequenceNumber,
541 get_next_val: &mut impl FnMut() -> u64,
542 output_by_stage: &Arc<Mutex<HashMap<u64, Vec<CheckpointSequenceNumber>>>>,
543 output_by_order: &Arc<Mutex<Vec<u64>>>,
544 ) {
545 let sleep_time = Duration::from_millis(thread_rng().gen_range(0..10));
546 tokio::time::sleep(sleep_time).await;
547 let val = get_next_val();
548 debug!("pushing output ({val}) for seq: {}", seq);
549 output_by_stage.lock().entry(val).or_default().push(seq);
550 output_by_order.lock().push(val);
551 }
552
553 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
554 finish_stage(&mut handle, PipelineStage::ExecuteTransactions).await;
555 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
556 finish_stage(&mut handle, PipelineStage::WaitForTransactions).await;
557 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
558 finish_stage(&mut handle, PipelineStage::FinalizeTransactions).await;
559 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
560 finish_stage(&mut handle, PipelineStage::ProcessCheckpointData).await;
561 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
562 finish_stage(&mut handle, PipelineStage::BuildDbBatch).await;
563 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
564 finish_stage(&mut handle, PipelineStage::CommitTransactionOutputs).await;
565 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
566 finish_stage(&mut handle, PipelineStage::FinalizeCheckpoint).await;
567 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
568 finish_stage(&mut handle, PipelineStage::UpdateRpcIndex).await;
569 push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
570 finish_stage(&mut handle, PipelineStage::BumpHighestExecutedCheckpoint).await;
571 }));
572 }
573
574 join_all(tasks).await;
575
576 let output_by_stage = output_by_stage.lock();
577 let output_by_order = output_by_order.lock();
578 for (_, seqs) in output_by_stage.iter() {
580 assert_eq!(seqs, &((0..30).collect::<Vec<_>>()));
581 }
582
583 let mut found_out_of_order = false;
587 for window in output_by_order.windows(2) {
588 if window[0] > window[1] {
589 found_out_of_order = true;
590 break;
591 }
592 }
593 assert!(
594 found_out_of_order,
595 "Expected to find evidence of concurrent execution in output sequence, but all elements were in order"
596 );
597 }
598}