sui_rpc_benchmark/direct/
query_enricher.rs1use anyhow::Result;
9use bb8::Pool;
10use bb8_postgres::PostgresConnectionManager;
11use parking_lot::Mutex;
12use std::sync::Arc;
13use sui_indexer_alt_framework::task::TrySpawnStreamExt;
14use tokio_postgres::{NoTls, Row, types::Type};
15use tracing::warn;
16use url::Url;
17
18use crate::direct::query_template_generator::QueryTemplate;
19
20#[derive(Clone, Debug)]
21pub enum SqlValue {
22 Text(Option<String>),
23 Int4(Option<i32>),
24 Int8(Option<i64>),
25 Float8(Option<f64>),
26 Bool(Option<bool>),
27 Int2(Option<i16>),
28 Bytea(Option<Vec<u8>>),
29}
30
31#[derive(Debug, Clone)]
32pub struct EnrichedBenchmarkQuery {
33 pub query: QueryTemplate,
34 pub rows: Vec<Vec<SqlValue>>,
35 pub types: Vec<Type>,
36}
37
38pub struct QueryEnricher {
39 pool: Pool<PostgresConnectionManager<NoTls>>,
40}
41
42impl QueryEnricher {
43 pub async fn new(db_url: &Url) -> Result<Self> {
44 let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
45 let pool = Pool::builder().build(manager).await?;
46 Ok(Self { pool })
47 }
48
49 fn row_to_values(row: &Row) -> Vec<SqlValue> {
50 (0..row.len())
51 .map(|i| match row.columns()[i].type_() {
52 &Type::TEXT | &Type::VARCHAR => SqlValue::Text(row.get(i)),
53 &Type::INT4 => SqlValue::Int4(row.get(i)),
54 &Type::INT8 => SqlValue::Int8(row.get(i)),
55 &Type::FLOAT8 => SqlValue::Float8(row.get(i)),
56 &Type::BOOL => SqlValue::Bool(row.get(i)),
57 &Type::INT2 => SqlValue::Int2(row.get(i)),
58 &Type::BYTEA => SqlValue::Bytea(row.get(i)),
59 ty => panic!("Unsupported type: {:?}", ty),
60 })
61 .collect()
62 }
63
64 pub async fn enrich_queries(
65 &self,
66 queries: Vec<QueryTemplate>,
67 ) -> Result<Vec<EnrichedBenchmarkQuery>> {
68 let enriched_queries = std::sync::Arc::new(Mutex::new(Vec::new()));
69 let pool = self.pool.clone();
70 let enriched_queries_clone = enriched_queries.clone();
71
72 futures::stream::iter(queries)
73 .try_for_each_spawned(10, move |query| {
74 let pool = pool.clone();
75 let enriched_queries = enriched_queries_clone.clone();
76 async move {
77 let client = pool.get().await?;
78 let sql = format!(
79 "SELECT {} FROM {} WHERE {} IS NOT NULL LIMIT 1000",
80 query.needed_columns.join(", "),
81 query.table_name,
82 query.needed_columns[0]
83 );
84
85 let rows = client.query(&sql, &[]).await?;
86 let Some(first_row) = rows.first() else {
87 warn!(
88 table = query.table_name,
89 "No sample data found for query on table, table is empty."
90 );
91 let enriched = EnrichedBenchmarkQuery {
92 query: query.clone(),
93 rows: Vec::new(),
94 types: query.needed_columns.iter().map(|_| Type::TEXT).collect(), };
96 enriched_queries.lock().push(enriched);
97 return Ok::<(), anyhow::Error>(());
98 };
99 let types = first_row
100 .columns()
101 .iter()
102 .map(|c| c.type_().clone())
103 .collect();
104 let raw_rows = rows.iter().map(Self::row_to_values).collect();
105
106 let enriched = EnrichedBenchmarkQuery {
107 query: query.clone(),
108 rows: raw_rows,
109 types,
110 };
111 enriched_queries.lock().push(enriched);
112 Ok::<(), anyhow::Error>(())
113 }
114 })
115 .await?;
116
117 Ok(Arc::try_unwrap(enriched_queries)
118 .map_err(|_| anyhow::anyhow!("Failed to try_unwrap Arc"))?
119 .into_inner())
120 }
121}