sui_pg_db/
store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::NaiveDateTime;
8use diesel::ExpressionMethods;
9use diesel::OptionalExtension;
10use diesel::prelude::*;
11use diesel::sql_types::BigInt;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::model::StoredWatermark;
18use crate::schema::watermarks;
19use crate::{Connection, Db};
20
21pub use sui_indexer_alt_framework_store_traits::Store;
22
23#[async_trait]
24impl store::Connection for Connection<'_> {
25    async fn committer_watermark(
26        &mut self,
27        pipeline: &'static str,
28    ) -> anyhow::Result<Option<store::CommitterWatermark>> {
29        let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
30            .select((
31                watermarks::epoch_hi_inclusive,
32                watermarks::checkpoint_hi_inclusive,
33                watermarks::tx_hi,
34                watermarks::timestamp_ms_hi_inclusive,
35            ))
36            .filter(watermarks::pipeline.eq(pipeline))
37            .first(self)
38            .await
39            .optional()?;
40
41        if let Some(watermark) = watermark {
42            Ok(Some(store::CommitterWatermark {
43                epoch_hi_inclusive: watermark.0 as u64,
44                checkpoint_hi_inclusive: watermark.1 as u64,
45                tx_hi: watermark.2 as u64,
46                timestamp_ms_hi_inclusive: watermark.3 as u64,
47            }))
48        } else {
49            Ok(None)
50        }
51    }
52
53    async fn reader_watermark(
54        &mut self,
55        pipeline: &'static str,
56    ) -> anyhow::Result<Option<store::ReaderWatermark>> {
57        let watermark: Option<(i64, i64)> = watermarks::table
58            .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
59            .filter(watermarks::pipeline.eq(pipeline))
60            .first(self)
61            .await
62            .optional()?;
63
64        if let Some(watermark) = watermark {
65            Ok(Some(store::ReaderWatermark {
66                checkpoint_hi_inclusive: watermark.0 as u64,
67                reader_lo: watermark.1 as u64,
68            }))
69        } else {
70            Ok(None)
71        }
72    }
73
74    async fn pruner_watermark(
75        &mut self,
76        pipeline: &'static str,
77        delay: Duration,
78    ) -> anyhow::Result<Option<store::PrunerWatermark>> {
79        //     |---------- + delay ---------------------|
80        //                             |--- wait_for ---|
81        //     |-----------------------|----------------|
82        //     ^                       ^
83        //     pruner_timestamp        NOW()
84        let wait_for = sql!(as BigInt,
85            "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
86            delay.as_millis() as i64,
87        );
88
89        let watermark: Option<(i64, i64, i64)> = watermarks::table
90            .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
91            .filter(watermarks::pipeline.eq(pipeline))
92            .first(self)
93            .await
94            .optional()?;
95
96        if let Some(watermark) = watermark {
97            Ok(Some(store::PrunerWatermark {
98                wait_for_ms: watermark.0,
99                pruner_hi: watermark.1 as u64,
100                reader_lo: watermark.2 as u64,
101            }))
102        } else {
103            Ok(None)
104        }
105    }
106
107    async fn set_committer_watermark(
108        &mut self,
109        pipeline: &'static str,
110        watermark: store::CommitterWatermark,
111    ) -> anyhow::Result<bool> {
112        // Create a StoredWatermark directly from CommitterWatermark
113        let stored_watermark = StoredWatermark {
114            pipeline: pipeline.to_string(),
115            epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
116            checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
117            tx_hi: watermark.tx_hi as i64,
118            timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
119            reader_lo: 0,
120            pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
121            pruner_hi: 0,
122        };
123
124        use diesel::query_dsl::methods::FilterDsl;
125        Ok(diesel::insert_into(watermarks::table)
126            .values(&stored_watermark)
127            // There is an existing entry, so only write the new `hi` values
128            .on_conflict(watermarks::pipeline)
129            .do_update()
130            .set((
131                watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
132                watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
133                watermarks::tx_hi.eq(stored_watermark.tx_hi),
134                watermarks::timestamp_ms_hi_inclusive
135                    .eq(stored_watermark.timestamp_ms_hi_inclusive),
136            ))
137            .filter(
138                watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
139            )
140            .execute(self)
141            .await?
142            > 0)
143    }
144
145    async fn set_reader_watermark(
146        &mut self,
147        pipeline: &'static str,
148        reader_lo: u64,
149    ) -> anyhow::Result<bool> {
150        Ok(diesel::update(watermarks::table)
151            .set((
152                watermarks::reader_lo.eq(reader_lo as i64),
153                watermarks::pruner_timestamp.eq(diesel::dsl::now),
154            ))
155            .filter(watermarks::pipeline.eq(pipeline))
156            .filter(watermarks::reader_lo.lt(reader_lo as i64))
157            .execute(self)
158            .await?
159            > 0)
160    }
161
162    async fn set_pruner_watermark(
163        &mut self,
164        pipeline: &'static str,
165        pruner_hi: u64,
166    ) -> anyhow::Result<bool> {
167        Ok(diesel::update(watermarks::table)
168            .set(watermarks::pruner_hi.eq(pruner_hi as i64))
169            .filter(watermarks::pipeline.eq(pipeline))
170            .execute(self)
171            .await?
172            > 0)
173    }
174}
175
176#[async_trait]
177impl store::Store for Db {
178    type Connection<'c> = Connection<'c>;
179
180    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
181        Ok(Connection(self.0.get().await?))
182    }
183}
184
185#[async_trait]
186impl store::TransactionalStore for Db {
187    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
188    where
189        R: Send + 'a,
190        F: Send + 'a,
191        F: for<'r> FnOnce(
192            &'r mut Self::Connection<'_>,
193        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
194    {
195        let mut conn = self.connect().await?;
196        AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
197    }
198}