sui_analytics_indexer/
analytics_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fs;
5use std::ops::Range;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10use anyhow::Context;
11use anyhow::Result;
12use object_store::DynObjectStore;
13use object_store::path::Path;
14use serde::Serialize;
15use tokio::sync::{Mutex as TokioMutex, mpsc, oneshot};
16use tracing::{error, info};
17
18use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
19use sui_data_ingestion_core::Worker;
20use sui_storage::object_store::util::{copy_file, path_to_filesystem};
21use sui_types::full_checkpoint_content::CheckpointData;
22use sui_types::messages_checkpoint::CheckpointSequenceNumber;
23
24use crate::analytics_metrics::AnalyticsMetrics;
25use crate::handlers::AnalyticsHandler;
26use crate::writers::AnalyticsWriter;
27use crate::{
28    EPOCH_DIR_PREFIX, FileMetadata, MaxCheckpointReader, ParquetSchema, TaskContext, join_paths,
29};
30
31struct State<S: Serialize + ParquetSchema + Send + Sync> {
32    current_epoch: u64,
33    current_checkpoint_range: Range<u64>,
34    last_commit_instant: Instant,
35    num_checkpoint_iterations: u64,
36    writer: Arc<Mutex<Box<dyn AnalyticsWriter<S>>>>,
37}
38
39pub struct AnalyticsProcessor<S: Serialize + ParquetSchema + Send + Sync> {
40    handler: Box<dyn AnalyticsHandler<S>>,
41    state: TokioMutex<State<S>>,
42    task_context: TaskContext,
43    sender: mpsc::Sender<FileMetadata>,
44    #[allow(dead_code)]
45    kill_sender: oneshot::Sender<()>,
46    #[allow(dead_code)]
47    max_checkpoint_sender: oneshot::Sender<()>,
48}
49
50const CHECK_FILE_SIZE_ITERATION_CYCLE: u64 = 50;
51
52#[async_trait::async_trait]
53impl<S: Serialize + ParquetSchema + Send + Sync + 'static> Worker for AnalyticsProcessor<S> {
54    type Result = ();
55    async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
56        // get epoch id, checkpoint sequence number and timestamp, those are important
57        // indexes when operating on data
58        let epoch: u64 = checkpoint_data.checkpoint_summary.epoch();
59        let checkpoint_num: u64 = *checkpoint_data.checkpoint_summary.sequence_number();
60        let timestamp: u64 = checkpoint_data.checkpoint_summary.data().timestamp_ms;
61        info!("Processing checkpoint {checkpoint_num}, epoch {epoch}, timestamp {timestamp}");
62        let mut state = self.state.lock().await;
63        if epoch > state.current_epoch {
64            self.cut(&mut state).await?;
65            self.update_to_next_epoch(epoch, &mut state);
66            self.create_epoch_dirs(&state)?;
67            self.reset(&mut state)?;
68        }
69
70        assert_eq!(epoch, state.current_epoch);
71        assert_eq!(checkpoint_num, state.current_checkpoint_range.end);
72
73        let num_checkpoints_processed =
74            state.current_checkpoint_range.end - state.current_checkpoint_range.start;
75
76        let (cur_size, cur_rows) = {
77            let writer = state.writer.lock().unwrap();
78            (writer.file_size()?.unwrap_or(0), writer.rows()?)
79        };
80
81        let cut_new_files = (num_checkpoints_processed
82            >= self.task_context.config.checkpoint_interval)
83            || (state.last_commit_instant.elapsed().as_secs()
84                > self.task_context.config.time_interval_s)
85            || (state.num_checkpoint_iterations % CHECK_FILE_SIZE_ITERATION_CYCLE == 0
86                && cur_size > self.task_context.config.max_file_size_mb * 1024 * 1024)
87            || (cur_rows >= self.task_context.config.max_row_count);
88
89        if cut_new_files {
90            self.cut(&mut state).await?;
91            self.reset(&mut state)?;
92        }
93
94        self.task_context
95            .metrics
96            .total_received
97            .with_label_values(&[self.name()])
98            .inc();
99
100        let iter = self.handler.process_checkpoint(checkpoint_data).await?;
101        {
102            let mut writer = state.writer.lock().unwrap();
103            writer.write(iter)?;
104        }
105
106        state.current_checkpoint_range.end = state
107            .current_checkpoint_range
108            .end
109            .checked_add(1)
110            .context("Checkpoint sequence num overflow")?;
111        state.num_checkpoint_iterations += 1;
112        Ok(())
113    }
114}
115
116impl<S: Serialize + ParquetSchema + Send + Sync + 'static> AnalyticsProcessor<S> {
117    pub async fn new(
118        handler: Box<dyn AnalyticsHandler<S>>,
119        writer: Box<dyn AnalyticsWriter<S>>,
120        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
121        next_checkpoint_seq_num: CheckpointSequenceNumber,
122        task_context: TaskContext,
123    ) -> Result<Self> {
124        let local_store_config = ObjectStoreConfig {
125            directory: Some(task_context.checkpoint_dir_path().to_path_buf()),
126            object_store: Some(ObjectStoreType::File),
127            ..Default::default()
128        };
129        let local_object_store = local_store_config.make()?;
130        let remote_object_store = task_context.job_config.remote_store_config.make()?;
131        let (kill_sender, kill_receiver) = oneshot::channel();
132        let (sender, receiver) = mpsc::channel::<FileMetadata>(100);
133        let name = handler.name().to_string();
134        let checkpoint_dir = task_context.checkpoint_dir_path();
135        let cloned_metrics = task_context.metrics.clone();
136        tokio::spawn(Self::start_syncing_with_remote(
137            remote_object_store,
138            local_object_store.clone(),
139            checkpoint_dir.to_path_buf(),
140            task_context.config.remote_store_path_prefix()?,
141            receiver,
142            kill_receiver,
143            cloned_metrics,
144            name.clone(),
145        ));
146        let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
147        if task_context.config.report_bq_max_table_checkpoint
148            || task_context.config.report_sf_max_table_checkpoint
149        {
150            tokio::spawn(Self::setup_max_checkpoint_metrics_updates(
151                max_checkpoint_reader,
152                task_context.metrics.clone(),
153                max_checkpoint_receiver,
154                name,
155            ));
156        }
157        let state = State {
158            current_epoch: 0,
159            current_checkpoint_range: next_checkpoint_seq_num..next_checkpoint_seq_num,
160            last_commit_instant: Instant::now(),
161            num_checkpoint_iterations: 0,
162            writer: Arc::new(Mutex::new(writer)),
163        };
164
165        Ok(Self {
166            handler,
167            state: TokioMutex::new(state),
168            task_context,
169            sender,
170            kill_sender,
171            max_checkpoint_sender,
172        })
173    }
174
175    #[inline]
176    fn name(&self) -> &str {
177        self.handler.name()
178    }
179
180    async fn cut(&self, state: &mut State<S>) -> Result<()> {
181        if state.current_checkpoint_range.is_empty() {
182            return Ok(());
183        }
184
185        let writer = state.writer.clone();
186        let end_seq = state.current_checkpoint_range.end;
187
188        // flush in blocking pool. These files can be huge and we don't want to block the tokio
189        // threads
190        let flushed = tokio::task::spawn_blocking(move || {
191            let mut w = writer.lock().unwrap();
192            w.flush(end_seq)
193        })
194        .await??;
195
196        if flushed {
197            let file_metadata = FileMetadata::new(
198                self.task_context.config.file_type,
199                self.task_context.config.file_format,
200                state.current_epoch,
201                state.current_checkpoint_range.clone(),
202            );
203            self.emit_file_size_metric(&file_metadata)?;
204
205            self.sender.send(file_metadata).await?;
206            tokio::task::yield_now().await;
207        }
208        Ok(())
209    }
210
211    fn emit_file_size_metric(&self, file_metadata: &FileMetadata) -> Result<()> {
212        let object_path = file_metadata.file_path();
213        let file_path = path_to_filesystem(
214            self.task_context.checkpoint_dir_path().to_path_buf(),
215            &object_path,
216        )?;
217        if file_path.exists()
218            && let Ok(metadata) = fs::metadata(&file_path)
219        {
220            let file_size = metadata.len();
221            self.task_context
222                .metrics
223                .file_size_bytes
224                .with_label_values(&[self.name()])
225                .observe(file_size as f64);
226        };
227        Ok(())
228    }
229
230    fn update_to_next_epoch(&self, epoch: u64, state: &mut State<S>) {
231        state.current_epoch = epoch;
232    }
233
234    fn epoch_dir(&self, state: &State<S>) -> Result<PathBuf> {
235        let path = path_to_filesystem(
236            self.task_context.checkpoint_dir_path().to_path_buf(),
237            &self.task_context.config.file_type.dir_prefix(),
238        )?
239        .join(format!("{}{}", EPOCH_DIR_PREFIX, state.current_epoch));
240        Ok(path)
241    }
242
243    fn create_epoch_dirs(&self, state: &State<S>) -> Result<()> {
244        let epoch_dir = self.epoch_dir(state)?;
245        if epoch_dir.exists() {
246            fs::remove_dir_all(&epoch_dir)?;
247        }
248        fs::create_dir_all(&epoch_dir)?;
249        Ok(())
250    }
251
252    fn reset(&self, state: &mut State<S>) -> Result<()> {
253        self.reset_checkpoint_range(state);
254        {
255            let mut writer = state.writer.lock().unwrap();
256            writer.reset(state.current_epoch, state.current_checkpoint_range.start)?;
257        }
258        self.reset_last_commit_ts(state);
259        Ok(())
260    }
261
262    fn reset_checkpoint_range(&self, state: &mut State<S>) {
263        state.current_checkpoint_range =
264            state.current_checkpoint_range.end..state.current_checkpoint_range.end
265    }
266
267    fn reset_last_commit_ts(&self, state: &mut State<S>) {
268        state.last_commit_instant = Instant::now();
269    }
270
271    async fn start_syncing_with_remote(
272        remote_object_store: Arc<DynObjectStore>,
273        local_object_store: Arc<DynObjectStore>,
274        local_staging_root_dir: PathBuf,
275        remote_store_path_prefix: Option<Path>,
276        mut file_recv: mpsc::Receiver<FileMetadata>,
277        mut recv: oneshot::Receiver<()>,
278        metrics: AnalyticsMetrics,
279        name: String,
280    ) -> Result<()> {
281        loop {
282            tokio::select! {
283                _ = &mut recv => break,
284                file = file_recv.recv() => {
285                    if let Some(file_metadata) = file {
286                        info!("Received {name} file with checkpoints: {:?}", &file_metadata.checkpoint_seq_range);
287                        let checkpoint_seq_num = file_metadata.checkpoint_seq_range.end;
288                        Self::sync_file_to_remote(
289                                local_staging_root_dir.clone(),
290                                file_metadata.file_path(),
291                                remote_store_path_prefix.clone(),
292                                local_object_store.clone(),
293                                remote_object_store.clone()
294                            )
295                            .await
296                            .expect("Syncing checkpoint should not fail");
297                        metrics.last_uploaded_checkpoint.with_label_values(&[&name]).set(checkpoint_seq_num as i64);
298                    } else {
299                        info!("Terminating upload sync loop");
300                        break;
301                    }
302                },
303            }
304        }
305        Ok(())
306    }
307
308    async fn setup_max_checkpoint_metrics_updates(
309        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
310        analytics_metrics: AnalyticsMetrics,
311        mut recv: oneshot::Receiver<()>,
312        handler_name: String,
313    ) -> Result<()> {
314        let mut interval = tokio::time::interval(Duration::from_secs(300));
315        loop {
316            tokio::select! {
317                _ = &mut recv => break,
318                 _ = interval.tick() => {
319                    let max_checkpoint = max_checkpoint_reader.max_checkpoint().await;
320                    if let Ok(max_checkpoint) = max_checkpoint {
321                        analytics_metrics
322                            .max_checkpoint_on_store
323                            .with_label_values(&[&handler_name])
324                            .set(max_checkpoint);
325                    } else {
326                        error!("Failed to read max checkpoint for {} with err: {}", handler_name, max_checkpoint.unwrap_err());
327                    }
328
329                 }
330            }
331        }
332        Ok(())
333    }
334
335    async fn sync_file_to_remote(
336        dir: PathBuf,
337        path: Path,
338        prefix: Option<Path>,
339        from: Arc<DynObjectStore>,
340        to: Arc<DynObjectStore>,
341    ) -> Result<()> {
342        let remote_dest = join_paths(prefix.as_ref(), &path);
343        info!("Syncing file to remote: {:?}", &remote_dest);
344        copy_file(&path, &remote_dest, &from, &to).await?;
345        fs::remove_file(path_to_filesystem(dir, &path)?)?;
346        Ok(())
347    }
348}