sui_pg_db/
store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::NaiveDateTime;
8use chrono::Utc;
9use diesel::ExpressionMethods;
10use diesel::OptionalExtension;
11use diesel::prelude::*;
12use diesel::sql_types::BigInt;
13use diesel_async::AsyncConnection;
14use diesel_async::RunQueryDsl;
15use scoped_futures::ScopedBoxFuture;
16use sui_indexer_alt_framework_store_traits as store;
17use sui_sql_macro::sql;
18
19use crate::Connection;
20use crate::Db;
21use crate::model::StoredWatermark;
22use crate::schema::watermarks;
23
24pub use sui_indexer_alt_framework_store_traits::Store;
25
26#[async_trait]
27impl store::Connection for Connection<'_> {
28    async fn init_watermark(
29        &mut self,
30        pipeline_task: &str,
31        default_next_checkpoint: u64,
32    ) -> anyhow::Result<Option<u64>> {
33        let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
34            // Do not create a watermark record with checkpoint_hi_inclusive = -1.
35            return Ok(self
36                .committer_watermark(pipeline_task)
37                .await?
38                .map(|w| w.checkpoint_hi_inclusive));
39        };
40
41        let stored_watermark = StoredWatermark {
42            pipeline: pipeline_task.to_string(),
43            epoch_hi_inclusive: 0,
44            checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
45            tx_hi: 0,
46            timestamp_ms_hi_inclusive: 0,
47            reader_lo: default_next_checkpoint as i64,
48            pruner_timestamp: Utc::now().naive_utc(),
49            pruner_hi: default_next_checkpoint as i64,
50        };
51
52        use diesel::pg::upsert::excluded;
53        let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
54            .values(&stored_watermark)
55            // There is an existing entry, so only write the new `hi` values
56            .on_conflict(watermarks::pipeline)
57            // Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
58            .do_update()
59            // When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
60            // `excluded` is a virtual table containing the existing row that there was a conflict with.
61            .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
62            .returning(watermarks::checkpoint_hi_inclusive)
63            .get_result(self)
64            .await?;
65
66        Ok(Some(checkpoint_hi_inclusive as u64))
67    }
68
69    async fn committer_watermark(
70        &mut self,
71        pipeline_task: &str,
72    ) -> anyhow::Result<Option<store::CommitterWatermark>> {
73        let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
74            .select((
75                watermarks::epoch_hi_inclusive,
76                watermarks::checkpoint_hi_inclusive,
77                watermarks::tx_hi,
78                watermarks::timestamp_ms_hi_inclusive,
79            ))
80            .filter(watermarks::pipeline.eq(pipeline_task))
81            .first(self)
82            .await
83            .optional()?;
84
85        if let Some(watermark) = watermark {
86            Ok(Some(store::CommitterWatermark {
87                epoch_hi_inclusive: watermark.0 as u64,
88                checkpoint_hi_inclusive: watermark.1 as u64,
89                tx_hi: watermark.2 as u64,
90                timestamp_ms_hi_inclusive: watermark.3 as u64,
91            }))
92        } else {
93            Ok(None)
94        }
95    }
96
97    async fn reader_watermark(
98        &mut self,
99        pipeline: &'static str,
100    ) -> anyhow::Result<Option<store::ReaderWatermark>> {
101        let watermark: Option<(i64, i64)> = watermarks::table
102            .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
103            .filter(watermarks::pipeline.eq(pipeline))
104            .first(self)
105            .await
106            .optional()?;
107
108        if let Some(watermark) = watermark {
109            Ok(Some(store::ReaderWatermark {
110                checkpoint_hi_inclusive: watermark.0 as u64,
111                reader_lo: watermark.1 as u64,
112            }))
113        } else {
114            Ok(None)
115        }
116    }
117
118    async fn pruner_watermark(
119        &mut self,
120        pipeline: &'static str,
121        delay: Duration,
122    ) -> anyhow::Result<Option<store::PrunerWatermark>> {
123        //     |---------- + delay ---------------------|
124        //                             |--- wait_for ---|
125        //     |-----------------------|----------------|
126        //     ^                       ^
127        //     pruner_timestamp        NOW()
128        let wait_for = sql!(as BigInt,
129            "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
130            delay.as_millis() as i64,
131        );
132
133        let watermark: Option<(i64, i64, i64)> = watermarks::table
134            .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
135            .filter(watermarks::pipeline.eq(pipeline))
136            .first(self)
137            .await
138            .optional()?;
139
140        if let Some(watermark) = watermark {
141            Ok(Some(store::PrunerWatermark {
142                wait_for_ms: watermark.0,
143                pruner_hi: watermark.1 as u64,
144                reader_lo: watermark.2 as u64,
145            }))
146        } else {
147            Ok(None)
148        }
149    }
150
151    async fn set_committer_watermark(
152        &mut self,
153        pipeline_task: &str,
154        watermark: store::CommitterWatermark,
155    ) -> anyhow::Result<bool> {
156        // Create a StoredWatermark directly from CommitterWatermark
157        let stored_watermark = StoredWatermark {
158            pipeline: pipeline_task.to_string(),
159            epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
160            checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
161            tx_hi: watermark.tx_hi as i64,
162            timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
163            reader_lo: 0,
164            pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
165            pruner_hi: 0,
166        };
167
168        use diesel::query_dsl::methods::FilterDsl;
169        Ok(diesel::insert_into(watermarks::table)
170            .values(&stored_watermark)
171            // There is an existing entry, so only write the new `hi` values
172            .on_conflict(watermarks::pipeline)
173            .do_update()
174            .set((
175                watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
176                watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
177                watermarks::tx_hi.eq(stored_watermark.tx_hi),
178                watermarks::timestamp_ms_hi_inclusive
179                    .eq(stored_watermark.timestamp_ms_hi_inclusive),
180            ))
181            .filter(
182                watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
183            )
184            .execute(self)
185            .await?
186            > 0)
187    }
188
189    async fn set_reader_watermark(
190        &mut self,
191        pipeline: &'static str,
192        reader_lo: u64,
193    ) -> anyhow::Result<bool> {
194        Ok(diesel::update(watermarks::table)
195            .set((
196                watermarks::reader_lo.eq(reader_lo as i64),
197                watermarks::pruner_timestamp.eq(diesel::dsl::now),
198            ))
199            .filter(watermarks::pipeline.eq(pipeline))
200            .filter(watermarks::reader_lo.lt(reader_lo as i64))
201            .execute(self)
202            .await?
203            > 0)
204    }
205
206    async fn set_pruner_watermark(
207        &mut self,
208        pipeline: &'static str,
209        pruner_hi: u64,
210    ) -> anyhow::Result<bool> {
211        Ok(diesel::update(watermarks::table)
212            .set(watermarks::pruner_hi.eq(pruner_hi as i64))
213            .filter(watermarks::pipeline.eq(pipeline))
214            .execute(self)
215            .await?
216            > 0)
217    }
218}
219
220#[async_trait]
221impl store::Store for Db {
222    type Connection<'c> = Connection<'c>;
223
224    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
225        Ok(Connection(self.0.get().await?))
226    }
227}
228
229#[async_trait]
230impl store::TransactionalStore for Db {
231    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
232    where
233        R: Send + 'a,
234        F: Send + 'a,
235        F: for<'r> FnOnce(
236            &'r mut Self::Connection<'_>,
237        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
238    {
239        let mut conn = self.connect().await?;
240        AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
241    }
242}