sui_indexer_alt_framework/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use serde::Deserialize;
5use serde::Serialize;
6
7use sui_futures::stream::ConcurrencyConfig as RuntimeConcurrencyConfig;
8
9/// Serde-friendly concurrency configuration used by both the ingestion and processor stages.
10///
11/// Use `{ kind = "fixed", value = 10 }` for constant concurrency, or
12/// `{ kind = "adaptive", initial = 5, min = 1, max = 20 }` for adaptive concurrency that adjusts
13/// based on downstream channel fill fraction.
14///
15/// Adaptive mode accepts optional tuning overrides:
16/// - `dead_band`: `[low, high]` fill fraction thresholds (default: [0.6, 0.85])
17#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
18#[serde(tag = "kind", rename_all = "lowercase")]
19pub enum ConcurrencyConfig {
20    Fixed {
21        value: usize,
22    },
23    Adaptive {
24        initial: usize,
25        min: usize,
26        max: usize,
27        #[serde(default, skip_serializing_if = "Option::is_none")]
28        dead_band: Option<[f64; 2]>,
29    },
30}
31
32impl ConcurrencyConfig {
33    pub fn initial(&self) -> usize {
34        match self {
35            Self::Fixed { value } => *value,
36            Self::Adaptive { initial, .. } => *initial,
37        }
38    }
39
40    pub fn min(&self) -> usize {
41        let v = match self {
42            Self::Fixed { value } => *value,
43            Self::Adaptive { min, .. } => *min,
44        };
45        assert!(v >= 1, "min concurrency must be >= 1");
46        v
47    }
48
49    pub fn max(&self) -> usize {
50        match self {
51            Self::Fixed { value } => *value,
52            Self::Adaptive { max, .. } => *max,
53        }
54    }
55
56    pub fn is_adaptive(&self) -> bool {
57        matches!(self, Self::Adaptive { .. })
58    }
59}
60
61impl From<ConcurrencyConfig> for RuntimeConcurrencyConfig {
62    fn from(config: ConcurrencyConfig) -> Self {
63        match config {
64            ConcurrencyConfig::Fixed { value } => Self::fixed(value),
65            ConcurrencyConfig::Adaptive {
66                initial,
67                min,
68                max,
69                dead_band,
70            } => {
71                let mut c = Self::adaptive(initial, min, max);
72                if let Some([low, high]) = dead_band {
73                    c = c.with_dead_band(low, high);
74                }
75                c
76            }
77        }
78    }
79}