consensus_core/
transaction.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{collections::BTreeMap, sync::Arc};
4
5use consensus_config::Epoch;
6use consensus_types::block::{
7    BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX, Round, TransactionIndex,
8};
9use mysten_common::debug_fatal;
10use mysten_metrics::monitored_mpsc::{Receiver, Sender, channel};
11use parking_lot::Mutex;
12use tap::TapFallible;
13use thiserror::Error;
14use tokio::sync::oneshot;
15use tracing::{error, warn};
16
17use crate::{block::Transaction, context::Context};
18
19/// The maximum number of transactions pending to the queue to be pulled for block proposal
20const MAX_PENDING_TRANSACTIONS: usize = 2_000;
21
22/// The guard acts as an acknowledgment mechanism for the inclusion of the transactions to a block.
23/// When its last transaction is included to a block then `included_in_block_ack` will be signalled.
24/// If the guard is dropped without getting acknowledged that means the transactions have not been
25/// included to a block and the consensus is shutting down.
26pub(crate) struct TransactionsGuard {
27    // Holds a list of transactions to be included in the block.
28    // A TransactionsGuard may be partially consumed by `TransactionConsumer`, in which case, this holds the remaining transactions.
29    transactions: Vec<Transaction>,
30
31    // When the transactions are included in a block, this will be signalled with
32    // the following information
33    included_in_block_ack: oneshot::Sender<(
34        // The block reference in which the transactions have been included
35        BlockRef,
36        // The indices of the transactions that have been included in the block
37        Vec<TransactionIndex>,
38        // A receiver to notify the submitter about the block status
39        oneshot::Receiver<BlockStatus>,
40    )>,
41}
42
43/// The TransactionConsumer is responsible for fetching the next transactions to be included for the block proposals.
44/// The transactions are submitted to a channel which is shared between the TransactionConsumer and the TransactionClient
45/// and are pulled every time the `next` method is called.
46pub(crate) struct TransactionConsumer {
47    tx_receiver: Receiver<TransactionsGuard>,
48    max_transactions_in_block_bytes: u64,
49    max_num_transactions_in_block: u64,
50    pending_transactions: Option<TransactionsGuard>,
51    block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55#[allow(unused)]
56pub enum BlockStatus {
57    /// The block has been sequenced as part of a committed sub dag. That means that any transaction that has been included in the block
58    /// has been committed as well.
59    Sequenced(BlockRef),
60    /// The block has been garbage collected and will never be committed. Any transactions that have been included in the block should also
61    /// be considered as impossible to be committed as part of this block and might need to be retried
62    GarbageCollected(BlockRef),
63}
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub enum LimitReached {
67    // The maximum number of transactions have been included
68    MaxNumOfTransactions,
69    // The maximum number of bytes have been included
70    MaxBytes,
71    // All available transactions have been included
72    AllTransactionsIncluded,
73}
74
75impl TransactionConsumer {
76    pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
77        // max_num_transactions_in_block - 1 is the max possible transaction index in a block.
78        // TransactionIndex::MAX is reserved for the ping transaction.
79        // Indexes down to TransactionIndex::MAX - 8 are also reserved for future use.
80        // This check makes sure they do not overlap.
81        assert!(
82            context
83                .protocol_config
84                .max_num_transactions_in_block()
85                .saturating_sub(1)
86                < TransactionIndex::MAX.saturating_sub(NUM_RESERVED_TRANSACTION_INDICES) as u64,
87            "Unsupported max_num_transactions_in_block: {}",
88            context.protocol_config.max_num_transactions_in_block()
89        );
90
91        Self {
92            tx_receiver,
93            max_transactions_in_block_bytes: context
94                .protocol_config
95                .max_transactions_in_block_bytes(),
96            max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
97            pending_transactions: None,
98            block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
99        }
100    }
101
102    // Attempts to fetch the next transactions that have been submitted for sequence. Respects the `max_transactions_in_block_bytes`
103    // and `max_num_transactions_in_block` parameters specified via protocol config.
104    // This returns one or more transactions to be included in the block and a callback to acknowledge the inclusion of those transactions.
105    // Also returns a `LimitReached` enum to indicate which limit type has been reached.
106    pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
107        let mut transactions = Vec::new();
108        let mut acks = Vec::new();
109        let mut total_bytes = 0;
110        let mut limit_reached = LimitReached::AllTransactionsIncluded;
111
112        // Handle one batch of incoming transactions from TransactionGuard.
113        // The method will return `None` if all the transactions can be included in the block. Otherwise none of the transactions will be
114        // included in the block and the method will return the TransactionGuard.
115        let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
116            // If no transactions are submitted, it means that the transaction guard represents a ping transaction.
117            // In this case, we need to push the `PING_TRANSACTION_INDEX` to the indices vector.
118            let transactions_num = t.transactions.len() as u64;
119            if transactions_num == 0 {
120                acks.push((t.included_in_block_ack, vec![PING_TRANSACTION_INDEX]));
121                return None;
122            }
123
124            // Check if the total bytes of the transactions exceed the max transactions in block bytes.
125            let transactions_bytes =
126                t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
127            if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
128                limit_reached = LimitReached::MaxBytes;
129                return Some(t);
130            }
131            if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
132                limit_reached = LimitReached::MaxNumOfTransactions;
133                return Some(t);
134            }
135
136            total_bytes += transactions_bytes;
137
138            // Calculate indices for this batch
139            let start_idx = transactions.len() as TransactionIndex;
140            let indices: Vec<TransactionIndex> =
141                (start_idx..start_idx + t.transactions.len() as TransactionIndex).collect();
142
143            // The transactions can be consumed, register its ack and transaction
144            // indices to be sent with the ack.
145            acks.push((t.included_in_block_ack, indices));
146            transactions.extend(t.transactions);
147            None
148        };
149
150        if let Some(t) = self.pending_transactions.take()
151            && let Some(pending_transactions) = handle_txs(t)
152        {
153            debug_fatal!(
154                "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
155                pending_transactions.transactions
156            );
157        }
158
159        // Until we have reached the limit for the pull.
160        // We may have already reached limit in the first iteration above, in which case we stop immediately.
161        while self.pending_transactions.is_none() {
162            if let Ok(t) = self.tx_receiver.try_recv() {
163                self.pending_transactions = handle_txs(t);
164            } else {
165                break;
166            }
167        }
168
169        let block_status_subscribers = self.block_status_subscribers.clone();
170        (
171            transactions,
172            Box::new(move |block_ref: BlockRef| {
173                let mut block_status_subscribers = block_status_subscribers.lock();
174
175                for (ack, tx_indices) in acks {
176                    let (status_tx, status_rx) = oneshot::channel();
177
178                    block_status_subscribers
179                        .entry(block_ref)
180                        .or_default()
181                        .push(status_tx);
182
183                    let _ = ack.send((block_ref, tx_indices, status_rx));
184                }
185            }),
186            limit_reached,
187        )
188    }
189
190    /// Notifies all the transaction submitters who are waiting to receive an update on the status of the block.
191    /// The `committed_blocks` are the blocks that have been committed and the `gc_round` is the round up to which the blocks have been garbage collected.
192    /// First we'll notify for all the committed blocks, and then for all the blocks that have been garbage collected.
193    pub(crate) fn notify_own_blocks_status(
194        &self,
195        committed_blocks: Vec<BlockRef>,
196        gc_round: Round,
197    ) {
198        // Notify for all the committed blocks first
199        let mut block_status_subscribers = self.block_status_subscribers.lock();
200        for block_ref in committed_blocks {
201            if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
202                subscribers.into_iter().for_each(|s| {
203                    let _ = s.send(BlockStatus::Sequenced(block_ref));
204                });
205            }
206        }
207
208        // Now notify everyone <= gc_round that their block has been garbage collected and clean up the entries
209        while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
210            if block_ref.round <= gc_round {
211                subscribers.into_iter().for_each(|s| {
212                    let _ = s.send(BlockStatus::GarbageCollected(block_ref));
213                });
214            } else {
215                block_status_subscribers.insert(block_ref, subscribers);
216                break;
217            }
218        }
219    }
220
221    #[cfg(test)]
222    pub(crate) fn subscribe_for_block_status_testing(
223        &self,
224        block_ref: BlockRef,
225    ) -> oneshot::Receiver<BlockStatus> {
226        let (tx, rx) = oneshot::channel();
227        let mut block_status_subscribers = self.block_status_subscribers.lock();
228        block_status_subscribers
229            .entry(block_ref)
230            .or_default()
231            .push(tx);
232        rx
233    }
234
235    #[cfg(test)]
236    fn is_empty(&mut self) -> bool {
237        if self.pending_transactions.is_some() {
238            return false;
239        }
240        if let Ok(t) = self.tx_receiver.try_recv() {
241            self.pending_transactions = Some(t);
242            return false;
243        }
244        true
245    }
246}
247
248#[derive(Clone)]
249pub struct TransactionClient {
250    context: Arc<Context>,
251    sender: Sender<TransactionsGuard>,
252    max_transaction_size: u64,
253    max_transactions_in_block_bytes: u64,
254    max_transactions_in_block_count: u64,
255}
256
257#[derive(Debug, Error)]
258pub enum ClientError {
259    #[error("Failed to submit transaction, consensus is shutting down: {0}")]
260    ConsensusShuttingDown(String),
261
262    #[error("Transaction size ({0}B) is over limit ({1}B)")]
263    OversizedTransaction(u64, u64),
264
265    #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
266    OversizedTransactionBundleBytes(u64, u64),
267
268    #[error("Transaction bundle count ({0}) is over limit ({1})")]
269    OversizedTransactionBundleCount(u64, u64),
270}
271
272impl TransactionClient {
273    pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
274        Self::new_with_max_pending_transactions(context, MAX_PENDING_TRANSACTIONS)
275    }
276
277    fn new_with_max_pending_transactions(
278        context: Arc<Context>,
279        max_pending_transactions: usize,
280    ) -> (Self, Receiver<TransactionsGuard>) {
281        let (sender, receiver) = channel("consensus_input", max_pending_transactions);
282        (
283            Self {
284                sender,
285                max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
286
287                max_transactions_in_block_bytes: context
288                    .protocol_config
289                    .max_transactions_in_block_bytes(),
290                max_transactions_in_block_count: context
291                    .protocol_config
292                    .max_num_transactions_in_block(),
293                context: context.clone(),
294            },
295            receiver,
296        )
297    }
298
299    /// Returns the current epoch of this client.
300    pub fn epoch(&self) -> Epoch {
301        self.context.committee.epoch()
302    }
303
304    /// Submits a list of transactions to be sequenced. The method returns when all the transactions have been successfully included
305    /// to next proposed blocks.
306    ///
307    /// If `transactions` is empty, then this will be interpreted as a "ping" signal from the client in order to get information about the next
308    /// block and simulate a transaction inclusion to the next block. In this an empty vector of the transaction index will be returned as response
309    /// and the block status receiver.
310    pub async fn submit(
311        &self,
312        transactions: Vec<Vec<u8>>,
313    ) -> Result<
314        (
315            BlockRef,
316            Vec<TransactionIndex>,
317            oneshot::Receiver<BlockStatus>,
318        ),
319        ClientError,
320    > {
321        let included_in_block = self.submit_no_wait(transactions).await?;
322        included_in_block
323            .await
324            .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
325            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
326    }
327
328    /// Submits a list of transactions to be sequenced.
329    /// If any transaction's length exceeds `max_transaction_size`, no transaction will be submitted.
330    /// That shouldn't be the common case as sizes should be aligned between consensus and client. The method returns
331    /// a receiver to wait on until the transactions has been included in the next block to get proposed. The consumer should
332    /// wait on it to consider as inclusion acknowledgement. If the receiver errors then consensus is shutting down and transaction
333    /// has not been included to any block.
334    /// If multiple transactions are submitted, the method will attempt to bundle them together in a single block. If the total size of
335    /// the transactions exceeds `max_transactions_in_block_bytes`, no transaction will be submitted and an error will be returned instead.
336    /// Similar if transactions exceed `max_transactions_in_block_count` an error will be returned.
337    pub(crate) async fn submit_no_wait(
338        &self,
339        transactions: Vec<Vec<u8>>,
340    ) -> Result<
341        oneshot::Receiver<(
342            BlockRef,
343            Vec<TransactionIndex>,
344            oneshot::Receiver<BlockStatus>,
345        )>,
346        ClientError,
347    > {
348        let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
349
350        let mut bundle_size = 0;
351
352        if transactions.len() as u64 > self.max_transactions_in_block_count {
353            return Err(ClientError::OversizedTransactionBundleCount(
354                transactions.len() as u64,
355                self.max_transactions_in_block_count,
356            ));
357        }
358
359        for transaction in &transactions {
360            if transaction.len() as u64 > self.max_transaction_size {
361                return Err(ClientError::OversizedTransaction(
362                    transaction.len() as u64,
363                    self.max_transaction_size,
364                ));
365            }
366            bundle_size += transaction.len() as u64;
367
368            if bundle_size > self.max_transactions_in_block_bytes {
369                return Err(ClientError::OversizedTransactionBundleBytes(
370                    bundle_size,
371                    self.max_transactions_in_block_bytes,
372                ));
373            }
374        }
375
376        let t = TransactionsGuard {
377            transactions: transactions.into_iter().map(Transaction::new).collect(),
378            included_in_block_ack: included_in_block_ack_send,
379        };
380        self.sender
381            .send(t)
382            .await
383            .tap_err(|e| error!("Submit transactions failed with {:?}", e))
384            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
385        Ok(included_in_block_ack_receive)
386    }
387}
388
389/// `TransactionVerifier` implementation is supplied by Sui to validate transactions in a block,
390/// before acceptance of the block.
391pub trait TransactionVerifier: Send + Sync + 'static {
392    /// Determines if this batch of transactions is valid.
393    /// Fails if any one of the transactions is invalid.
394    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
395
396    /// Returns indices of transactions to reject, or a transaction validation error.
397    /// Currently only uncertified user transactions can be voted to reject, which are created
398    /// by Mysticeti fastpath client.
399    /// Honest validators may disagree on voting for uncertified user transactions.
400    /// The other types of transactions are implicitly voted to be accepted if they pass validation.
401    ///
402    /// Honest validators should produce the same validation outcome on the same batch of
403    /// transactions. So if a batch from a peer fails validation, the peer is equivocating.
404    fn verify_and_vote_batch(
405        &self,
406        block_ref: &BlockRef,
407        batch: &[&[u8]],
408    ) -> Result<Vec<TransactionIndex>, ValidationError>;
409}
410
411#[derive(Debug, Error)]
412pub enum ValidationError {
413    #[error("Invalid transaction: {0}")]
414    InvalidTransaction(String),
415}
416
417/// `NoopTransactionVerifier` accepts all transactions.
418#[cfg(any(test, msim))]
419pub struct NoopTransactionVerifier;
420
421#[cfg(any(test, msim))]
422impl TransactionVerifier for NoopTransactionVerifier {
423    fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
424        Ok(())
425    }
426
427    fn verify_and_vote_batch(
428        &self,
429        _block_ref: &BlockRef,
430        _batch: &[&[u8]],
431    ) -> Result<Vec<TransactionIndex>, ValidationError> {
432        Ok(vec![])
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use std::{sync::Arc, time::Duration};
439
440    use consensus_config::AuthorityIndex;
441    use consensus_types::block::{
442        BlockDigest, BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX,
443        TransactionIndex,
444    };
445    use futures::{StreamExt, stream::FuturesUnordered};
446    use sui_protocol_config::ProtocolConfig;
447    use tokio::time::timeout;
448
449    use crate::transaction::NoopTransactionVerifier;
450    use crate::{
451        block_verifier::SignedBlockVerifier,
452        context::Context,
453        transaction::{BlockStatus, LimitReached, TransactionClient, TransactionConsumer},
454    };
455
456    #[tokio::test(flavor = "current_thread", start_paused = true)]
457    async fn basic_submit_and_consume() {
458        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
459            config.set_consensus_max_transaction_size_bytes_for_testing(2_000); // 2KB
460            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
461            config
462        });
463
464        let context = Arc::new(Context::new_for_test(4).0);
465        let (client, tx_receiver) = TransactionClient::new(context.clone());
466        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
467
468        // submit asynchronously the transactions and keep the waiters
469        let mut included_in_block_waiters = FuturesUnordered::new();
470        for i in 0..3 {
471            let transaction =
472                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
473            let w = client
474                .submit_no_wait(vec![transaction])
475                .await
476                .expect("Shouldn't submit successfully transaction");
477            included_in_block_waiters.push(w);
478        }
479
480        // now pull the transactions from the consumer
481        let (transactions, ack_transactions, _limit_reached) = consumer.next();
482        assert_eq!(transactions.len(), 3);
483
484        for (i, t) in transactions.iter().enumerate() {
485            let t: String = bcs::from_bytes(t.data()).unwrap();
486            assert_eq!(format!("transaction {i}").to_string(), t);
487        }
488
489        assert!(
490            timeout(Duration::from_secs(1), included_in_block_waiters.next())
491                .await
492                .is_err(),
493            "We should expect to timeout as none of the transactions have been acknowledged yet"
494        );
495
496        // Now acknowledge the inclusion of transactions
497        ack_transactions(BlockRef::MIN);
498
499        // Now make sure that all the waiters have returned
500        while let Some(result) = included_in_block_waiters.next().await {
501            assert!(result.is_ok());
502        }
503
504        // try to pull again transactions, result should be empty
505        assert!(consumer.is_empty());
506    }
507
508    #[tokio::test(flavor = "current_thread", start_paused = true)]
509    async fn block_status_update() {
510        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
511            config.set_consensus_max_transaction_size_bytes_for_testing(2_000); // 2KB
512            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
513            config.set_consensus_gc_depth_for_testing(10);
514            config
515        });
516
517        let context = Arc::new(Context::new_for_test(4).0);
518        let (client, tx_receiver) = TransactionClient::new(context.clone());
519        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
520
521        // submit the transactions and include 2 of each on a new block
522        let mut included_in_block_waiters = FuturesUnordered::new();
523        for i in 1..=10 {
524            let transaction =
525                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
526            let w = client
527                .submit_no_wait(vec![transaction])
528                .await
529                .expect("Shouldn't submit successfully transaction");
530            included_in_block_waiters.push(w);
531
532            // Every 2 transactions simulate the creation of a new block and acknowledge the inclusion of the transactions
533            if i % 2 == 0 {
534                let (transactions, ack_transactions, _limit_reached) = consumer.next();
535                assert_eq!(transactions.len(), 2);
536                ack_transactions(BlockRef::new(
537                    i,
538                    AuthorityIndex::new_for_test(0),
539                    BlockDigest::MIN,
540                ));
541            }
542        }
543
544        let mut transaction_count = 0;
545        // Now iterate over all the waiters. Everyone should have been acknowledged.
546        let mut block_status_waiters = Vec::new();
547        while let Some(result) = included_in_block_waiters.next().await {
548            let (block_ref, tx_indices, block_status_waiter) =
549                result.expect("Block inclusion waiter shouldn't fail");
550            // tx is submitted one at a time so tx acks should only return one tx index
551            assert_eq!(tx_indices.len(), 1);
552            // The first transaction in the block should have index 0, the second one 1, etc.
553            // because we submit 2 transactions per block, the index should be 0 then 1 and then
554            // reset back to 0 for the next block.
555            assert_eq!(tx_indices[0], transaction_count % 2);
556            transaction_count += 1;
557
558            block_status_waiters.push((block_ref, block_status_waiter));
559        }
560
561        // Now acknowledge the commit of the blocks 6, 8, 10 and set gc_round = 5, which should trigger the garbage collection of blocks 1..=5
562        let gc_round = 5;
563        consumer.notify_own_blocks_status(
564            vec![
565                BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
566                BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
567                BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
568            ],
569            gc_round,
570        );
571
572        // Now iterate over all the block status waiters. Everyone should have been notified.
573        for (block_ref, waiter) in block_status_waiters {
574            let block_status = waiter.await.expect("Block status waiter shouldn't fail");
575
576            if block_ref.round <= gc_round {
577                assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
578            } else {
579                assert!(matches!(block_status, BlockStatus::Sequenced(_)));
580            }
581        }
582
583        // Ensure internal structure is clear
584        assert!(consumer.block_status_subscribers.lock().is_empty());
585    }
586
587    #[tokio::test]
588    async fn submit_over_max_fetch_size_and_consume() {
589        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
590            config.set_consensus_max_transaction_size_bytes_for_testing(100);
591            config.set_consensus_max_transactions_in_block_bytes_for_testing(100);
592            config
593        });
594
595        let context = Arc::new(Context::new_for_test(4).0);
596        let (client, tx_receiver) = TransactionClient::new(context.clone());
597        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
598
599        // submit some transactions
600        for i in 0..10 {
601            let transaction =
602                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
603            let _w = client
604                .submit_no_wait(vec![transaction])
605                .await
606                .expect("Shouldn't submit successfully transaction");
607        }
608
609        // now pull the transactions from the consumer
610        let mut all_transactions = Vec::new();
611        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
612        assert_eq!(transactions.len(), 7);
613
614        // ensure their total size is less than `max_bytes_to_fetch`
615        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
616        assert!(
617            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
618            "Should have fetched transactions up to {}",
619            context.protocol_config.max_transactions_in_block_bytes()
620        );
621        all_transactions.extend(transactions);
622
623        // try to pull again transactions, next should be provided
624        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
625        assert_eq!(transactions.len(), 3);
626
627        // ensure their total size is less than `max_bytes_to_fetch`
628        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
629        assert!(
630            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
631            "Should have fetched transactions up to {}",
632            context.protocol_config.max_transactions_in_block_bytes()
633        );
634        all_transactions.extend(transactions);
635
636        // try to pull again transactions, result should be empty
637        assert!(consumer.is_empty());
638
639        for (i, t) in all_transactions.iter().enumerate() {
640            let t: String = bcs::from_bytes(t.data()).unwrap();
641            assert_eq!(format!("transaction {i}").to_string(), t);
642        }
643    }
644
645    #[tokio::test]
646    async fn submit_large_batch_and_ack() {
647        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
648            config.set_consensus_max_transaction_size_bytes_for_testing(15);
649            config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
650            config
651        });
652
653        let context = Arc::new(Context::new_for_test(4).0);
654        let (client, tx_receiver) = TransactionClient::new(context.clone());
655        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
656        let mut all_receivers = Vec::new();
657        // submit a few transactions individually.
658        for i in 0..10 {
659            let transaction =
660                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
661            let w = client
662                .submit_no_wait(vec![transaction])
663                .await
664                .expect("Should submit successfully transaction");
665            all_receivers.push(w);
666        }
667
668        // construct an acceptable batch and submit, it should be accepted
669        {
670            let transactions: Vec<_> = (10..15)
671                .map(|i| {
672                    bcs::to_bytes(&format!("transaction {i}"))
673                        .expect("Serialization should not fail.")
674                })
675                .collect();
676            let w = client
677                .submit_no_wait(transactions)
678                .await
679                .expect("Should submit successfully transaction");
680            all_receivers.push(w);
681        }
682
683        // submit another individual transaction.
684        {
685            let i = 15;
686            let transaction =
687                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
688            let w = client
689                .submit_no_wait(vec![transaction])
690                .await
691                .expect("Shouldn't submit successfully transaction");
692            all_receivers.push(w);
693        }
694
695        // construct a over-size-limit batch and submit, it should not be accepted
696        {
697            let transactions: Vec<_> = (16..32)
698                .map(|i| {
699                    bcs::to_bytes(&format!("transaction {i}"))
700                        .expect("Serialization should not fail.")
701                })
702                .collect();
703            let result = client.submit_no_wait(transactions).await.unwrap_err();
704            assert_eq!(
705                result.to_string(),
706                "Transaction bundle size (210B) is over limit (200B)"
707            );
708        }
709
710        // now pull the transactions from the consumer.
711        // we expect all transactions are fetched in order, not missing any, and not exceeding the size limit.
712        let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
713        let mut batch_index = 0;
714        while !consumer.is_empty() {
715            let (transactions, ack_transactions, _limit_reached) = consumer.next();
716
717            assert!(
718                transactions.len() as u64
719                    <= context.protocol_config.max_num_transactions_in_block(),
720                "Should have fetched transactions up to {}",
721                context.protocol_config.max_num_transactions_in_block()
722            );
723
724            let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
725            assert!(
726                total_size <= context.protocol_config.max_transactions_in_block_bytes(),
727                "Should have fetched transactions up to {}",
728                context.protocol_config.max_transactions_in_block_bytes()
729            );
730
731            // first batch should contain all transactions from 0..10. The softbundle it is to big to fit as well, so it's parked.
732            if batch_index == 0 {
733                assert_eq!(transactions.len(), 10);
734                for (i, transaction) in transactions.iter().enumerate() {
735                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
736                    assert_eq!(format!("transaction {}", i).to_string(), t);
737                }
738            // second batch will contain the soft bundle and the additional last transaction.
739            } else if batch_index == 1 {
740                assert_eq!(transactions.len(), 6);
741                for (i, transaction) in transactions.iter().enumerate() {
742                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
743                    assert_eq!(format!("transaction {}", i + 10).to_string(), t);
744                }
745            } else {
746                panic!("Unexpected batch index");
747            }
748
749            batch_index += 1;
750
751            all_acks.push(ack_transactions);
752        }
753
754        // now acknowledge the inclusion of all transactions.
755        for ack in all_acks {
756            ack(BlockRef::MIN);
757        }
758
759        // expect all receivers to be resolved.
760        for w in all_receivers {
761            let r = w.await;
762            assert!(r.is_ok());
763        }
764    }
765
766    #[tokio::test]
767    async fn test_submit_over_max_block_size_and_validate_block_size() {
768        // submit transactions individually so we make sure that we have reached the block size limit of 10
769        {
770            let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
771                config.set_consensus_max_transaction_size_bytes_for_testing(100);
772                config.set_consensus_max_num_transactions_in_block_for_testing(10);
773                config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
774                config
775            });
776
777            let context = Arc::new(Context::new_for_test(4).0);
778            let (client, tx_receiver) = TransactionClient::new(context.clone());
779            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
780            let mut all_receivers = Vec::new();
781
782            // create enough transactions
783            let max_num_transactions_in_block =
784                context.protocol_config.max_num_transactions_in_block();
785            for i in 0..2 * max_num_transactions_in_block {
786                let transaction = bcs::to_bytes(&format!("transaction {i}"))
787                    .expect("Serialization should not fail.");
788                let w = client
789                    .submit_no_wait(vec![transaction])
790                    .await
791                    .expect("Should submit successfully transaction");
792                all_receivers.push(w);
793            }
794
795            // Fetch the next transactions to be included in a block
796            let (transactions, _ack_transactions, limit) = consumer.next();
797            assert_eq!(limit, LimitReached::MaxNumOfTransactions);
798            assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
799
800            // Now create a block and verify that transactions are within the size limits
801            let block_verifier =
802                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
803
804            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
805            assert!(
806                block_verifier.check_transactions(&batch).is_ok(),
807                "Number of transactions limit verification failed"
808            );
809        }
810
811        // submit transactions individually so we make sure that we have reached the block size bytes 300
812        {
813            let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
814                config.set_consensus_max_transaction_size_bytes_for_testing(100);
815                config.set_consensus_max_num_transactions_in_block_for_testing(1_000);
816                config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
817                config
818            });
819
820            let context = Arc::new(Context::new_for_test(4).0);
821            let (client, tx_receiver) = TransactionClient::new(context.clone());
822            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
823            let mut all_receivers = Vec::new();
824
825            let max_transactions_in_block_bytes =
826                context.protocol_config.max_transactions_in_block_bytes();
827            let mut total_size = 0;
828            loop {
829                let transaction = bcs::to_bytes(&"transaction".to_string())
830                    .expect("Serialization should not fail.");
831                total_size += transaction.len() as u64;
832                let w = client
833                    .submit_no_wait(vec![transaction])
834                    .await
835                    .expect("Should submit successfully transaction");
836                all_receivers.push(w);
837
838                // create enough transactions to reach the block size limit
839                if total_size >= 2 * max_transactions_in_block_bytes {
840                    break;
841                }
842            }
843
844            // Fetch the next transactions to be included in a block
845            let (transactions, _ack_transactions, limit) = consumer.next();
846            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
847            let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
848
849            assert_eq!(limit, LimitReached::MaxBytes);
850            assert!(
851                batch.len()
852                    < context
853                        .protocol_config
854                        .consensus_max_num_transactions_in_block() as usize,
855                "Should have submitted less than the max number of transactions in a block"
856            );
857            assert!(size <= max_transactions_in_block_bytes);
858
859            // Now create a block and verify that transactions are within the size limits
860            let block_verifier =
861                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
862
863            assert!(
864                block_verifier.check_transactions(&batch).is_ok(),
865                "Total size of transactions limit verification failed"
866            );
867        }
868    }
869
870    // This is the case where the client submits a "ping" signal to the consensus to get information about the next block and simulate a transaction inclusion to the next block.
871    #[tokio::test]
872    async fn submit_with_no_transactions() {
873        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
874            config.set_consensus_max_transaction_size_bytes_for_testing(15);
875            config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
876            config
877        });
878
879        let context = Arc::new(Context::new_for_test(4).0);
880        let (client, tx_receiver) = TransactionClient::new(context.clone());
881        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
882
883        let w_no_transactions = client
884            .submit_no_wait(vec![])
885            .await
886            .expect("Should submit successfully empty array of transactions");
887
888        let transaction =
889            bcs::to_bytes(&"transaction".to_string()).expect("Serialization should not fail.");
890        let w_with_transactions = client
891            .submit_no_wait(vec![transaction])
892            .await
893            .expect("Should submit successfully transaction");
894
895        let (transactions, ack_transactions, _limit_reached) = consumer.next();
896        assert_eq!(transactions.len(), 1);
897
898        // Acknowledge the inclusion of the transactions
899        ack_transactions(BlockRef::MIN);
900
901        {
902            let r = w_no_transactions.await;
903            let (block_ref, indices, _status) = r.unwrap();
904            assert_eq!(block_ref, BlockRef::MIN);
905            assert_eq!(indices, vec![PING_TRANSACTION_INDEX]);
906        }
907
908        {
909            let r = w_with_transactions.await;
910            let (block_ref, indices, _status) = r.unwrap();
911            assert_eq!(block_ref, BlockRef::MIN);
912            assert_eq!(indices, vec![0]);
913        }
914    }
915
916    #[tokio::test]
917    async fn ping_transaction_index_never_reached() {
918        // Set the max number of transactions in a block to the max value of u16.
919        static MAX_NUM_TRANSACTIONS_IN_BLOCK: u64 =
920            (TransactionIndex::MAX - NUM_RESERVED_TRANSACTION_INDICES) as u64;
921
922        // Ensure that enough space is allocated in the channel for the pending transactions, so we don't end up consuming the transactions in chunks.
923        static MAX_PENDING_TRANSACTIONS: usize = 2 * MAX_NUM_TRANSACTIONS_IN_BLOCK as usize;
924
925        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
926            config.set_consensus_max_transaction_size_bytes_for_testing(200_000);
927            config.set_consensus_max_transactions_in_block_bytes_for_testing(1_000_000);
928            config.set_consensus_max_num_transactions_in_block_for_testing(
929                MAX_NUM_TRANSACTIONS_IN_BLOCK,
930            );
931            config
932        });
933
934        let context = Arc::new(Context::new_for_test(4).0);
935        let (client, tx_receiver) = TransactionClient::new_with_max_pending_transactions(
936            context.clone(),
937            MAX_PENDING_TRANSACTIONS,
938        );
939        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
940
941        // Add 10 more transactions than the max number of transactions in a block.
942        for i in 0..MAX_NUM_TRANSACTIONS_IN_BLOCK + 10 {
943            println!("Submitting transaction {i}");
944            let transaction =
945                bcs::to_bytes(&format!("t {i}")).expect("Serialization should not fail.");
946            let _w = client
947                .submit_no_wait(vec![transaction])
948                .await
949                .expect("Shouldn't submit successfully transaction");
950        }
951
952        // now pull the transactions from the consumer
953        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
954        assert_eq!(transactions.len() as u64, MAX_NUM_TRANSACTIONS_IN_BLOCK);
955
956        let t: String = bcs::from_bytes(transactions.last().unwrap().data()).unwrap();
957        assert_eq!(
958            t,
959            format!(
960                "t {}",
961                PING_TRANSACTION_INDEX - NUM_RESERVED_TRANSACTION_INDICES - 1
962            )
963        );
964    }
965}