1use std::fs;
5use std::ops::Range;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10use anyhow::Context;
11use anyhow::Result;
12use object_store::DynObjectStore;
13use object_store::path::Path;
14use serde::Serialize;
15use tokio::sync::{Mutex as TokioMutex, mpsc, oneshot};
16use tracing::{error, info};
17
18use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
19use sui_data_ingestion_core::Worker;
20use sui_storage::object_store::util::{copy_file, path_to_filesystem};
21use sui_types::full_checkpoint_content::CheckpointData;
22use sui_types::messages_checkpoint::CheckpointSequenceNumber;
23
24use crate::analytics_metrics::AnalyticsMetrics;
25use crate::handlers::AnalyticsHandler;
26use crate::writers::AnalyticsWriter;
27use crate::{
28 EPOCH_DIR_PREFIX, FileMetadata, MaxCheckpointReader, ParquetSchema, TaskContext, join_paths,
29};
30
31struct State<S: Serialize + ParquetSchema + Send + Sync> {
32 current_epoch: u64,
33 current_checkpoint_range: Range<u64>,
34 last_commit_instant: Instant,
35 num_checkpoint_iterations: u64,
36 writer: Arc<Mutex<Box<dyn AnalyticsWriter<S>>>>,
37}
38
39pub struct AnalyticsProcessor<S: Serialize + ParquetSchema + Send + Sync> {
40 handler: Box<dyn AnalyticsHandler<S>>,
41 state: TokioMutex<State<S>>,
42 task_context: TaskContext,
43 sender: mpsc::Sender<FileMetadata>,
44 #[allow(dead_code)]
45 kill_sender: oneshot::Sender<()>,
46 #[allow(dead_code)]
47 max_checkpoint_sender: oneshot::Sender<()>,
48}
49
50const CHECK_FILE_SIZE_ITERATION_CYCLE: u64 = 50;
51
52#[async_trait::async_trait]
53impl<S: Serialize + ParquetSchema + Send + Sync + 'static> Worker for AnalyticsProcessor<S> {
54 type Result = ();
55 async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
56 let epoch: u64 = checkpoint_data.checkpoint_summary.epoch();
59 let checkpoint_num: u64 = *checkpoint_data.checkpoint_summary.sequence_number();
60 let timestamp: u64 = checkpoint_data.checkpoint_summary.data().timestamp_ms;
61 info!("Processing checkpoint {checkpoint_num}, epoch {epoch}, timestamp {timestamp}");
62 let mut state = self.state.lock().await;
63 if epoch > state.current_epoch {
64 self.cut(&mut state).await?;
65 self.update_to_next_epoch(epoch, &mut state);
66 self.create_epoch_dirs(&state)?;
67 self.reset(&mut state)?;
68 }
69
70 assert_eq!(epoch, state.current_epoch);
71 assert_eq!(checkpoint_num, state.current_checkpoint_range.end);
72
73 let num_checkpoints_processed =
74 state.current_checkpoint_range.end - state.current_checkpoint_range.start;
75
76 let (cur_size, cur_rows) = {
77 let writer = state.writer.lock().unwrap();
78 (writer.file_size()?.unwrap_or(0), writer.rows()?)
79 };
80
81 let cut_new_files = (num_checkpoints_processed
82 >= self.task_context.config.checkpoint_interval)
83 || (state.last_commit_instant.elapsed().as_secs()
84 > self.task_context.config.time_interval_s)
85 || (state.num_checkpoint_iterations % CHECK_FILE_SIZE_ITERATION_CYCLE == 0
86 && cur_size > self.task_context.config.max_file_size_mb * 1024 * 1024)
87 || (cur_rows >= self.task_context.config.max_row_count);
88
89 if cut_new_files {
90 self.cut(&mut state).await?;
91 self.reset(&mut state)?;
92 }
93
94 self.task_context
95 .metrics
96 .total_received
97 .with_label_values(&[self.name()])
98 .inc();
99
100 let iter = self.handler.process_checkpoint(checkpoint_data).await?;
101 {
102 let mut writer = state.writer.lock().unwrap();
103 writer.write(iter)?;
104 }
105
106 state.current_checkpoint_range.end = state
107 .current_checkpoint_range
108 .end
109 .checked_add(1)
110 .context("Checkpoint sequence num overflow")?;
111 state.num_checkpoint_iterations += 1;
112 Ok(())
113 }
114}
115
116impl<S: Serialize + ParquetSchema + Send + Sync + 'static> AnalyticsProcessor<S> {
117 pub async fn new(
118 handler: Box<dyn AnalyticsHandler<S>>,
119 writer: Box<dyn AnalyticsWriter<S>>,
120 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
121 next_checkpoint_seq_num: CheckpointSequenceNumber,
122 task_context: TaskContext,
123 ) -> Result<Self> {
124 let local_store_config = ObjectStoreConfig {
125 directory: Some(task_context.checkpoint_dir_path().to_path_buf()),
126 object_store: Some(ObjectStoreType::File),
127 ..Default::default()
128 };
129 let local_object_store = local_store_config.make()?;
130 let remote_object_store = task_context.job_config.remote_store_config.make()?;
131 let (kill_sender, kill_receiver) = oneshot::channel();
132 let (sender, receiver) = mpsc::channel::<FileMetadata>(100);
133 let name = handler.name().to_string();
134 let checkpoint_dir = task_context.checkpoint_dir_path();
135 let cloned_metrics = task_context.metrics.clone();
136 tokio::spawn(Self::start_syncing_with_remote(
137 remote_object_store,
138 local_object_store.clone(),
139 checkpoint_dir.to_path_buf(),
140 task_context.config.remote_store_path_prefix()?,
141 receiver,
142 kill_receiver,
143 cloned_metrics,
144 name.clone(),
145 ));
146 let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
147 if task_context.config.report_bq_max_table_checkpoint
148 || task_context.config.report_sf_max_table_checkpoint
149 {
150 tokio::spawn(Self::setup_max_checkpoint_metrics_updates(
151 max_checkpoint_reader,
152 task_context.metrics.clone(),
153 max_checkpoint_receiver,
154 name,
155 ));
156 }
157 let state = State {
158 current_epoch: 0,
159 current_checkpoint_range: next_checkpoint_seq_num..next_checkpoint_seq_num,
160 last_commit_instant: Instant::now(),
161 num_checkpoint_iterations: 0,
162 writer: Arc::new(Mutex::new(writer)),
163 };
164
165 Ok(Self {
166 handler,
167 state: TokioMutex::new(state),
168 task_context,
169 sender,
170 kill_sender,
171 max_checkpoint_sender,
172 })
173 }
174
175 #[inline]
176 fn name(&self) -> &str {
177 self.handler.name()
178 }
179
180 async fn cut(&self, state: &mut State<S>) -> Result<()> {
181 if state.current_checkpoint_range.is_empty() {
182 return Ok(());
183 }
184
185 let writer = state.writer.clone();
186 let end_seq = state.current_checkpoint_range.end;
187
188 let flushed = tokio::task::spawn_blocking(move || {
191 let mut w = writer.lock().unwrap();
192 w.flush(end_seq)
193 })
194 .await??;
195
196 if flushed {
197 let file_metadata = FileMetadata::new(
198 self.task_context.config.file_type,
199 self.task_context.config.file_format,
200 state.current_epoch,
201 state.current_checkpoint_range.clone(),
202 );
203 self.emit_file_size_metric(&file_metadata)?;
204
205 self.sender.send(file_metadata).await?;
206 tokio::task::yield_now().await;
207 }
208 Ok(())
209 }
210
211 fn emit_file_size_metric(&self, file_metadata: &FileMetadata) -> Result<()> {
212 let object_path = file_metadata.file_path();
213 let file_path = path_to_filesystem(
214 self.task_context.checkpoint_dir_path().to_path_buf(),
215 &object_path,
216 )?;
217 if file_path.exists()
218 && let Ok(metadata) = fs::metadata(&file_path)
219 {
220 let file_size = metadata.len();
221 self.task_context
222 .metrics
223 .file_size_bytes
224 .with_label_values(&[self.name()])
225 .observe(file_size as f64);
226 };
227 Ok(())
228 }
229
230 fn update_to_next_epoch(&self, epoch: u64, state: &mut State<S>) {
231 state.current_epoch = epoch;
232 }
233
234 fn epoch_dir(&self, state: &State<S>) -> Result<PathBuf> {
235 let path = path_to_filesystem(
236 self.task_context.checkpoint_dir_path().to_path_buf(),
237 &self.task_context.config.file_type.dir_prefix(),
238 )?
239 .join(format!("{}{}", EPOCH_DIR_PREFIX, state.current_epoch));
240 Ok(path)
241 }
242
243 fn create_epoch_dirs(&self, state: &State<S>) -> Result<()> {
244 let epoch_dir = self.epoch_dir(state)?;
245 if epoch_dir.exists() {
246 fs::remove_dir_all(&epoch_dir)?;
247 }
248 fs::create_dir_all(&epoch_dir)?;
249 Ok(())
250 }
251
252 fn reset(&self, state: &mut State<S>) -> Result<()> {
253 self.reset_checkpoint_range(state);
254 {
255 let mut writer = state.writer.lock().unwrap();
256 writer.reset(state.current_epoch, state.current_checkpoint_range.start)?;
257 }
258 self.reset_last_commit_ts(state);
259 Ok(())
260 }
261
262 fn reset_checkpoint_range(&self, state: &mut State<S>) {
263 state.current_checkpoint_range =
264 state.current_checkpoint_range.end..state.current_checkpoint_range.end
265 }
266
267 fn reset_last_commit_ts(&self, state: &mut State<S>) {
268 state.last_commit_instant = Instant::now();
269 }
270
271 async fn start_syncing_with_remote(
272 remote_object_store: Arc<DynObjectStore>,
273 local_object_store: Arc<DynObjectStore>,
274 local_staging_root_dir: PathBuf,
275 remote_store_path_prefix: Option<Path>,
276 mut file_recv: mpsc::Receiver<FileMetadata>,
277 mut recv: oneshot::Receiver<()>,
278 metrics: AnalyticsMetrics,
279 name: String,
280 ) -> Result<()> {
281 loop {
282 tokio::select! {
283 _ = &mut recv => break,
284 file = file_recv.recv() => {
285 if let Some(file_metadata) = file {
286 info!("Received {name} file with checkpoints: {:?}", &file_metadata.checkpoint_seq_range);
287 let checkpoint_seq_num = file_metadata.checkpoint_seq_range.end;
288 Self::sync_file_to_remote(
289 local_staging_root_dir.clone(),
290 file_metadata.file_path(),
291 remote_store_path_prefix.clone(),
292 local_object_store.clone(),
293 remote_object_store.clone()
294 )
295 .await
296 .expect("Syncing checkpoint should not fail");
297 metrics.last_uploaded_checkpoint.with_label_values(&[&name]).set(checkpoint_seq_num as i64);
298 } else {
299 info!("Terminating upload sync loop");
300 break;
301 }
302 },
303 }
304 }
305 Ok(())
306 }
307
308 async fn setup_max_checkpoint_metrics_updates(
309 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
310 analytics_metrics: AnalyticsMetrics,
311 mut recv: oneshot::Receiver<()>,
312 handler_name: String,
313 ) -> Result<()> {
314 let mut interval = tokio::time::interval(Duration::from_secs(300));
315 loop {
316 tokio::select! {
317 _ = &mut recv => break,
318 _ = interval.tick() => {
319 let max_checkpoint = max_checkpoint_reader.max_checkpoint().await;
320 if let Ok(max_checkpoint) = max_checkpoint {
321 analytics_metrics
322 .max_checkpoint_on_store
323 .with_label_values(&[&handler_name])
324 .set(max_checkpoint);
325 } else {
326 error!("Failed to read max checkpoint for {} with err: {}", handler_name, max_checkpoint.unwrap_err());
327 }
328
329 }
330 }
331 }
332 Ok(())
333 }
334
335 async fn sync_file_to_remote(
336 dir: PathBuf,
337 path: Path,
338 prefix: Option<Path>,
339 from: Arc<DynObjectStore>,
340 to: Arc<DynObjectStore>,
341 ) -> Result<()> {
342 let remote_dest = join_paths(prefix.as_ref(), &path);
343 info!("Syncing file to remote: {:?}", &remote_dest);
344 copy_file(&path, &remote_dest, &from, &to).await?;
345 fs::remove_file(path_to_filesystem(dir, &path)?)?;
346 Ok(())
347 }
348}