1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::block_synchronizer::handler::Handler;
use config::{Committee, SharedWorkerCache};
use crypto::PublicKey;
use fastcrypto::{Digest, Hash};
use futures::{
    future::{try_join_all, BoxFuture},
    stream::{futures_unordered::FuturesUnordered, StreamExt as _},
    FutureExt,
};
use network::{P2pNetwork, UnreliableNetwork};
use std::{
    collections::{HashMap, HashSet},
    fmt,
    fmt::Formatter,
    sync::Arc,
    time::Duration,
};
use tap::TapFallible;
use tokio::{
    sync::{oneshot, watch},
    task::JoinHandle,
    time::timeout,
};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
    metered_channel::Receiver, BatchDigest, BatchMessage, BlockError, BlockErrorKind, BlockResult,
    Certificate, CertificateDigest, Header, PrimaryWorkerMessage, ReconfigureNotification,
};
use Result::*;

//TODO [860]: customize the configuration of the block waiter to allow distinct
// settings for NW block waiter settings when deployed in different contexts.
// Indeed, this is used for NW + external consensus in the validator API (where
// latency is key) as well as NW + internal consensus, in the Executor
// (see #738, there reliability is key).
const BATCH_RETRIEVE_TIMEOUT: Duration = Duration::from_secs(10);

#[cfg(test)]
#[path = "tests/block_waiter_tests.rs"]
pub mod block_waiter_tests;

#[derive(Debug)]
pub enum BlockCommand {
    /// GetBlock dictates retrieving the block data
    /// (vector of transactions) by a given block digest.
    /// Results are sent to the provided Sender. The id is
    /// basically the Certificate digest id.
    GetBlock {
        id: CertificateDigest,
        // The channel to send the results to.
        sender: oneshot::Sender<BlockResult<GetBlockResponse>>,
    },

