1use std::cmp::min;
5use std::sync::Arc;
6
7use anyhow::Error;
8use async_trait::async_trait;
9use futures::StreamExt;
10use prometheus::{IntGauge, IntGaugeVec};
11use tokio::task::JoinHandle;
12
13use crate::metrics::IndexerMetricProvider;
14use crate::{Task, Tasks};
15use mysten_metrics::{metered_channel, spawn_monitored_task};
16use tap::tap::TapFallible;
17
18type CheckpointData<T> = (u64, Vec<T>);
19pub type DataSender<T> = metered_channel::Sender<CheckpointData<T>>;
20
21const INGESTION_BATCH_SIZE: usize = 100;
22const RETRIEVED_CHECKPOINT_CHANNEL_SIZE: usize = 10000;
23
24pub struct IndexerBuilder<D, M, P> {
25 name: String,
26 datasource: D,
27 data_mapper: M,
28 persistent: P,
29 backfill_strategy: BackfillStrategy,
30 disable_live_task: bool,
31}
32
33impl<D, M, P> IndexerBuilder<D, M, P> {
34 pub fn new<R>(
35 name: &str,
36 datasource: D,
37 data_mapper: M,
38 persistent: P,
39 ) -> IndexerBuilder<D, M, P>
40 where
41 P: Persistent<R>,
42 {
43 IndexerBuilder {
44 name: name.into(),
45 datasource,
46 data_mapper,
47 backfill_strategy: BackfillStrategy::Simple,
48 disable_live_task: false,
49 persistent,
50 }
51 }
52 pub fn build(self) -> Indexer<P, D, M> {
53 Indexer {
54 name: self.name,
55 storage: self.persistent,
56 datasource: self.datasource.into(),
57 backfill_strategy: self.backfill_strategy,
58 disable_live_task: self.disable_live_task,
59 data_mapper: self.data_mapper,
60 }
61 }
62
63 pub fn with_backfill_strategy(mut self, backfill: BackfillStrategy) -> Self {
64 self.backfill_strategy = backfill;
65 self
66 }
67
68 pub fn disable_live_task(mut self) -> Self {
69 self.disable_live_task = true;
70 self
71 }
72}
73
74pub struct Indexer<P, D, M> {
75 name: String,
76 storage: P,
77 datasource: Arc<D>,
78 data_mapper: M,
79 backfill_strategy: BackfillStrategy,
80 disable_live_task: bool,
81}
82
83impl<P, D, M> Indexer<P, D, M> {
84 pub async fn start<T, R>(mut self) -> Result<(), Error>
85 where
86 D: Datasource<T> + 'static,
87 M: DataMapper<T, R> + 'static,
88 P: Persistent<R> + 'static,
89 T: Send,
90 {
91 let task_name = self.name.clone();
92 self.update_tasks()
94 .await
95 .tap_err(|e| {
96 tracing::error!(task_name, "Failed to update tasks: {:?}", e);
97 })
98 .tap_ok(|_| {
99 tracing::info!(task_name, "Tasks updated.");
100 })?;
101
102 let ongoing_tasks = self
104 .storage
105 .get_ongoing_tasks(&self.name)
106 .await
107 .tap_err(|e| {
108 tracing::error!(task_name, "Failed to get updated tasks: {:?}", e);
109 })
110 .tap_ok(|tasks| {
111 tracing::info!(task_name, "Got updated tasks: {:?}", tasks);
112 })?;
113
114 let live_task_future = match ongoing_tasks.live_task() {
118 Some(live_task) if !self.disable_live_task => {
119 let live_task_future = self.datasource.start_ingestion_task(
120 live_task,
121 self.storage.clone(),
122 self.data_mapper.clone(),
123 );
124 Some(live_task_future)
125 }
126 _ => None,
127 };
128
129 let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc();
130 let storage_clone = self.storage.clone();
131 let data_mapper_clone = self.data_mapper.clone();
132 let datasource_clone = self.datasource.clone();
133
134 let handle = spawn_monitored_task!(async {
135 for backfill_task in backfill_tasks {
137 if backfill_task.start_checkpoint < backfill_task.target_checkpoint {
138 datasource_clone
139 .start_ingestion_task(
140 backfill_task,
141 storage_clone.clone(),
142 data_mapper_clone.clone(),
143 )
144 .await
145 .expect("Backfill task failed");
146 }
147 }
148 });
149
150 if let Some(live_task_future) = live_task_future {
151 live_task_future.await?;
152 }
153
154 tokio::try_join!(handle)?;
155
156 Ok(())
157 }
158
159 async fn update_tasks<T, R>(&mut self) -> Result<(), Error>
160 where
161 P: Persistent<R>,
162 D: Datasource<T>,
163 T: Send,
164 {
165 let ongoing_tasks = self.storage.get_ongoing_tasks(&self.name).await?;
166 let largest_checkpoint = self
167 .storage
168 .get_largest_indexed_checkpoint(&self.name)
169 .await?;
170 let live_task_from_checkpoint = self.datasource.get_live_task_starting_checkpoint().await?;
171
172 if !self.disable_live_task {
178 match ongoing_tasks.live_task() {
179 None => {
180 self.storage
181 .register_live_task(
182 format!("{} - Live", self.name),
183 live_task_from_checkpoint,
184 )
185 .await
186 .tap_ok(|_| {
187 tracing::info!(
188 task_name = self.name.as_str(),
189 "Created live task from {}",
190 live_task_from_checkpoint,
191 );
192 })
193 .tap_err(|e| {
194 tracing::error!(
195 "Failed to register live task ({}-MAX): {:?}",
196 live_task_from_checkpoint,
197 e
198 );
199 })?;
200 }
201 Some(mut live_task) => {
202 if live_task_from_checkpoint != live_task.start_checkpoint {
206 let old_checkpoint = live_task.start_checkpoint;
207 live_task.start_checkpoint = live_task_from_checkpoint;
208 self.storage
209 .update_task(live_task)
210 .await
211 .tap_ok(|_| {
212 tracing::info!(
213 task_name = self.name.as_str(),
214 "Updated live task starting point from {} to {}",
215 old_checkpoint,
216 live_task_from_checkpoint,
217 );
218 })
219 .tap_err(|e| {
220 tracing::error!(
221 "Failed to update live task to ({}-MAX): {:?}",
222 live_task_from_checkpoint,
223 e
224 );
225 })?;
226 }
227 }
228 }
229 }
230
231 let from_checkpoint = largest_checkpoint
237 .map(|cp| cp + 1)
238 .unwrap_or(self.datasource.get_genesis_height());
239 if from_checkpoint < live_task_from_checkpoint {
240 self.create_backfill_tasks(from_checkpoint, live_task_from_checkpoint - 1)
241 .await
242 .tap_ok(|_| {
243 tracing::info!(
244 task_name = self.name.as_str(),
245 "Created backfill tasks ({}-{})",
246 from_checkpoint,
247 live_task_from_checkpoint - 1
248 );
249 })
250 .tap_err(|e| {
251 tracing::error!(
252 task_name = self.name.as_str(),
253 "Failed to create backfill tasks ({}-{}): {:?}",
254 from_checkpoint,
255 live_task_from_checkpoint - 1,
256 e
257 );
258 })?;
259 }
260 Ok(())
261 }
262
263 async fn create_backfill_tasks<R>(&mut self, mut from_cp: u64, to_cp: u64) -> Result<(), Error>
265 where
266 P: Persistent<R>,
267 {
268 match self.backfill_strategy {
269 BackfillStrategy::Simple => {
270 self.storage
271 .register_task(
272 format!("{} - backfill - {from_cp}:{to_cp}", self.name),
273 from_cp,
274 to_cp,
275 )
276 .await
277 }
278 BackfillStrategy::Partitioned { task_size } => {
279 while from_cp < to_cp {
281 let target_cp = min(from_cp + task_size - 1, to_cp);
282 self.storage
283 .register_task(
284 format!("{} - backfill - {from_cp}:{target_cp}", self.name),
285 from_cp,
286 target_cp,
287 )
288 .await?;
289 from_cp = target_cp + 1;
290 }
291 Ok(())
292 }
293 BackfillStrategy::Disabled => Ok(()),
294 }
295 }
296
297 #[cfg(any(feature = "test-utils", test))]
298 pub async fn test_only_update_tasks<R, T>(&mut self) -> Result<(), Error>
299 where
300 P: Persistent<R>,
301 D: Datasource<T>,
302 T: Send,
303 {
304 self.update_tasks().await
305 }
306
307 #[cfg(any(feature = "test-utils", test))]
308 pub fn test_only_storage<R>(&self) -> &P
309 where
310 P: Persistent<R>,
311 {
312 &self.storage
313 }
314
315 #[cfg(any(feature = "test-utils", test))]
316 pub fn test_only_name(&self) -> String {
317 self.name.clone()
318 }
319}
320
321#[async_trait]
322pub trait Persistent<T>: IndexerProgressStore + Sync + Send + Clone {
323 async fn write(&self, data: Vec<T>) -> Result<(), Error>;
324}
325
326#[async_trait]
327pub trait IndexerProgressStore: Send {
328 async fn load_progress(&self, task_name: String) -> anyhow::Result<u64>;
329 async fn save_progress(
334 &mut self,
335 task: &Task,
336 checkpoint_numbers: &[u64],
337 ) -> anyhow::Result<Option<u64>>;
338
339 async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Tasks, Error>;
340
341 async fn get_largest_indexed_checkpoint(&self, prefix: &str) -> Result<Option<u64>, Error>;
342
343 async fn register_task(
344 &mut self,
345 task_name: String,
346 start_checkpoint: u64,
347 target_checkpoint: u64,
348 ) -> Result<(), anyhow::Error>;
349
350 async fn register_live_task(
351 &mut self,
352 task_name: String,
353 start_checkpoint: u64,
354 ) -> Result<(), anyhow::Error>;
355
356 async fn update_task(&mut self, task: Task) -> Result<(), Error>;
357}
358
359#[async_trait]
360pub trait Datasource<T: Send>: Sync + Send {
361 async fn start_ingestion_task<M, P, R>(
362 &self,
363 task: Task,
364 mut storage: P,
365 data_mapper: M,
366 ) -> Result<(), Error>
367 where
368 M: DataMapper<T, R>,
369 P: Persistent<R>,
370 {
371 let task_name = task.task_name.clone();
372 let task_name_prefix = task.name_prefix();
373 let task_type_label = task.type_str();
374 let starting_checkpoint = task.start_checkpoint;
375 let target_checkpoint = task.target_checkpoint;
376 let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE")
377 .unwrap_or(INGESTION_BATCH_SIZE.to_string())
378 .parse::<usize>()
379 .unwrap();
380 let checkpoint_channel_size = std::env::var("RETRIEVED_CHECKPOINT_CHANNEL_SIZE")
381 .unwrap_or(RETRIEVED_CHECKPOINT_CHANNEL_SIZE.to_string())
382 .parse::<usize>()
383 .unwrap();
384 tracing::info!(
385 task_name,
386 ingestion_batch_size,
387 checkpoint_channel_size,
388 "Starting ingestion task ({}-{})",
389 starting_checkpoint,
390 target_checkpoint,
391 );
392 let (data_sender, data_rx) = metered_channel::channel(
393 checkpoint_channel_size,
394 &mysten_metrics::get_metrics()
395 .unwrap()
396 .channel_inflight
397 .with_label_values(&[&format!("{}-{}", task_name_prefix, task_type_label)]),
400 );
401 let is_live_task = task.is_live_task;
402 let _live_tasks_tracker = if is_live_task {
403 Some(LiveTasksTracker::new(
404 self.metric_provider()
405 .get_inflight_live_tasks_metrics()
406 .clone(),
407 &task_name,
408 ))
409 } else {
410 None
411 };
412 let join_handle = self.start_data_retrieval(task.clone(), data_sender).await?;
413 let processed_checkpoints_metrics = self
414 .metric_provider()
415 .get_tasks_processed_checkpoints_metric()
416 .with_label_values(&[task_name_prefix, task_type_label]);
417 let remaining_checkpoints_metric = if !is_live_task {
419 let remaining = self
420 .metric_provider()
421 .get_tasks_remaining_checkpoints_metric()
422 .with_label_values(&[task_name_prefix]);
423 remaining.set((target_checkpoint - starting_checkpoint + 1) as i64);
424 Some(remaining)
425 } else {
426 None
427 };
428
429 let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx)
430 .ready_chunks(ingestion_batch_size);
431 let mut last_saved_checkpoint = None;
432 loop {
433 let batch_option = stream.next().await;
434 if batch_option.is_none() {
435 tracing::error!(task_name, "Data stream ended unexpectedly");
436 break;
437 }
438 let batch = batch_option.unwrap();
439 let mut max_height = 0;
440 let mut heights = vec![];
441 let mut data = vec![];
442 for (height, d) in batch {
443 if height > target_checkpoint {
445 tracing::warn!(
446 task_name,
447 height,
448 "Received data with height > target_checkpoint, skipping."
449 );
450 continue;
451 }
452 max_height = std::cmp::max(max_height, height);
453 heights.push(height);
454 data.extend(d);
455 }
456 tracing::debug!(
457 task_name,
458 max_height,
459 "Ingestion task received {} blocks.",
460 heights.len(),
461 );
462 let timer = tokio::time::Instant::now();
463
464 if !data.is_empty() {
465 let timer = tokio::time::Instant::now();
466 let processed_data = data.into_iter().try_fold(vec![], |mut result, d| {
467 result.append(&mut data_mapper.map(d)?);
468 Ok::<Vec<_>, Error>(result)
469 })?;
470 tracing::debug!(
471 task_name,
472 max_height,
473 "Data mapper processed {} blocks in {}ms.",
474 heights.len(),
475 timer.elapsed().as_millis(),
476 );
477 let timer = tokio::time::Instant::now();
478 storage.write(processed_data).await?;
481 tracing::debug!(
482 task_name,
483 max_height,
484 "Processed data ({} blocks) was wrote to storage in {}ms.",
485 heights.len(),
486 timer.elapsed().as_millis(),
487 );
488 }
489 last_saved_checkpoint = storage.save_progress(&task, &heights).await?;
490 tracing::debug!(
491 task_name,
492 max_height,
493 last_saved_checkpoint,
494 "Ingestion task processed {} blocks in {}ms",
495 heights.len(),
496 timer.elapsed().as_millis(),
497 );
498 processed_checkpoints_metrics.inc_by(heights.len() as u64);
499 if let Some(m) = &remaining_checkpoints_metric {
500 m.set(std::cmp::max(
502 target_checkpoint as i64 - max_height as i64,
503 0,
504 ));
505 }
506 if let Some(cp) = last_saved_checkpoint
508 && cp >= target_checkpoint
509 {
510 break;
512 }
513 }
514 if is_live_task {
515 tracing::error!(task_name, "Live task exiting unexpectedly");
517 } else if let Some(last_saved_checkpoint) = last_saved_checkpoint {
518 if last_saved_checkpoint < target_checkpoint {
519 tracing::error!(
520 task_name,
521 last_saved_checkpoint,
522 "Task exiting before reaching target checkpoint",
523 );
524 } else {
525 tracing::info!(task_name, "Backfill task is done, exiting");
526 }
527 } else {
528 tracing::error!(
529 task_name,
530 "Task exiting unexpectedly with no progress saved"
531 );
532 }
533 join_handle.abort();
534 if let Some(m) = &remaining_checkpoints_metric {
535 m.set(0)
536 }
537 join_handle.await?.tap_err(|err| {
538 tracing::error!(task_name, "Data retrieval task failed: {:?}", err);
539 })
540 }
541
542 async fn start_data_retrieval(
543 &self,
544 task: Task,
545 data_sender: DataSender<T>,
546 ) -> Result<JoinHandle<Result<(), Error>>, Error>;
547
548 async fn get_live_task_starting_checkpoint(&self) -> Result<u64, Error>;
549
550 fn get_genesis_height(&self) -> u64;
551
552 fn metric_provider(&self) -> &dyn IndexerMetricProvider;
553}
554
555pub enum BackfillStrategy {
556 Simple,
557 Partitioned { task_size: u64 },
558 Disabled,
559}
560
561pub trait DataMapper<T, R>: Sync + Send + Clone {
562 fn map(&self, data: T) -> Result<Vec<R>, anyhow::Error>;
563}
564
565struct LiveTasksTracker {
566 gauge: IntGauge,
567}
568
569impl LiveTasksTracker {
570 pub fn new(metrics: IntGaugeVec, task_name: &str) -> Self {
571 let gauge = metrics.with_label_values(&[task_name]);
572 gauge.inc();
573 Self { gauge }
574 }
575}
576
577impl Drop for LiveTasksTracker {
578 fn drop(&mut self) {
579 self.gauge.dec();
580 }
581}