sui_pg_db/
store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use diesel::ExpressionMethods;
9use diesel::prelude::*;
10use diesel::sql_types::BigInt;
11use diesel::sql_types::Nullable;
12use diesel::sql_types::SingleValue;
13use diesel::sql_types::SqlType;
14use diesel_async::AsyncConnection;
15use diesel_async::RunQueryDsl;
16use scoped_futures::ScopedBoxFuture;
17use sui_indexer_alt_framework_store_traits as store;
18use sui_sql_macro::sql;
19
20use crate::Connection;
21use crate::Db;
22use crate::model::StoredWatermark;
23use crate::schema::watermarks;
24
25pub use sui_indexer_alt_framework_store_traits::Store;
26
27define_sql_function! {
28    fn coalesce<T: SqlType + SingleValue>(x: Nullable<T>, y: T) -> Nullable<T>;
29}
30
31#[async_trait]
32impl store::Connection for Connection<'_> {
33    async fn init_watermark(
34        &mut self,
35        pipeline_task: &str,
36        checkpoint_hi_inclusive: Option<u64>,
37    ) -> anyhow::Result<Option<store::InitWatermark>> {
38        let checkpoint_hi_inclusive = checkpoint_hi_inclusive.map_or(-1, |c| c as i64);
39        let stored_watermark = StoredWatermark::for_init(
40            pipeline_task,
41            checkpoint_hi_inclusive,
42            checkpoint_hi_inclusive + 1,
43        );
44
45        use diesel::pg::upsert::excluded;
46        let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
47            diesel::insert_into(watermarks::table)
48                .values(&stored_watermark)
49                // If there is an existing row, return it without updating it.
50                .on_conflict(watermarks::pipeline)
51                // Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
52                .do_update()
53                // When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
54                // `excluded` is a virtual table containing the existing row that there was a conflict with.
55                .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
56                .returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
57                .get_result(self)
58                .await?;
59
60        Ok(Some(store::InitWatermark {
61            checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
62            reader_lo: Some(reader_lo as u64),
63        }))
64    }
65
66    async fn accepts_chain_id(
67        &mut self,
68        pipeline_task: &str,
69        chain_id: [u8; 32],
70    ) -> anyhow::Result<bool> {
71        let stored_chain_id: Option<Vec<u8>> = diesel::update(watermarks::table)
72            .filter(watermarks::pipeline.eq(pipeline_task))
73            // "coalesce" only updates the value if it is null in the existing row
74            .set(watermarks::chain_id.eq(coalesce(watermarks::chain_id, chain_id)))
75            .returning(watermarks::chain_id)
76            .get_result(self)
77            .await?;
78
79        let stored_chain_id = stored_chain_id.context("missing chain id after update")?;
80        let stored_chain_id: [u8; 32] = stored_chain_id
81            .try_into()
82            .map_err(|v: Vec<u8>| anyhow::anyhow!("chain id has wrong length: {}", v.len()))?;
83        Ok(stored_chain_id == chain_id)
84    }
85
86    async fn committer_watermark(
87        &mut self,
88        pipeline_task: &str,
89    ) -> anyhow::Result<Option<store::CommitterWatermark>> {
90        let (
91            epoch_hi_inclusive,
92            checkpoint_hi_inclusive,
93            tx_hi,
94            timestamp_ms_hi_inclusive,
95            reader_lo,
96        ): (i64, i64, i64, i64, i64) = watermarks::table
97            .select((
98                watermarks::epoch_hi_inclusive,
99                watermarks::checkpoint_hi_inclusive,
100                watermarks::tx_hi,
101                watermarks::timestamp_ms_hi_inclusive,
102                watermarks::reader_lo,
103            ))
104            .filter(watermarks::pipeline.eq(pipeline_task))
105            .first(self)
106            .await?;
107
108        Ok(
109            (reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
110                epoch_hi_inclusive: epoch_hi_inclusive as u64,
111                checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
112                tx_hi: tx_hi as u64,
113                timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
114            }),
115        )
116    }
117
118    async fn set_committer_watermark(
119        &mut self,
120        pipeline_task: &str,
121        watermark: store::CommitterWatermark,
122    ) -> anyhow::Result<bool> {
123        Ok(diesel::update(watermarks::table)
124            .set((
125                watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
126                watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
127                watermarks::tx_hi.eq(watermark.tx_hi as i64),
128                watermarks::timestamp_ms_hi_inclusive
129                    .eq(watermark.timestamp_ms_hi_inclusive as i64),
130            ))
131            .filter(watermarks::pipeline.eq(pipeline_task))
132            .filter(
133                watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
134            )
135            .execute(self)
136            .await?
137            > 0)
138    }
139}
140
141#[async_trait]
142impl store::ConcurrentConnection for Connection<'_> {
143    async fn reader_watermark(
144        &mut self,
145        pipeline: &str,
146    ) -> anyhow::Result<Option<store::ReaderWatermark>> {
147        let (checkpoint_hi_inclusive, reader_lo): (i64, i64) = watermarks::table
148            .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
149            .filter(watermarks::pipeline.eq(pipeline))
150            .first(self)
151            .await?;
152
153        Ok(
154            (reader_lo <= checkpoint_hi_inclusive).then_some(store::ReaderWatermark {
155                checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
156                reader_lo: reader_lo as u64,
157            }),
158        )
159    }
160
161    async fn pruner_watermark(
162        &mut self,
163        pipeline: &'static str,
164        delay: Duration,
165    ) -> anyhow::Result<Option<store::PrunerWatermark>> {
166        //     |---------- + delay ---------------------|
167        //                             |--- wait_for ---|
168        //     |-----------------------|----------------|
169        //     ^                       ^
170        //     pruner_timestamp        NOW()
171        let wait_for = sql!(as BigInt,
172            "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
173            delay.as_millis() as i64,
174        );
175
176        let (wait_for_ms, pruner_hi, reader_lo): (i64, i64, i64) = watermarks::table
177            .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
178            .filter(watermarks::pipeline.eq(pipeline))
179            .first(self)
180            .await?;
181
182        Ok(Some(store::PrunerWatermark {
183            wait_for_ms,
184            pruner_hi: pruner_hi as u64,
185            reader_lo: reader_lo as u64,
186        }))
187    }
188
189    async fn set_reader_watermark(
190        &mut self,
191        pipeline: &'static str,
192        reader_lo: u64,
193    ) -> anyhow::Result<bool> {
194        Ok(diesel::update(watermarks::table)
195            .set((
196                watermarks::reader_lo.eq(reader_lo as i64),
197                watermarks::pruner_timestamp.eq(diesel::dsl::now),
198            ))
199            .filter(watermarks::pipeline.eq(pipeline))
200            .filter(watermarks::reader_lo.lt(reader_lo as i64))
201            .execute(self)
202            .await?
203            > 0)
204    }
205
206    async fn set_pruner_watermark(
207        &mut self,
208        pipeline: &'static str,
209        pruner_hi: u64,
210    ) -> anyhow::Result<bool> {
211        Ok(diesel::update(watermarks::table)
212            .set(watermarks::pruner_hi.eq(pruner_hi as i64))
213            .filter(watermarks::pipeline.eq(pipeline))
214            .execute(self)
215            .await?
216            > 0)
217    }
218}
219
220#[async_trait]
221impl store::SequentialConnection for Connection<'_> {}
222
223#[async_trait]
224impl store::Store for Db {
225    type Connection<'c> = Connection<'c>;
226
227    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
228        self.connect().await
229    }
230}
231
232#[async_trait]
233impl store::ConcurrentStore for Db {
234    type ConcurrentConnection<'c> = Connection<'c>;
235}
236
237#[async_trait]
238impl store::SequentialStore for Db {
239    type SequentialConnection<'c> = Connection<'c>;
240    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
241    where
242        R: Send + 'a,
243        F: Send + 'a,
244        F: for<'r> FnOnce(
245            &'r mut Self::Connection<'_>,
246        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
247    {
248        let mut conn = self.connect().await?;
249        AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
250    }
251}