sui_analytics_indexer/writers/
parquet_writer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{AnalyticsWriter, FileFormat, FileType, ParquetSchema, ParquetValue};
5use anyhow::{Result, anyhow};
6use arrow_array::{
7    ArrayRef, RecordBatch,
8    builder::{ArrayBuilder, BooleanBuilder, GenericStringBuilder, Int64Builder, UInt64Builder},
9};
10use serde::Serialize;
11use std::fs::{File, create_dir_all, remove_file};
12use std::ops::Range;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use sui_storage::object_store::util::path_to_filesystem;
16use sui_types::base_types::EpochId;
17
18use parquet::arrow::ArrowWriter;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21
22type StrBuilder = GenericStringBuilder<i32>;
23
24enum ColumnBuilder {
25    U64(UInt64Builder),
26    I64(Int64Builder),
27    Bool(BooleanBuilder),
28    Str(StrBuilder),
29}
30
31impl ColumnBuilder {
32    fn as_any_builder(&mut self) -> &mut dyn ArrayBuilder {
33        match self {
34            Self::U64(b) => b,
35            Self::I64(b) => b,
36            Self::Bool(b) => b,
37            Self::Str(b) => b,
38        }
39    }
40
41    fn finish(self) -> ArrayRef {
42        match self {
43            Self::U64(mut b) => Arc::new(b.finish()),
44            Self::I64(mut b) => Arc::new(b.finish()),
45            Self::Bool(mut b) => Arc::new(b.finish()),
46            Self::Str(mut b) => Arc::new(b.finish()),
47        }
48    }
49}
50
51// Save table entries to parquet files.
52pub(crate) struct ParquetWriter {
53    root_dir_path: PathBuf,
54    file_type: FileType,
55    epoch: EpochId,
56    checkpoint_range: Range<u64>,
57    builders: Vec<ColumnBuilder>,
58    row_count: usize,
59}
60
61impl ParquetWriter {
62    pub(crate) fn new(
63        root_dir_path: &Path,
64        file_type: FileType,
65        start_checkpoint_seq_num: u64,
66    ) -> Result<Self> {
67        Ok(Self {
68            root_dir_path: root_dir_path.to_path_buf(),
69            file_type,
70            epoch: 0,
71            checkpoint_range: start_checkpoint_seq_num..u64::MAX,
72            builders: vec![],
73            row_count: 0,
74        })
75    }
76
77    fn file(&self) -> Result<File> {
78        let file_path = path_to_filesystem(
79            self.root_dir_path.clone(),
80            &self.file_type.file_path(
81                FileFormat::PARQUET,
82                self.epoch,
83                self.checkpoint_range.clone(),
84            ),
85        )?;
86        create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
87        if file_path.exists() {
88            remove_file(&file_path)?;
89        }
90        Ok(File::create(&file_path)?)
91    }
92}
93
94impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for ParquetWriter {
95    fn file_format(&self) -> Result<FileFormat> {
96        Ok(FileFormat::PARQUET)
97    }
98
99    fn write(&mut self, rows: Box<dyn Iterator<Item = S> + Send + Sync>) -> Result<()> {
100        // Make the iterator peekable
101        let mut row_iter = rows.peekable();
102
103        // Check if iterator is empty
104        if row_iter.peek().is_none() {
105            return Ok(());
106        }
107
108        // Lazily sample the first row to infer the schema and decide which concrete builder to instantiate
109        if self.builders.is_empty()
110            && let Some(first_row) = row_iter.peek()
111        {
112            for col_idx in 0..S::schema().len() {
113                let value = first_row.get_column(col_idx);
114                self.builders.push(match value {
115                    ParquetValue::U64(_) | ParquetValue::OptionU64(_) => {
116                        ColumnBuilder::U64(UInt64Builder::new())
117                    }
118                    ParquetValue::I64(_) => ColumnBuilder::I64(Int64Builder::new()),
119                    ParquetValue::Bool(_) => ColumnBuilder::Bool(BooleanBuilder::new()),
120                    ParquetValue::Str(_) | ParquetValue::OptionStr(_) => {
121                        ColumnBuilder::Str(StrBuilder::new())
122                    }
123                });
124            }
125        }
126
127        let mut count = 0;
128        for row in row_iter {
129            count += 1;
130            for (col_idx, value) in (0..S::schema().len()).map(|i| (i, row.get_column(i))) {
131                match (&mut self.builders[col_idx], value) {
132                    (ColumnBuilder::U64(b), ParquetValue::U64(v)) => b.append_value(v),
133                    (ColumnBuilder::I64(b), ParquetValue::I64(v)) => b.append_value(v),
134                    (ColumnBuilder::Bool(b), ParquetValue::Bool(v)) => b.append_value(v),
135                    (ColumnBuilder::Str(b), ParquetValue::Str(v)) => b.append_value(&v),
136
137                    (ColumnBuilder::U64(b), ParquetValue::OptionU64(opt)) => match opt {
138                        Some(v) => b.append_value(v),
139                        None => b.append_null(),
140                    },
141                    (ColumnBuilder::Str(b), ParquetValue::OptionStr(opt)) => match opt {
142                        Some(v) => b.append_value(&v),
143                        None => b.append_null(),
144                    },
145
146                    _ => return Err(anyhow!("type mismatch on column {}", col_idx)),
147                }
148            }
149        }
150
151        self.row_count += count;
152        Ok(())
153    }
154
155    fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
156        // Nothing to flush if builders aren't initialized or are empty
157        if self.builders.is_empty()
158            || self
159                .builders
160                .iter_mut()
161                .all(|b| b.as_any_builder().is_empty())
162        {
163            return Ok(false);
164        }
165
166        self.checkpoint_range.end = end_checkpoint_seq_num;
167
168        // Turn builders into Arrow arrays.
169        let arrays: Vec<ArrayRef> = std::mem::take(&mut self.builders)
170            .into_iter()
171            .map(|b| b.finish())
172            .collect();
173
174        let batch = RecordBatch::try_from_iter(S::schema().iter().zip(arrays))?;
175
176        let propertiess = WriterProperties::builder()
177            .set_compression(Compression::SNAPPY)
178            .build();
179        let mut writer = ArrowWriter::try_new(self.file()?, batch.schema(), Some(propertiess))?;
180        writer.write(&batch)?;
181        writer.close()?;
182        Ok(true)
183    }
184
185    fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
186        self.epoch = epoch_num;
187        self.checkpoint_range = start_checkpoint_seq_num..u64::MAX;
188        self.builders.clear();
189        self.row_count = 0;
190        Ok(())
191    }
192
193    fn file_size(&self) -> Result<Option<u64>> {
194        // parquet writer doesn't write records in a temp staging file
195        // and only flushes records after serializing and compressing them
196        // when flush is invoked
197        Ok(None)
198    }
199
200    fn rows(&self) -> Result<usize> {
201        Ok(self.row_count)
202    }
203}