1use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::Utc;
8use diesel::ExpressionMethods;
9use diesel::prelude::*;
10use diesel::sql_types::BigInt;
11use diesel_async::AsyncConnection;
12use diesel_async::RunQueryDsl;
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::Connection;
18use crate::Db;
19use crate::model::StoredWatermark;
20use crate::schema::watermarks;
21
22pub use sui_indexer_alt_framework_store_traits::Store;
23
24#[async_trait]
25impl store::Connection for Connection<'_> {
26 async fn init_watermark(
27 &mut self,
28 pipeline_task: &str,
29 store::InitWatermark {
30 checkpoint_hi_inclusive,
31 reader_lo,
32 }: store::InitWatermark,
33 ) -> anyhow::Result<store::InitWatermark> {
34 let stored_watermark = StoredWatermark {
35 pipeline: pipeline_task.to_string(),
36 epoch_hi_inclusive: 0,
37 checkpoint_hi_inclusive: checkpoint_hi_inclusive.map_or(-1, |c| c as i64),
40 tx_hi: 0,
41 timestamp_ms_hi_inclusive: 0,
42 reader_lo: reader_lo as i64,
43 pruner_timestamp: Utc::now().naive_utc(),
44 pruner_hi: reader_lo as i64,
45 };
46
47 use diesel::pg::upsert::excluded;
48 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
49 diesel::insert_into(watermarks::table)
50 .values(&stored_watermark)
51 .on_conflict(watermarks::pipeline)
53 .do_update()
55 .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
58 .returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
59 .get_result(self)
60 .await?;
61
62 Ok(store::InitWatermark {
63 checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
64 reader_lo: reader_lo as u64,
65 })
66 }
67
68 async fn committer_watermark(
69 &mut self,
70 pipeline_task: &str,
71 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
72 let (
73 epoch_hi_inclusive,
74 checkpoint_hi_inclusive,
75 tx_hi,
76 timestamp_ms_hi_inclusive,
77 reader_lo,
78 ): (i64, i64, i64, i64, i64) = watermarks::table
79 .select((
80 watermarks::epoch_hi_inclusive,
81 watermarks::checkpoint_hi_inclusive,
82 watermarks::tx_hi,
83 watermarks::timestamp_ms_hi_inclusive,
84 watermarks::reader_lo,
85 ))
86 .filter(watermarks::pipeline.eq(pipeline_task))
87 .first(self)
88 .await?;
89
90 Ok(
91 (reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
92 epoch_hi_inclusive: epoch_hi_inclusive as u64,
93 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
94 tx_hi: tx_hi as u64,
95 timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
96 }),
97 )
98 }
99
100 async fn reader_watermark(
101 &mut self,
102 pipeline: &'static str,
103 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
104 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) = watermarks::table
105 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
106 .filter(watermarks::pipeline.eq(pipeline))
107 .first(self)
108 .await?;
109
110 Ok(
111 (reader_lo <= checkpoint_hi_inclusive).then_some(store::ReaderWatermark {
112 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
113 reader_lo: reader_lo as u64,
114 }),
115 )
116 }
117
118 async fn pruner_watermark(
119 &mut self,
120 pipeline: &'static str,
121 delay: Duration,
122 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
123 let wait_for = sql!(as BigInt,
129 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
130 delay.as_millis() as i64,
131 );
132
133 let (wait_for_ms, pruner_hi, reader_lo): (i64, i64, i64) = watermarks::table
134 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
135 .filter(watermarks::pipeline.eq(pipeline))
136 .first(self)
137 .await?;
138
139 Ok(Some(store::PrunerWatermark {
140 wait_for_ms,
141 pruner_hi: pruner_hi as u64,
142 reader_lo: reader_lo as u64,
143 }))
144 }
145
146 async fn set_committer_watermark(
147 &mut self,
148 pipeline_task: &str,
149 watermark: store::CommitterWatermark,
150 ) -> anyhow::Result<bool> {
151 Ok(diesel::update(watermarks::table)
152 .set((
153 watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
154 watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
155 watermarks::tx_hi.eq(watermark.tx_hi as i64),
156 watermarks::timestamp_ms_hi_inclusive
157 .eq(watermark.timestamp_ms_hi_inclusive as i64),
158 ))
159 .filter(watermarks::pipeline.eq(pipeline_task))
160 .filter(
161 watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
162 )
163 .execute(self)
164 .await?
165 > 0)
166 }
167
168 async fn set_reader_watermark(
169 &mut self,
170 pipeline: &'static str,
171 reader_lo: u64,
172 ) -> anyhow::Result<bool> {
173 Ok(diesel::update(watermarks::table)
174 .set((
175 watermarks::reader_lo.eq(reader_lo as i64),
176 watermarks::pruner_timestamp.eq(diesel::dsl::now),
177 ))
178 .filter(watermarks::pipeline.eq(pipeline))
179 .filter(watermarks::reader_lo.lt(reader_lo as i64))
180 .execute(self)
181 .await?
182 > 0)
183 }
184
185 async fn set_pruner_watermark(
186 &mut self,
187 pipeline: &'static str,
188 pruner_hi: u64,
189 ) -> anyhow::Result<bool> {
190 Ok(diesel::update(watermarks::table)
191 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
192 .filter(watermarks::pipeline.eq(pipeline))
193 .execute(self)
194 .await?
195 > 0)
196 }
197}
198
199#[async_trait]
200impl store::Store for Db {
201 type Connection<'c> = Connection<'c>;
202
203 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
204 self.connect().await
205 }
206}
207
208#[async_trait]
209impl store::TransactionalStore for Db {
210 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
211 where
212 R: Send + 'a,
213 F: Send + 'a,
214 F: for<'r> FnOnce(
215 &'r mut Self::Connection<'_>,
216 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
217 {
218 let mut conn = self.connect().await?;
219 AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
220 }
221}