sui_aws_orchestrator/
faults.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    fmt::{Debug, Display},
6    time::Duration,
7};
8
9use serde::{Deserialize, Serialize};
10
11use crate::client::Instance;
12
13#[derive(Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
14pub enum FaultsType {
15    /// Permanently crash the maximum number of nodes from the beginning.
16    Permanent { faults: usize },
17    /// Progressively crash and recover nodes.
18    CrashRecovery {
19        max_faults: usize,
20        interval: Duration,
21    },
22}
23
24impl Default for FaultsType {
25    fn default() -> Self {
26        Self::Permanent { faults: 0 }
27    }
28}
29
30impl Debug for FaultsType {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Self::Permanent { faults } => write!(f, "{faults}"),
34            Self::CrashRecovery {
35                max_faults,
36                interval,
37            } => write!(f, "{max_faults}-{}cr", interval.as_secs()),
38        }
39    }
40}
41
42impl Display for FaultsType {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            Self::Permanent { faults } => {
46                if *faults == 0 {
47                    write!(f, "no faults")
48                } else {
49                    write!(f, "{faults} crashed")
50                }
51            }
52            Self::CrashRecovery {
53                max_faults,
54                interval,
55            } => write!(f, "{max_faults} crash-recovery, {}s", interval.as_secs()),
56        }
57    }
58}
59
60/// The actions to apply to the testbed, i.e., which instances to crash and recover.
61#[derive(Default)]
62pub struct CrashRecoveryAction {
63    /// The instances to boot.
64    pub boot: Vec<Instance>,
65    /// The instances to kill.
66    pub kill: Vec<Instance>,
67}
68
69impl Display for CrashRecoveryAction {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let booted = self.boot.len();
72        let killed = self.kill.len();
73
74        if self.boot.is_empty() {
75            write!(f, "{killed} node(s) killed")
76        } else if self.kill.is_empty() {
77            write!(f, "{booted} node(s) recovered")
78        } else {
79            write!(f, "{killed} node(s) killed and {booted} node(s) recovered")
80        }
81    }
82}
83
84impl CrashRecoveryAction {
85    pub fn boot(instances: Vec<Instance>) -> Self {
86        Self {
87            boot: instances,
88            kill: Vec::new(),
89        }
90    }
91
92    pub fn kill(instances: Vec<Instance>) -> Self {
93        Self {
94            boot: Vec::new(),
95            kill: instances,
96        }
97    }
98
99    pub fn no_op() -> Self {
100        Self::default()
101    }
102}
103
104pub struct CrashRecoverySchedule {
105    /// The number of faulty nodes and the crash-recovery pattern to follow.
106    faults_type: FaultsType,
107    /// The available instances.
108    instances: Vec<Instance>,
109    /// The current number of dead nodes.
110    dead: usize,
111}
112
113impl CrashRecoverySchedule {
114    pub fn new(faults_type: FaultsType, instances: Vec<Instance>) -> Self {
115        Self {
116            faults_type,
117            instances,
118            dead: 0,
119        }
120    }
121    pub fn update(&mut self) -> CrashRecoveryAction {
122        match &self.faults_type {
123            // Permanently crash the specified number of nodes.
124            FaultsType::Permanent { faults } => {
125                if self.dead == 0 {
126                    self.dead = *faults;
127                    CrashRecoveryAction::kill(self.instances.clone().drain(0..*faults).collect())
128                } else {
129                    CrashRecoveryAction::no_op()
130                }
131            }
132
133            // Periodically crash and recover nodes.
134            FaultsType::CrashRecovery { max_faults, .. } => {
135                let min_faults = max_faults / 3;
136
137                // Recover all nodes if we already crashed them all.
138                if self.dead == *max_faults {
139                    let instances: Vec<_> = self.instances.clone().drain(0..*max_faults).collect();
140                    self.dead = 0;
141                    CrashRecoveryAction::boot(instances)
142                }
143                // Otherwise crash a few nodes at the time.
144                else {
145                    let (l, h) = if self.dead == 0 && min_faults != 0 {
146                        (0, min_faults)
147                    } else if self.dead == min_faults && min_faults != 0 {
148                        (min_faults, 2 * min_faults)
149                    } else {
150                        (2 * min_faults, *max_faults)
151                    };
152
153                    let instances: Vec<_> = self.instances.clone().drain(l..h).collect();
154                    self.dead += h - l;
155                    CrashRecoveryAction::kill(instances)
156                }
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod faults_tests {
164    use std::time::Duration;
165
166    use crate::client::Instance;
167
168    use super::{CrashRecoverySchedule, FaultsType};
169
170    #[test]
171    fn crash_recovery_1_fault() {
172        let max_faults = 1;
173        let interval = Duration::from_secs(60);
174        let faulty = (0..max_faults)
175            .map(|i| Instance::new_for_test(i.to_string()))
176            .collect();
177        let mut schedule = CrashRecoverySchedule::new(
178            FaultsType::CrashRecovery {
179                max_faults,
180                interval,
181            },
182            faulty,
183        );
184
185        let action = schedule.update();
186        assert_eq!(action.boot.len(), 0);
187        assert_eq!(action.kill.len(), 1);
188
189        let action = schedule.update();
190        assert_eq!(action.boot.len(), 1);
191        assert_eq!(action.kill.len(), 0);
192
193        let action = schedule.update();
194        assert_eq!(action.boot.len(), 0);
195        assert_eq!(action.kill.len(), 1);
196
197        let action = schedule.update();
198        assert_eq!(action.boot.len(), 1);
199        assert_eq!(action.kill.len(), 0);
200    }
201
202    #[test]
203    fn crash_recovery_2_faults() {
204        let max_faults = 2;
205        let interval = Duration::from_secs(60);
206        let faulty = (0..max_faults)
207            .map(|i| Instance::new_for_test(i.to_string()))
208            .collect();
209        let mut schedule = CrashRecoverySchedule::new(
210            FaultsType::CrashRecovery {
211                max_faults,
212                interval,
213            },
214            faulty,
215        );
216
217        let action = schedule.update();
218        assert_eq!(action.boot.len(), 0);
219        assert_eq!(action.kill.len(), 2);
220
221        let action = schedule.update();
222        assert_eq!(action.boot.len(), 2);
223        assert_eq!(action.kill.len(), 0);
224
225        let action = schedule.update();
226        assert_eq!(action.boot.len(), 0);
227        assert_eq!(action.kill.len(), 2);
228
229        let action = schedule.update();
230        assert_eq!(action.boot.len(), 2);
231        assert_eq!(action.kill.len(), 0);
232    }
233
234    #[test]
235    fn crash_recovery() {
236        let interval = Duration::from_secs(60);
237        for i in 3..33 {
238            let max_faults = i;
239            let min_faults = max_faults / 3;
240
241            let instances = (0..max_faults)
242                .map(|i| Instance::new_for_test(i.to_string()))
243                .collect();
244            let mut schedule = CrashRecoverySchedule::new(
245                FaultsType::CrashRecovery {
246                    max_faults,
247                    interval,
248                },
249                instances,
250            );
251
252            let action = schedule.update();
253            assert_eq!(action.boot.len(), 0);
254            assert_eq!(action.kill.len(), min_faults);
255
256            let action = schedule.update();
257            assert_eq!(action.boot.len(), 0);
258            assert_eq!(action.kill.len(), min_faults);
259
260            let action = schedule.update();
261            assert_eq!(action.boot.len(), 0);
262            assert_eq!(action.kill.len(), max_faults - 2 * min_faults);
263
264            let action = schedule.update();
265            assert_eq!(action.boot.len(), max_faults);
266            assert_eq!(action.kill.len(), 0);
267
268            let action = schedule.update();
269            assert_eq!(action.boot.len(), 0);
270            assert_eq!(action.kill.len(), min_faults);
271        }
272    }
273}