sui_analytics_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashSet;
5use std::fs;
6use std::ops::Range;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, Int32Array};
12use gcp_bigquery_client::Client;
13use gcp_bigquery_client::model::query_request::QueryRequest;
14use handlers::package_bcs_handler::PackageBCSHandler;
15use handlers::transaction_bcs_handler::TransactionBCSHandler;
16use num_enum::IntoPrimitive;
17use num_enum::TryFromPrimitive;
18use object_store::path::Path;
19use once_cell::sync::Lazy;
20use package_store::{LazyPackageCache, PackageCache};
21use serde::{Deserialize, Serialize};
22use snowflake_api::{QueryResult, SnowflakeApi};
23use strum_macros::EnumIter;
24use tempfile::TempDir;
25use tracing::info;
26
27use sui_config::object_storage_config::ObjectStoreConfig;
28use sui_data_ingestion_core::Worker;
29use sui_storage::object_store::util::{
30    find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix,
31};
32use sui_types::base_types::EpochId;
33use sui_types::dynamic_field::DynamicFieldType;
34use sui_types::full_checkpoint_content::CheckpointData;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36
37use crate::analytics_metrics::AnalyticsMetrics;
38use crate::analytics_processor::AnalyticsProcessor;
39use crate::handlers::AnalyticsHandler;
40use crate::handlers::checkpoint_handler::CheckpointHandler;
41use crate::handlers::df_handler::DynamicFieldHandler;
42use crate::handlers::event_handler::EventHandler;
43use crate::handlers::move_call_handler::MoveCallHandler;
44use crate::handlers::object_handler::ObjectHandler;
45use crate::handlers::package_handler::PackageHandler;
46use crate::handlers::transaction_handler::TransactionHandler;
47use crate::handlers::transaction_objects_handler::TransactionObjectsHandler;
48use crate::handlers::wrapped_object_handler::WrappedObjectHandler;
49use crate::tables::{InputObjectKind, ObjectStatus, OwnerType};
50use crate::writers::AnalyticsWriter;
51use crate::writers::csv_writer::CSVWriter;
52use crate::writers::parquet_writer::ParquetWriter;
53use gcp_bigquery_client::model::query_response::ResultSet;
54
55pub mod analytics_metrics;
56pub mod analytics_processor;
57pub mod errors;
58mod handlers;
59pub mod package_store;
60pub mod tables;
61mod writers;
62
63const EPOCH_DIR_PREFIX: &str = "epoch_";
64const CHECKPOINT_DIR_PREFIX: &str = "checkpoints";
65const OBJECT_DIR_PREFIX: &str = "objects";
66const TRANSACTION_DIR_PREFIX: &str = "transactions";
67const TRANSACTION_BCS_DIR_PREFIX: &str = "transaction_bcs";
68const EVENT_DIR_PREFIX: &str = "events";
69const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects";
70const MOVE_CALL_PREFIX: &str = "move_call";
71const MOVE_PACKAGE_PREFIX: &str = "move_package";
72const PACKAGE_BCS_DIR_PREFIX: &str = "move_package_bcs";
73const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field";
74
75const WRAPPED_OBJECT_PREFIX: &str = "wrapped_object";
76
77const TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME: &str = "TRANSACTION_CONCURRENCY_LIMIT";
78const DEFAULT_TRANSACTION_CONCURRENCY_LIMIT: usize = 64;
79pub static TRANSACTION_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
80    let async_transactions_opt = std::env::var(TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME)
81        .ok()
82        .and_then(|s| s.parse().ok());
83    if let Some(async_transactions) = async_transactions_opt {
84        info!(
85            "Using custom value for '{}' max checkpoints in progress: {}",
86            TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME, async_transactions
87        );
88        async_transactions
89    } else {
90        info!(
91            "Using default value for '{}' -- max checkpoints in progress: {}",
92            TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME, DEFAULT_TRANSACTION_CONCURRENCY_LIMIT
93        );
94        DEFAULT_TRANSACTION_CONCURRENCY_LIMIT
95    }
96});
97
98fn default_client_metric_host() -> String {
99    "127.0.0.1".to_string()
100}
101
102fn default_client_metric_port() -> u16 {
103    8081
104}
105
106fn default_checkpoint_root() -> PathBuf {
107    PathBuf::from("/tmp")
108}
109
110fn default_batch_size() -> usize {
111    10
112}
113
114fn default_data_limit() -> usize {
115    100
116}
117
118fn default_remote_store_url() -> String {
119    "https://checkpoints.mainnet.sui.io".to_string()
120}
121
122fn default_remote_store_timeout_secs() -> u64 {
123    5
124}
125
126fn default_package_cache_path() -> PathBuf {
127    PathBuf::from("/opt/sui/db/package_cache")
128}
129
130fn default_file_format() -> FileFormat {
131    FileFormat::CSV
132}
133
134fn default_checkpoint_interval() -> u64 {
135    10000
136}
137
138fn default_max_file_size_mb() -> u64 {
139    100
140}
141
142fn default_max_row_count() -> usize {
143    100000
144}
145
146fn default_time_interval_s() -> u64 {
147    600
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct JobConfig {
152    /// The url of the checkpoint client to connect to.
153    pub rest_url: String,
154    /// The url of the metrics client to connect to.
155    #[serde(default = "default_client_metric_host")]
156    pub client_metric_host: String,
157    /// The port of the metrics client to connect to.
158    #[serde(default = "default_client_metric_port")]
159    pub client_metric_port: u16,
160    /// Remote object store where data gets written to
161    pub remote_store_config: ObjectStoreConfig,
162    /// Object store download batch size.
163    #[serde(default = "default_batch_size")]
164    pub batch_size: usize,
165    /// Maximum number of checkpoints to queue in memory.
166    #[serde(default = "default_data_limit")]
167    pub data_limit: usize,
168    /// Remote store URL.
169    #[serde(default = "default_remote_store_url")]
170    pub remote_store_url: String,
171    /// These are key-value config pairs that are defined in the object_store crate
172    /// <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>
173    #[serde(default)]
174    pub remote_store_options: Vec<(String, String)>,
175    /// Remote store timeout
176    #[serde(default = "default_remote_store_timeout_secs")]
177    pub remote_store_timeout_secs: u64,
178    /// Directory to contain the package cache for pipelines
179    #[serde(default = "default_package_cache_path")]
180    pub package_cache_path: PathBuf,
181    /// Root directory to contain the temporary directory for checkpoint entries.
182    #[serde(default = "default_checkpoint_root")]
183    pub checkpoint_root: PathBuf,
184    pub bq_service_account_key_file: Option<String>,
185    pub bq_project_id: Option<String>,
186    pub bq_dataset_id: Option<String>,
187    pub sf_account_identifier: Option<String>,
188    pub sf_warehouse: Option<String>,
189    pub sf_database: Option<String>,
190    pub sf_schema: Option<String>,
191    pub sf_username: Option<String>,
192    pub sf_role: Option<String>,
193    pub sf_password_file: Option<String>,
194
195    // This is private to enforce using the TaskContext struct
196    #[serde(rename = "tasks")]
197    task_configs: Vec<TaskConfig>,
198}
199
200impl JobConfig {
201    pub async fn create_checkpoint_processors(
202        self,
203        metrics: AnalyticsMetrics,
204    ) -> Result<(Vec<Processor>, Option<Arc<PackageCache>>)> {
205        use crate::package_store::LazyPackageCache;
206        use std::sync::Mutex;
207
208        let lazy_package_cache = Arc::new(Mutex::new(LazyPackageCache::new(
209            self.package_cache_path.clone(),
210            self.rest_url.clone(),
211        )));
212
213        let job_config = Arc::new(self);
214        let mut processors = Vec::with_capacity(job_config.task_configs.len());
215        let mut task_names = HashSet::new();
216
217        for task_config in job_config.task_configs.clone() {
218            let task_name = &task_config.task_name;
219
220            if !task_names.insert(task_name.clone()) {
221                return Err(anyhow!("Duplicate task_name '{}' found", task_name));
222            }
223
224            let temp_dir = tempfile::Builder::new()
225                .prefix(&format!("{}-work-dir", task_name))
226                .tempdir_in(&job_config.checkpoint_root)?;
227
228            let task_context = TaskContext {
229                job_config: Arc::clone(&job_config),
230                config: task_config,
231                checkpoint_dir: Arc::new(temp_dir),
232                metrics: metrics.clone(),
233                lazy_package_cache: lazy_package_cache.clone(),
234            };
235
236            processors.push(task_context.create_analytics_processor().await?);
237        }
238
239        let package_cache = lazy_package_cache
240            .lock()
241            .unwrap()
242            .get_cache_if_initialized();
243
244        Ok((processors, package_cache))
245    }
246
247    // Convenience method to get task configs for compatibility
248    pub fn task_configs(&self) -> &[TaskConfig] {
249        &self.task_configs
250    }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct TaskConfig {
255    /// Name of the task. Must be unique per process. Used to identify tasks in the Progress Store.
256    pub task_name: String,
257    /// Type of data to write i.e. checkpoint, object, transaction, etc
258    pub file_type: FileType,
259    /// File format to store data in i.e. csv, parquet, etc
260    #[serde(default = "default_file_format")]
261    pub file_format: FileFormat,
262    /// Number of checkpoints to process before uploading to the datastore.
263    #[serde(default = "default_checkpoint_interval")]
264    pub checkpoint_interval: u64,
265    /// Maximum file size in mb before uploading to the datastore.
266    #[serde(default = "default_max_file_size_mb")]
267    pub max_file_size_mb: u64,
268    /// Maximum number of rows before uploading to the datastore.
269    #[serde(default = "default_max_row_count")]
270    pub max_row_count: usize,
271    /// Checkpoint sequence number to start the download from
272    pub starting_checkpoint_seq_num: Option<u64>,
273    /// Time to process in seconds before uploding to the datastore.
274    #[serde(default = "default_time_interval_s")]
275    pub time_interval_s: u64,
276    /// Remote object store path prefix to use while writing
277    #[serde(default)]
278    remote_store_path_prefix: Option<String>,
279    pub bq_table_id: Option<String>,
280    pub bq_checkpoint_col_id: Option<String>,
281    #[serde(default)]
282    pub report_bq_max_table_checkpoint: bool,
283    pub sf_table_id: Option<String>,
284    pub sf_checkpoint_col_id: Option<String>,
285    #[serde(default)]
286    pub report_sf_max_table_checkpoint: bool,
287    pub package_id_filter: Option<String>,
288}
289
290impl TaskConfig {
291    pub fn remote_store_path_prefix(&self) -> Result<Option<Path>> {
292        self.remote_store_path_prefix
293            .as_ref()
294            .map(|pb| Ok(Path::from(pb.as_str())))
295            .transpose()
296    }
297}
298
299pub struct TaskContext {
300    pub config: TaskConfig,
301    pub job_config: Arc<JobConfig>,
302    pub checkpoint_dir: Arc<TempDir>,
303    pub metrics: AnalyticsMetrics,
304    pub lazy_package_cache: Arc<Mutex<LazyPackageCache>>,
305}
306
307impl TaskContext {
308    pub fn checkpoint_dir_path(&self) -> &std::path::Path {
309        self.checkpoint_dir.path()
310    }
311
312    pub fn task_name(&self) -> &str {
313        &self.config.task_name
314    }
315
316    pub async fn create_analytics_processor(self) -> Result<Processor> {
317        match &self.config.file_type {
318            FileType::Checkpoint => {
319                self.create_processor_for_handler(Box::new(CheckpointHandler::new()))
320                    .await
321            }
322            FileType::Object => {
323                let package_id_filter = self.config.package_id_filter.clone();
324                let package_cache = self
325                    .lazy_package_cache
326                    .lock()
327                    .unwrap()
328                    .initialize_or_get_cache();
329                let metrics = self.metrics.clone();
330                self.create_processor_for_handler(Box::new(ObjectHandler::new(
331                    package_cache,
332                    &package_id_filter,
333                    metrics,
334                )))
335                .await
336            }
337            FileType::Transaction => {
338                self.create_processor_for_handler(Box::new(TransactionHandler::new()))
339                    .await
340            }
341            FileType::TransactionBCS => {
342                self.create_processor_for_handler(Box::new(TransactionBCSHandler::new()))
343                    .await
344            }
345            FileType::Event => {
346                let package_cache = self
347                    .lazy_package_cache
348                    .lock()
349                    .unwrap()
350                    .initialize_or_get_cache();
351                self.create_processor_for_handler(Box::new(EventHandler::new(package_cache)))
352                    .await
353            }
354            FileType::TransactionObjects => {
355                self.create_processor_for_handler(Box::new(TransactionObjectsHandler::new()))
356                    .await
357            }
358            FileType::MoveCall => {
359                self.create_processor_for_handler(Box::new(MoveCallHandler::new()))
360                    .await
361            }
362            FileType::MovePackage => {
363                self.create_processor_for_handler(Box::new(PackageHandler::new()))
364                    .await
365            }
366            FileType::MovePackageBCS => {
367                self.create_processor_for_handler(Box::new(PackageBCSHandler::new()))
368                    .await
369            }
370            FileType::DynamicField => {
371                let package_cache = self
372                    .lazy_package_cache
373                    .lock()
374                    .unwrap()
375                    .initialize_or_get_cache();
376                self.create_processor_for_handler(Box::new(DynamicFieldHandler::new(package_cache)))
377                    .await
378            }
379            FileType::WrappedObject => {
380                let package_cache = self
381                    .lazy_package_cache
382                    .lock()
383                    .unwrap()
384                    .initialize_or_get_cache();
385                let metrics = self.metrics.clone();
386                self.create_processor_for_handler(Box::new(WrappedObjectHandler::new(
387                    package_cache,
388                    metrics,
389                )))
390                .await
391            }
392        }
393    }
394
395    async fn create_processor_for_handler<
396        T: Serialize + Clone + ParquetSchema + Send + Sync + 'static,
397    >(
398        self,
399        handler: Box<dyn AnalyticsHandler<T>>,
400    ) -> Result<Processor> {
401        let starting_checkpoint_seq_num = self.get_starting_checkpoint_seq_num().await?;
402        let writer = self.make_writer::<T>(starting_checkpoint_seq_num)?;
403        let max_checkpoint_reader = self.make_max_checkpoint_reader().await?;
404        Processor::new::<T>(
405            handler,
406            writer,
407            max_checkpoint_reader,
408            starting_checkpoint_seq_num,
409            self,
410        )
411        .await
412    }
413
414    async fn get_starting_checkpoint_seq_num(&self) -> Result<u64> {
415        let remote_latest = read_store_for_checkpoint(
416            &self.job_config.remote_store_config,
417            self.config.file_type,
418            self.config.remote_store_path_prefix()?.as_ref(),
419        )
420        .await?;
421
422        Ok(self
423            .config
424            .starting_checkpoint_seq_num
425            .map_or(remote_latest, |start| start.max(remote_latest)))
426    }
427
428    fn make_writer<S: Serialize + ParquetSchema>(
429        &self,
430        starting_checkpoint_seq_num: u64,
431    ) -> Result<Box<dyn AnalyticsWriter<S>>> {
432        Ok(match self.config.file_format {
433            FileFormat::CSV => Box::new(CSVWriter::new(
434                self.checkpoint_dir_path(),
435                self.config.file_type,
436                starting_checkpoint_seq_num,
437            )?),
438            FileFormat::PARQUET => Box::new(ParquetWriter::new(
439                self.checkpoint_dir_path(),
440                self.config.file_type,
441                starting_checkpoint_seq_num,
442            )?),
443        })
444    }
445
446    async fn make_max_checkpoint_reader(&self) -> Result<Box<dyn MaxCheckpointReader>> {
447        let res: Box<dyn MaxCheckpointReader> = if self.config.report_bq_max_table_checkpoint {
448            Box::new(
449                BQMaxCheckpointReader::new(
450                    self.job_config
451                        .bq_service_account_key_file
452                        .as_ref()
453                        .ok_or(anyhow!("Missing gcp key file"))?,
454                    self.job_config
455                        .bq_project_id
456                        .as_ref()
457                        .ok_or(anyhow!("Missing big query project id"))?,
458                    self.job_config
459                        .bq_dataset_id
460                        .as_ref()
461                        .ok_or(anyhow!("Missing big query dataset id"))?,
462                    self.config
463                        .bq_table_id
464                        .as_ref()
465                        .ok_or(anyhow!("Missing big query table id"))?,
466                    self.config
467                        .bq_checkpoint_col_id
468                        .as_ref()
469                        .ok_or(anyhow!("Missing big query checkpoint col id"))?,
470                )
471                .await?,
472            )
473        } else if self.config.report_sf_max_table_checkpoint {
474            Box::new(
475                SnowflakeMaxCheckpointReader::new(
476                    self.job_config
477                        .sf_account_identifier
478                        .as_ref()
479                        .ok_or(anyhow!("Missing sf account identifier"))?,
480                    self.job_config
481                        .sf_warehouse
482                        .as_ref()
483                        .ok_or(anyhow!("Missing sf warehouse"))?,
484                    self.job_config
485                        .sf_database
486                        .as_ref()
487                        .ok_or(anyhow!("Missing sf database"))?,
488                    self.job_config
489                        .sf_schema
490                        .as_ref()
491                        .ok_or(anyhow!("Missing sf schema"))?,
492                    self.job_config
493                        .sf_username
494                        .as_ref()
495                        .ok_or(anyhow!("Missing sf username"))?,
496                    self.job_config
497                        .sf_role
498                        .as_ref()
499                        .ok_or(anyhow!("Missing sf role"))?,
500                    &load_password(
501                        self.job_config
502                            .sf_password_file
503                            .as_ref()
504                            .ok_or(anyhow!("Missing sf password"))?,
505                    )?,
506                    self.config
507                        .sf_table_id
508                        .as_ref()
509                        .ok_or(anyhow!("Missing sf table id"))?,
510                    self.config
511                        .sf_checkpoint_col_id
512                        .as_ref()
513                        .ok_or(anyhow!("Missing sf checkpoint col id"))?,
514                )
515                .await?,
516            )
517        } else {
518            Box::new(NoOpCheckpointReader {})
519        };
520        Ok(res)
521    }
522}
523
524#[async_trait::async_trait]
525pub trait MaxCheckpointReader: Send + Sync + 'static {
526    async fn max_checkpoint(&self) -> Result<i64>;
527}
528
529struct SnowflakeMaxCheckpointReader {
530    query: String,
531    api: SnowflakeApi,
532}
533
534impl SnowflakeMaxCheckpointReader {
535    pub async fn new(
536        account_identifier: &str,
537        warehouse: &str,
538        database: &str,
539        schema: &str,
540        user: &str,
541        role: &str,
542        passwd: &str,
543        table_id: &str,
544        col_id: &str,
545    ) -> anyhow::Result<Self> {
546        let api = SnowflakeApi::with_password_auth(
547            account_identifier,
548            Some(warehouse),
549            Some(database),
550            Some(schema),
551            user,
552            Some(role),
553            passwd,
554        )
555        .expect("Failed to build sf api client");
556        Ok(SnowflakeMaxCheckpointReader {
557            query: format!("SELECT max({}) from {}", col_id, table_id),
558            api,
559        })
560    }
561}
562
563#[async_trait::async_trait]
564impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
565    async fn max_checkpoint(&self) -> Result<i64> {
566        let res = self.api.exec(&self.query).await?;
567        match res {
568            QueryResult::Arrow(a) => {
569                if let Some(record_batch) = a.first() {
570                    let col = record_batch.column(0);
571                    let col_array = col
572                        .as_any()
573                        .downcast_ref::<Int32Array>()
574                        .expect("Failed to downcast arrow column");
575                    Ok(col_array.value(0) as i64)
576                } else {
577                    Ok(-1)
578                }
579            }
580            QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
581            QueryResult::Empty => Err(anyhow!("Unexpected query result")),
582        }
583    }
584}
585
586struct BQMaxCheckpointReader {
587    query: String,
588    project_id: String,
589    client: Client,
590}
591
592impl BQMaxCheckpointReader {
593    pub async fn new(
594        key_path: &str,
595        project_id: &str,
596        dataset_id: &str,
597        table_id: &str,
598        col_id: &str,
599    ) -> anyhow::Result<Self> {
600        Ok(BQMaxCheckpointReader {
601            query: format!(
602                "SELECT max({}) from `{}.{}.{}`",
603                col_id, project_id, dataset_id, table_id
604            ),
605            client: Client::from_service_account_key_file(key_path).await?,
606            project_id: project_id.to_string(),
607        })
608    }
609}
610
611#[async_trait::async_trait]
612impl MaxCheckpointReader for BQMaxCheckpointReader {
613    async fn max_checkpoint(&self) -> Result<i64> {
614        let result = self
615            .client
616            .job()
617            .query(&self.project_id, QueryRequest::new(&self.query))
618            .await?;
619        let mut result_set = ResultSet::new_from_query_response(result);
620        if result_set.next_row() {
621            let max_checkpoint = result_set.get_i64(0)?.ok_or(anyhow!("No rows returned"))?;
622            Ok(max_checkpoint)
623        } else {
624            Ok(-1)
625        }
626    }
627}
628
629struct NoOpCheckpointReader;
630
631#[async_trait::async_trait]
632impl MaxCheckpointReader for NoOpCheckpointReader {
633    async fn max_checkpoint(&self) -> Result<i64> {
634        Ok(-1)
635    }
636}
637
638#[derive(
639    Copy,
640    Clone,
641    Debug,
642    Eq,
643    PartialEq,
644    strum_macros::Display,
645    Serialize,
646    Deserialize,
647    TryFromPrimitive,
648    IntoPrimitive,
649    EnumIter,
650)]
651#[repr(u8)]
652pub enum FileFormat {
653    CSV = 0,
654    PARQUET = 1,
655}
656
657impl FileFormat {
658    pub fn file_suffix(&self) -> &str {
659        match self {
660            FileFormat::CSV => "csv",
661            FileFormat::PARQUET => "parquet",
662        }
663    }
664}
665
666#[derive(
667    Copy,
668    Clone,
669    Debug,
670    Eq,
671    PartialEq,
672    Serialize,
673    Deserialize,
674    TryFromPrimitive,
675    IntoPrimitive,
676    EnumIter,
677)]
678#[repr(u8)]
679pub enum FileType {
680    Checkpoint = 0,
681    Object,
682    Transaction,
683    TransactionBCS,
684    TransactionObjects,
685    Event,
686    MoveCall,
687    MovePackage,
688    MovePackageBCS,
689    DynamicField,
690    WrappedObject,
691}
692
693impl FileType {
694    pub fn dir_prefix(&self) -> Path {
695        match self {
696            FileType::Checkpoint => Path::from(CHECKPOINT_DIR_PREFIX),
697            FileType::Transaction => Path::from(TRANSACTION_DIR_PREFIX),
698            FileType::TransactionBCS => Path::from(TRANSACTION_BCS_DIR_PREFIX),
699            FileType::TransactionObjects => Path::from(TRANSACTION_OBJECT_DIR_PREFIX),
700            FileType::Object => Path::from(OBJECT_DIR_PREFIX),
701            FileType::Event => Path::from(EVENT_DIR_PREFIX),
702            FileType::MoveCall => Path::from(MOVE_CALL_PREFIX),
703            FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX),
704            FileType::MovePackageBCS => Path::from(PACKAGE_BCS_DIR_PREFIX),
705            FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX),
706            FileType::WrappedObject => Path::from(WRAPPED_OBJECT_PREFIX),
707        }
708    }
709
710    pub fn file_path(
711        &self,
712        file_format: FileFormat,
713        epoch_num: EpochId,
714        checkpoint_range: Range<u64>,
715    ) -> Path {
716        self.dir_prefix()
717            .child(format!("{}{}", EPOCH_DIR_PREFIX, epoch_num))
718            .child(format!(
719                "{}_{}.{}",
720                checkpoint_range.start,
721                checkpoint_range.end,
722                file_format.file_suffix()
723            ))
724    }
725}
726
727pub enum ParquetValue {
728    U64(u64),
729    Str(String),
730    Bool(bool),
731    I64(i64),
732    OptionU64(Option<u64>),
733    OptionStr(Option<String>),
734}
735
736impl From<u64> for ParquetValue {
737    fn from(value: u64) -> Self {
738        Self::U64(value)
739    }
740}
741
742impl From<i64> for ParquetValue {
743    fn from(value: i64) -> Self {
744        Self::I64(value)
745    }
746}
747
748impl From<String> for ParquetValue {
749    fn from(value: String) -> Self {
750        Self::Str(value)
751    }
752}
753
754impl From<Option<u64>> for ParquetValue {
755    fn from(value: Option<u64>) -> Self {
756        Self::OptionU64(value)
757    }
758}
759
760impl From<Option<String>> for ParquetValue {
761    fn from(value: Option<String>) -> Self {
762        Self::OptionStr(value)
763    }
764}
765
766impl From<bool> for ParquetValue {
767    fn from(value: bool) -> Self {
768        Self::Bool(value)
769    }
770}
771
772impl From<OwnerType> for ParquetValue {
773    fn from(value: OwnerType) -> Self {
774        Self::Str(value.to_string())
775    }
776}
777
778impl From<Option<OwnerType>> for ParquetValue {
779    fn from(value: Option<OwnerType>) -> Self {
780        value.map(|v| v.to_string()).into()
781    }
782}
783
784impl From<ObjectStatus> for ParquetValue {
785    fn from(value: ObjectStatus) -> Self {
786        Self::Str(value.to_string())
787    }
788}
789
790impl From<Option<ObjectStatus>> for ParquetValue {
791    fn from(value: Option<ObjectStatus>) -> Self {
792        Self::OptionStr(value.map(|v| v.to_string()))
793    }
794}
795
796impl From<Option<InputObjectKind>> for ParquetValue {
797    fn from(value: Option<InputObjectKind>) -> Self {
798        Self::OptionStr(value.map(|v| v.to_string()))
799    }
800}
801
802impl From<DynamicFieldType> for ParquetValue {
803    fn from(value: DynamicFieldType) -> Self {
804        Self::Str(value.to_string())
805    }
806}
807
808impl From<Option<DynamicFieldType>> for ParquetValue {
809    fn from(value: Option<DynamicFieldType>) -> Self {
810        Self::OptionStr(value.map(|v| v.to_string()))
811    }
812}
813
814pub trait ParquetSchema {
815    fn schema() -> Vec<String>;
816
817    fn get_column(&self, idx: usize) -> ParquetValue;
818}
819
820#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
821pub struct FileMetadata {
822    pub file_type: FileType,
823    pub file_format: FileFormat,
824    pub epoch_num: u64,
825    pub checkpoint_seq_range: Range<u64>,
826}
827
828impl FileMetadata {
829    fn new(
830        file_type: FileType,
831        file_format: FileFormat,
832        epoch_num: u64,
833        checkpoint_seq_range: Range<u64>,
834    ) -> FileMetadata {
835        FileMetadata {
836            file_type,
837            file_format,
838            epoch_num,
839            checkpoint_seq_range,
840        }
841    }
842
843    pub fn file_path(&self) -> Path {
844        self.file_type.file_path(
845            self.file_format,
846            self.epoch_num,
847            self.checkpoint_seq_range.clone(),
848        )
849    }
850}
851
852pub struct Processor {
853    pub processor: Box<dyn Worker<Result = ()>>,
854    pub starting_checkpoint_seq_num: CheckpointSequenceNumber,
855    pub task_name: String,
856    pub file_type: FileType,
857}
858
859#[async_trait::async_trait]
860impl Worker for Processor {
861    type Result = ();
862
863    #[inline]
864    async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
865        self.processor.process_checkpoint_arc(checkpoint_data).await
866    }
867}
868
869impl Processor {
870    pub async fn new<S: Serialize + ParquetSchema + Send + Sync + 'static>(
871        handler: Box<dyn AnalyticsHandler<S>>,
872        writer: Box<dyn AnalyticsWriter<S>>,
873        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
874        starting_checkpoint_seq_num: CheckpointSequenceNumber,
875        task: TaskContext,
876    ) -> Result<Self> {
877        let task_name = task.config.task_name.clone();
878        let file_type = task.config.file_type;
879        let processor = Box::new(
880            AnalyticsProcessor::new(
881                handler,
882                writer,
883                max_checkpoint_reader,
884                starting_checkpoint_seq_num,
885                task,
886            )
887            .await?,
888        );
889
890        Ok(Processor {
891            processor,
892            starting_checkpoint_seq_num,
893            task_name,
894            file_type,
895        })
896    }
897
898    pub fn last_committed_checkpoint(&self) -> Option<u64> {
899        Some(self.starting_checkpoint_seq_num.saturating_sub(1)).filter(|x| *x > 0)
900    }
901}
902
903pub async fn read_store_for_checkpoint(
904    remote_store_config: &ObjectStoreConfig,
905    file_type: FileType,
906    dir_prefix: Option<&Path>,
907) -> Result<CheckpointSequenceNumber> {
908    let remote_object_store = remote_store_config.make()?;
909    let remote_store_is_empty = remote_object_store
910        .list_with_delimiter(None)
911        .await
912        .expect("Failed to read remote analytics store")
913        .common_prefixes
914        .is_empty();
915    info!("Remote store is empty: {remote_store_is_empty}");
916    let file_type_prefix = file_type.dir_prefix();
917    let prefix = join_paths(dir_prefix, &file_type_prefix);
918    let epoch_dirs = find_all_dirs_with_epoch_prefix(&remote_object_store, Some(&prefix)).await?;
919    let epoch = epoch_dirs.last_key_value().map(|(k, _v)| *k).unwrap_or(0);
920    let epoch_prefix = prefix.child(format!("epoch_{}", epoch));
921    let checkpoints =
922        find_all_files_with_epoch_prefix(&remote_object_store, Some(&epoch_prefix)).await?;
923    let next_checkpoint_seq_num = checkpoints
924        .iter()
925        .max_by(|x, y| x.end.cmp(&y.end))
926        .map(|r| r.end)
927        .unwrap_or(0);
928    Ok(next_checkpoint_seq_num)
929}
930
931pub fn join_paths(base: Option<&Path>, child: &Path) -> Path {
932    base.map(|p| {
933        let mut out_path = p.clone();
934        for part in child.parts() {
935            out_path = out_path.child(part)
936        }
937        out_path
938    })
939    .unwrap_or(child.clone())
940}
941
942fn load_password(path: &str) -> anyhow::Result<String> {
943    let contents = fs::read_to_string(std::path::Path::new(path))?;
944    Ok(contents.trim().to_string())
945}