sui_analytics_indexer/progress_monitoring/
snowflake.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Snowflake-based checkpoint progress reader.
5
6use anyhow::Result;
7use anyhow::anyhow;
8
9use crate::progress_monitoring::MaxCheckpointReader;
10
11/// Reads the maximum checkpoint from a Snowflake table.
12pub struct SnowflakeMaxCheckpointReader {
13    query: String,
14    api: snowflake_api::SnowflakeApi,
15}
16
17impl SnowflakeMaxCheckpointReader {
18    /// Creates a new Snowflake checkpoint reader.
19    pub async fn new(
20        account_identifier: &str,
21        warehouse: &str,
22        database: &str,
23        schema: &str,
24        user: &str,
25        role: &str,
26        passwd: &str,
27        table_id: &str,
28        col_id: &str,
29    ) -> Result<Self> {
30        let api = snowflake_api::SnowflakeApi::with_password_auth(
31            account_identifier,
32            Some(warehouse),
33            Some(database),
34            Some(schema),
35            user,
36            Some(role),
37            passwd,
38        )
39        .expect("Failed to build sf api client");
40        Ok(SnowflakeMaxCheckpointReader {
41            query: format!("SELECT max({}) from {}", col_id, table_id),
42            api,
43        })
44    }
45}
46
47#[async_trait::async_trait]
48impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
49    async fn max_checkpoint(&self) -> Result<i64> {
50        use arrow::array::Int32Array;
51        use snowflake_api::QueryResult;
52
53        let res = self.api.exec(&self.query).await?;
54        match res {
55            QueryResult::Arrow(a) => {
56                if let Some(record_batch) = a.first() {
57                    let col = record_batch.column(0);
58                    let col_array = col
59                        .as_any()
60                        .downcast_ref::<Int32Array>()
61                        .expect("Failed to downcast arrow column");
62                    Ok(col_array.value(0) as i64)
63                } else {
64                    Ok(-1)
65                }
66            }
67            QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
68            QueryResult::Empty => Err(anyhow!("Unexpected query result")),
69        }
70    }
71}