sui_analytics_indexer/writers/
parquet_writer.rs1use crate::{AnalyticsWriter, FileFormat, FileType, ParquetSchema, ParquetValue};
5use anyhow::{Result, anyhow};
6use arrow_array::{
7 ArrayRef, RecordBatch,
8 builder::{ArrayBuilder, BooleanBuilder, GenericStringBuilder, Int64Builder, UInt64Builder},
9};
10use serde::Serialize;
11use std::fs::{File, create_dir_all, remove_file};
12use std::ops::Range;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use sui_storage::object_store::util::path_to_filesystem;
16use sui_types::base_types::EpochId;
17
18use parquet::arrow::ArrowWriter;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21
22type StrBuilder = GenericStringBuilder<i32>;
23
24enum ColumnBuilder {
25 U64(UInt64Builder),
26 I64(Int64Builder),
27 Bool(BooleanBuilder),
28 Str(StrBuilder),
29}
30
31impl ColumnBuilder {
32 fn as_any_builder(&mut self) -> &mut dyn ArrayBuilder {
33 match self {
34 Self::U64(b) => b,
35 Self::I64(b) => b,
36 Self::Bool(b) => b,
37 Self::Str(b) => b,
38 }
39 }
40
41 fn finish(self) -> ArrayRef {
42 match self {
43 Self::U64(mut b) => Arc::new(b.finish()),
44 Self::I64(mut b) => Arc::new(b.finish()),
45 Self::Bool(mut b) => Arc::new(b.finish()),
46 Self::Str(mut b) => Arc::new(b.finish()),
47 }
48 }
49}
50
51pub(crate) struct ParquetWriter {
53 root_dir_path: PathBuf,
54 file_type: FileType,
55 epoch: EpochId,
56 checkpoint_range: Range<u64>,
57 builders: Vec<ColumnBuilder>,
58 row_count: usize,
59}
60
61impl ParquetWriter {
62 pub(crate) fn new(
63 root_dir_path: &Path,
64 file_type: FileType,
65 start_checkpoint_seq_num: u64,
66 ) -> Result<Self> {
67 Ok(Self {
68 root_dir_path: root_dir_path.to_path_buf(),
69 file_type,
70 epoch: 0,
71 checkpoint_range: start_checkpoint_seq_num..u64::MAX,
72 builders: vec![],
73 row_count: 0,
74 })
75 }
76
77 fn file(&self) -> Result<File> {
78 let file_path = path_to_filesystem(
79 self.root_dir_path.clone(),
80 &self.file_type.file_path(
81 FileFormat::PARQUET,
82 self.epoch,
83 self.checkpoint_range.clone(),
84 ),
85 )?;
86 create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
87 if file_path.exists() {
88 remove_file(&file_path)?;
89 }
90 Ok(File::create(&file_path)?)
91 }
92}
93
94impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for ParquetWriter {
95 fn file_format(&self) -> Result<FileFormat> {
96 Ok(FileFormat::PARQUET)
97 }
98
99 fn write(&mut self, rows: Box<dyn Iterator<Item = S> + Send + Sync>) -> Result<()> {
100 let mut row_iter = rows.peekable();
102
103 if row_iter.peek().is_none() {
105 return Ok(());
106 }
107
108 if self.builders.is_empty()
110 && let Some(first_row) = row_iter.peek()
111 {
112 for col_idx in 0..S::schema().len() {
113 let value = first_row.get_column(col_idx);
114 self.builders.push(match value {
115 ParquetValue::U64(_) | ParquetValue::OptionU64(_) => {
116 ColumnBuilder::U64(UInt64Builder::new())
117 }
118 ParquetValue::I64(_) => ColumnBuilder::I64(Int64Builder::new()),
119 ParquetValue::Bool(_) => ColumnBuilder::Bool(BooleanBuilder::new()),
120 ParquetValue::Str(_) | ParquetValue::OptionStr(_) => {
121 ColumnBuilder::Str(StrBuilder::new())
122 }
123 });
124 }
125 }
126
127 let mut count = 0;
128 for row in row_iter {
129 count += 1;
130 for (col_idx, value) in (0..S::schema().len()).map(|i| (i, row.get_column(i))) {
131 match (&mut self.builders[col_idx], value) {
132 (ColumnBuilder::U64(b), ParquetValue::U64(v)) => b.append_value(v),
133 (ColumnBuilder::I64(b), ParquetValue::I64(v)) => b.append_value(v),
134 (ColumnBuilder::Bool(b), ParquetValue::Bool(v)) => b.append_value(v),
135 (ColumnBuilder::Str(b), ParquetValue::Str(v)) => b.append_value(&v),
136
137 (ColumnBuilder::U64(b), ParquetValue::OptionU64(opt)) => match opt {
138 Some(v) => b.append_value(v),
139 None => b.append_null(),
140 },
141 (ColumnBuilder::Str(b), ParquetValue::OptionStr(opt)) => match opt {
142 Some(v) => b.append_value(&v),
143 None => b.append_null(),
144 },
145
146 _ => return Err(anyhow!("type mismatch on column {}", col_idx)),
147 }
148 }
149 }
150
151 self.row_count += count;
152 Ok(())
153 }
154
155 fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
156 if self.builders.is_empty()
158 || self
159 .builders
160 .iter_mut()
161 .all(|b| b.as_any_builder().is_empty())
162 {
163 return Ok(false);
164 }
165
166 self.checkpoint_range.end = end_checkpoint_seq_num;
167
168 let arrays: Vec<ArrayRef> = std::mem::take(&mut self.builders)
170 .into_iter()
171 .map(|b| b.finish())
172 .collect();
173
174 let batch = RecordBatch::try_from_iter(S::schema().iter().zip(arrays))?;
175
176 let propertiess = WriterProperties::builder()
177 .set_compression(Compression::SNAPPY)
178 .build();
179 let mut writer = ArrowWriter::try_new(self.file()?, batch.schema(), Some(propertiess))?;
180 writer.write(&batch)?;
181 writer.close()?;
182 Ok(true)
183 }
184
185 fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
186 self.epoch = epoch_num;
187 self.checkpoint_range = start_checkpoint_seq_num..u64::MAX;
188 self.builders.clear();
189 self.row_count = 0;
190 Ok(())
191 }
192
193 fn file_size(&self) -> Result<Option<u64>> {
194 Ok(None)
198 }
199
200 fn rows(&self) -> Result<usize> {
201 Ok(self.row_count)
202 }
203}