sui_core/authority/
congestion_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fs::{self, File, OpenOptions};
5use std::io::{self, Write};
6use std::path::{Path, PathBuf};
7
8use prost::Message;
9use sui_config::node::CongestionLogConfig;
10
11use crate::consensus_handler::ConsensusCommitInfo;
12
13use super::shared_object_congestion_tracker::FinishedCommitData;
14
15mod proto {
16    include!(concat!(env!("OUT_DIR"), "/sui.logs.rs"));
17}
18
19pub struct CongestionCommitLogger {
20    base_path: PathBuf,
21    max_file_size: u64,
22    max_files: u32,
23    current_file: File,
24    current_file_size: u64,
25    current_suffix: u64,
26}
27
28impl CongestionCommitLogger {
29    pub fn new(config: &CongestionLogConfig) -> io::Result<Self> {
30        // Always start a fresh file on startup to avoid appending
31        // after a potentially corrupt partial write from a crash.
32        let current_suffix = Self::existing_suffixes(&config.path)
33            .into_iter()
34            .max()
35            .map_or(0, |s| s + 1);
36
37        let current_file = OpenOptions::new()
38            .create(true)
39            .write(true)
40            .truncate(true)
41            .open(Self::file_path_for(&config.path, current_suffix))?;
42
43        let logger = Self {
44            base_path: config.path.clone(),
45            max_file_size: config.max_file_size,
46            max_files: config.max_files,
47            current_file,
48            current_file_size: 0,
49            current_suffix,
50        };
51        logger.delete_excess_files();
52        Ok(logger)
53    }
54
55    fn file_path_for(base_path: &Path, suffix: u64) -> PathBuf {
56        let mut path = base_path.as_os_str().to_owned();
57        path.push(format!(".{suffix}"));
58        PathBuf::from(path)
59    }
60
61    fn existing_suffixes(base_path: &Path) -> Vec<u64> {
62        let (Some(dir), Some(stem)) = (base_path.parent(), base_path.file_name()) else {
63            return Vec::new();
64        };
65        let mut prefix = stem.to_owned();
66        prefix.push(".");
67        let prefix = prefix.to_string_lossy().into_owned();
68
69        fs::read_dir(dir)
70            .into_iter()
71            .flatten()
72            .flatten()
73            .filter_map(|e| {
74                e.file_name()
75                    .to_string_lossy()
76                    .strip_prefix(&prefix)?
77                    .parse()
78                    .ok()
79            })
80            .collect()
81    }
82
83    fn delete_excess_files(&self) {
84        let Some(cutoff) = self.current_suffix.checked_sub(self.max_files as u64) else {
85            return;
86        };
87        for n in Self::existing_suffixes(&self.base_path) {
88            if n <= cutoff {
89                let _ = fs::remove_file(Self::file_path_for(&self.base_path, n));
90            }
91        }
92    }
93
94    pub fn write_commit_log(
95        &mut self,
96        epoch: u64,
97        commit_info: &ConsensusCommitInfo,
98        for_randomness: bool,
99        data: &FinishedCommitData,
100    ) {
101        let log = proto::CongestionCommitLog {
102            epoch,
103            round: commit_info.round,
104            timestamp_ms: commit_info.timestamp,
105            commit_budget: data.commit_budget,
106            for_randomness,
107            final_object_execution_costs: data
108                .final_object_execution_costs
109                .iter()
110                .map(|(id, cost)| proto::ObjectCost {
111                    object_id: id.to_vec(),
112                    cost: *cost,
113                })
114                .collect(),
115            transaction_entries: data
116                .log_entries
117                .iter()
118                .map(|entry| proto::TransactionCostEntry {
119                    tx_digest: entry.tx_digest.into_inner().to_vec(),
120                    start_cost: entry.start_cost,
121                    end_cost: entry.end_cost,
122                })
123                .collect(),
124        };
125
126        let buf = log.encode_length_delimited_to_vec();
127        if let Err(e) = self.current_file.write_all(&buf) {
128            tracing::warn!("Failed to write congestion log: {e}");
129            return;
130        }
131        self.current_file_size += buf.len() as u64;
132
133        if self.current_file_size >= self.max_file_size
134            && let Err(e) = self.rotate()
135        {
136            tracing::warn!("Failed to rotate congestion log: {e}");
137        }
138    }
139
140    fn rotate(&mut self) -> io::Result<()> {
141        let next_suffix = self.current_suffix + 1;
142        let new_file = OpenOptions::new()
143            .create(true)
144            .write(true)
145            .truncate(true)
146            .open(Self::file_path_for(&self.base_path, next_suffix))?;
147        self.current_suffix = next_suffix;
148        self.current_file = new_file;
149        self.current_file_size = 0;
150        self.delete_excess_files();
151        Ok(())
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use std::collections::HashMap;
159
160    use prost::Message;
161    use sui_types::base_types::ObjectID;
162    use sui_types::digests::TransactionDigest;
163
164    use super::super::shared_object_congestion_tracker::TransactionCostLogEntry;
165
166    fn make_test_data() -> (ConsensusCommitInfo, FinishedCommitData) {
167        let commit_info = ConsensusCommitInfo::new_for_congestion_test(
168            1,
169            1000,
170            std::time::Duration::from_micros(1_000_000),
171        );
172        let data = FinishedCommitData {
173            accumulated_debts: vec![],
174            log_entries: vec![TransactionCostLogEntry {
175                tx_digest: TransactionDigest::random(),
176                start_cost: 0,
177                end_cost: 100,
178            }],
179            final_object_execution_costs: HashMap::from([(ObjectID::random(), 100)]),
180            commit_budget: 500_000,
181        };
182        (commit_info, data)
183    }
184
185    #[test]
186    fn test_protobuf_round_trip() {
187        let obj_id = ObjectID::random();
188        let tx_digest = TransactionDigest::random();
189
190        let log = proto::CongestionCommitLog {
191            epoch: 42,
192            round: 100,
193            timestamp_ms: 1234567890,
194            commit_budget: 500_000,
195            for_randomness: false,
196            final_object_execution_costs: vec![proto::ObjectCost {
197                object_id: obj_id.to_vec(),
198                cost: 1000,
199            }],
200            transaction_entries: vec![proto::TransactionCostEntry {
201                tx_digest: tx_digest.into_inner().to_vec(),
202                start_cost: 50,
203                end_cost: 150,
204            }],
205        };
206
207        let buf = log.encode_length_delimited_to_vec();
208        let decoded = proto::CongestionCommitLog::decode_length_delimited(buf.as_slice()).unwrap();
209
210        assert_eq!(decoded.epoch, 42);
211        assert_eq!(decoded.round, 100);
212        assert_eq!(decoded.timestamp_ms, 1234567890);
213        assert_eq!(decoded.commit_budget, 500_000);
214        assert!(!decoded.for_randomness);
215
216        assert_eq!(decoded.final_object_execution_costs.len(), 1);
217        let c = &decoded.final_object_execution_costs[0];
218        assert_eq!(ObjectID::try_from(c.object_id.as_slice()).unwrap(), obj_id);
219        assert_eq!(c.cost, 1000);
220
221        assert_eq!(decoded.transaction_entries.len(), 1);
222        let e = &decoded.transaction_entries[0];
223        let digest_bytes: [u8; 32] = e.tx_digest.as_slice().try_into().unwrap();
224        assert_eq!(TransactionDigest::new(digest_bytes), tx_digest);
225        assert_eq!(e.start_cost, 50);
226        assert_eq!(e.end_cost, 150);
227    }
228
229    #[test]
230    fn test_file_rotation() {
231        let dir = tempfile::tempdir().unwrap();
232        let base_path = dir.path().join("congestion_log");
233        let config = CongestionLogConfig {
234            path: base_path.clone(),
235            max_file_size: 200,
236            max_files: 3,
237        };
238
239        let mut logger = CongestionCommitLogger::new(&config).unwrap();
240        let (commit_info, data) = make_test_data();
241        for _ in 0..10 {
242            logger.write_commit_log(1, &commit_info, false, &data);
243        }
244
245        let mut suffixes = CongestionCommitLogger::existing_suffixes(&base_path);
246        suffixes.sort();
247        assert!(suffixes.len() <= 3);
248        assert_eq!(*suffixes.last().unwrap(), logger.current_suffix);
249        assert!(suffixes.windows(2).all(|w| w[0] < w[1]));
250    }
251
252    #[test]
253    fn test_restart_resumes_after_highest_suffix() {
254        let dir = tempfile::tempdir().unwrap();
255        let base_path = dir.path().join("congestion_log");
256        let config = CongestionLogConfig {
257            path: base_path.clone(),
258            max_file_size: 200,
259            max_files: 3,
260        };
261        let (commit_info, data) = make_test_data();
262
263        let suffix_after_first_session;
264        {
265            let mut logger = CongestionCommitLogger::new(&config).unwrap();
266            for _ in 0..10 {
267                logger.write_commit_log(1, &commit_info, false, &data);
268            }
269            suffix_after_first_session = logger.current_suffix;
270        }
271
272        let logger2 = CongestionCommitLogger::new(&config).unwrap();
273        assert_eq!(logger2.current_suffix, suffix_after_first_session + 1);
274        assert_eq!(logger2.current_file_size, 0);
275    }
276
277    #[test]
278    fn test_restart_always_starts_fresh_file() {
279        let dir = tempfile::tempdir().unwrap();
280        let base_path = dir.path().join("congestion_log");
281        let config = CongestionLogConfig {
282            path: base_path.clone(),
283            max_file_size: 100_000,
284            max_files: 3,
285        };
286
287        std::fs::write(
288            CongestionCommitLogger::file_path_for(&base_path, 0),
289            vec![0u8; 10],
290        )
291        .unwrap();
292
293        let logger = CongestionCommitLogger::new(&config).unwrap();
294        assert_eq!(logger.current_suffix, 1);
295        assert_eq!(logger.current_file_size, 0);
296    }
297
298    #[test]
299    fn test_old_files_deleted_on_rotation() {
300        let dir = tempfile::tempdir().unwrap();
301        let base_path = dir.path().join("congestion_log");
302        let config = CongestionLogConfig {
303            path: base_path.clone(),
304            max_file_size: 200,
305            max_files: 3,
306        };
307
308        let mut logger = CongestionCommitLogger::new(&config).unwrap();
309        let (commit_info, data) = make_test_data();
310        for _ in 0..20 {
311            logger.write_commit_log(1, &commit_info, false, &data);
312        }
313
314        let suffixes = CongestionCommitLogger::existing_suffixes(&base_path);
315        assert!(suffixes.len() <= 3);
316        assert!(!CongestionCommitLogger::file_path_for(&base_path, 0).exists());
317    }
318}