    /// GetBlocks will initiate the process of retrieving the
    /// block data for multiple provided block ids. The results
    /// will be returned in the same order that the ids were
    /// provided.
    GetBlocks {
        ids: Vec<CertificateDigest>,
        sender: oneshot::Sender<BlocksResult>,
    },
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetBlockResponse {
    pub id: CertificateDigest,
    #[allow(dead_code)]
    pub batches: Vec<BatchMessage>,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetBlocksResponse {
    pub blocks: Vec<BlockResult<GetBlockResponse>>,
}

pub type BatchResult = Result<BatchMessage, BatchMessageError>;

#[derive(Clone, Default, Debug)]
// If worker couldn't send us a batch, this error message
// should be passed to BlockWaiter.
pub struct BatchMessageError {
    pub id: BatchDigest,
}

type BlocksResult = Result<GetBlocksResponse, BlocksError>;

#[derive(Debug, Clone)]
pub struct BlocksError {
    ids: Vec<CertificateDigest>,
    #[allow(dead_code)]
    error: BlocksErrorType,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum BlocksErrorType {
    Error,
}

impl fmt::Display for BlocksErrorType {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

type RequestKey = Vec<u8>;

/// BlockWaiter is responsible for fetching the block data from the
/// downstream worker nodes. A block is basically the aggregate
/// of batches of transactions for a given certificate.
///
/// # Example
///
/// Basic setup of the BlockWaiter
///
/// This example shows the basic setup of the BlockWaiter module. It showcases
/// the necessary components that have to be used (e.x channels, datastore etc)
/// and how a request (command) should be issued to get a block and receive
/// the result of it.
///
/// ```rust
/// # use tokio::sync::{mpsc::{channel}, watch, oneshot};
/// # use arc_swap::ArcSwap;
/// # use fastcrypto::Hash;
/// # use std::env::temp_dir;
/// # use config::{Committee, WorkerCache, SharedWorkerCache};
/// # use crypto::PublicKey;
/// # use narwhal_primary::{BlockWaiter, BlockHeader, BlockCommand, block_synchronizer::{BlockSynchronizeResult, handler::{Error, Handler}}};
/// # use std::collections::BTreeMap;
/// # use types::Certificate;
/// # use types::{BatchMessage, BatchDigest, CertificateDigest, Batch};
/// # use mockall::*;
/// # use test_utils::test_channel;
/// # use types::ReconfigureNotification;
/// # use fastcrypto::traits::VerifyingKey;
/// # use async_trait::async_trait;
/// # use std::sync::Arc;
/// # use network::P2pNetwork;
///
/// # // A mock implementation of the BlockSynchronizerHandler
/// struct BlockSynchronizerHandler;
///
/// #[async_trait]
/// impl Handler for BlockSynchronizerHandler {
///
///     async fn get_and_synchronize_block_headers(&self, block_ids: Vec<CertificateDigest>) -> Vec<Result<Certificate, Error>> {
///         vec![]
///     }
///
///     async fn get_block_headers(&self, block_ids: Vec<CertificateDigest>) -> Vec<BlockSynchronizeResult<BlockHeader>> {
///         vec![]
///     }
///
///     async fn synchronize_block_payloads(&self, certificates: Vec<Certificate>) -> Vec<Result<Certificate, Error>> {
///         vec![]
///     }
///
/// }
///
/// #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
///     let (tx_commands, rx_commands) = test_utils::test_channel!(1);
///     let (tx_batches, rx_batches) = test_utils::test_channel!(1);
///     let (tx_get_block, mut rx_get_block) = oneshot::channel();
///
///     let name = PublicKey::default();
///     let committee = Committee{ epoch: 0, authorities: BTreeMap::new() };
///     let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into();
///     let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
///
///     // A dummy certificate
///     let certificate = Certificate::default();
///
///     // Dummy - we expect the BlockSynchronizer to actually respond, here
///     // we are using a mock
///     BlockWaiter::spawn(
///         name,
///         committee,
///         worker_cache,
///         rx_reconfigure,
///         rx_commands,
///         rx_batches,
///         Arc::new(BlockSynchronizerHandler{}),
///         network::P2pNetwork::new(test_utils::random_network()),
///     );
///
///     // Send a command to receive a block
///     tx_commands
///         .send(BlockCommand::GetBlock {
///             id: certificate.digest(),
///             sender: tx_get_block,
///         })
///         .await;
///
///     // Dummy - we expect to receive the requested batches via another component
///     // and get fed via the tx_batches channel.
///     tx_batches.send(Ok(BatchMessage{ id: BatchDigest::default(), transactions: Batch(vec![]) })).await;
///
///     // Wait to receive the block output to the provided sender channel
///     match rx_get_block.await.unwrap() {
///         Ok(result) => {
///             println!("Successfully received a block response");
///         }
///         Err(err) => {
///             println!("Received an error {}", err);
///         }
///     }
/// # }
/// ```
pub struct BlockWaiter<SynchronizerHandler: Handler + Send + Sync + 'static> {
    /// The public key of this primary.
    name: PublicKey,

    /// The committee information.
    committee: Committee,

    /// The worker information cache.
    worker_cache: SharedWorkerCache,

    /// Receive all the requests to get a block
    rx_commands: Receiver<BlockCommand>,

    /// Whenever we have a get_block request, we mark the
    /// processing as pending by adding it on the hashmap. Once
    /// we have a result back - or timeout - we expect to remove
    /// the digest from the map. The key is the block id, and
    /// the value is the corresponding certificate.
    pending_get_block: HashMap<CertificateDigest, Certificate>,

    /// Network driver allowing to send messages.
    worker_network: P2pNetwork,

    /// Watch channel to reconfigure the committee.
    rx_reconfigure: watch::Receiver<ReconfigureNotification>,

    /// The batch receive channel is listening for received
    /// messages for batches that have been requested
    rx_batch_receiver: Receiver<BatchResult>,

    /// Maps batch ids to channels that "listen" for arrived batch messages.
    /// On the key we hold the batch id.
    /// On the value we hold a map of CertificateDigest --> oneshot::Sender
    /// as we might need to deliver the batch result for requests from multiple
    /// certificates (although not really probable its still possible for batches of
    /// same id to be included in multiple headers).
    pending_batch_by_digest:
        HashMap<BatchDigest, HashMap<CertificateDigest, oneshot::Sender<BatchResult>>>,

    /// A map that holds the channels we should notify with the
    /// GetBlock responses.
    get_block_map_requesters:
        HashMap<CertificateDigest, Vec<oneshot::Sender<BlockResult<GetBlockResponse>>>>,

    /// A map that holds the channels we should notify with the
    /// GetBlocks responses.
    get_blocks_map_requesters: HashMap<RequestKey, Vec<oneshot::Sender<BlocksResult>>>,

    /// We use the handler of the block synchronizer to interact with the
    /// block synchronizer in a synchronous way. Share a reference of this
    /// between components.
    block_synchronizer_handler: Arc<SynchronizerHandler>,
}

impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<SynchronizerHandler> {
    // Create a new waiter and start listening on incoming
    // commands to fetch a block
    #[must_use]
    pub fn spawn(
        name: PublicKey,
        committee: Committee,
        worker_cache: SharedWorkerCache,
        rx_reconfigure: watch::Receiver<ReconfigureNotification>,
        rx_commands: Receiver<BlockCommand>,
        batch_receiver: Receiver<BatchResult>,
        block_synchronizer_handler: Arc<SynchronizerHandler>,
        worker_network: P2pNetwork,
    ) -> JoinHandle<()> {
        tokio::spawn(async move {
            Self {
                name,
                committee,
                worker_cache,
                rx_commands,
                pending_get_block: HashMap::new(),
                worker_network,
                rx_reconfigure,
                rx_batch_receiver: batch_receiver,
                pending_batch_by_digest: HashMap::new(),
                get_block_map_requesters: HashMap::new(),
                get_blocks_map_requesters: HashMap::new(),
                block_synchronizer_handler,
            }
            .run()
            .await;
        })
    }

    async fn run(&mut self) {
        let mut waiting_get_block = FuturesUnordered::new();
        let mut waiting_get_blocks = FuturesUnordered::new();

        info!(
            "BlockWaiter on node {} has started successfully.",
            self.name
        );
        loop {
            tokio::select! {
                Some(command) = self.rx_commands.recv() => {
                    match command {
                        BlockCommand::GetBlocks { ids, sender } => {
                            match self.handle_get_blocks_command(ids, sender).await {
                                Some((get_block_futures, get_blocks_future)) => {
                                    for fut in get_block_futures {
                                        waiting_get_block.push(fut);
                                    }
                                    waiting_get_blocks.push(get_blocks_future);
                                },
                                _ => debug!("no processing for command get blocks, will not wait for any result")
                            }
                        },
                        BlockCommand::GetBlock { id, sender } => {
                            match self.handle_get_block_command(id, sender).await {
                                Some(fut) => waiting_get_block.push(fut),
                                None => debug!("no processing for command, will not wait for any results")
                            }
                        }
                    }
                },
                // When we receive a BatchMessage (from a worker), this is
                // this is captured by the rx_batch_receiver channel and
                // handled appropriately.
                Some(batch_message) = self.rx_batch_receiver.recv() => {
                    self.handle_batch_message(batch_message).await;
                },
                // When we send a request to fetch a block's batches
                // we wait on the results to come back before we proceed.
                // By iterating the waiting vector it allow us to proceed
                // whenever waiting has been finished for a request.
                Some(result) = waiting_get_block.next() => {
                    self.handle_batch_waiting_result(result).await;
                },
                Some(result) = waiting_get_blocks.next() => {
                    self.handle_get_blocks_waiting_result(result).await;
                }

                // Check whether the committee changed. If the network address of our workers changed upon trying
                // to send them a request, we will timeout and the caller will have to retry.
                result = self.rx_reconfigure.changed() => {
                    result.expect("Committee channel dropped");
                    let message = self.rx_reconfigure.borrow().clone();
                    match message {
                        ReconfigureNotification::NewEpoch(new_committee)=> {
                            self.worker_network.cleanup(self.committee.network_diff(&new_committee));
                            self.committee = new_committee;
                        }
                        ReconfigureNotification::UpdateCommittee(new_committee)=> {
                            self.worker_network.cleanup(self.committee.network_diff(&new_committee));
                            self.committee = new_committee;
                        }
                        ReconfigureNotification::Shutdown => return
                    }
                    tracing::debug!("Committee updated to {}", self.committee);
                }
            }
        }
    }

    async fn handle_get_blocks_waiting_result(&mut self, result: BlocksResult) {
        let ids = result.as_ref().map_or_else(
            |err| err.ids.clone(),
            |ok| {
                ok.blocks
                    .iter()
                    .map(|res| res.as_ref().map_or_else(|e| e.id, |r| r.id))
                    .collect()
            },
        );

        let key = Self::construct_get_blocks_request_key(&ids);

        match self.get_blocks_map_requesters.remove(&key) {
            Some(senders) => {
                for sender in senders {
                    if sender.send(result.clone()).is_err() {
                        error!("Couldn't forward results for blocks {:?} to sender", ids)
                    }
                }
            }
            None => {
                error!("We should expect to find channels to respond for {:?}", ids);
            }
        }
    }

    #[instrument(level="debug", skip_all, fields(num_block_ids = ids.len()))]
    async fn handle_get_blocks_command<'a>(
        &mut self,
        ids: Vec<CertificateDigest>,
        sender: oneshot::Sender<BlocksResult>,
    ) -> Option<(
        Vec<BoxFuture<'a, BlockResult<GetBlockResponse>>>,
        BoxFuture<'a, BlocksResult>,
    )> {
        // check whether we have a similar request pending
        // to make the check easy we sort the digests in asc order,
        // and then we merge all the bytes to form a key
        let key = Self::construct_get_blocks_request_key(&ids);

        if self.get_blocks_map_requesters.contains_key(&key) {
            // request already pending, nothing to do, just add the sender to the list
            // of pending to be notified ones.
            self.get_blocks_map_requesters
                .entry(key)
                .or_insert_with(Vec::new)
                .push(sender);

            debug!("GetBlocks has an already pending request for the provided ids");
            return None;
        }

        // fetch the certificates
        let certificates = self.get_certificates(ids.clone()).await;

        let (get_block_futures, get_blocks_future) = self.get_blocks(certificates).await;

        // mark the request as pending
        self.get_blocks_map_requesters
            .entry(key)
            .or_insert_with(Vec::new)
            .push(sender);

        Some((get_block_futures, get_blocks_future))
    }

    /// Helper method to retrieve a single certificate.
    #[instrument(level = "trace", skip_all, fields(certificate_id = ?id))]
    async fn get_certificate(&mut self, id: CertificateDigest) -> Option<Certificate> {
        if let Some((_, c)) = self.get_certificates(vec![id]).await.first() {
            return c.to_owned();
        }
        None
    }

    /// Will fetch the certificates via the block_synchronizer. If the
    /// certificate is missing then we expect the synchronizer to
    /// fetch it via the peers. Otherwise if available on the storage
    /// should return the result immediately. The method is blocking to
    /// retrieve all the results.
    #[instrument(level = "trace", skip_all, fields(num_certificate_ids = ids.len()))]
    async fn get_certificates(
        &mut self,
        ids: Vec<CertificateDigest>,
    ) -> Vec<(CertificateDigest, Option<Certificate>)> {
        let mut results = Vec::new();

        let block_header_results = self
            .block_synchronizer_handler
            .get_and_synchronize_block_headers(ids)
            .await;

        for result in block_header_results {
            if let Ok(certificate) = result {
                results.push((certificate.digest(), Some(certificate)));
            } else {
                results.push((result.err().unwrap().block_id(), None));
            }
        }

        results
    }

    /// It triggers fetching the blocks for each provided certificate. The
    /// method receives the `certificates` vector which is a tuple of the
    /// certificate id and an Optional with the certificate. If the certificate
    /// doesn't exist then the Optional will be empty (None) which means that
    /// we haven't managed to retrieve/find the certificate an error result
    /// will immediately be sent to the consumer.
    async fn get_blocks<'a>(
        &mut self,
        certificates: Vec<(CertificateDigest, Option<Certificate>)>,
    ) -> (
        Vec<BoxFuture<'a, BlockResult<GetBlockResponse>>>,
        BoxFuture<'a, BlocksResult>,
    ) {
        let mut get_block_receivers = Vec::new();
        let mut futures = Vec::new();
        let mut ids = Vec::new();

        // ensure payloads are synchronized for the found certificates
        let found_certificates: Vec<Certificate> =
            certificates.iter().flat_map(|(_, c)| c).cloned().collect();

        let sync_result = self
            .block_synchronizer_handler
            .synchronize_block_payloads(found_certificates)
            .await;
        let successful_payload_sync_set = sync_result
            .iter()
            .flat_map(|r| r.as_ref().map(|c| c.digest()).ok())
            .collect::<HashSet<CertificateDigest>>();

        for (id, c) in certificates {
            let (get_block_sender, get_block_receiver) = oneshot::channel();
            ids.push(id);

            // certificate has been found
            if let Some(certificate) = c {
                // Proceed on getting the block only if the payload has
                // been successfully synced.
                if successful_payload_sync_set.contains(&id) {
                    let fut = self.get_block(id, certificate, get_block_sender).await;

                    if let Some(f) = fut {
                        futures.push(f.boxed());
                    }
                } else {
                    // Send a batch error in this case
                    get_block_sender
                        .send(Err(BlockError {
                            id,
                            error: BlockErrorKind::BatchError,
                        }))
                        .expect("Couldn't send BatchError error for a GetBlocks request");
                }
            } else {
                // if certificate has not been found , we just want to send directly a non-found block response
                get_block_sender
                    .send(Err(BlockError {
                        id,
                        error: BlockErrorKind::BlockNotFound,
                    }))
                    .expect("Couldn't send BlockNotFound error for a GetBlocks request");
            }

            get_block_receivers.push(get_block_receiver);
        }

        // create a waiter to fetch them all and send the response
        let fut = Self::wait_for_all_blocks(ids.clone(), get_block_receivers);

        return (futures, fut.boxed());
    }

    // handles received commands and returns back a future if needs to
    // wait for further results. Otherwise, an empty option is returned
    // if no further waiting on processing is needed.
    #[instrument(level="debug", skip_all, fields(block_id = ?id))]
    async fn handle_get_block_command<'a>(
        &mut self,
        id: CertificateDigest,
        sender: oneshot::Sender<BlockResult<GetBlockResponse>>,
    ) -> Option<BoxFuture<'a, BlockResult<GetBlockResponse>>> {
        match self.get_certificate(id).await {
            Some(certificate) => {
                // Before sending a request to fetch the block's batches, ensure that
                // those are synchronized and available.
                if !self
                    .ensure_payload_is_synchronized(certificate.clone())
                    .await
                {
                    // If the payload is not available or didn't manage to successfully
                    // sync, then we want to reply with an error and return.
                    sender
                        .send(Err(BlockError {
                            id,
                            error: BlockErrorKind::BatchError,
                        }))
                        .expect("Couldn't send message back to sender");

                    return None;
                }

                self.get_block(id, certificate, sender).await
            }
            None => {
                sender
                    .send(Err(BlockError {
                        id,
                        error: BlockErrorKind::BlockNotFound,
                    }))
                    .expect("Couldn't send BlockNotFound error for a GetBlock request");

                None
            }
        }
    }

    async fn get_block<'a>(
        &mut self,
        id: CertificateDigest,
        certificate: Certificate,
        sender: oneshot::Sender<BlockResult<GetBlockResponse>>,
    ) -> Option<BoxFuture<'a, BlockResult<GetBlockResponse>>> {
        // If similar request is already under processing, don't start a new one
        if self.pending_get_block.contains_key(&id) {
            self.get_block_map_requesters
                .entry(id)
                .or_insert_with(Vec::new)
                .push(sender);

            trace!("Block with id {} already has a pending request", id);
            return None;
        }

        trace!("No pending get block for {}", id);

        // Add on a vector the receivers
        let batch_receivers = self
            .send_batch_requests(id, certificate.header.clone())
            .await;

        let fut = Self::wait_for_all_batches(id, batch_receivers);

        // Ensure that we mark this block retrieval
        // as pending so no other can initiate the process
        self.pending_get_block.insert(id, certificate.clone());

        self.get_block_map_requesters
            .entry(id)
            .or_insert_with(Vec::new)
            .push(sender);

        return Some(fut.boxed());
    }

    /// This method will ensure that the payload for a block is available to
    /// the worker before going ahead to retrieve. This is done via the
    /// block synchronizer handler which will trigger the process of syncing
    /// if the payload is missing.
    ///
    /// # Returns
    ///
    /// `true`: If the payload was already synchronized or the synchronization
    /// was successful
    /// `false`: When synchronization failed
    async fn ensure_payload_is_synchronized(&self, certificate: Certificate) -> bool {
        let sync_result = self
            .block_synchronizer_handler
            .synchronize_block_payloads(vec![certificate.clone()])
            .await;

        sync_result
            .first()
            .expect("Expected at least one result back")
            .is_ok()
    }

    async fn wait_for_all_blocks(
        ids: Vec<CertificateDigest>,
        get_block_receivers: Vec<oneshot::Receiver<BlockResult<GetBlockResponse>>>,
    ) -> BlocksResult {
        let result = try_join_all(get_block_receivers).await;

        if let Ok(res) = result {
            Ok(GetBlocksResponse { blocks: res })
        } else {
            Err(BlocksError {
                ids,
                error: BlocksErrorType::Error,
            })
        }
    }

    async fn handle_batch_waiting_result(&mut self, result: BlockResult<GetBlockResponse>) {
        let block_id = result.as_ref().map_or_else(|e| e.id, |r| r.id);

        match self.get_block_map_requesters.remove(&block_id) {
            Some(senders) => {
                for sender in senders {
                    if sender.send(result.clone()).is_err() {
                        error!("Couldn't forward results for block {} to sender", block_id)
                    }
                }
            }
            None => {
                error!(
                    "We should expect to find channels to respond for {}",
                    block_id
                );
            }
        }

        // unlock the pending request & batches.
        match self.pending_get_block.remove(&block_id) {
            Some(certificate) => {
                for (digest, _) in certificate.header.payload {
                    // Although we expect the entries to have been cleaned up by the moment
                    // they have been delivered (or error) still adding this here to ensure
                    // we don't miss any edge case and introduce memory leaks.
                    if let Some(senders) = self.pending_batch_by_digest.get_mut(&digest) {
                        senders.remove(&block_id);

                        // if no more senders in the map then remove entirely
                        // the map for the digest
                        if senders.is_empty() {
                            self.pending_batch_by_digest.remove(&digest);
                        }
                    }
                }
            }
            None => {
                // TODO: handle panic here
                error!(
                    "Expected to find certificate with id {} for pending processing",
                    &block_id
                );
            }
        }
    }

    // Sends requests to fetch the batches from the corresponding workers.
    // It returns a vector of tuples of the batch digest and a Receiver
    // channel of the fetched batch.
    async fn send_batch_requests(
        &mut self,
        block_id: CertificateDigest,
        header: Header,
    ) -> Vec<(BatchDigest, oneshot::Receiver<BatchResult>)> {
        // Add the receivers to a vector
        let mut batch_receivers = Vec::new();

        // otherwise we send requests to all workers to send us their batches
        for (digest, worker_id) in header.payload {
            // Although we expect our headers to reference to unique batch ids it is
            // possible for a batch with the same id to be produced if the exact same
            // transactions are posted and included to a batch. Although unlikely it's
            // still a possibility and this component should be prepared for it.
            let (tx, rx) = oneshot::channel();

            if let Some(map) = self.pending_batch_by_digest.get_mut(&digest) {
                debug!(
                    "Skip sending request for batch {} to worker id {}, already pending",
                    digest, worker_id
                );

                map.insert(block_id, tx);
            } else {
                self.pending_batch_by_digest
                    .entry(digest)
                    .or_default()
                    .insert(block_id, tx);

                debug!(
                    "Sending batch {} request to worker id {}",
                    digest, worker_id
                );

                let worker_name = self
                    .worker_cache
                    .load()
                    .worker(&self.name, &worker_id)
                    .expect("Worker id not found")
                    .name;

                let message = PrimaryWorkerMessage::RequestBatch(digest);

                let _ = self.worker_network.unreliable_send(worker_name, &message);
            }

            // add the receiver to a vector to poll later
            batch_receivers.push((digest, rx));
        }

        batch_receivers
    }

    async fn handle_batch_message(&mut self, result: BatchResult) {
        let batch_id: BatchDigest = result.as_ref().map_or_else(|e| e.id, |r| r.id);

        match self.pending_batch_by_digest.remove(&batch_id) {
            Some(respond_to) => {
                for (id, s) in respond_to {
                    let _ = s.send(result.clone())
                        .tap_err(|err| error!("Couldn't send batch result {batch_id} message to channel [{err:?}] for block_id {id}"));
                }
            }
            None => {
                warn!("Couldn't find pending batch with id {}", &batch_id);
            }
        }
    }

    /// A helper method to "wait" for all the batch responses to be received.
    /// It gets the fetched batches and creates a GetBlockResponse ready
    /// to be sent back to the request.
    async fn wait_for_all_batches(
        block_id: CertificateDigest,
        batches_receivers: Vec<(BatchDigest, oneshot::Receiver<BatchResult>)>,
    ) -> BlockResult<GetBlockResponse> {
        let waiting: Vec<_> = batches_receivers
            .into_iter()
            .map(|p| Self::wait_for_batch(block_id, p.1))
            .collect();

        let mut result = try_join_all(waiting).await?;

        // to make deterministic the response, let's make sure that we'll serve the
        // batch results in the same order. Sort by batch_id ascending order.
        result.sort_by(|a, b| a.id.cmp(&b.id));

        Ok(GetBlockResponse {
            id: block_id,
            batches: result,
        })
    }

    /// Waits for a batch to be received. If batch is not received in time,
    /// then a timeout is yielded and an error is returned.
    async fn wait_for_batch(
        block_id: CertificateDigest,
        batch_receiver: oneshot::Receiver<BatchResult>,
    ) -> BlockResult<BatchMessage> {
        // ensure that we won't wait forever for a batch result to come
        let r = match timeout(BATCH_RETRIEVE_TIMEOUT, batch_receiver).await {
            Ok(Ok(result)) => result.or(Err(BlockErrorKind::BatchError)),
            Ok(Err(_)) => Err(BlockErrorKind::BatchError),
            Err(_) => Err(BlockErrorKind::BatchTimeout),
        };

        r.map_err(|e| BlockError {
            id: block_id,
            error: e,
        })
    }

    fn construct_get_blocks_request_key(ids: &[CertificateDigest]) -> RequestKey {
        let mut ids_cloned = ids.to_vec();
        ids_cloned.sort();

        let result: RequestKey = ids_cloned
            .into_iter()
            .flat_map(|d| Digest::from(d).to_vec())
            .collect();

        result
    }
}