sui_pg_db/
store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::{NaiveDateTime, Utc};
8use diesel::ExpressionMethods;
9use diesel::OptionalExtension;
10use diesel::prelude::*;
11use diesel::sql_types::BigInt;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::model::StoredWatermark;
18use crate::schema::watermarks;
19use crate::{Connection, Db};
20
21pub use sui_indexer_alt_framework_store_traits::Store;
22
23#[async_trait]
24impl store::Connection for Connection<'_> {
25    async fn init_watermark(
26        &mut self,
27        pipeline_task: &str,
28        default_next_checkpoint: u64,
29    ) -> anyhow::Result<Option<u64>> {
30        let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
31            // Do not create a watermark record with checkpoint_hi_inclusive = -1.
32            return Ok(self
33                .committer_watermark(pipeline_task)
34                .await?
35                .map(|w| w.checkpoint_hi_inclusive));
36        };
37
38        let stored_watermark = StoredWatermark {
39            pipeline: pipeline_task.to_string(),
40            epoch_hi_inclusive: 0,
41            checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
42            tx_hi: 0,
43            timestamp_ms_hi_inclusive: 0,
44            reader_lo: default_next_checkpoint as i64,
45            pruner_timestamp: Utc::now().naive_utc(),
46            pruner_hi: default_next_checkpoint as i64,
47        };
48
49        use diesel::pg::upsert::excluded;
50        let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
51            .values(&stored_watermark)
52            // There is an existing entry, so only write the new `hi` values
53            .on_conflict(watermarks::pipeline)
54            // Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
55            .do_update()
56            // When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
57            // `excluded` is a virtual table containing the existing row that there was a conflict with.
58            .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
59            .returning(watermarks::checkpoint_hi_inclusive)
60            .get_result(self)
61            .await?;
62
63        Ok(Some(checkpoint_hi_inclusive as u64))
64    }
65
66    async fn committer_watermark(
67        &mut self,
68        pipeline_task: &str,
69    ) -> anyhow::Result<Option<store::CommitterWatermark>> {
70        let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
71            .select((
72                watermarks::epoch_hi_inclusive,
73                watermarks::checkpoint_hi_inclusive,
74                watermarks::tx_hi,
75                watermarks::timestamp_ms_hi_inclusive,
76            ))
77            .filter(watermarks::pipeline.eq(pipeline_task))
78            .first(self)
79            .await
80            .optional()?;
81
82        if let Some(watermark) = watermark {
83            Ok(Some(store::CommitterWatermark {
84                epoch_hi_inclusive: watermark.0 as u64,
85                checkpoint_hi_inclusive: watermark.1 as u64,
86                tx_hi: watermark.2 as u64,
87                timestamp_ms_hi_inclusive: watermark.3 as u64,
88            }))
89        } else {
90            Ok(None)
91        }
92    }
93
94    async fn reader_watermark(
95        &mut self,
96        pipeline: &'static str,
97    ) -> anyhow::Result<Option<store::ReaderWatermark>> {
98        let watermark: Option<(i64, i64)> = watermarks::table
99            .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
100            .filter(watermarks::pipeline.eq(pipeline))
101            .first(self)
102            .await
103            .optional()?;
104
105        if let Some(watermark) = watermark {
106            Ok(Some(store::ReaderWatermark {
107                checkpoint_hi_inclusive: watermark.0 as u64,
108                reader_lo: watermark.1 as u64,
109            }))
110        } else {
111            Ok(None)
112        }
113    }
114
115    async fn pruner_watermark(
116        &mut self,
117        pipeline: &'static str,
118        delay: Duration,
119    ) -> anyhow::Result<Option<store::PrunerWatermark>> {
120        //     |---------- + delay ---------------------|
121        //                             |--- wait_for ---|
122        //     |-----------------------|----------------|
123        //     ^                       ^
124        //     pruner_timestamp        NOW()
125        let wait_for = sql!(as BigInt,
126            "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
127            delay.as_millis() as i64,
128        );
129
130        let watermark: Option<(i64, i64, i64)> = watermarks::table
131            .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
132            .filter(watermarks::pipeline.eq(pipeline))
133            .first(self)
134            .await
135            .optional()?;
136
137        if let Some(watermark) = watermark {
138            Ok(Some(store::PrunerWatermark {
139                wait_for_ms: watermark.0,
140                pruner_hi: watermark.1 as u64,
141                reader_lo: watermark.2 as u64,
142            }))
143        } else {
144            Ok(None)
145        }
146    }
147
148    async fn set_committer_watermark(
149        &mut self,
150        pipeline_task: &str,
151        watermark: store::CommitterWatermark,
152    ) -> anyhow::Result<bool> {
153        // Create a StoredWatermark directly from CommitterWatermark
154        let stored_watermark = StoredWatermark {
155            pipeline: pipeline_task.to_string(),
156            epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
157            checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
158            tx_hi: watermark.tx_hi as i64,
159            timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
160            reader_lo: 0,
161            pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
162            pruner_hi: 0,
163        };
164
165        use diesel::query_dsl::methods::FilterDsl;
166        Ok(diesel::insert_into(watermarks::table)
167            .values(&stored_watermark)
168            // There is an existing entry, so only write the new `hi` values
169            .on_conflict(watermarks::pipeline)
170            .do_update()
171            .set((
172                watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
173                watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
174                watermarks::tx_hi.eq(stored_watermark.tx_hi),
175                watermarks::timestamp_ms_hi_inclusive
176                    .eq(stored_watermark.timestamp_ms_hi_inclusive),
177            ))
178            .filter(
179                watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
180            )
181            .execute(self)
182            .await?
183            > 0)
184    }
185
186    async fn set_reader_watermark(
187        &mut self,
188        pipeline: &'static str,
189        reader_lo: u64,
190    ) -> anyhow::Result<bool> {
191        Ok(diesel::update(watermarks::table)
192            .set((
193                watermarks::reader_lo.eq(reader_lo as i64),
194                watermarks::pruner_timestamp.eq(diesel::dsl::now),
195            ))
196            .filter(watermarks::pipeline.eq(pipeline))
197            .filter(watermarks::reader_lo.lt(reader_lo as i64))
198            .execute(self)
199            .await?
200            > 0)
201    }
202
203    async fn set_pruner_watermark(
204        &mut self,
205        pipeline: &'static str,
206        pruner_hi: u64,
207    ) -> anyhow::Result<bool> {
208        Ok(diesel::update(watermarks::table)
209            .set(watermarks::pruner_hi.eq(pruner_hi as i64))
210            .filter(watermarks::pipeline.eq(pipeline))
211            .execute(self)
212            .await?
213            > 0)
214    }
215}
216
217#[async_trait]
218impl store::Store for Db {
219    type Connection<'c> = Connection<'c>;
220
221    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
222        Ok(Connection(self.0.get().await?))
223    }
224}
225
226#[async_trait]
227impl store::TransactionalStore for Db {
228    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
229    where
230        R: Send + 'a,
231        F: Send + 'a,
232        F: for<'r> FnOnce(
233            &'r mut Self::Connection<'_>,
234        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
235    {
236        let mut conn = self.connect().await?;
237        AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
238    }
239}