1use std::time::Duration;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use diesel::ExpressionMethods;
9use diesel::prelude::*;
10use diesel::sql_types::BigInt;
11use diesel::sql_types::Nullable;
12use diesel::sql_types::SingleValue;
13use diesel::sql_types::SqlType;
14use diesel_async::AsyncConnection;
15use diesel_async::RunQueryDsl;
16use scoped_futures::ScopedBoxFuture;
17use sui_indexer_alt_framework_store_traits as store;
18use sui_sql_macro::sql;
19
20use crate::Connection;
21use crate::Db;
22use crate::model::StoredWatermark;
23use crate::schema::watermarks;
24
25pub use sui_indexer_alt_framework_store_traits::Store;
26
27define_sql_function! {
28 fn coalesce<T: SqlType + SingleValue>(x: Nullable<T>, y: T) -> Nullable<T>;
29}
30
31#[async_trait]
32impl store::Connection for Connection<'_> {
33 async fn init_watermark(
34 &mut self,
35 pipeline_task: &str,
36 checkpoint_hi_inclusive: Option<u64>,
37 ) -> anyhow::Result<Option<store::InitWatermark>> {
38 let checkpoint_hi_inclusive = checkpoint_hi_inclusive.map_or(-1, |c| c as i64);
39 let stored_watermark = StoredWatermark::for_init(
40 pipeline_task,
41 checkpoint_hi_inclusive,
42 checkpoint_hi_inclusive + 1,
43 );
44
45 use diesel::pg::upsert::excluded;
46 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
47 diesel::insert_into(watermarks::table)
48 .values(&stored_watermark)
49 .on_conflict(watermarks::pipeline)
51 .do_update()
53 .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
56 .returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
57 .get_result(self)
58 .await?;
59
60 Ok(Some(store::InitWatermark {
61 checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
62 reader_lo: Some(reader_lo as u64),
63 }))
64 }
65
66 async fn accepts_chain_id(
67 &mut self,
68 pipeline_task: &str,
69 chain_id: [u8; 32],
70 ) -> anyhow::Result<bool> {
71 let stored_chain_id: Option<Vec<u8>> = diesel::update(watermarks::table)
72 .filter(watermarks::pipeline.eq(pipeline_task))
73 .set(watermarks::chain_id.eq(coalesce(watermarks::chain_id, chain_id)))
75 .returning(watermarks::chain_id)
76 .get_result(self)
77 .await?;
78
79 let stored_chain_id = stored_chain_id.context("missing chain id after update")?;
80 let stored_chain_id: [u8; 32] = stored_chain_id
81 .try_into()
82 .map_err(|v: Vec<u8>| anyhow::anyhow!("chain id has wrong length: {}", v.len()))?;
83 Ok(stored_chain_id == chain_id)
84 }
85
86 async fn committer_watermark(
87 &mut self,
88 pipeline_task: &str,
89 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
90 let (
91 epoch_hi_inclusive,
92 checkpoint_hi_inclusive,
93 tx_hi,
94 timestamp_ms_hi_inclusive,
95 reader_lo,
96 ): (i64, i64, i64, i64, i64) = watermarks::table
97 .select((
98 watermarks::epoch_hi_inclusive,
99 watermarks::checkpoint_hi_inclusive,
100 watermarks::tx_hi,
101 watermarks::timestamp_ms_hi_inclusive,
102 watermarks::reader_lo,
103 ))
104 .filter(watermarks::pipeline.eq(pipeline_task))
105 .first(self)
106 .await?;
107
108 Ok(
109 (reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
110 epoch_hi_inclusive: epoch_hi_inclusive as u64,
111 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
112 tx_hi: tx_hi as u64,
113 timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
114 }),
115 )
116 }
117
118 async fn set_committer_watermark(
119 &mut self,
120 pipeline_task: &str,
121 watermark: store::CommitterWatermark,
122 ) -> anyhow::Result<bool> {
123 Ok(diesel::update(watermarks::table)
124 .set((
125 watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
126 watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
127 watermarks::tx_hi.eq(watermark.tx_hi as i64),
128 watermarks::timestamp_ms_hi_inclusive
129 .eq(watermark.timestamp_ms_hi_inclusive as i64),
130 ))
131 .filter(watermarks::pipeline.eq(pipeline_task))
132 .filter(
133 watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
134 )
135 .execute(self)
136 .await?
137 > 0)
138 }
139}
140
141#[async_trait]
142impl store::ConcurrentConnection for Connection<'_> {
143 async fn reader_watermark(
144 &mut self,
145 pipeline: &str,
146 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
147 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) = watermarks::table
148 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
149 .filter(watermarks::pipeline.eq(pipeline))
150 .first(self)
151 .await?;
152
153 Ok(
154 (reader_lo <= checkpoint_hi_inclusive).then_some(store::ReaderWatermark {
155 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
156 reader_lo: reader_lo as u64,
157 }),
158 )
159 }
160
161 async fn pruner_watermark(
162 &mut self,
163 pipeline: &'static str,
164 delay: Duration,
165 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
166 let wait_for = sql!(as BigInt,
172 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
173 delay.as_millis() as i64,
174 );
175
176 let (wait_for_ms, pruner_hi, reader_lo): (i64, i64, i64) = watermarks::table
177 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
178 .filter(watermarks::pipeline.eq(pipeline))
179 .first(self)
180 .await?;
181
182 Ok(Some(store::PrunerWatermark {
183 wait_for_ms,
184 pruner_hi: pruner_hi as u64,
185 reader_lo: reader_lo as u64,
186 }))
187 }
188
189 async fn set_reader_watermark(
190 &mut self,
191 pipeline: &'static str,
192 reader_lo: u64,
193 ) -> anyhow::Result<bool> {
194 Ok(diesel::update(watermarks::table)
195 .set((
196 watermarks::reader_lo.eq(reader_lo as i64),
197 watermarks::pruner_timestamp.eq(diesel::dsl::now),
198 ))
199 .filter(watermarks::pipeline.eq(pipeline))
200 .filter(watermarks::reader_lo.lt(reader_lo as i64))
201 .execute(self)
202 .await?
203 > 0)
204 }
205
206 async fn set_pruner_watermark(
207 &mut self,
208 pipeline: &'static str,
209 pruner_hi: u64,
210 ) -> anyhow::Result<bool> {
211 Ok(diesel::update(watermarks::table)
212 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
213 .filter(watermarks::pipeline.eq(pipeline))
214 .execute(self)
215 .await?
216 > 0)
217 }
218}
219
220#[async_trait]
221impl store::SequentialConnection for Connection<'_> {}
222
223#[async_trait]
224impl store::Store for Db {
225 type Connection<'c> = Connection<'c>;
226
227 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
228 self.connect().await
229 }
230}
231
232#[async_trait]
233impl store::ConcurrentStore for Db {
234 type ConcurrentConnection<'c> = Connection<'c>;
235}
236
237#[async_trait]
238impl store::SequentialStore for Db {
239 type SequentialConnection<'c> = Connection<'c>;
240 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
241 where
242 R: Send + 'a,
243 F: Send + 'a,
244 F: for<'r> FnOnce(
245 &'r mut Self::Connection<'_>,
246 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
247 {
248 let mut conn = self.connect().await?;
249 AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
250 }
251}