sui_pg_db/
store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::Utc;
8use diesel::ExpressionMethods;
9use diesel::prelude::*;
10use diesel::sql_types::BigInt;
11use diesel_async::AsyncConnection;
12use diesel_async::RunQueryDsl;
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::Connection;
18use crate::Db;
19use crate::model::StoredWatermark;
20use crate::schema::watermarks;
21
22pub use sui_indexer_alt_framework_store_traits::Store;
23
24#[async_trait]
25impl store::Connection for Connection<'_> {
26    async fn init_watermark(
27        &mut self,
28        pipeline_task: &str,
29        store::InitWatermark {
30            checkpoint_hi_inclusive,
31            reader_lo,
32        }: store::InitWatermark,
33    ) -> anyhow::Result<store::InitWatermark> {
34        let stored_watermark = StoredWatermark {
35            pipeline: pipeline_task.to_string(),
36            epoch_hi_inclusive: 0,
37            // Initially, `checkpoint_hi_inclusive` is less than `reader_lo` meaning that no
38            // checkpoints have been indexed yet.
39            checkpoint_hi_inclusive: checkpoint_hi_inclusive.map_or(-1, |c| c as i64),
40            tx_hi: 0,
41            timestamp_ms_hi_inclusive: 0,
42            reader_lo: reader_lo as i64,
43            pruner_timestamp: Utc::now().naive_utc(),
44            pruner_hi: reader_lo as i64,
45        };
46
47        use diesel::pg::upsert::excluded;
48        let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
49            diesel::insert_into(watermarks::table)
50                .values(&stored_watermark)
51                // There is an existing entry, so only write the new `hi` values
52                .on_conflict(watermarks::pipeline)
53                // Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
54                .do_update()
55                // When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
56                // `excluded` is a virtual table containing the existing row that there was a conflict with.
57                .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
58                .returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
59                .get_result(self)
60                .await?;
61
62        Ok(store::InitWatermark {
63            checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
64            reader_lo: reader_lo as u64,
65        })
66    }
67
68    async fn committer_watermark(
69        &mut self,
70        pipeline_task: &str,
71    ) -> anyhow::Result<Option<store::CommitterWatermark>> {
72        let (
73            epoch_hi_inclusive,
74            checkpoint_hi_inclusive,
75            tx_hi,
76            timestamp_ms_hi_inclusive,
77            reader_lo,
78        ): (i64, i64, i64, i64, i64) = watermarks::table
79            .select((
80                watermarks::epoch_hi_inclusive,
81                watermarks::checkpoint_hi_inclusive,
82                watermarks::tx_hi,
83                watermarks::timestamp_ms_hi_inclusive,
84                watermarks::reader_lo,
85            ))
86            .filter(watermarks::pipeline.eq(pipeline_task))
87            .first(self)
88            .await?;
89
90        Ok(
91            (reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
92                epoch_hi_inclusive: epoch_hi_inclusive as u64,
93                checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
94                tx_hi: tx_hi as u64,
95                timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
96            }),
97        )
98    }
99
100    async fn reader_watermark(
101        &mut self,
102        pipeline: &'static str,
103    ) -> anyhow::Result<Option<store::ReaderWatermark>> {
104        let (checkpoint_hi_inclusive, reader_lo): (i64, i64) = watermarks::table
105            .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
106            .filter(watermarks::pipeline.eq(pipeline))
107            .first(self)
108            .await?;
109
110        Ok(
111            (reader_lo <= checkpoint_hi_inclusive).then_some(store::ReaderWatermark {
112                checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
113                reader_lo: reader_lo as u64,
114            }),
115        )
116    }
117
118    async fn pruner_watermark(
119        &mut self,
120        pipeline: &'static str,
121        delay: Duration,
122    ) -> anyhow::Result<Option<store::PrunerWatermark>> {
123        //     |---------- + delay ---------------------|
124        //                             |--- wait_for ---|
125        //     |-----------------------|----------------|
126        //     ^                       ^
127        //     pruner_timestamp        NOW()
128        let wait_for = sql!(as BigInt,
129            "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
130            delay.as_millis() as i64,
131        );
132
133        let (wait_for_ms, pruner_hi, reader_lo): (i64, i64, i64) = watermarks::table
134            .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
135            .filter(watermarks::pipeline.eq(pipeline))
136            .first(self)
137            .await?;
138
139        Ok(Some(store::PrunerWatermark {
140            wait_for_ms,
141            pruner_hi: pruner_hi as u64,
142            reader_lo: reader_lo as u64,
143        }))
144    }
145
146    async fn set_committer_watermark(
147        &mut self,
148        pipeline_task: &str,
149        watermark: store::CommitterWatermark,
150    ) -> anyhow::Result<bool> {
151        Ok(diesel::update(watermarks::table)
152            .set((
153                watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
154                watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
155                watermarks::tx_hi.eq(watermark.tx_hi as i64),
156                watermarks::timestamp_ms_hi_inclusive
157                    .eq(watermark.timestamp_ms_hi_inclusive as i64),
158            ))
159            .filter(watermarks::pipeline.eq(pipeline_task))
160            .filter(
161                watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
162            )
163            .execute(self)
164            .await?
165            > 0)
166    }
167
168    async fn set_reader_watermark(
169        &mut self,
170        pipeline: &'static str,
171        reader_lo: u64,
172    ) -> anyhow::Result<bool> {
173        Ok(diesel::update(watermarks::table)
174            .set((
175                watermarks::reader_lo.eq(reader_lo as i64),
176                watermarks::pruner_timestamp.eq(diesel::dsl::now),
177            ))
178            .filter(watermarks::pipeline.eq(pipeline))
179            .filter(watermarks::reader_lo.lt(reader_lo as i64))
180            .execute(self)
181            .await?
182            > 0)
183    }
184
185    async fn set_pruner_watermark(
186        &mut self,
187        pipeline: &'static str,
188        pruner_hi: u64,
189    ) -> anyhow::Result<bool> {
190        Ok(diesel::update(watermarks::table)
191            .set(watermarks::pruner_hi.eq(pruner_hi as i64))
192            .filter(watermarks::pipeline.eq(pipeline))
193            .execute(self)
194            .await?
195            > 0)
196    }
197}
198
199#[async_trait]
200impl store::Store for Db {
201    type Connection<'c> = Connection<'c>;
202
203    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
204        self.connect().await
205    }
206}
207
208#[async_trait]
209impl store::TransactionalStore for Db {
210    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
211    where
212        R: Send + 'a,
213        F: Send + 'a,
214        F: for<'r> FnOnce(
215            &'r mut Self::Connection<'_>,
216        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
217    {
218        let mut conn = self.connect().await?;
219        AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
220    }
221}