sui_analytics_indexer/progress_monitoring/
snowflake.rs1use anyhow::Result;
7use anyhow::anyhow;
8
9use crate::progress_monitoring::MaxCheckpointReader;
10
11pub struct SnowflakeMaxCheckpointReader {
13 query: String,
14 api: snowflake_api::SnowflakeApi,
15}
16
17impl SnowflakeMaxCheckpointReader {
18 pub async fn new(
20 account_identifier: &str,
21 warehouse: &str,
22 database: &str,
23 schema: &str,
24 user: &str,
25 role: &str,
26 passwd: &str,
27 table_id: &str,
28 col_id: &str,
29 ) -> Result<Self> {
30 let api = snowflake_api::SnowflakeApi::with_password_auth(
31 account_identifier,
32 Some(warehouse),
33 Some(database),
34 Some(schema),
35 user,
36 Some(role),
37 passwd,
38 )
39 .expect("Failed to build sf api client");
40 Ok(SnowflakeMaxCheckpointReader {
41 query: format!("SELECT max({}) from {}", col_id, table_id),
42 api,
43 })
44 }
45}
46
47#[async_trait::async_trait]
48impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
49 async fn max_checkpoint(&self) -> Result<i64> {
50 use arrow::array::Int32Array;
51 use snowflake_api::QueryResult;
52
53 let res = self.api.exec(&self.query).await?;
54 match res {
55 QueryResult::Arrow(a) => {
56 if let Some(record_batch) = a.first() {
57 let col = record_batch.column(0);
58 let col_array = col
59 .as_any()
60 .downcast_ref::<Int32Array>()
61 .expect("Failed to downcast arrow column");
62 Ok(col_array.value(0) as i64)
63 } else {
64 Ok(-1)
65 }
66 }
67 QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
68 QueryResult::Empty => Err(anyhow!("Unexpected query result")),
69 }
70 }
71}