Struct narwhal_primary::BlockRemover
source · [−]pub struct BlockRemover { /* private fields */ }
Expand description
BlockRemover is responsible for removing blocks identified by their certificate id (digest) from across our system. On high level It will make sure that the DAG is updated, internal storage where there certificates and headers are stored, and the corresponding batches as well.
Example
Basic setup of the BlockRemover
This example shows the basic setup of the BlockRemover module. It showcases the necessary components that have to be used (e.x channels, datastore etc) and how a request (command) should be issued to delete a list of blocks and receive the result of it.
#[tokio::main(flavor = "current_thread")]
const CERTIFICATES_CF: &str = "certificates";
const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round";
const HEADERS_CF: &str = "headers";
const PAYLOAD_CF: &str = "payload";
let temp_dir = tempdir().expect("Failed to open temporary directory").into_path();
// Basic setup: datastore, channels & BlockWaiter
let rocksdb = rocks::open_cf(temp_dir, None, &[CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF, HEADERS_CF, PAYLOAD_CF])
.expect("Failed creating database");
let (certificate_map, certificate_id_by_round_map, headers_map, payload_map) = reopen!(&rocksdb,
CERTIFICATES_CF;<CertificateDigest, Certificate>,
CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), CertificateToken>,
HEADERS_CF;<HeaderDigest, Header>,
PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>);
let certificate_store = CertificateStore::new(certificate_map, certificate_id_by_round_map);
let headers_store = Store::new(headers_map);
let payload_store = Store::new(payload_map);
let (tx_commands, rx_commands) = test_utils::test_channel!(1);
let (tx_delete_batches, rx_delete_batches) = test_utils::test_channel!(1);
let (tx_removed_certificates, _rx_removed_certificates) = test_utils::test_channel!(1);
let (tx_delete_block_result, mut rx_delete_block_result) = channel(1);
let name = PublicKey::default();
let committee = Committee{ epoch: 0, authorities: BTreeMap::new() };
let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into();
let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
// A dag with genesis for the committee
let (tx_new_certificates, rx_new_certificates) = test_utils::test_channel!(1);
let dag = Arc::new(Dag::new(&committee, rx_new_certificates, consensus_metrics).1);
// Populate genesis in the Dag
join_all(
Certificate::genesis(&committee)
.iter()
.map(|cert| dag.insert(cert.clone())),
)
.await;
let _ = BlockRemover::spawn(
name,
committee,
worker_cache,
certificate_store.clone(),
headers_store.clone(),
payload_store.clone(),
Some(dag),
network::P2pNetwork::new(test_utils::random_network()),
rx_reconfigure,
rx_commands,
rx_delete_batches,
tx_removed_certificates,
);
// A dummy certificate
let certificate = Certificate::default();
// Send a command to receive a block
tx_commands
.send(BlockRemoverCommand::RemoveBlocks {
ids: vec![certificate.clone().digest()],
sender: tx_delete_block_result,
})
.await;
// Dummy - we expect to receive the deleted batches responses via another component
// and get fed via the tx_delete_batches channel.
tx_delete_batches.send(Ok(DeleteBatchMessage{ ids: vec![BatchDigest::default()] })).await;
// Wait to receive the blocks delete output to the provided sender channel
match rx_delete_block_result.recv().await {
Some(Ok(result)) => {
println!("Successfully received a delete blocks response");
}
Some(Err(err)) => {
println!("Received an error {:?}", err);
}
_ => {
println!("Nothing received");
}
}
Implementations
sourceimpl BlockRemover
impl BlockRemover
pub fn spawn(
name: PublicKey,
committee: Committee,
worker_cache: SharedWorkerCache,
certificate_store: CertificateStore,
header_store: Store<HeaderDigest, Header>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
dag: Option<Arc<Dag>>,
worker_network: P2pNetwork,
rx_reconfigure: Receiver<ReconfigureNotification>,
rx_commands: Receiver<BlockRemoverCommand>,
rx_delete_batches: Receiver<Result<DeleteBatchMessage, DeleteBatchMessage>>,
removed_certificates: Sender<Certificate>
) -> JoinHandle<()>
Auto Trait Implementations
impl !RefUnwindSafe for BlockRemover
impl Send for BlockRemover
impl Sync for BlockRemover
impl Unpin for BlockRemover
impl !UnwindSafe for BlockRemover
Blanket Implementations
sourceimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request
impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a Request
impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
Pipes by value. This is generally the method you want to use. Read more
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
Borrows
self
and passes that borrow into the pipe function. Read morefn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
Mutably borrows
self
and passes that borrow into the pipe function. Read morefn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
Borrows
self
, then passes self.as_ref()
into the pipe function.fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> Rwhere
Self: Deref<Target = T>,
T: 'a + ?Sized,
R: 'a,
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> Rwhere
Self: Deref<Target = T>,
T: 'a + ?Sized,
R: 'a,
Borrows
self
, then passes self.deref()
into the pipe function.impl<T> Pointable for T
impl<T> Pointable for T
impl<T> Tap for T
impl<T> Tap for T
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
Immutable access to the
Borrow<B>
of a value. Read morefn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
Mutable access to the
BorrowMut<B>
of a value. Read morefn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
Immutable access to the
AsRef<R>
view of a value. Read morefn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
Mutable access to the
AsMut<R>
view of a value. Read morefn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
Immutable access to the
Deref::Target
of a value. Read morefn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
Mutable access to the
Deref::Target
of a value. Read morefn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
Calls
.tap()
only in debug builds, and is erased in release builds.fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
Calls
.tap_mut()
only in debug builds, and is erased in release
builds. Read morefn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
Calls
.tap_borrow()
only in debug builds, and is erased in release
builds. Read morefn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
Calls
.tap_borrow_mut()
only in debug builds, and is erased in release
builds. Read morefn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
Calls
.tap_ref()
only in debug builds, and is erased in release
builds. Read morefn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
Calls
.tap_ref_mut()
only in debug builds, and is erased in release
builds. Read more