sui_analytics_indexer/progress_monitoring/
mod.rs1use std::fs;
11
12use anyhow::Result;
13use anyhow::anyhow;
14use tracing::info;
15
16use crate::config::IndexerConfig;
17use crate::metrics::Metrics;
18
19mod snowflake;
20
21pub use snowflake::SnowflakeMaxCheckpointReader;
22
23#[async_trait::async_trait]
25pub trait MaxCheckpointReader: Send + Sync + 'static {
26 async fn max_checkpoint(&self) -> Result<i64>;
28}
29
30fn load_password(path: &str) -> Result<String> {
31 Ok(fs::read_to_string(path)?.trim().to_string())
32}
33
34pub fn spawn_snowflake_monitors(
36 config: &IndexerConfig,
37 metrics: Metrics,
38 cancel: tokio_util::sync::CancellationToken,
39) -> Result<Vec<tokio::task::JoinHandle<()>>> {
40 let mut handles = Vec::new();
41
42 for pipeline_config in config.pipeline_configs() {
43 if !pipeline_config.report_sf_max_table_checkpoint {
44 continue;
45 }
46
47 let sf_table_id = pipeline_config
48 .sf_table_id
49 .as_ref()
50 .ok_or_else(|| {
51 anyhow!(
52 "Missing sf_table_id for pipeline {}",
53 pipeline_config.pipeline
54 )
55 })?
56 .clone();
57
58 let sf_checkpoint_col_id = pipeline_config
59 .sf_checkpoint_col_id
60 .as_ref()
61 .ok_or_else(|| {
62 anyhow!(
63 "Missing sf_checkpoint_col_id for pipeline {}",
64 pipeline_config.pipeline
65 )
66 })?
67 .clone();
68
69 let account_identifier = config
70 .sf_account_identifier
71 .as_ref()
72 .ok_or_else(|| anyhow!("Missing sf_account_identifier"))?
73 .clone();
74
75 let warehouse = config
76 .sf_warehouse
77 .as_ref()
78 .ok_or_else(|| anyhow!("Missing sf_warehouse"))?
79 .clone();
80
81 let database = config
82 .sf_database
83 .as_ref()
84 .ok_or_else(|| anyhow!("Missing sf_database"))?
85 .clone();
86
87 let schema = config
88 .sf_schema
89 .as_ref()
90 .ok_or_else(|| anyhow!("Missing sf_schema"))?
91 .clone();
92
93 let username = config
94 .sf_username
95 .as_ref()
96 .ok_or_else(|| anyhow!("Missing sf_username"))?
97 .clone();
98
99 let role = config
100 .sf_role
101 .as_ref()
102 .ok_or_else(|| anyhow!("Missing sf_role"))?
103 .clone();
104
105 let password = load_password(
106 config
107 .sf_password_file
108 .as_ref()
109 .ok_or_else(|| anyhow!("Missing sf_password_file"))?,
110 )?;
111
112 let pipeline_name = pipeline_config.pipeline.to_string();
113 let metrics = metrics.clone();
114 let cancel = cancel.clone();
115
116 let handle = tokio::spawn(async move {
117 info!("Starting Snowflake monitor for pipeline: {}", pipeline_name);
118
119 let reader = match SnowflakeMaxCheckpointReader::new(
120 &account_identifier,
121 &warehouse,
122 &database,
123 &schema,
124 &username,
125 &role,
126 &password,
127 &sf_table_id,
128 &sf_checkpoint_col_id,
129 )
130 .await
131 {
132 Ok(r) => r,
133 Err(e) => {
134 tracing::error!(
135 "Failed to create Snowflake reader for {}: {}",
136 pipeline_name,
137 e
138 );
139 return;
140 }
141 };
142
143 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
144 loop {
145 tokio::select! {
146 _ = cancel.cancelled() => {
147 break;
148 }
149 _ = interval.tick() => {
150 match reader.max_checkpoint().await {
151 Ok(max_cp) => {
152 metrics
153 .max_checkpoint_on_store
154 .with_label_values(&[&pipeline_name])
155 .set(max_cp);
156 }
157 Err(e) => {
158 tracing::warn!(
159 "Failed to query Snowflake max checkpoint for {}: {}",
160 pipeline_name,
161 e
162 );
163 }
164 }
165 }
166 }
167 }
168 });
169
170 handles.push(handle);
171 }
172
173 Ok(handles)
174}