1use std::collections::HashSet;
5use std::fs;
6use std::ops::Range;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, Int32Array};
12use gcp_bigquery_client::Client;
13use gcp_bigquery_client::model::query_request::QueryRequest;
14use handlers::package_bcs_handler::PackageBCSHandler;
15use handlers::transaction_bcs_handler::TransactionBCSHandler;
16use num_enum::IntoPrimitive;
17use num_enum::TryFromPrimitive;
18use object_store::path::Path;
19use once_cell::sync::Lazy;
20use package_store::{LazyPackageCache, PackageCache};
21use serde::{Deserialize, Serialize};
22use snowflake_api::{QueryResult, SnowflakeApi};
23use strum_macros::EnumIter;
24use tempfile::TempDir;
25use tracing::info;
26
27use sui_config::object_storage_config::ObjectStoreConfig;
28use sui_data_ingestion_core::Worker;
29use sui_storage::object_store::util::{
30 find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix,
31};
32use sui_types::base_types::EpochId;
33use sui_types::dynamic_field::DynamicFieldType;
34use sui_types::full_checkpoint_content::CheckpointData;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36
37use crate::analytics_metrics::AnalyticsMetrics;
38use crate::analytics_processor::AnalyticsProcessor;
39use crate::handlers::AnalyticsHandler;
40use crate::handlers::checkpoint_handler::CheckpointHandler;
41use crate::handlers::df_handler::DynamicFieldHandler;
42use crate::handlers::event_handler::EventHandler;
43use crate::handlers::move_call_handler::MoveCallHandler;
44use crate::handlers::object_handler::ObjectHandler;
45use crate::handlers::package_handler::PackageHandler;
46use crate::handlers::transaction_handler::TransactionHandler;
47use crate::handlers::transaction_objects_handler::TransactionObjectsHandler;
48use crate::handlers::wrapped_object_handler::WrappedObjectHandler;
49use crate::tables::{InputObjectKind, ObjectStatus, OwnerType};
50use crate::writers::AnalyticsWriter;
51use crate::writers::csv_writer::CSVWriter;
52use crate::writers::parquet_writer::ParquetWriter;
53use gcp_bigquery_client::model::query_response::ResultSet;
54
55pub mod analytics_metrics;
56pub mod analytics_processor;
57pub mod errors;
58mod handlers;
59pub mod package_store;
60pub mod tables;
61mod writers;
62
63const EPOCH_DIR_PREFIX: &str = "epoch_";
64const CHECKPOINT_DIR_PREFIX: &str = "checkpoints";
65const OBJECT_DIR_PREFIX: &str = "objects";
66const TRANSACTION_DIR_PREFIX: &str = "transactions";
67const TRANSACTION_BCS_DIR_PREFIX: &str = "transaction_bcs";
68const EVENT_DIR_PREFIX: &str = "events";
69const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects";
70const MOVE_CALL_PREFIX: &str = "move_call";
71const MOVE_PACKAGE_PREFIX: &str = "move_package";
72const PACKAGE_BCS_DIR_PREFIX: &str = "move_package_bcs";
73const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field";
74
75const WRAPPED_OBJECT_PREFIX: &str = "wrapped_object";
76
77const TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME: &str = "TRANSACTION_CONCURRENCY_LIMIT";
78const DEFAULT_TRANSACTION_CONCURRENCY_LIMIT: usize = 64;
79pub static TRANSACTION_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
80 let async_transactions_opt = std::env::var(TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME)
81 .ok()
82 .and_then(|s| s.parse().ok());
83 if let Some(async_transactions) = async_transactions_opt {
84 info!(
85 "Using custom value for '{}' max checkpoints in progress: {}",
86 TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME, async_transactions
87 );
88 async_transactions
89 } else {
90 info!(
91 "Using default value for '{}' -- max checkpoints in progress: {}",
92 TRANSACTION_CONCURRENCY_LIMIT_VAR_NAME, DEFAULT_TRANSACTION_CONCURRENCY_LIMIT
93 );
94 DEFAULT_TRANSACTION_CONCURRENCY_LIMIT
95 }
96});
97
98fn default_client_metric_host() -> String {
99 "127.0.0.1".to_string()
100}
101
102fn default_client_metric_port() -> u16 {
103 8081
104}
105
106fn default_checkpoint_root() -> PathBuf {
107 PathBuf::from("/tmp")
108}
109
110fn default_batch_size() -> usize {
111 10
112}
113
114fn default_data_limit() -> usize {
115 100
116}
117
118fn default_remote_store_url() -> String {
119 "https://checkpoints.mainnet.sui.io".to_string()
120}
121
122fn default_remote_store_timeout_secs() -> u64 {
123 5
124}
125
126fn default_package_cache_path() -> PathBuf {
127 PathBuf::from("/opt/sui/db/package_cache")
128}
129
130fn default_file_format() -> FileFormat {
131 FileFormat::CSV
132}
133
134fn default_checkpoint_interval() -> u64 {
135 10000
136}
137
138fn default_max_file_size_mb() -> u64 {
139 100
140}
141
142fn default_max_row_count() -> usize {
143 100000
144}
145
146fn default_time_interval_s() -> u64 {
147 600
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct JobConfig {
152 pub rest_url: String,
154 #[serde(default = "default_client_metric_host")]
156 pub client_metric_host: String,
157 #[serde(default = "default_client_metric_port")]
159 pub client_metric_port: u16,
160 pub remote_store_config: ObjectStoreConfig,
162 #[serde(default = "default_batch_size")]
164 pub batch_size: usize,
165 #[serde(default = "default_data_limit")]
167 pub data_limit: usize,
168 #[serde(default = "default_remote_store_url")]
170 pub remote_store_url: String,
171 #[serde(default)]
174 pub remote_store_options: Vec<(String, String)>,
175 #[serde(default = "default_remote_store_timeout_secs")]
177 pub remote_store_timeout_secs: u64,
178 #[serde(default = "default_package_cache_path")]
180 pub package_cache_path: PathBuf,
181 #[serde(default = "default_checkpoint_root")]
183 pub checkpoint_root: PathBuf,
184 pub bq_service_account_key_file: Option<String>,
185 pub bq_project_id: Option<String>,
186 pub bq_dataset_id: Option<String>,
187 pub sf_account_identifier: Option<String>,
188 pub sf_warehouse: Option<String>,
189 pub sf_database: Option<String>,
190 pub sf_schema: Option<String>,
191 pub sf_username: Option<String>,
192 pub sf_role: Option<String>,
193 pub sf_password_file: Option<String>,
194
195 #[serde(rename = "tasks")]
197 task_configs: Vec<TaskConfig>,
198}
199
200impl JobConfig {
201 pub async fn create_checkpoint_processors(
202 self,
203 metrics: AnalyticsMetrics,
204 ) -> Result<(Vec<Processor>, Option<Arc<PackageCache>>)> {
205 use crate::package_store::LazyPackageCache;
206 use std::sync::Mutex;
207
208 let lazy_package_cache = Arc::new(Mutex::new(LazyPackageCache::new(
209 self.package_cache_path.clone(),
210 self.rest_url.clone(),
211 )));
212
213 let job_config = Arc::new(self);
214 let mut processors = Vec::with_capacity(job_config.task_configs.len());
215 let mut task_names = HashSet::new();
216
217 for task_config in job_config.task_configs.clone() {
218 let task_name = &task_config.task_name;
219
220 if !task_names.insert(task_name.clone()) {
221 return Err(anyhow!("Duplicate task_name '{}' found", task_name));
222 }
223
224 let temp_dir = tempfile::Builder::new()
225 .prefix(&format!("{}-work-dir", task_name))
226 .tempdir_in(&job_config.checkpoint_root)?;
227
228 let task_context = TaskContext {
229 job_config: Arc::clone(&job_config),
230 config: task_config,
231 checkpoint_dir: Arc::new(temp_dir),
232 metrics: metrics.clone(),
233 lazy_package_cache: lazy_package_cache.clone(),
234 };
235
236 processors.push(task_context.create_analytics_processor().await?);
237 }
238
239 let package_cache = lazy_package_cache
240 .lock()
241 .unwrap()
242 .get_cache_if_initialized();
243
244 Ok((processors, package_cache))
245 }
246
247 pub fn task_configs(&self) -> &[TaskConfig] {
249 &self.task_configs
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct TaskConfig {
255 pub task_name: String,
257 pub file_type: FileType,
259 #[serde(default = "default_file_format")]
261 pub file_format: FileFormat,
262 #[serde(default = "default_checkpoint_interval")]
264 pub checkpoint_interval: u64,
265 #[serde(default = "default_max_file_size_mb")]
267 pub max_file_size_mb: u64,
268 #[serde(default = "default_max_row_count")]
270 pub max_row_count: usize,
271 pub starting_checkpoint_seq_num: Option<u64>,
273 #[serde(default = "default_time_interval_s")]
275 pub time_interval_s: u64,
276 #[serde(default)]
278 remote_store_path_prefix: Option<String>,
279 pub bq_table_id: Option<String>,
280 pub bq_checkpoint_col_id: Option<String>,
281 #[serde(default)]
282 pub report_bq_max_table_checkpoint: bool,
283 pub sf_table_id: Option<String>,
284 pub sf_checkpoint_col_id: Option<String>,
285 #[serde(default)]
286 pub report_sf_max_table_checkpoint: bool,
287 pub package_id_filter: Option<String>,
288}
289
290impl TaskConfig {
291 pub fn remote_store_path_prefix(&self) -> Result<Option<Path>> {
292 self.remote_store_path_prefix
293 .as_ref()
294 .map(|pb| Ok(Path::from(pb.as_str())))
295 .transpose()
296 }
297}
298
299pub struct TaskContext {
300 pub config: TaskConfig,
301 pub job_config: Arc<JobConfig>,
302 pub checkpoint_dir: Arc<TempDir>,
303 pub metrics: AnalyticsMetrics,
304 pub lazy_package_cache: Arc<Mutex<LazyPackageCache>>,
305}
306
307impl TaskContext {
308 pub fn checkpoint_dir_path(&self) -> &std::path::Path {
309 self.checkpoint_dir.path()
310 }
311
312 pub fn task_name(&self) -> &str {
313 &self.config.task_name
314 }
315
316 pub async fn create_analytics_processor(self) -> Result<Processor> {
317 match &self.config.file_type {
318 FileType::Checkpoint => {
319 self.create_processor_for_handler(Box::new(CheckpointHandler::new()))
320 .await
321 }
322 FileType::Object => {
323 let package_id_filter = self.config.package_id_filter.clone();
324 let package_cache = self
325 .lazy_package_cache
326 .lock()
327 .unwrap()
328 .initialize_or_get_cache();
329 let metrics = self.metrics.clone();
330 self.create_processor_for_handler(Box::new(ObjectHandler::new(
331 package_cache,
332 &package_id_filter,
333 metrics,
334 )))
335 .await
336 }
337 FileType::Transaction => {
338 self.create_processor_for_handler(Box::new(TransactionHandler::new()))
339 .await
340 }
341 FileType::TransactionBCS => {
342 self.create_processor_for_handler(Box::new(TransactionBCSHandler::new()))
343 .await
344 }
345 FileType::Event => {
346 let package_cache = self
347 .lazy_package_cache
348 .lock()
349 .unwrap()
350 .initialize_or_get_cache();
351 self.create_processor_for_handler(Box::new(EventHandler::new(package_cache)))
352 .await
353 }
354 FileType::TransactionObjects => {
355 self.create_processor_for_handler(Box::new(TransactionObjectsHandler::new()))
356 .await
357 }
358 FileType::MoveCall => {
359 self.create_processor_for_handler(Box::new(MoveCallHandler::new()))
360 .await
361 }
362 FileType::MovePackage => {
363 self.create_processor_for_handler(Box::new(PackageHandler::new()))
364 .await
365 }
366 FileType::MovePackageBCS => {
367 self.create_processor_for_handler(Box::new(PackageBCSHandler::new()))
368 .await
369 }
370 FileType::DynamicField => {
371 let package_cache = self
372 .lazy_package_cache
373 .lock()
374 .unwrap()
375 .initialize_or_get_cache();
376 self.create_processor_for_handler(Box::new(DynamicFieldHandler::new(package_cache)))
377 .await
378 }
379 FileType::WrappedObject => {
380 let package_cache = self
381 .lazy_package_cache
382 .lock()
383 .unwrap()
384 .initialize_or_get_cache();
385 let metrics = self.metrics.clone();
386 self.create_processor_for_handler(Box::new(WrappedObjectHandler::new(
387 package_cache,
388 metrics,
389 )))
390 .await
391 }
392 }
393 }
394
395 async fn create_processor_for_handler<
396 T: Serialize + Clone + ParquetSchema + Send + Sync + 'static,
397 >(
398 self,
399 handler: Box<dyn AnalyticsHandler<T>>,
400 ) -> Result<Processor> {
401 let starting_checkpoint_seq_num = self.get_starting_checkpoint_seq_num().await?;
402 let writer = self.make_writer::<T>(starting_checkpoint_seq_num)?;
403 let max_checkpoint_reader = self.make_max_checkpoint_reader().await?;
404 Processor::new::<T>(
405 handler,
406 writer,
407 max_checkpoint_reader,
408 starting_checkpoint_seq_num,
409 self,
410 )
411 .await
412 }
413
414 async fn get_starting_checkpoint_seq_num(&self) -> Result<u64> {
415 let remote_latest = read_store_for_checkpoint(
416 &self.job_config.remote_store_config,
417 self.config.file_type,
418 self.config.remote_store_path_prefix()?.as_ref(),
419 )
420 .await?;
421
422 Ok(self
423 .config
424 .starting_checkpoint_seq_num
425 .map_or(remote_latest, |start| start.max(remote_latest)))
426 }
427
428 fn make_writer<S: Serialize + ParquetSchema>(
429 &self,
430 starting_checkpoint_seq_num: u64,
431 ) -> Result<Box<dyn AnalyticsWriter<S>>> {
432 Ok(match self.config.file_format {
433 FileFormat::CSV => Box::new(CSVWriter::new(
434 self.checkpoint_dir_path(),
435 self.config.file_type,
436 starting_checkpoint_seq_num,
437 )?),
438 FileFormat::PARQUET => Box::new(ParquetWriter::new(
439 self.checkpoint_dir_path(),
440 self.config.file_type,
441 starting_checkpoint_seq_num,
442 )?),
443 })
444 }
445
446 async fn make_max_checkpoint_reader(&self) -> Result<Box<dyn MaxCheckpointReader>> {
447 let res: Box<dyn MaxCheckpointReader> = if self.config.report_bq_max_table_checkpoint {
448 Box::new(
449 BQMaxCheckpointReader::new(
450 self.job_config
451 .bq_service_account_key_file
452 .as_ref()
453 .ok_or(anyhow!("Missing gcp key file"))?,
454 self.job_config
455 .bq_project_id
456 .as_ref()
457 .ok_or(anyhow!("Missing big query project id"))?,
458 self.job_config
459 .bq_dataset_id
460 .as_ref()
461 .ok_or(anyhow!("Missing big query dataset id"))?,
462 self.config
463 .bq_table_id
464 .as_ref()
465 .ok_or(anyhow!("Missing big query table id"))?,
466 self.config
467 .bq_checkpoint_col_id
468 .as_ref()
469 .ok_or(anyhow!("Missing big query checkpoint col id"))?,
470 )
471 .await?,
472 )
473 } else if self.config.report_sf_max_table_checkpoint {
474 Box::new(
475 SnowflakeMaxCheckpointReader::new(
476 self.job_config
477 .sf_account_identifier
478 .as_ref()
479 .ok_or(anyhow!("Missing sf account identifier"))?,
480 self.job_config
481 .sf_warehouse
482 .as_ref()
483 .ok_or(anyhow!("Missing sf warehouse"))?,
484 self.job_config
485 .sf_database
486 .as_ref()
487 .ok_or(anyhow!("Missing sf database"))?,
488 self.job_config
489 .sf_schema
490 .as_ref()
491 .ok_or(anyhow!("Missing sf schema"))?,
492 self.job_config
493 .sf_username
494 .as_ref()
495 .ok_or(anyhow!("Missing sf username"))?,
496 self.job_config
497 .sf_role
498 .as_ref()
499 .ok_or(anyhow!("Missing sf role"))?,
500 &load_password(
501 self.job_config
502 .sf_password_file
503 .as_ref()
504 .ok_or(anyhow!("Missing sf password"))?,
505 )?,
506 self.config
507 .sf_table_id
508 .as_ref()
509 .ok_or(anyhow!("Missing sf table id"))?,
510 self.config
511 .sf_checkpoint_col_id
512 .as_ref()
513 .ok_or(anyhow!("Missing sf checkpoint col id"))?,
514 )
515 .await?,
516 )
517 } else {
518 Box::new(NoOpCheckpointReader {})
519 };
520 Ok(res)
521 }
522}
523
524#[async_trait::async_trait]
525pub trait MaxCheckpointReader: Send + Sync + 'static {
526 async fn max_checkpoint(&self) -> Result<i64>;
527}
528
529struct SnowflakeMaxCheckpointReader {
530 query: String,
531 api: SnowflakeApi,
532}
533
534impl SnowflakeMaxCheckpointReader {
535 pub async fn new(
536 account_identifier: &str,
537 warehouse: &str,
538 database: &str,
539 schema: &str,
540 user: &str,
541 role: &str,
542 passwd: &str,
543 table_id: &str,
544 col_id: &str,
545 ) -> anyhow::Result<Self> {
546 let api = SnowflakeApi::with_password_auth(
547 account_identifier,
548 Some(warehouse),
549 Some(database),
550 Some(schema),
551 user,
552 Some(role),
553 passwd,
554 )
555 .expect("Failed to build sf api client");
556 Ok(SnowflakeMaxCheckpointReader {
557 query: format!("SELECT max({}) from {}", col_id, table_id),
558 api,
559 })
560 }
561}
562
563#[async_trait::async_trait]
564impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
565 async fn max_checkpoint(&self) -> Result<i64> {
566 let res = self.api.exec(&self.query).await?;
567 match res {
568 QueryResult::Arrow(a) => {
569 if let Some(record_batch) = a.first() {
570 let col = record_batch.column(0);
571 let col_array = col
572 .as_any()
573 .downcast_ref::<Int32Array>()
574 .expect("Failed to downcast arrow column");
575 Ok(col_array.value(0) as i64)
576 } else {
577 Ok(-1)
578 }
579 }
580 QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
581 QueryResult::Empty => Err(anyhow!("Unexpected query result")),
582 }
583 }
584}
585
586struct BQMaxCheckpointReader {
587 query: String,
588 project_id: String,
589 client: Client,
590}
591
592impl BQMaxCheckpointReader {
593 pub async fn new(
594 key_path: &str,
595 project_id: &str,
596 dataset_id: &str,
597 table_id: &str,
598 col_id: &str,
599 ) -> anyhow::Result<Self> {
600 Ok(BQMaxCheckpointReader {
601 query: format!(
602 "SELECT max({}) from `{}.{}.{}`",
603 col_id, project_id, dataset_id, table_id
604 ),
605 client: Client::from_service_account_key_file(key_path).await?,
606 project_id: project_id.to_string(),
607 })
608 }
609}
610
611#[async_trait::async_trait]
612impl MaxCheckpointReader for BQMaxCheckpointReader {
613 async fn max_checkpoint(&self) -> Result<i64> {
614 let result = self
615 .client
616 .job()
617 .query(&self.project_id, QueryRequest::new(&self.query))
618 .await?;
619 let mut result_set = ResultSet::new_from_query_response(result);
620 if result_set.next_row() {
621 let max_checkpoint = result_set.get_i64(0)?.ok_or(anyhow!("No rows returned"))?;
622 Ok(max_checkpoint)
623 } else {
624 Ok(-1)
625 }
626 }
627}
628
629struct NoOpCheckpointReader;
630
631#[async_trait::async_trait]
632impl MaxCheckpointReader for NoOpCheckpointReader {
633 async fn max_checkpoint(&self) -> Result<i64> {
634 Ok(-1)
635 }
636}
637
638#[derive(
639 Copy,
640 Clone,
641 Debug,
642 Eq,
643 PartialEq,
644 strum_macros::Display,
645 Serialize,
646 Deserialize,
647 TryFromPrimitive,
648 IntoPrimitive,
649 EnumIter,
650)]
651#[repr(u8)]
652pub enum FileFormat {
653 CSV = 0,
654 PARQUET = 1,
655}
656
657impl FileFormat {
658 pub fn file_suffix(&self) -> &str {
659 match self {
660 FileFormat::CSV => "csv",
661 FileFormat::PARQUET => "parquet",
662 }
663 }
664}
665
666#[derive(
667 Copy,
668 Clone,
669 Debug,
670 Eq,
671 PartialEq,
672 Serialize,
673 Deserialize,
674 TryFromPrimitive,
675 IntoPrimitive,
676 EnumIter,
677)]
678#[repr(u8)]
679pub enum FileType {
680 Checkpoint = 0,
681 Object,
682 Transaction,
683 TransactionBCS,
684 TransactionObjects,
685 Event,
686 MoveCall,
687 MovePackage,
688 MovePackageBCS,
689 DynamicField,
690 WrappedObject,
691}
692
693impl FileType {
694 pub fn dir_prefix(&self) -> Path {
695 match self {
696 FileType::Checkpoint => Path::from(CHECKPOINT_DIR_PREFIX),
697 FileType::Transaction => Path::from(TRANSACTION_DIR_PREFIX),
698 FileType::TransactionBCS => Path::from(TRANSACTION_BCS_DIR_PREFIX),
699 FileType::TransactionObjects => Path::from(TRANSACTION_OBJECT_DIR_PREFIX),
700 FileType::Object => Path::from(OBJECT_DIR_PREFIX),
701 FileType::Event => Path::from(EVENT_DIR_PREFIX),
702 FileType::MoveCall => Path::from(MOVE_CALL_PREFIX),
703 FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX),
704 FileType::MovePackageBCS => Path::from(PACKAGE_BCS_DIR_PREFIX),
705 FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX),
706 FileType::WrappedObject => Path::from(WRAPPED_OBJECT_PREFIX),
707 }
708 }
709
710 pub fn file_path(
711 &self,
712 file_format: FileFormat,
713 epoch_num: EpochId,
714 checkpoint_range: Range<u64>,
715 ) -> Path {
716 self.dir_prefix()
717 .child(format!("{}{}", EPOCH_DIR_PREFIX, epoch_num))
718 .child(format!(
719 "{}_{}.{}",
720 checkpoint_range.start,
721 checkpoint_range.end,
722 file_format.file_suffix()
723 ))
724 }
725}
726
727pub enum ParquetValue {
728 U64(u64),
729 Str(String),
730 Bool(bool),
731 I64(i64),
732 OptionU64(Option<u64>),
733 OptionStr(Option<String>),
734}
735
736impl From<u64> for ParquetValue {
737 fn from(value: u64) -> Self {
738 Self::U64(value)
739 }
740}
741
742impl From<i64> for ParquetValue {
743 fn from(value: i64) -> Self {
744 Self::I64(value)
745 }
746}
747
748impl From<String> for ParquetValue {
749 fn from(value: String) -> Self {
750 Self::Str(value)
751 }
752}
753
754impl From<Option<u64>> for ParquetValue {
755 fn from(value: Option<u64>) -> Self {
756 Self::OptionU64(value)
757 }
758}
759
760impl From<Option<String>> for ParquetValue {
761 fn from(value: Option<String>) -> Self {
762 Self::OptionStr(value)
763 }
764}
765
766impl From<bool> for ParquetValue {
767 fn from(value: bool) -> Self {
768 Self::Bool(value)
769 }
770}
771
772impl From<OwnerType> for ParquetValue {
773 fn from(value: OwnerType) -> Self {
774 Self::Str(value.to_string())
775 }
776}
777
778impl From<Option<OwnerType>> for ParquetValue {
779 fn from(value: Option<OwnerType>) -> Self {
780 value.map(|v| v.to_string()).into()
781 }
782}
783
784impl From<ObjectStatus> for ParquetValue {
785 fn from(value: ObjectStatus) -> Self {
786 Self::Str(value.to_string())
787 }
788}
789
790impl From<Option<ObjectStatus>> for ParquetValue {
791 fn from(value: Option<ObjectStatus>) -> Self {
792 Self::OptionStr(value.map(|v| v.to_string()))
793 }
794}
795
796impl From<Option<InputObjectKind>> for ParquetValue {
797 fn from(value: Option<InputObjectKind>) -> Self {
798 Self::OptionStr(value.map(|v| v.to_string()))
799 }
800}
801
802impl From<DynamicFieldType> for ParquetValue {
803 fn from(value: DynamicFieldType) -> Self {
804 Self::Str(value.to_string())
805 }
806}
807
808impl From<Option<DynamicFieldType>> for ParquetValue {
809 fn from(value: Option<DynamicFieldType>) -> Self {
810 Self::OptionStr(value.map(|v| v.to_string()))
811 }
812}
813
814pub trait ParquetSchema {
815 fn schema() -> Vec<String>;
816
817 fn get_column(&self, idx: usize) -> ParquetValue;
818}
819
820#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
821pub struct FileMetadata {
822 pub file_type: FileType,
823 pub file_format: FileFormat,
824 pub epoch_num: u64,
825 pub checkpoint_seq_range: Range<u64>,
826}
827
828impl FileMetadata {
829 fn new(
830 file_type: FileType,
831 file_format: FileFormat,
832 epoch_num: u64,
833 checkpoint_seq_range: Range<u64>,
834 ) -> FileMetadata {
835 FileMetadata {
836 file_type,
837 file_format,
838 epoch_num,
839 checkpoint_seq_range,
840 }
841 }
842
843 pub fn file_path(&self) -> Path {
844 self.file_type.file_path(
845 self.file_format,
846 self.epoch_num,
847 self.checkpoint_seq_range.clone(),
848 )
849 }
850}
851
852pub struct Processor {
853 pub processor: Box<dyn Worker<Result = ()>>,
854 pub starting_checkpoint_seq_num: CheckpointSequenceNumber,
855 pub task_name: String,
856 pub file_type: FileType,
857}
858
859#[async_trait::async_trait]
860impl Worker for Processor {
861 type Result = ();
862
863 #[inline]
864 async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
865 self.processor.process_checkpoint_arc(checkpoint_data).await
866 }
867}
868
869impl Processor {
870 pub async fn new<S: Serialize + ParquetSchema + Send + Sync + 'static>(
871 handler: Box<dyn AnalyticsHandler<S>>,
872 writer: Box<dyn AnalyticsWriter<S>>,
873 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
874 starting_checkpoint_seq_num: CheckpointSequenceNumber,
875 task: TaskContext,
876 ) -> Result<Self> {
877 let task_name = task.config.task_name.clone();
878 let file_type = task.config.file_type;
879 let processor = Box::new(
880 AnalyticsProcessor::new(
881 handler,
882 writer,
883 max_checkpoint_reader,
884 starting_checkpoint_seq_num,
885 task,
886 )
887 .await?,
888 );
889
890 Ok(Processor {
891 processor,
892 starting_checkpoint_seq_num,
893 task_name,
894 file_type,
895 })
896 }
897
898 pub fn last_committed_checkpoint(&self) -> Option<u64> {
899 Some(self.starting_checkpoint_seq_num.saturating_sub(1)).filter(|x| *x > 0)
900 }
901}
902
903pub async fn read_store_for_checkpoint(
904 remote_store_config: &ObjectStoreConfig,
905 file_type: FileType,
906 dir_prefix: Option<&Path>,
907) -> Result<CheckpointSequenceNumber> {
908 let remote_object_store = remote_store_config.make()?;
909 let remote_store_is_empty = remote_object_store
910 .list_with_delimiter(None)
911 .await
912 .expect("Failed to read remote analytics store")
913 .common_prefixes
914 .is_empty();
915 info!("Remote store is empty: {remote_store_is_empty}");
916 let file_type_prefix = file_type.dir_prefix();
917 let prefix = join_paths(dir_prefix, &file_type_prefix);
918 let epoch_dirs = find_all_dirs_with_epoch_prefix(&remote_object_store, Some(&prefix)).await?;
919 let epoch = epoch_dirs.last_key_value().map(|(k, _v)| *k).unwrap_or(0);
920 let epoch_prefix = prefix.child(format!("epoch_{}", epoch));
921 let checkpoints =
922 find_all_files_with_epoch_prefix(&remote_object_store, Some(&epoch_prefix)).await?;
923 let next_checkpoint_seq_num = checkpoints
924 .iter()
925 .max_by(|x, y| x.end.cmp(&y.end))
926 .map(|r| r.end)
927 .unwrap_or(0);
928 Ok(next_checkpoint_seq_num)
929}
930
931pub fn join_paths(base: Option<&Path>, child: &Path) -> Path {
932 base.map(|p| {
933 let mut out_path = p.clone();
934 for part in child.parts() {
935 out_path = out_path.child(part)
936 }
937 out_path
938 })
939 .unwrap_or(child.clone())
940}
941
942fn load_password(path: &str) -> anyhow::Result<String> {
943 let contents = fs::read_to_string(std::path::Path::new(path))?;
944 Ok(contents.trim().to_string())
945}