sui_analytics_indexer/progress_monitoring/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Progress monitoring for analytics pipelines.
5//!
6//! This module provides traits and implementations for monitoring the progress
7//! of data through analytics pipelines, including checking checkpoints in
8//! external stores like Snowflake.
9
10use std::fs;
11
12use anyhow::Result;
13use anyhow::anyhow;
14use tracing::info;
15
16use crate::config::IndexerConfig;
17use crate::metrics::Metrics;
18
19mod snowflake;
20
21pub use snowflake::SnowflakeMaxCheckpointReader;
22
23/// Trait for reading the maximum checkpoint from an external store.
24#[async_trait::async_trait]
25pub trait MaxCheckpointReader: Send + Sync + 'static {
26    /// Returns the maximum checkpoint number in the store.
27    async fn max_checkpoint(&self) -> Result<i64>;
28}
29
30fn load_password(path: &str) -> Result<String> {
31    Ok(fs::read_to_string(path)?.trim().to_string())
32}
33
34/// Spawns background tasks to monitor Snowflake table checkpoints.
35pub fn spawn_snowflake_monitors(
36    config: &IndexerConfig,
37    metrics: Metrics,
38    cancel: tokio_util::sync::CancellationToken,
39) -> Result<Vec<tokio::task::JoinHandle<()>>> {
40    let mut handles = Vec::new();
41
42    for pipeline_config in config.pipeline_configs() {
43        if !pipeline_config.report_sf_max_table_checkpoint {
44            continue;
45        }
46
47        let sf_table_id = pipeline_config
48            .sf_table_id
49            .as_ref()
50            .ok_or_else(|| {
51                anyhow!(
52                    "Missing sf_table_id for pipeline {}",
53                    pipeline_config.pipeline
54                )
55            })?
56            .clone();
57
58        let sf_checkpoint_col_id = pipeline_config
59            .sf_checkpoint_col_id
60            .as_ref()
61            .ok_or_else(|| {
62                anyhow!(
63                    "Missing sf_checkpoint_col_id for pipeline {}",
64                    pipeline_config.pipeline
65                )
66            })?
67            .clone();
68
69        let account_identifier = config
70            .sf_account_identifier
71            .as_ref()
72            .ok_or_else(|| anyhow!("Missing sf_account_identifier"))?
73            .clone();
74
75        let warehouse = config
76            .sf_warehouse
77            .as_ref()
78            .ok_or_else(|| anyhow!("Missing sf_warehouse"))?
79            .clone();
80
81        let database = config
82            .sf_database
83            .as_ref()
84            .ok_or_else(|| anyhow!("Missing sf_database"))?
85            .clone();
86
87        let schema = config
88            .sf_schema
89            .as_ref()
90            .ok_or_else(|| anyhow!("Missing sf_schema"))?
91            .clone();
92
93        let username = config
94            .sf_username
95            .as_ref()
96            .ok_or_else(|| anyhow!("Missing sf_username"))?
97            .clone();
98
99        let role = config
100            .sf_role
101            .as_ref()
102            .ok_or_else(|| anyhow!("Missing sf_role"))?
103            .clone();
104
105        let password = load_password(
106            config
107                .sf_password_file
108                .as_ref()
109                .ok_or_else(|| anyhow!("Missing sf_password_file"))?,
110        )?;
111
112        let pipeline_name = pipeline_config.pipeline.to_string();
113        let metrics = metrics.clone();
114        let cancel = cancel.clone();
115
116        let handle = tokio::spawn(async move {
117            info!("Starting Snowflake monitor for pipeline: {}", pipeline_name);
118
119            let reader = match SnowflakeMaxCheckpointReader::new(
120                &account_identifier,
121                &warehouse,
122                &database,
123                &schema,
124                &username,
125                &role,
126                &password,
127                &sf_table_id,
128                &sf_checkpoint_col_id,
129            )
130            .await
131            {
132                Ok(r) => r,
133                Err(e) => {
134                    tracing::error!(
135                        "Failed to create Snowflake reader for {}: {}",
136                        pipeline_name,
137                        e
138                    );
139                    return;
140                }
141            };
142
143            let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
144            loop {
145                tokio::select! {
146                    _ = cancel.cancelled() => {
147                        break;
148                    }
149                    _ = interval.tick() => {
150                        match reader.max_checkpoint().await {
151                            Ok(max_cp) => {
152                                metrics
153                                    .max_checkpoint_on_store
154                                    .with_label_values(&[&pipeline_name])
155                                    .set(max_cp);
156                            }
157                            Err(e) => {
158                                tracing::warn!(
159                                    "Failed to query Snowflake max checkpoint for {}: {}",
160                                    pipeline_name,
161                                    e
162                                );
163                            }
164                        }
165                    }
166                }
167            }
168        });
169
170        handles.push(handle);
171    }
172
173    Ok(handles)
174}