consensus_core/
transaction.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{collections::BTreeMap, sync::Arc};
4
5use consensus_config::Epoch;
6use consensus_types::block::{
7    BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX, Round, TransactionIndex,
8};
9use mysten_common::debug_fatal;
10use mysten_metrics::monitored_mpsc::{Receiver, Sender, channel};
11use parking_lot::Mutex;
12use tap::TapFallible;
13use thiserror::Error;
14use tokio::sync::oneshot;
15use tracing::{error, warn};
16
17use crate::{block::Transaction, context::Context};
18
19/// The maximum number of transactions pending to the queue to be pulled for block proposal
20const MAX_PENDING_TRANSACTIONS: usize = 2_000;
21
22/// The guard acts as an acknowledgment mechanism for the inclusion of the transactions to a block.
23/// When its last transaction is included to a block then `included_in_block_ack` will be signalled.
24/// If the guard is dropped without getting acknowledged that means the transactions have not been
25/// included to a block and the consensus is shutting down.
26pub(crate) struct TransactionsGuard {
27    // Holds a list of transactions to be included in the block.
28    // A TransactionsGuard may be partially consumed by `TransactionConsumer`, in which case, this holds the remaining transactions.
29    transactions: Vec<Transaction>,
30
31    // When the transactions are included in a block, this will be signalled with
32    // the following information
33    included_in_block_ack: oneshot::Sender<(
34        // The block reference in which the transactions have been included
35        BlockRef,
36        // The indices of the transactions that have been included in the block
37        Vec<TransactionIndex>,
38        // A receiver to notify the submitter about the block status
39        oneshot::Receiver<BlockStatus>,
40    )>,
41}
42
43/// The TransactionConsumer is responsible for fetching the next transactions to be included for the block proposals.
44/// The transactions are submitted to a channel which is shared between the TransactionConsumer and the TransactionClient
45/// and are pulled every time the `next` method is called.
46pub(crate) struct TransactionConsumer {
47    tx_receiver: Receiver<TransactionsGuard>,
48    max_transactions_in_block_bytes: u64,
49    max_num_transactions_in_block: u64,
50    pending_transactions: Option<TransactionsGuard>,
51    block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55#[allow(unused)]
56pub enum BlockStatus {
57    /// The block has been sequenced as part of a committed sub dag. That means that any transaction that has been included in the block
58    /// has been committed as well.
59    Sequenced(BlockRef),
60    /// The block has been garbage collected and will never be committed. Any transactions that have been included in the block should also
61    /// be considered as impossible to be committed as part of this block and might need to be retried
62    GarbageCollected(BlockRef),
63}
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub enum LimitReached {
67    // The maximum number of transactions have been included
68    MaxNumOfTransactions,
69    // The maximum number of bytes have been included
70    MaxBytes,
71    // All available transactions have been included
72    AllTransactionsIncluded,
73}
74
75impl TransactionConsumer {
76    pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
77        // max_num_transactions_in_block - 1 is the max possible transaction index in a block.
78        // TransactionIndex::MAX is reserved for the ping transaction.
79        // Indexes down to TransactionIndex::MAX - 8 are also reserved for future use.
80        // This check makes sure they do not overlap.
81        assert!(
82            context
83                .protocol_config
84                .max_num_transactions_in_block()
85                .saturating_sub(1)
86                < TransactionIndex::MAX.saturating_sub(NUM_RESERVED_TRANSACTION_INDICES) as u64,
87            "Unsupported max_num_transactions_in_block: {}",
88            context.protocol_config.max_num_transactions_in_block()
89        );
90
91        Self {
92            tx_receiver,
93            max_transactions_in_block_bytes: context
94                .protocol_config
95                .max_transactions_in_block_bytes(),
96            max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
97            pending_transactions: None,
98            block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
99        }
100    }
101
102    // Attempts to fetch the next transactions that have been submitted for sequence. Respects the `max_transactions_in_block_bytes`
103    // and `max_num_transactions_in_block` parameters specified via protocol config.
104    // This returns one or more transactions to be included in the block and a callback to acknowledge the inclusion of those transactions.
105    // Also returns a `LimitReached` enum to indicate which limit type has been reached.
106    pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
107        let mut transactions = Vec::new();
108        let mut acks = Vec::new();
109        let mut total_bytes = 0;
110        let mut limit_reached = LimitReached::AllTransactionsIncluded;
111
112        // Handle one batch of incoming transactions from TransactionGuard.
113        // The method will return `None` if all the transactions can be included in the block. Otherwise none of the transactions will be
114        // included in the block and the method will return the TransactionGuard.
115        let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
116            // If no transactions are submitted, it means that the transaction guard represents a ping transaction.
117            // In this case, we need to push the `PING_TRANSACTION_INDEX` to the indices vector.
118            let transactions_num = t.transactions.len() as u64;
119            if transactions_num == 0 {
120                acks.push((t.included_in_block_ack, vec![PING_TRANSACTION_INDEX]));
121                return None;
122            }
123
124            // Check if the total bytes of the transactions exceed the max transactions in block bytes.
125            let transactions_bytes =
126                t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
127            if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
128                limit_reached = LimitReached::MaxBytes;
129                return Some(t);
130            }
131            if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
132                limit_reached = LimitReached::MaxNumOfTransactions;
133                return Some(t);
134            }
135
136            total_bytes += transactions_bytes;
137
138            // Calculate indices for this batch
139            let start_idx = transactions.len() as TransactionIndex;
140            let indices: Vec<TransactionIndex> =
141                (start_idx..start_idx + t.transactions.len() as TransactionIndex).collect();
142
143            // The transactions can be consumed, register its ack and transaction
144            // indices to be sent with the ack.
145            acks.push((t.included_in_block_ack, indices));
146            transactions.extend(t.transactions);
147            None
148        };
149
150        if let Some(t) = self.pending_transactions.take()
151            && let Some(pending_transactions) = handle_txs(t)
152        {
153            debug_fatal!(
154                "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
155                pending_transactions.transactions
156            );
157        }
158
159        // Until we have reached the limit for the pull.
160        // We may have already reached limit in the first iteration above, in which case we stop immediately.
161        while self.pending_transactions.is_none() {
162            if let Ok(t) = self.tx_receiver.try_recv() {
163                self.pending_transactions = handle_txs(t);
164            } else {
165                break;
166            }
167        }
168
169        let block_status_subscribers = self.block_status_subscribers.clone();
170        (
171            transactions,
172            Box::new(move |block_ref: BlockRef| {
173                let mut block_status_subscribers = block_status_subscribers.lock();
174
175                for (ack, tx_indices) in acks {
176                    let (status_tx, status_rx) = oneshot::channel();
177
178                    block_status_subscribers
179                        .entry(block_ref)
180                        .or_default()
181                        .push(status_tx);
182
183                    let _ = ack.send((block_ref, tx_indices, status_rx));
184                }
185            }),
186            limit_reached,
187        )
188    }
189
190    /// Notifies all the transaction submitters who are waiting to receive an update on the status of the block.
191    /// The `committed_blocks` are the blocks that have been committed and the `gc_round` is the round up to which the blocks have been garbage collected.
192    /// First we'll notify for all the committed blocks, and then for all the blocks that have been garbage collected.
193    pub(crate) fn notify_own_blocks_status(
194        &self,
195        committed_blocks: Vec<BlockRef>,
196        gc_round: Round,
197    ) {
198        // Notify for all the committed blocks first
199        let mut block_status_subscribers = self.block_status_subscribers.lock();
200        for block_ref in committed_blocks {
201            if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
202                subscribers.into_iter().for_each(|s| {
203                    let _ = s.send(BlockStatus::Sequenced(block_ref));
204                });
205            }
206        }
207
208        // Now notify everyone <= gc_round that their block has been garbage collected and clean up the entries
209        while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
210            if block_ref.round <= gc_round {
211                subscribers.into_iter().for_each(|s| {
212                    let _ = s.send(BlockStatus::GarbageCollected(block_ref));
213                });
214            } else {
215                block_status_subscribers.insert(block_ref, subscribers);
216                break;
217            }
218        }
219    }
220
221    #[cfg(test)]
222    pub(crate) fn subscribe_for_block_status_testing(
223        &self,
224        block_ref: BlockRef,
225    ) -> oneshot::Receiver<BlockStatus> {
226        let (tx, rx) = oneshot::channel();
227        let mut block_status_subscribers = self.block_status_subscribers.lock();
228        block_status_subscribers
229            .entry(block_ref)
230            .or_default()
231            .push(tx);
232        rx
233    }
234
235    #[cfg(test)]
236    fn is_empty(&mut self) -> bool {
237        if self.pending_transactions.is_some() {
238            return false;
239        }
240        if let Ok(t) = self.tx_receiver.try_recv() {
241            self.pending_transactions = Some(t);
242            return false;
243        }
244        true
245    }
246}
247
248#[derive(Clone)]
249pub struct TransactionClient {
250    context: Arc<Context>,
251    sender: Sender<TransactionsGuard>,
252    max_transaction_size: u64,
253    max_transactions_in_block_bytes: u64,
254    max_transactions_in_block_count: u64,
255}
256
257#[derive(Debug, Error)]
258pub enum ClientError {
259    #[error("Failed to submit transaction, consensus is shutting down: {0}")]
260    ConsensusShuttingDown(String),
261
262    #[error("Transaction size ({0}B) is over limit ({1}B)")]
263    OversizedTransaction(u64, u64),
264
265    #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
266    OversizedTransactionBundleBytes(u64, u64),
267
268    #[error("Transaction bundle count ({0}) is over limit ({1})")]
269    OversizedTransactionBundleCount(u64, u64),
270}
271
272impl TransactionClient {
273    pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
274        Self::new_with_max_pending_transactions(context, MAX_PENDING_TRANSACTIONS)
275    }
276
277    fn new_with_max_pending_transactions(
278        context: Arc<Context>,
279        max_pending_transactions: usize,
280    ) -> (Self, Receiver<TransactionsGuard>) {
281        let (sender, receiver) = channel("consensus_input", max_pending_transactions);
282        (
283            Self {
284                sender,
285                max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
286
287                max_transactions_in_block_bytes: context
288                    .protocol_config
289                    .max_transactions_in_block_bytes(),
290                max_transactions_in_block_count: context
291                    .protocol_config
292                    .max_num_transactions_in_block(),
293                context: context.clone(),
294            },
295            receiver,
296        )
297    }
298
299    /// Returns the current epoch of this client.
300    pub fn epoch(&self) -> Epoch {
301        self.context.committee.epoch()
302    }
303
304    /// Submits a list of transactions to be sequenced. The method returns when all the transactions have been successfully included
305    /// to next proposed blocks.
306    ///
307    /// If `transactions` is empty, then this will be interpreted as a "ping" signal from the client in order to get information about the next
308    /// block and simulate a transaction inclusion to the next block. In this an empty vector of the transaction index will be returned as response
309    /// and the block status receiver.
310    pub async fn submit(
311        &self,
312        transactions: Vec<Vec<u8>>,
313    ) -> Result<
314        (
315            BlockRef,
316            Vec<TransactionIndex>,
317            oneshot::Receiver<BlockStatus>,
318        ),
319        ClientError,
320    > {
321        let included_in_block = self.submit_no_wait(transactions).await?;
322        included_in_block
323            .await
324            .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
325            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
326    }
327
328    /// Submits a list of transactions to be sequenced.
329    /// If any transaction's length exceeds `max_transaction_size`, no transaction will be submitted.
330    /// That shouldn't be the common case as sizes should be aligned between consensus and client. The method returns
331    /// a receiver to wait on until the transactions has been included in the next block to get proposed. The consumer should
332    /// wait on it to consider as inclusion acknowledgement. If the receiver errors then consensus is shutting down and transaction
333    /// has not been included to any block.
334    /// If multiple transactions are submitted, the method will attempt to bundle them together in a single block. If the total size of
335    /// the transactions exceeds `max_transactions_in_block_bytes`, no transaction will be submitted and an error will be returned instead.
336    /// Similar if transactions exceed `max_transactions_in_block_count` an error will be returned.
337    pub(crate) async fn submit_no_wait(
338        &self,
339        transactions: Vec<Vec<u8>>,
340    ) -> Result<
341        oneshot::Receiver<(
342            BlockRef,
343            Vec<TransactionIndex>,
344            oneshot::Receiver<BlockStatus>,
345        )>,
346        ClientError,
347    > {
348        let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
349
350        let mut bundle_size = 0;
351
352        if transactions.len() as u64 > self.max_transactions_in_block_count {
353            return Err(ClientError::OversizedTransactionBundleCount(
354                transactions.len() as u64,
355                self.max_transactions_in_block_count,
356            ));
357        }
358
359        for transaction in &transactions {
360            if transaction.len() as u64 > self.max_transaction_size {
361                return Err(ClientError::OversizedTransaction(
362                    transaction.len() as u64,
363                    self.max_transaction_size,
364                ));
365            }
366            bundle_size += transaction.len() as u64;
367
368            if bundle_size > self.max_transactions_in_block_bytes {
369                return Err(ClientError::OversizedTransactionBundleBytes(
370                    bundle_size,
371                    self.max_transactions_in_block_bytes,
372                ));
373            }
374        }
375
376        let t = TransactionsGuard {
377            transactions: transactions.into_iter().map(Transaction::new).collect(),
378            included_in_block_ack: included_in_block_ack_send,
379        };
380        self.sender
381            .send(t)
382            .await
383            .tap_err(|e| error!("Submit transactions failed with {:?}", e))
384            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
385        Ok(included_in_block_ack_receive)
386    }
387}
388
389/// `TransactionVerifier` implementation is supplied by Sui to validate transactions in a block,
390/// before acceptance of the block.
391pub trait TransactionVerifier: Send + Sync + 'static {
392    /// Determines if this batch of transactions is valid.
393    /// Fails if any one of the transactions is invalid.
394    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
395
396    /// Returns indices of transactions to reject, or a transaction validation error.
397    /// Currently only uncertified user transactions can be voted to reject, which are created
398    /// by Mysticeti fastpath client.
399    /// Honest validators may disagree on voting for uncertified user transactions.
400    /// The other types of transactions are implicitly voted to be accepted if they pass validation.
401    ///
402    /// Honest validators should produce the same validation outcome on the same batch of
403    /// transactions. So if a batch from a peer fails validation, the peer is equivocating.
404    fn verify_and_vote_batch(
405        &self,
406        block_ref: &BlockRef,
407        batch: &[&[u8]],
408    ) -> Result<Vec<TransactionIndex>, ValidationError>;
409}
410
411#[derive(Debug, Error)]
412pub enum ValidationError {
413    #[error("Invalid transaction: {0}")]
414    InvalidTransaction(String),
415}
416
417/// `NoopTransactionVerifier` accepts all transactions.
418#[cfg(any(test, msim))]
419pub struct NoopTransactionVerifier;
420
421#[cfg(any(test, msim))]
422impl TransactionVerifier for NoopTransactionVerifier {
423    fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
424        Ok(())
425    }
426
427    fn verify_and_vote_batch(
428        &self,
429        _block_ref: &BlockRef,
430        _batch: &[&[u8]],
431    ) -> Result<Vec<TransactionIndex>, ValidationError> {
432        Ok(vec![])
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use std::{sync::Arc, time::Duration};
439
440    use consensus_config::AuthorityIndex;
441    use consensus_types::block::{
442        BlockDigest, BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX,
443        TransactionIndex,
444    };
445    use futures::{StreamExt, stream::FuturesUnordered};
446    use tokio::time::timeout;
447
448    use crate::transaction::NoopTransactionVerifier;
449    use crate::{
450        block_verifier::SignedBlockVerifier,
451        context::Context,
452        transaction::{BlockStatus, LimitReached, TransactionClient, TransactionConsumer},
453    };
454
455    #[tokio::test(flavor = "current_thread", start_paused = true)]
456    async fn basic_submit_and_consume() {
457        let (mut context, _) = Context::new_for_test(4);
458        context
459            .protocol_config
460            .set_max_transaction_size_bytes_for_testing(2_000); // 2KB
461        context
462            .protocol_config
463            .set_max_transactions_in_block_bytes_for_testing(2_000);
464        let context = Arc::new(context);
465        let (client, tx_receiver) = TransactionClient::new(context.clone());
466        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
467
468        // submit asynchronously the transactions and keep the waiters
469        let mut included_in_block_waiters = FuturesUnordered::new();
470        for i in 0..3 {
471            let transaction =
472                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
473            let w = client
474                .submit_no_wait(vec![transaction])
475                .await
476                .expect("Shouldn't submit successfully transaction");
477            included_in_block_waiters.push(w);
478        }
479
480        // now pull the transactions from the consumer
481        let (transactions, ack_transactions, _limit_reached) = consumer.next();
482        assert_eq!(transactions.len(), 3);
483
484        for (i, t) in transactions.iter().enumerate() {
485            let t: String = bcs::from_bytes(t.data()).unwrap();
486            assert_eq!(format!("transaction {i}").to_string(), t);
487        }
488
489        assert!(
490            timeout(Duration::from_secs(1), included_in_block_waiters.next())
491                .await
492                .is_err(),
493            "We should expect to timeout as none of the transactions have been acknowledged yet"
494        );
495
496        // Now acknowledge the inclusion of transactions
497        ack_transactions(BlockRef::MIN);
498
499        // Now make sure that all the waiters have returned
500        while let Some(result) = included_in_block_waiters.next().await {
501            assert!(result.is_ok());
502        }
503
504        // try to pull again transactions, result should be empty
505        assert!(consumer.is_empty());
506    }
507
508    #[tokio::test(flavor = "current_thread", start_paused = true)]
509    async fn block_status_update() {
510        let (mut context, _) = Context::new_for_test(4);
511        context
512            .protocol_config
513            .set_max_transaction_size_bytes_for_testing(2_000); // 2KB
514        context
515            .protocol_config
516            .set_max_transactions_in_block_bytes_for_testing(2_000);
517        context.protocol_config.set_gc_depth_for_testing(10);
518        let context = Arc::new(context);
519        let (client, tx_receiver) = TransactionClient::new(context.clone());
520        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
521
522        // submit the transactions and include 2 of each on a new block
523        let mut included_in_block_waiters = FuturesUnordered::new();
524        for i in 1..=10 {
525            let transaction =
526                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
527            let w = client
528                .submit_no_wait(vec![transaction])
529                .await
530                .expect("Shouldn't submit successfully transaction");
531            included_in_block_waiters.push(w);
532
533            // Every 2 transactions simulate the creation of a new block and acknowledge the inclusion of the transactions
534            if i % 2 == 0 {
535                let (transactions, ack_transactions, _limit_reached) = consumer.next();
536                assert_eq!(transactions.len(), 2);
537                ack_transactions(BlockRef::new(
538                    i,
539                    AuthorityIndex::new_for_test(0),
540                    BlockDigest::MIN,
541                ));
542            }
543        }
544
545        let mut transaction_count = 0;
546        // Now iterate over all the waiters. Everyone should have been acknowledged.
547        let mut block_status_waiters = Vec::new();
548        while let Some(result) = included_in_block_waiters.next().await {
549            let (block_ref, tx_indices, block_status_waiter) =
550                result.expect("Block inclusion waiter shouldn't fail");
551            // tx is submitted one at a time so tx acks should only return one tx index
552            assert_eq!(tx_indices.len(), 1);
553            // The first transaction in the block should have index 0, the second one 1, etc.
554            // because we submit 2 transactions per block, the index should be 0 then 1 and then
555            // reset back to 0 for the next block.
556            assert_eq!(tx_indices[0], transaction_count % 2);
557            transaction_count += 1;
558
559            block_status_waiters.push((block_ref, block_status_waiter));
560        }
561
562        // Now acknowledge the commit of the blocks 6, 8, 10 and set gc_round = 5, which should trigger the garbage collection of blocks 1..=5
563        let gc_round = 5;
564        consumer.notify_own_blocks_status(
565            vec![
566                BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
567                BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
568                BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
569            ],
570            gc_round,
571        );
572
573        // Now iterate over all the block status waiters. Everyone should have been notified.
574        for (block_ref, waiter) in block_status_waiters {
575            let block_status = waiter.await.expect("Block status waiter shouldn't fail");
576
577            if block_ref.round <= gc_round {
578                assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
579            } else {
580                assert!(matches!(block_status, BlockStatus::Sequenced(_)));
581            }
582        }
583
584        // Ensure internal structure is clear
585        assert!(consumer.block_status_subscribers.lock().is_empty());
586    }
587
588    #[tokio::test]
589    async fn submit_over_max_fetch_size_and_consume() {
590        let (mut context, _) = Context::new_for_test(4);
591        context
592            .protocol_config
593            .set_max_transaction_size_bytes_for_testing(100);
594        context
595            .protocol_config
596            .set_max_transactions_in_block_bytes_for_testing(100);
597        let context = Arc::new(context);
598        let (client, tx_receiver) = TransactionClient::new(context.clone());
599        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
600
601        // submit some transactions
602        for i in 0..10 {
603            let transaction =
604                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
605            let _w = client
606                .submit_no_wait(vec![transaction])
607                .await
608                .expect("Shouldn't submit successfully transaction");
609        }
610
611        // now pull the transactions from the consumer
612        let mut all_transactions = Vec::new();
613        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
614        assert_eq!(transactions.len(), 7);
615
616        // ensure their total size is less than `max_bytes_to_fetch`
617        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
618        assert!(
619            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
620            "Should have fetched transactions up to {}",
621            context.protocol_config.max_transactions_in_block_bytes()
622        );
623        all_transactions.extend(transactions);
624
625        // try to pull again transactions, next should be provided
626        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
627        assert_eq!(transactions.len(), 3);
628
629        // ensure their total size is less than `max_bytes_to_fetch`
630        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
631        assert!(
632            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
633            "Should have fetched transactions up to {}",
634            context.protocol_config.max_transactions_in_block_bytes()
635        );
636        all_transactions.extend(transactions);
637
638        // try to pull again transactions, result should be empty
639        assert!(consumer.is_empty());
640
641        for (i, t) in all_transactions.iter().enumerate() {
642            let t: String = bcs::from_bytes(t.data()).unwrap();
643            assert_eq!(format!("transaction {i}").to_string(), t);
644        }
645    }
646
647    #[tokio::test]
648    async fn submit_large_batch_and_ack() {
649        let (mut context, _) = Context::new_for_test(4);
650        context
651            .protocol_config
652            .set_max_transaction_size_bytes_for_testing(15);
653        context
654            .protocol_config
655            .set_max_transactions_in_block_bytes_for_testing(200);
656        let context = Arc::new(context);
657        let (client, tx_receiver) = TransactionClient::new(context.clone());
658        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
659        let mut all_receivers = Vec::new();
660        // submit a few transactions individually.
661        for i in 0..10 {
662            let transaction =
663                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
664            let w = client
665                .submit_no_wait(vec![transaction])
666                .await
667                .expect("Should submit successfully transaction");
668            all_receivers.push(w);
669        }
670
671        // construct an acceptable batch and submit, it should be accepted
672        {
673            let transactions: Vec<_> = (10..15)
674                .map(|i| {
675                    bcs::to_bytes(&format!("transaction {i}"))
676                        .expect("Serialization should not fail.")
677                })
678                .collect();
679            let w = client
680                .submit_no_wait(transactions)
681                .await
682                .expect("Should submit successfully transaction");
683            all_receivers.push(w);
684        }
685
686        // submit another individual transaction.
687        {
688            let i = 15;
689            let transaction =
690                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
691            let w = client
692                .submit_no_wait(vec![transaction])
693                .await
694                .expect("Shouldn't submit successfully transaction");
695            all_receivers.push(w);
696        }
697
698        // construct a over-size-limit batch and submit, it should not be accepted
699        {
700            let transactions: Vec<_> = (16..32)
701                .map(|i| {
702                    bcs::to_bytes(&format!("transaction {i}"))
703                        .expect("Serialization should not fail.")
704                })
705                .collect();
706            let result = client.submit_no_wait(transactions).await.unwrap_err();
707            assert_eq!(
708                result.to_string(),
709                "Transaction bundle size (210B) is over limit (200B)"
710            );
711        }
712
713        // now pull the transactions from the consumer.
714        // we expect all transactions are fetched in order, not missing any, and not exceeding the size limit.
715        let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
716        let mut batch_index = 0;
717        while !consumer.is_empty() {
718            let (transactions, ack_transactions, _limit_reached) = consumer.next();
719
720            assert!(
721                transactions.len() as u64
722                    <= context.protocol_config.max_num_transactions_in_block(),
723                "Should have fetched transactions up to {}",
724                context.protocol_config.max_num_transactions_in_block()
725            );
726
727            let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
728            assert!(
729                total_size <= context.protocol_config.max_transactions_in_block_bytes(),
730                "Should have fetched transactions up to {}",
731                context.protocol_config.max_transactions_in_block_bytes()
732            );
733
734            // first batch should contain all transactions from 0..10. The softbundle it is to big to fit as well, so it's parked.
735            if batch_index == 0 {
736                assert_eq!(transactions.len(), 10);
737                for (i, transaction) in transactions.iter().enumerate() {
738                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
739                    assert_eq!(format!("transaction {}", i).to_string(), t);
740                }
741            // second batch will contain the soft bundle and the additional last transaction.
742            } else if batch_index == 1 {
743                assert_eq!(transactions.len(), 6);
744                for (i, transaction) in transactions.iter().enumerate() {
745                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
746                    assert_eq!(format!("transaction {}", i + 10).to_string(), t);
747                }
748            } else {
749                panic!("Unexpected batch index");
750            }
751
752            batch_index += 1;
753
754            all_acks.push(ack_transactions);
755        }
756
757        // now acknowledge the inclusion of all transactions.
758        for ack in all_acks {
759            ack(BlockRef::MIN);
760        }
761
762        // expect all receivers to be resolved.
763        for w in all_receivers {
764            let r = w.await;
765            assert!(r.is_ok());
766        }
767    }
768
769    #[tokio::test]
770    async fn test_submit_over_max_block_size_and_validate_block_size() {
771        // submit transactions individually so we make sure that we have reached the block size limit of 10
772        {
773            let (mut context, _) = Context::new_for_test(4);
774            context
775                .protocol_config
776                .set_max_transaction_size_bytes_for_testing(100);
777            context
778                .protocol_config
779                .set_max_num_transactions_in_block_for_testing(10);
780            context
781                .protocol_config
782                .set_max_transactions_in_block_bytes_for_testing(300);
783            let context = Arc::new(context);
784            let (client, tx_receiver) = TransactionClient::new(context.clone());
785            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
786            let mut all_receivers = Vec::new();
787
788            // create enough transactions
789            let max_num_transactions_in_block =
790                context.protocol_config.max_num_transactions_in_block();
791            for i in 0..2 * max_num_transactions_in_block {
792                let transaction = bcs::to_bytes(&format!("transaction {i}"))
793                    .expect("Serialization should not fail.");
794                let w = client
795                    .submit_no_wait(vec![transaction])
796                    .await
797                    .expect("Should submit successfully transaction");
798                all_receivers.push(w);
799            }
800
801            // Fetch the next transactions to be included in a block
802            let (transactions, _ack_transactions, limit) = consumer.next();
803            assert_eq!(limit, LimitReached::MaxNumOfTransactions);
804            assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
805
806            // Now create a block and verify that transactions are within the size limits
807            let block_verifier =
808                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
809
810            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
811            assert!(
812                block_verifier.check_transactions(&batch).is_ok(),
813                "Number of transactions limit verification failed"
814            );
815        }
816
817        // submit transactions individually so we make sure that we have reached the block size bytes 300
818        {
819            let (mut context, _) = Context::new_for_test(4);
820            context
821                .protocol_config
822                .set_max_transaction_size_bytes_for_testing(100);
823            context
824                .protocol_config
825                .set_max_num_transactions_in_block_for_testing(1_000);
826            context
827                .protocol_config
828                .set_max_transactions_in_block_bytes_for_testing(300);
829            let context = Arc::new(context);
830            let (client, tx_receiver) = TransactionClient::new(context.clone());
831            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
832            let mut all_receivers = Vec::new();
833
834            let max_transactions_in_block_bytes =
835                context.protocol_config.max_transactions_in_block_bytes();
836            let mut total_size = 0;
837            loop {
838                let transaction = bcs::to_bytes(&"transaction".to_string())
839                    .expect("Serialization should not fail.");
840                total_size += transaction.len() as u64;
841                let w = client
842                    .submit_no_wait(vec![transaction])
843                    .await
844                    .expect("Should submit successfully transaction");
845                all_receivers.push(w);
846
847                // create enough transactions to reach the block size limit
848                if total_size >= 2 * max_transactions_in_block_bytes {
849                    break;
850                }
851            }
852
853            // Fetch the next transactions to be included in a block
854            let (transactions, _ack_transactions, limit) = consumer.next();
855            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
856            let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
857
858            assert_eq!(limit, LimitReached::MaxBytes);
859            assert!(
860                batch.len() < context.protocol_config.max_num_transactions_in_block() as usize,
861                "Should have submitted less than the max number of transactions in a block"
862            );
863            assert!(size <= max_transactions_in_block_bytes);
864
865            // Now create a block and verify that transactions are within the size limits
866            let block_verifier =
867                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
868
869            assert!(
870                block_verifier.check_transactions(&batch).is_ok(),
871                "Total size of transactions limit verification failed"
872            );
873        }
874    }
875
876    // This is the case where the client submits a "ping" signal to the consensus to get information about the next block and simulate a transaction inclusion to the next block.
877    #[tokio::test]
878    async fn submit_with_no_transactions() {
879        let (mut context, _) = Context::new_for_test(4);
880        context
881            .protocol_config
882            .set_max_transaction_size_bytes_for_testing(15);
883        context
884            .protocol_config
885            .set_max_transactions_in_block_bytes_for_testing(200);
886        let context = Arc::new(context);
887        let (client, tx_receiver) = TransactionClient::new(context.clone());
888        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
889
890        let w_no_transactions = client
891            .submit_no_wait(vec![])
892            .await
893            .expect("Should submit successfully empty array of transactions");
894
895        let transaction =
896            bcs::to_bytes(&"transaction".to_string()).expect("Serialization should not fail.");
897        let w_with_transactions = client
898            .submit_no_wait(vec![transaction])
899            .await
900            .expect("Should submit successfully transaction");
901
902        let (transactions, ack_transactions, _limit_reached) = consumer.next();
903        assert_eq!(transactions.len(), 1);
904
905        // Acknowledge the inclusion of the transactions
906        ack_transactions(BlockRef::MIN);
907
908        {
909            let r = w_no_transactions.await;
910            let (block_ref, indices, _status) = r.unwrap();
911            assert_eq!(block_ref, BlockRef::MIN);
912            assert_eq!(indices, vec![PING_TRANSACTION_INDEX]);
913        }
914
915        {
916            let r = w_with_transactions.await;
917            let (block_ref, indices, _status) = r.unwrap();
918            assert_eq!(block_ref, BlockRef::MIN);
919            assert_eq!(indices, vec![0]);
920        }
921    }
922
923    #[tokio::test]
924    async fn ping_transaction_index_never_reached() {
925        // Set the max number of transactions in a block to the max value of u16.
926        static MAX_NUM_TRANSACTIONS_IN_BLOCK: u64 =
927            (TransactionIndex::MAX - NUM_RESERVED_TRANSACTION_INDICES) as u64;
928
929        // Ensure that enough space is allocated in the channel for the pending transactions, so we don't end up consuming the transactions in chunks.
930        static MAX_PENDING_TRANSACTIONS: usize = 2 * MAX_NUM_TRANSACTIONS_IN_BLOCK as usize;
931
932        let (mut context, _) = Context::new_for_test(4);
933        context
934            .protocol_config
935            .set_max_transaction_size_bytes_for_testing(200_000);
936        context
937            .protocol_config
938            .set_max_transactions_in_block_bytes_for_testing(1_000_000);
939        context
940            .protocol_config
941            .set_max_num_transactions_in_block_for_testing(MAX_NUM_TRANSACTIONS_IN_BLOCK);
942        let context = Arc::new(context);
943        let (client, tx_receiver) = TransactionClient::new_with_max_pending_transactions(
944            context.clone(),
945            MAX_PENDING_TRANSACTIONS,
946        );
947        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
948
949        // Add 10 more transactions than the max number of transactions in a block.
950        for i in 0..MAX_NUM_TRANSACTIONS_IN_BLOCK + 10 {
951            println!("Submitting transaction {i}");
952            let transaction =
953                bcs::to_bytes(&format!("t {i}")).expect("Serialization should not fail.");
954            let _w = client
955                .submit_no_wait(vec![transaction])
956                .await
957                .expect("Shouldn't submit successfully transaction");
958        }
959
960        // now pull the transactions from the consumer
961        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
962        assert_eq!(transactions.len() as u64, MAX_NUM_TRANSACTIONS_IN_BLOCK);
963
964        let t: String = bcs::from_bytes(transactions.last().unwrap().data()).unwrap();
965        assert_eq!(
966            t,
967            format!(
968                "t {}",
969                PING_TRANSACTION_INDEX - NUM_RESERVED_TRANSACTION_INDICES - 1
970            )
971        );
972    }
973}