typed_store/rocks/
safe_iter.rsuse std::{marker::PhantomData, sync::Arc};
use bincode::Options;
use prometheus::{Histogram, HistogramTimer};
use rocksdb::Direction;
use crate::metrics::{DBMetrics, RocksDBPerfContext};
use super::{RawIter, TypedStoreError};
use serde::de::DeserializeOwned;
pub struct SafeIter<'a, K, V> {
cf_name: String,
db_iter: RawIter<'a>,
_phantom: PhantomData<(K, V)>,
direction: Direction,
is_initialized: bool,
_timer: Option<HistogramTimer>,
_perf_ctx: Option<RocksDBPerfContext>,
bytes_scanned: Option<Histogram>,
keys_scanned: Option<Histogram>,
db_metrics: Option<Arc<DBMetrics>>,
bytes_scanned_counter: usize,
keys_returned_counter: usize,
}
impl<'a, K: DeserializeOwned, V: DeserializeOwned> SafeIter<'a, K, V> {
pub(super) fn new(
cf_name: String,
db_iter: RawIter<'a>,
_timer: Option<HistogramTimer>,
_perf_ctx: Option<RocksDBPerfContext>,
bytes_scanned: Option<Histogram>,
keys_scanned: Option<Histogram>,
db_metrics: Option<Arc<DBMetrics>>,
) -> Self {
Self {
cf_name,
db_iter,
_phantom: PhantomData,
direction: Direction::Forward,
is_initialized: false,
_timer,
_perf_ctx,
bytes_scanned,
keys_scanned,
db_metrics,
bytes_scanned_counter: 0,
keys_returned_counter: 0,
}
}
}
impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeIter<'_, K, V> {
type Item = Result<(K, V), TypedStoreError>;
fn next(&mut self) -> Option<Self::Item> {
if !self.is_initialized {
self.db_iter.seek_to_first();
self.is_initialized = true;
}
if self.db_iter.valid() {
let config = bincode::DefaultOptions::new()
.with_big_endian()
.with_fixint_encoding();
let raw_key = self
.db_iter
.key()
.expect("Valid iterator failed to get key");
let raw_value = self
.db_iter
.value()
.expect("Valid iterator failed to get value");
self.bytes_scanned_counter += raw_key.len() + raw_value.len();
self.keys_returned_counter += 1;
let key = config.deserialize(raw_key).ok();
let value = bcs::from_bytes(raw_value).ok();
match self.direction {
Direction::Forward => self.db_iter.next(),
Direction::Reverse => self.db_iter.prev(),
}
key.and_then(|k| value.map(|v| Ok((k, v))))
} else {
match self.db_iter.status() {
Ok(_) => None,
Err(err) => Some(Err(TypedStoreError::RocksDBError(format!("{err}")))),
}
}
}
}
impl<K, V> Drop for SafeIter<'_, K, V> {
fn drop(&mut self) {
if let Some(bytes_scanned) = self.bytes_scanned.take() {
bytes_scanned.observe(self.bytes_scanned_counter as f64);
}
if let Some(keys_scanned) = self.keys_scanned.take() {
keys_scanned.observe(self.keys_returned_counter as f64);
}
if let Some(db_metrics) = self.db_metrics.take() {
db_metrics
.read_perf_ctx_metrics
.report_metrics(&self.cf_name);
}
}
}
pub struct SafeRevIter<'a, K, V> {
iter: SafeIter<'a, K, V>,
}
impl<'a, K, V> SafeRevIter<'a, K, V> {
pub(crate) fn new(mut iter: SafeIter<'a, K, V>, upper_bound: Option<Vec<u8>>) -> Self {
iter.is_initialized = true;
iter.direction = Direction::Reverse;
match upper_bound {
None => iter.db_iter.seek_to_last(),
Some(key) => iter.db_iter.seek_for_prev(&key),
}
Self { iter }
}
}
impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeRevIter<'_, K, V> {
type Item = Result<(K, V), TypedStoreError>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}