1use std::{
5 fmt::{Debug, Display},
6 time::Duration,
7};
8
9use serde::{Deserialize, Serialize};
10
11use crate::client::Instance;
12
13#[derive(Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
14pub enum FaultsType {
15 Permanent { faults: usize },
17 CrashRecovery {
19 max_faults: usize,
20 interval: Duration,
21 },
22}
23
24impl Default for FaultsType {
25 fn default() -> Self {
26 Self::Permanent { faults: 0 }
27 }
28}
29
30impl Debug for FaultsType {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 Self::Permanent { faults } => write!(f, "{faults}"),
34 Self::CrashRecovery {
35 max_faults,
36 interval,
37 } => write!(f, "{max_faults}-{}cr", interval.as_secs()),
38 }
39 }
40}
41
42impl Display for FaultsType {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 Self::Permanent { faults } => {
46 if *faults == 0 {
47 write!(f, "no faults")
48 } else {
49 write!(f, "{faults} crashed")
50 }
51 }
52 Self::CrashRecovery {
53 max_faults,
54 interval,
55 } => write!(f, "{max_faults} crash-recovery, {}s", interval.as_secs()),
56 }
57 }
58}
59
60#[derive(Default)]
62pub struct CrashRecoveryAction {
63 pub boot: Vec<Instance>,
65 pub kill: Vec<Instance>,
67}
68
69impl Display for CrashRecoveryAction {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let booted = self.boot.len();
72 let killed = self.kill.len();
73
74 if self.boot.is_empty() {
75 write!(f, "{killed} node(s) killed")
76 } else if self.kill.is_empty() {
77 write!(f, "{booted} node(s) recovered")
78 } else {
79 write!(f, "{killed} node(s) killed and {booted} node(s) recovered")
80 }
81 }
82}
83
84impl CrashRecoveryAction {
85 pub fn boot(instances: Vec<Instance>) -> Self {
86 Self {
87 boot: instances,
88 kill: Vec::new(),
89 }
90 }
91
92 pub fn kill(instances: Vec<Instance>) -> Self {
93 Self {
94 boot: Vec::new(),
95 kill: instances,
96 }
97 }
98
99 pub fn no_op() -> Self {
100 Self::default()
101 }
102}
103
104pub struct CrashRecoverySchedule {
105 faults_type: FaultsType,
107 instances: Vec<Instance>,
109 dead: usize,
111}
112
113impl CrashRecoverySchedule {
114 pub fn new(faults_type: FaultsType, instances: Vec<Instance>) -> Self {
115 Self {
116 faults_type,
117 instances,
118 dead: 0,
119 }
120 }
121 pub fn update(&mut self) -> CrashRecoveryAction {
122 match &self.faults_type {
123 FaultsType::Permanent { faults } => {
125 if self.dead == 0 {
126 self.dead = *faults;
127 CrashRecoveryAction::kill(self.instances.clone().drain(0..*faults).collect())
128 } else {
129 CrashRecoveryAction::no_op()
130 }
131 }
132
133 FaultsType::CrashRecovery { max_faults, .. } => {
135 let min_faults = max_faults / 3;
136
137 if self.dead == *max_faults {
139 let instances: Vec<_> = self.instances.clone().drain(0..*max_faults).collect();
140 self.dead = 0;
141 CrashRecoveryAction::boot(instances)
142 }
143 else {
145 let (l, h) = if self.dead == 0 && min_faults != 0 {
146 (0, min_faults)
147 } else if self.dead == min_faults && min_faults != 0 {
148 (min_faults, 2 * min_faults)
149 } else {
150 (2 * min_faults, *max_faults)
151 };
152
153 let instances: Vec<_> = self.instances.clone().drain(l..h).collect();
154 self.dead += h - l;
155 CrashRecoveryAction::kill(instances)
156 }
157 }
158 }
159 }
160}
161
162#[cfg(test)]
163mod faults_tests {
164 use std::time::Duration;
165
166 use crate::client::Instance;
167
168 use super::{CrashRecoverySchedule, FaultsType};
169
170 #[test]
171 fn crash_recovery_1_fault() {
172 let max_faults = 1;
173 let interval = Duration::from_secs(60);
174 let faulty = (0..max_faults)
175 .map(|i| Instance::new_for_test(i.to_string()))
176 .collect();
177 let mut schedule = CrashRecoverySchedule::new(
178 FaultsType::CrashRecovery {
179 max_faults,
180 interval,
181 },
182 faulty,
183 );
184
185 let action = schedule.update();
186 assert_eq!(action.boot.len(), 0);
187 assert_eq!(action.kill.len(), 1);
188
189 let action = schedule.update();
190 assert_eq!(action.boot.len(), 1);
191 assert_eq!(action.kill.len(), 0);
192
193 let action = schedule.update();
194 assert_eq!(action.boot.len(), 0);
195 assert_eq!(action.kill.len(), 1);
196
197 let action = schedule.update();
198 assert_eq!(action.boot.len(), 1);
199 assert_eq!(action.kill.len(), 0);
200 }
201
202 #[test]
203 fn crash_recovery_2_faults() {
204 let max_faults = 2;
205 let interval = Duration::from_secs(60);
206 let faulty = (0..max_faults)
207 .map(|i| Instance::new_for_test(i.to_string()))
208 .collect();
209 let mut schedule = CrashRecoverySchedule::new(
210 FaultsType::CrashRecovery {
211 max_faults,
212 interval,
213 },
214 faulty,
215 );
216
217 let action = schedule.update();
218 assert_eq!(action.boot.len(), 0);
219 assert_eq!(action.kill.len(), 2);
220
221 let action = schedule.update();
222 assert_eq!(action.boot.len(), 2);
223 assert_eq!(action.kill.len(), 0);
224
225 let action = schedule.update();
226 assert_eq!(action.boot.len(), 0);
227 assert_eq!(action.kill.len(), 2);
228
229 let action = schedule.update();
230 assert_eq!(action.boot.len(), 2);
231 assert_eq!(action.kill.len(), 0);
232 }
233
234 #[test]
235 fn crash_recovery() {
236 let interval = Duration::from_secs(60);
237 for i in 3..33 {
238 let max_faults = i;
239 let min_faults = max_faults / 3;
240
241 let instances = (0..max_faults)
242 .map(|i| Instance::new_for_test(i.to_string()))
243 .collect();
244 let mut schedule = CrashRecoverySchedule::new(
245 FaultsType::CrashRecovery {
246 max_faults,
247 interval,
248 },
249 instances,
250 );
251
252 let action = schedule.update();
253 assert_eq!(action.boot.len(), 0);
254 assert_eq!(action.kill.len(), min_faults);
255
256 let action = schedule.update();
257 assert_eq!(action.boot.len(), 0);
258 assert_eq!(action.kill.len(), min_faults);
259
260 let action = schedule.update();
261 assert_eq!(action.boot.len(), 0);
262 assert_eq!(action.kill.len(), max_faults - 2 * min_faults);
263
264 let action = schedule.update();
265 assert_eq!(action.boot.len(), max_faults);
266 assert_eq!(action.kill.len(), 0);
267
268 let action = schedule.update();
269 assert_eq!(action.boot.len(), 0);
270 assert_eq!(action.kill.len(), min_faults);
271 }
272 }
273}