1use std::fs::{self, File, OpenOptions};
5use std::io::{self, Write};
6use std::path::{Path, PathBuf};
7
8use prost::Message;
9use sui_config::node::CongestionLogConfig;
10
11use crate::consensus_handler::ConsensusCommitInfo;
12
13use super::shared_object_congestion_tracker::FinishedCommitData;
14
15mod proto {
16 include!(concat!(env!("OUT_DIR"), "/sui.logs.rs"));
17}
18
19pub struct CongestionCommitLogger {
20 base_path: PathBuf,
21 max_file_size: u64,
22 max_files: u32,
23 current_file: File,
24 current_file_size: u64,
25 current_suffix: u64,
26}
27
28impl CongestionCommitLogger {
29 pub fn new(config: &CongestionLogConfig) -> io::Result<Self> {
30 let current_suffix = Self::existing_suffixes(&config.path)
33 .into_iter()
34 .max()
35 .map_or(0, |s| s + 1);
36
37 let current_file = OpenOptions::new()
38 .create(true)
39 .write(true)
40 .truncate(true)
41 .open(Self::file_path_for(&config.path, current_suffix))?;
42
43 let logger = Self {
44 base_path: config.path.clone(),
45 max_file_size: config.max_file_size,
46 max_files: config.max_files,
47 current_file,
48 current_file_size: 0,
49 current_suffix,
50 };
51 logger.delete_excess_files();
52 Ok(logger)
53 }
54
55 fn file_path_for(base_path: &Path, suffix: u64) -> PathBuf {
56 let mut path = base_path.as_os_str().to_owned();
57 path.push(format!(".{suffix}"));
58 PathBuf::from(path)
59 }
60
61 fn existing_suffixes(base_path: &Path) -> Vec<u64> {
62 let (Some(dir), Some(stem)) = (base_path.parent(), base_path.file_name()) else {
63 return Vec::new();
64 };
65 let mut prefix = stem.to_owned();
66 prefix.push(".");
67 let prefix = prefix.to_string_lossy().into_owned();
68
69 fs::read_dir(dir)
70 .into_iter()
71 .flatten()
72 .flatten()
73 .filter_map(|e| {
74 e.file_name()
75 .to_string_lossy()
76 .strip_prefix(&prefix)?
77 .parse()
78 .ok()
79 })
80 .collect()
81 }
82
83 fn delete_excess_files(&self) {
84 let Some(cutoff) = self.current_suffix.checked_sub(self.max_files as u64) else {
85 return;
86 };
87 for n in Self::existing_suffixes(&self.base_path) {
88 if n <= cutoff {
89 let _ = fs::remove_file(Self::file_path_for(&self.base_path, n));
90 }
91 }
92 }
93
94 pub fn write_commit_log(
95 &mut self,
96 epoch: u64,
97 commit_info: &ConsensusCommitInfo,
98 for_randomness: bool,
99 data: &FinishedCommitData,
100 ) {
101 let log = proto::CongestionCommitLog {
102 epoch,
103 round: commit_info.round,
104 timestamp_ms: commit_info.timestamp,
105 commit_budget: data.commit_budget,
106 for_randomness,
107 final_object_execution_costs: data
108 .final_object_execution_costs
109 .iter()
110 .map(|(id, cost)| proto::ObjectCost {
111 object_id: id.to_vec(),
112 cost: *cost,
113 })
114 .collect(),
115 transaction_entries: data
116 .log_entries
117 .iter()
118 .map(|entry| proto::TransactionCostEntry {
119 tx_digest: entry.tx_digest.into_inner().to_vec(),
120 start_cost: entry.start_cost,
121 end_cost: entry.end_cost,
122 })
123 .collect(),
124 };
125
126 let buf = log.encode_length_delimited_to_vec();
127 if let Err(e) = self.current_file.write_all(&buf) {
128 tracing::warn!("Failed to write congestion log: {e}");
129 return;
130 }
131 self.current_file_size += buf.len() as u64;
132
133 if self.current_file_size >= self.max_file_size
134 && let Err(e) = self.rotate()
135 {
136 tracing::warn!("Failed to rotate congestion log: {e}");
137 }
138 }
139
140 fn rotate(&mut self) -> io::Result<()> {
141 let next_suffix = self.current_suffix + 1;
142 let new_file = OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(Self::file_path_for(&self.base_path, next_suffix))?;
147 self.current_suffix = next_suffix;
148 self.current_file = new_file;
149 self.current_file_size = 0;
150 self.delete_excess_files();
151 Ok(())
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use std::collections::HashMap;
159
160 use prost::Message;
161 use sui_types::base_types::ObjectID;
162 use sui_types::digests::TransactionDigest;
163
164 use super::super::shared_object_congestion_tracker::TransactionCostLogEntry;
165
166 fn make_test_data() -> (ConsensusCommitInfo, FinishedCommitData) {
167 let commit_info = ConsensusCommitInfo::new_for_congestion_test(
168 1,
169 1000,
170 std::time::Duration::from_micros(1_000_000),
171 );
172 let data = FinishedCommitData {
173 accumulated_debts: vec![],
174 log_entries: vec![TransactionCostLogEntry {
175 tx_digest: TransactionDigest::random(),
176 start_cost: 0,
177 end_cost: 100,
178 }],
179 final_object_execution_costs: HashMap::from([(ObjectID::random(), 100)]),
180 commit_budget: 500_000,
181 };
182 (commit_info, data)
183 }
184
185 #[test]
186 fn test_protobuf_round_trip() {
187 let obj_id = ObjectID::random();
188 let tx_digest = TransactionDigest::random();
189
190 let log = proto::CongestionCommitLog {
191 epoch: 42,
192 round: 100,
193 timestamp_ms: 1234567890,
194 commit_budget: 500_000,
195 for_randomness: false,
196 final_object_execution_costs: vec![proto::ObjectCost {
197 object_id: obj_id.to_vec(),
198 cost: 1000,
199 }],
200 transaction_entries: vec![proto::TransactionCostEntry {
201 tx_digest: tx_digest.into_inner().to_vec(),
202 start_cost: 50,
203 end_cost: 150,
204 }],
205 };
206
207 let buf = log.encode_length_delimited_to_vec();
208 let decoded = proto::CongestionCommitLog::decode_length_delimited(buf.as_slice()).unwrap();
209
210 assert_eq!(decoded.epoch, 42);
211 assert_eq!(decoded.round, 100);
212 assert_eq!(decoded.timestamp_ms, 1234567890);
213 assert_eq!(decoded.commit_budget, 500_000);
214 assert!(!decoded.for_randomness);
215
216 assert_eq!(decoded.final_object_execution_costs.len(), 1);
217 let c = &decoded.final_object_execution_costs[0];
218 assert_eq!(ObjectID::try_from(c.object_id.as_slice()).unwrap(), obj_id);
219 assert_eq!(c.cost, 1000);
220
221 assert_eq!(decoded.transaction_entries.len(), 1);
222 let e = &decoded.transaction_entries[0];
223 let digest_bytes: [u8; 32] = e.tx_digest.as_slice().try_into().unwrap();
224 assert_eq!(TransactionDigest::new(digest_bytes), tx_digest);
225 assert_eq!(e.start_cost, 50);
226 assert_eq!(e.end_cost, 150);
227 }
228
229 #[test]
230 fn test_file_rotation() {
231 let dir = tempfile::tempdir().unwrap();
232 let base_path = dir.path().join("congestion_log");
233 let config = CongestionLogConfig {
234 path: base_path.clone(),
235 max_file_size: 200,
236 max_files: 3,
237 };
238
239 let mut logger = CongestionCommitLogger::new(&config).unwrap();
240 let (commit_info, data) = make_test_data();
241 for _ in 0..10 {
242 logger.write_commit_log(1, &commit_info, false, &data);
243 }
244
245 let mut suffixes = CongestionCommitLogger::existing_suffixes(&base_path);
246 suffixes.sort();
247 assert!(suffixes.len() <= 3);
248 assert_eq!(*suffixes.last().unwrap(), logger.current_suffix);
249 assert!(suffixes.windows(2).all(|w| w[0] < w[1]));
250 }
251
252 #[test]
253 fn test_restart_resumes_after_highest_suffix() {
254 let dir = tempfile::tempdir().unwrap();
255 let base_path = dir.path().join("congestion_log");
256 let config = CongestionLogConfig {
257 path: base_path.clone(),
258 max_file_size: 200,
259 max_files: 3,
260 };
261 let (commit_info, data) = make_test_data();
262
263 let suffix_after_first_session;
264 {
265 let mut logger = CongestionCommitLogger::new(&config).unwrap();
266 for _ in 0..10 {
267 logger.write_commit_log(1, &commit_info, false, &data);
268 }
269 suffix_after_first_session = logger.current_suffix;
270 }
271
272 let logger2 = CongestionCommitLogger::new(&config).unwrap();
273 assert_eq!(logger2.current_suffix, suffix_after_first_session + 1);
274 assert_eq!(logger2.current_file_size, 0);
275 }
276
277 #[test]
278 fn test_restart_always_starts_fresh_file() {
279 let dir = tempfile::tempdir().unwrap();
280 let base_path = dir.path().join("congestion_log");
281 let config = CongestionLogConfig {
282 path: base_path.clone(),
283 max_file_size: 100_000,
284 max_files: 3,
285 };
286
287 std::fs::write(
288 CongestionCommitLogger::file_path_for(&base_path, 0),
289 vec![0u8; 10],
290 )
291 .unwrap();
292
293 let logger = CongestionCommitLogger::new(&config).unwrap();
294 assert_eq!(logger.current_suffix, 1);
295 assert_eq!(logger.current_file_size, 0);
296 }
297
298 #[test]
299 fn test_old_files_deleted_on_rotation() {
300 let dir = tempfile::tempdir().unwrap();
301 let base_path = dir.path().join("congestion_log");
302 let config = CongestionLogConfig {
303 path: base_path.clone(),
304 max_file_size: 200,
305 max_files: 3,
306 };
307
308 let mut logger = CongestionCommitLogger::new(&config).unwrap();
309 let (commit_info, data) = make_test_data();
310 for _ in 0..20 {
311 logger.write_commit_log(1, &commit_info, false, &data);
312 }
313
314 let suffixes = CongestionCommitLogger::existing_suffixes(&base_path);
315 assert!(suffixes.len() <= 3);
316 assert!(!CongestionCommitLogger::file_path_for(&base_path, 0).exists());
317 }
318}