sui_core/checkpoints/checkpoint_executor/
utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use crate::checkpoints::CheckpointStore;
7use crate::execution_cache::TransactionCacheRead;
8use futures::{Stream, future::Either};
9use mysten_common::fatal;
10use std::time::Duration;
11use strum::VariantNames;
12use sui_types::{
13    base_types::{TransactionDigest, TransactionEffectsDigest},
14    message_envelope::Message,
15    messages_checkpoint::{CheckpointSequenceNumber, CheckpointSummary, VerifiedCheckpoint},
16};
17use tokio::sync::watch;
18use tracing::{debug, error, info, instrument, warn};
19
20use super::metrics::CheckpointExecutorMetrics;
21
22#[instrument(level = "debug", skip_all)]
23pub(super) fn stream_synced_checkpoints(
24    checkpoint_store: Arc<CheckpointStore>,
25    start_seq: CheckpointSequenceNumber,
26    stop_seq: Option<CheckpointSequenceNumber>,
27) -> impl Stream<Item = VerifiedCheckpoint> + 'static {
28    let scheduling_timeout_config = get_scheduling_timeout();
29    let panic_timeout = scheduling_timeout_config.panic_timeout;
30    let warning_timeout = scheduling_timeout_config.warning_timeout;
31
32    struct State {
33        current_seq: CheckpointSequenceNumber,
34        checkpoint_store: Arc<CheckpointStore>,
35        warning_timeout: Duration,
36        panic_timeout: Option<Duration>,
37        stop_seq: Option<CheckpointSequenceNumber>,
38    }
39
40    let state = State {
41        current_seq: start_seq,
42        checkpoint_store,
43        warning_timeout,
44        panic_timeout,
45        stop_seq,
46    };
47
48    futures::stream::unfold(Some(state), |state| async move {
49        match state {
50            None => None,
51            Some(state) if state.current_seq > state.stop_seq.unwrap_or(u64::MAX) => None,
52            Some(mut state) => {
53                let seq = state.current_seq;
54                let checkpoint = wait_for_checkpoint(
55                    &state.checkpoint_store,
56                    seq,
57                    state.warning_timeout,
58                    state.panic_timeout,
59                )
60                .await;
61                info!(
62                    "received synced checkpoint: {:?}",
63                    checkpoint.sequence_number
64                );
65                if checkpoint.end_of_epoch_data.is_some() {
66                    Some((checkpoint, None))
67                } else {
68                    state.current_seq = seq + 1;
69                    Some((checkpoint, Some(state)))
70                }
71            }
72        }
73    })
74}
75
76async fn wait_for_checkpoint(
77    checkpoint_store: &CheckpointStore,
78    seq: CheckpointSequenceNumber,
79    warning_timeout: Duration,
80    panic_timeout: Option<Duration>,
81) -> VerifiedCheckpoint {
82    debug!("waiting for checkpoint: {:?}", seq);
83    loop {
84        tokio::select! {
85            checkpoint = checkpoint_store.notify_read_synced_checkpoint(seq) => {
86                return checkpoint;
87            }
88
89            _ = tokio::time::sleep(warning_timeout) => {
90                warn!(
91                    "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {seq}",
92                );
93            }
94
95            _ = panic_timeout
96                        .map(|d| Either::Left(tokio::time::sleep(d)))
97                        .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
98                fatal!("No new synced checkpoints received for {panic_timeout:?}");
99            },
100        }
101    }
102}
103
104#[derive(Debug, Clone, Copy)]
105pub struct CheckpointTimeoutConfig {
106    pub panic_timeout: Option<Duration>,
107    pub warning_timeout: Duration,
108}
109
110// We use a thread local so that the config can be overridden on a per-test basis. This means
111// that get_scheduling_timeout() can be called multiple times in a multithreaded context, but
112// the function is still very cheap to call so this is okay.
113thread_local! {
114    static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
115        const { once_cell::sync::OnceCell::new() };
116}
117
118#[cfg(msim)]
119pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
120    SCHEDULING_TIMEOUT.with(|s| {
121        s.set(config).expect("SchedulingTimeoutConfig already set");
122    });
123}
124
125fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
126    fn inner() -> CheckpointTimeoutConfig {
127        let panic_timeout: Option<Duration> = if cfg!(msim) {
128            Some(Duration::from_secs(45))
129        } else {
130            std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
131                .ok()
132                .and_then(|s| s.parse::<u64>().ok())
133                .map(Duration::from_millis)
134        };
135
136        let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
137            .ok()
138            .and_then(|s| s.parse::<u64>().ok())
139            .map(Duration::from_millis)
140            .unwrap_or(Duration::from_secs(5));
141
142        CheckpointTimeoutConfig {
143            panic_timeout,
144            warning_timeout,
145        }
146    }
147
148    SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
149}
150
151pub(super) fn assert_not_forked(
152    checkpoint: &VerifiedCheckpoint,
153    tx_digest: &TransactionDigest,
154    expected_digest: &TransactionEffectsDigest,
155    actual_effects_digest: &TransactionEffectsDigest,
156    cache_reader: &dyn TransactionCacheRead,
157) {
158    if *expected_digest != *actual_effects_digest {
159        let actual_effects = cache_reader
160            .get_executed_effects(tx_digest)
161            .expect("actual effects should exist");
162
163        // log observed effects (too big for panic message) and then panic.
164        error!(
165            ?checkpoint,
166            ?tx_digest,
167            ?expected_digest,
168            ?actual_effects,
169            "fork detected!"
170        );
171        panic!(
172            "When executing checkpoint {}, transaction {} \
173            is expected to have effects digest {}, but got {}!",
174            checkpoint.sequence_number(),
175            tx_digest,
176            expected_digest,
177            actual_effects_digest,
178        );
179    }
180}
181
182pub(super) fn assert_checkpoint_not_forked(
183    locally_built_checkpoint: &CheckpointSummary,
184    verified_checkpoint: &VerifiedCheckpoint,
185    checkpoint_store: &CheckpointStore,
186) {
187    assert_eq!(
188        locally_built_checkpoint.sequence_number(),
189        verified_checkpoint.sequence_number(),
190        "Checkpoint sequence numbers must match"
191    );
192
193    if locally_built_checkpoint.digest() == *verified_checkpoint.digest() {
194        return;
195    }
196
197    let verified_checkpoint_summary = verified_checkpoint.data();
198
199    if locally_built_checkpoint.content_digest == verified_checkpoint_summary.content_digest {
200        // fork is in the checkpoint header
201        fatal!(
202            "Checkpoint fork detected in header! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
203            locally_built_checkpoint,
204            verified_checkpoint
205        );
206    } else {
207        let local_contents = checkpoint_store
208            .get_checkpoint_contents(&locally_built_checkpoint.content_digest)
209            .expect("db error")
210            .expect("contents must exist if checkpoint was built locally!");
211
212        let verified_contents = checkpoint_store
213            .get_checkpoint_contents(&verified_checkpoint_summary.content_digest)
214            .expect("db error")
215            .expect("contents must exist if checkpoint has been synced!");
216
217        // fork is in the checkpoint contents
218        let mut local_contents_iter = local_contents.iter();
219        let mut verified_contents_iter = verified_contents.iter();
220        let mut pos = 0;
221
222        loop {
223            let local_digests = local_contents_iter.next();
224            let verified_digests = verified_contents_iter.next();
225
226            match (local_digests, verified_digests) {
227                (Some(local_digests), Some(verified_digests)) => {
228                    if local_digests != verified_digests {
229                        fatal!(
230                            "Checkpoint contents diverge at position {pos}! {local_digests:?} != {verified_digests:?}"
231                        );
232                    }
233                }
234                (None, Some(_)) | (Some(_), None) => {
235                    fatal!(
236                        "Checkpoint contents have different lengths! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
237                        locally_built_checkpoint,
238                        verified_checkpoint
239                    );
240                }
241                (None, None) => {
242                    break;
243                }
244            }
245            pos += 1;
246        }
247
248        fatal!(
249            "Checkpoint fork detected in contents! Locally built checkpoint: {:?}, verified checkpoint: {:?}",
250            locally_built_checkpoint,
251            verified_checkpoint
252        );
253    }
254}
255
256/// SequenceWatch is just a wrapper around a tokio::watch that can wait for a
257/// specific sequence number, instead of waiting for any change at all.
258struct SequenceWatch {
259    watch: watch::Sender<CheckpointSequenceNumber>,
260}
261
262impl SequenceWatch {
263    fn new(starting_seq: CheckpointSequenceNumber) -> Self {
264        Self {
265            watch: watch::channel(starting_seq).0,
266        }
267    }
268
269    async fn wait_for(&self, seq: CheckpointSequenceNumber) {
270        let mut ready_seq = self.watch.subscribe();
271        while *ready_seq.borrow_and_update() < seq {
272            ready_seq.changed().await.expect("sender cannot be dropped");
273        }
274    }
275
276    fn signal(&self, new_ready: CheckpointSequenceNumber) {
277        self.watch.send_modify(|prev| {
278            assert_eq!(*prev + 1, new_ready);
279            *prev = new_ready;
280        });
281    }
282}
283
284// Pipeline system for coordinating concurrent, pipelined execution without having to break code
285// down into a one-stage-per-function style.
286//
287// Pipeline concurrency can be achieved when you have the following sort of execution dependency.
288//
289//      0   A → B → C → D
290//          ↓   ↓   ↓   ↓
291//      1   A → B → C → D
292//          ↓   ↓   ↓   ↓
293//      2   A → B → C → D
294//          ↓   ↓   ↓   ↓
295//      3   A → B → C → D
296//
297// In this case, we have the following requirements:
298// - Stage A for seq i must complete before Stage A of seq i+1 begins.
299// - Stage A for seq i must complete before Stage B of seq i begins.
300// - (and likewise for B, C, D)
301// - But, crucially, there is no dependency from (i, D) to (i+1, A)! In
302//   other words, we can being sequence i+1 before i is completely finished.
303//
304// For CheckpointExecutor, we can see that this sort of pipelining can work. For instance,
305// we require that the transaction outputs are committed in order by checkpoint sequence number.
306// But there is no requirement that we commit outputs for seq i before we begin executing transactions
307// for seq i+1.
308//
309// The code here allows you to write pipelined executions without breaking up every stage into
310// its own function. Instead you can write monolithic functions like:
311//
312//       async fn execute_seq() {
313//           pipeline.begin().await;
314//           // do first stage
315//           pipeline.finish_stage(FirstStage).await;
316//           // do second stage
317//           pipeline.finish_stage(SecondStage).await;
318//           ...
319//       }
320//
321// Then, simply run many instances of this function concurrently, and the pipeline ordering
322// will be maintained.
323//
324// Currently this code is not generic with respect to the names of the stages. We can make the
325// stage enum a generic param when we need to use this code in other places.
326
327/// Names of the pipeline stages for CheckpointExecutor.
328#[derive(
329    Debug,
330    Clone,
331    Copy,
332    PartialEq,
333    Eq,
334    PartialOrd,
335    Ord,
336    strum::EnumIter,
337    strum_macros::VariantNames,
338    strum_macros::FromRepr,
339    strum_macros::EnumCount,
340)]
341
342/// Names of the pipeline stages for CheckpointExecutor.
343pub(crate) enum PipelineStage {
344    ExecuteTransactions = 0,
345    WaitForTransactions = 1,
346    FinalizeTransactions = 2,
347    ProcessCheckpointData = 3,
348    BuildDbBatch = 4,
349    CommitTransactionOutputs = 5,
350    FinalizeCheckpoint = 6,
351    UpdateRpcIndex = 7,
352    BumpHighestExecutedCheckpoint = 8,
353    End = 9,
354}
355
356impl PipelineStage {
357    pub const fn first() -> Self {
358        Self::ExecuteTransactions
359    }
360
361    fn next(self) -> Self {
362        assert!(self < Self::End);
363        Self::from_repr((self as usize) + 1).unwrap()
364    }
365
366    fn as_str(self) -> &'static str {
367        Self::VARIANTS[self as usize]
368    }
369}
370
371/// PipelineHandle is used to coordinate one iteration (sequence number) of the pipeline,
372/// starting from the first stage and ending at the last stage.
373pub(super) struct PipelineHandle {
374    seq: CheckpointSequenceNumber,
375    cur_stage: PipelineStage,
376    stages: Arc<PipelineStages>,
377    timer: Instant,
378}
379
380impl PipelineHandle {
381    fn new(stages: Arc<PipelineStages>, seq: CheckpointSequenceNumber) -> Self {
382        Self {
383            seq,
384            cur_stage: PipelineStage::first(),
385            stages,
386            timer: Instant::now(),
387        }
388    }
389
390    /// Begin at the first stage.
391    async fn begin(&mut self) {
392        assert_eq!(self.cur_stage, PipelineStage::first(), "cannot begin twice");
393        self.stages.begin(self.cur_stage, self.seq).await;
394        self.timer = Instant::now();
395    }
396
397    /// Finish a given stage and begin the next one.
398    pub async fn finish_stage(&mut self, finished: PipelineStage) {
399        let duration = self.timer.elapsed();
400        self.stages
401            .metrics
402            .stage_active_duration_ns
403            .with_label_values(&[self.cur_stage.as_str()])
404            .inc_by(duration.as_nanos() as u64);
405        assert_eq!(finished, self.cur_stage, "cannot skip stages");
406
407        self.stages.end(self.cur_stage, self.seq);
408
409        self.cur_stage = self.cur_stage.next();
410        if self.cur_stage != PipelineStage::End {
411            self.stages.begin(self.cur_stage, self.seq).await;
412        }
413    }
414
415    /// Skip to a given stage.
416    pub async fn skip_to(&mut self, stage: PipelineStage) {
417        assert!(self.cur_stage < stage);
418        while self.cur_stage < stage {
419            self.finish_stage(self.cur_stage).await;
420        }
421    }
422}
423
424/// A collection of watches for each stage. These are the synchronization points
425/// for the pipeline.
426pub(super) struct PipelineStages {
427    stages: [SequenceWatch; PipelineStage::End as usize],
428    metrics: Arc<CheckpointExecutorMetrics>,
429}
430
431impl PipelineStages {
432    pub fn new(
433        starting_seq: CheckpointSequenceNumber,
434        metrics: Arc<CheckpointExecutorMetrics>,
435    ) -> Arc<Self> {
436        Arc::new(Self {
437            stages: std::array::from_fn(|_| SequenceWatch::new(starting_seq)),
438            metrics,
439        })
440    }
441
442    /// Create a new PipelineHandle for the given sequence number.
443    pub async fn handle(self: &Arc<Self>, seq: CheckpointSequenceNumber) -> PipelineHandle {
444        let mut handle = PipelineHandle::new(self.clone(), seq);
445        handle.begin().await;
446        handle
447    }
448
449    /// Wait until (stage, seq - 1) has been completed.
450    async fn begin(&self, stage: PipelineStage, seq: CheckpointSequenceNumber) {
451        debug!(?stage, ?seq, "begin stage");
452        let start = Instant::now();
453        self.stages[stage as usize].wait_for(seq).await;
454        let duration = start.elapsed();
455        self.metrics
456            .stage_wait_duration_ns
457            .with_label_values(&[stage.as_str()])
458            .inc_by(duration.as_nanos() as u64);
459    }
460
461    /// Signal that (stage, seq) has been completed.
462    fn end(&self, stage: PipelineStage, seq: CheckpointSequenceNumber) {
463        debug!(?stage, ?seq, "end stage");
464        self.stages[stage as usize].signal(seq + 1);
465    }
466}
467
468#[derive(Default)]
469pub(super) struct TPSEstimator {
470    last_update: Option<Instant>,
471    transaction_count: u64,
472    tps: f64,
473}
474
475impl TPSEstimator {
476    pub fn update(&mut self, now: Instant, transaction_count: u64) -> f64 {
477        if let Some(last_update) = self.last_update
478            && now > last_update
479        {
480            let delta_t = now.duration_since(last_update);
481            let delta_c = transaction_count - self.transaction_count;
482            let tps = delta_c as f64 / delta_t.as_secs_f64();
483            self.tps = self.tps * 0.9 + tps * 0.1;
484        }
485
486        self.last_update = Some(now);
487        self.transaction_count = transaction_count;
488        self.tps
489    }
490}
491
492#[cfg(test)]
493mod test {
494    use rand::{Rng, thread_rng};
495    use std::collections::HashMap;
496    use sui_macros::sim_test;
497
498    use super::*;
499    use futures::future::join_all;
500    use parking_lot::Mutex;
501
502    #[tokio::test]
503    #[should_panic(expected = "cannot skip stages")]
504    async fn test_skip_pipeline_stages() {
505        let stages = PipelineStages::new(0, CheckpointExecutorMetrics::new_for_tests());
506        let mut handle = stages.handle(0).await;
507        handle
508            .finish_stage(PipelineStage::WaitForTransactions)
509            .await;
510    }
511
512    #[sim_test]
513    async fn test_pipeline_stages() {
514        let stages = PipelineStages::new(0, CheckpointExecutorMetrics::new_for_tests());
515
516        let output_by_stage = Arc::new(Mutex::new(HashMap::new()));
517        let output_by_order = Arc::new(Mutex::new(Vec::new()));
518
519        let mut tasks = Vec::new();
520
521        for seq in 0..30 {
522            let stages = stages.clone();
523            let output_by_stage = output_by_stage.clone();
524            let output_by_order = output_by_order.clone();
525            tasks.push(tokio::spawn(async move {
526                let mut handle = stages.handle(seq).await;
527                let mut val = 0;
528                let mut get_next_val = || {
529                    val += 1;
530                    val
531                };
532
533                async fn finish_stage(handle: &mut PipelineHandle, stage: PipelineStage) {
534                    handle.finish_stage(stage).await;
535                    let sleep_time = Duration::from_millis(thread_rng().gen_range(0..10));
536                    tokio::time::sleep(sleep_time).await;
537                }
538
539                async fn push_output(
540                    seq: CheckpointSequenceNumber,
541                    get_next_val: &mut impl FnMut() -> u64,
542                    output_by_stage: &Arc<Mutex<HashMap<u64, Vec<CheckpointSequenceNumber>>>>,
543                    output_by_order: &Arc<Mutex<Vec<u64>>>,
544                ) {
545                    let sleep_time = Duration::from_millis(thread_rng().gen_range(0..10));
546                    tokio::time::sleep(sleep_time).await;
547                    let val = get_next_val();
548                    debug!("pushing output ({val}) for seq: {}", seq);
549                    output_by_stage.lock().entry(val).or_default().push(seq);
550                    output_by_order.lock().push(val);
551                }
552
553                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
554                finish_stage(&mut handle, PipelineStage::ExecuteTransactions).await;
555                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
556                finish_stage(&mut handle, PipelineStage::WaitForTransactions).await;
557                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
558                finish_stage(&mut handle, PipelineStage::FinalizeTransactions).await;
559                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
560                finish_stage(&mut handle, PipelineStage::ProcessCheckpointData).await;
561                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
562                finish_stage(&mut handle, PipelineStage::BuildDbBatch).await;
563                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
564                finish_stage(&mut handle, PipelineStage::CommitTransactionOutputs).await;
565                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
566                finish_stage(&mut handle, PipelineStage::FinalizeCheckpoint).await;
567                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
568                finish_stage(&mut handle, PipelineStage::UpdateRpcIndex).await;
569                push_output(seq, &mut get_next_val, &output_by_stage, &output_by_order).await;
570                finish_stage(&mut handle, PipelineStage::BumpHighestExecutedCheckpoint).await;
571            }));
572        }
573
574        join_all(tasks).await;
575
576        let output_by_stage = output_by_stage.lock();
577        let output_by_order = output_by_order.lock();
578        // for each stage, assert that the sequences were done in order
579        for (_, seqs) in output_by_stage.iter() {
580            assert_eq!(seqs, &((0..30).collect::<Vec<_>>()));
581        }
582
583        // Verify that the output sequence shows evidence of concurrent execution
584        // Because the output is random, this test could fail by chance, but the
585        // probability is infinitesimal.
586        let mut found_out_of_order = false;
587        for window in output_by_order.windows(2) {
588            if window[0] > window[1] {
589                found_out_of_order = true;
590                break;
591            }
592        }
593        assert!(
594            found_out_of_order,
595            "Expected to find evidence of concurrent execution in output sequence, but all elements were in order"
596        );
597    }
598}