1use std::{collections::BTreeMap, sync::Arc};
4
5use consensus_config::Epoch;
6use consensus_types::block::{
7 BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX, Round, TransactionIndex,
8};
9use mysten_common::debug_fatal;
10use mysten_metrics::monitored_mpsc::{Receiver, Sender, channel};
11use parking_lot::Mutex;
12use tap::TapFallible;
13use thiserror::Error;
14use tokio::sync::oneshot;
15use tracing::{error, warn};
16
17use crate::{block::Transaction, context::Context};
18
19const MAX_PENDING_TRANSACTIONS: usize = 2_000;
21
22pub(crate) struct TransactionsGuard {
27 transactions: Vec<Transaction>,
30
31 included_in_block_ack: oneshot::Sender<(
34 BlockRef,
36 Vec<TransactionIndex>,
38 oneshot::Receiver<BlockStatus>,
40 )>,
41}
42
43pub(crate) struct TransactionConsumer {
47 tx_receiver: Receiver<TransactionsGuard>,
48 max_transactions_in_block_bytes: u64,
49 max_num_transactions_in_block: u64,
50 pending_transactions: Option<TransactionsGuard>,
51 block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55#[allow(unused)]
56pub enum BlockStatus {
57 Sequenced(BlockRef),
60 GarbageCollected(BlockRef),
63}
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub enum LimitReached {
67 MaxNumOfTransactions,
69 MaxBytes,
71 AllTransactionsIncluded,
73}
74
75impl TransactionConsumer {
76 pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
77 assert!(
82 context
83 .protocol_config
84 .max_num_transactions_in_block()
85 .saturating_sub(1)
86 < TransactionIndex::MAX.saturating_sub(NUM_RESERVED_TRANSACTION_INDICES) as u64,
87 "Unsupported max_num_transactions_in_block: {}",
88 context.protocol_config.max_num_transactions_in_block()
89 );
90
91 Self {
92 tx_receiver,
93 max_transactions_in_block_bytes: context
94 .protocol_config
95 .max_transactions_in_block_bytes(),
96 max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
97 pending_transactions: None,
98 block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
99 }
100 }
101
102 pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
107 let mut transactions = Vec::new();
108 let mut acks = Vec::new();
109 let mut total_bytes = 0;
110 let mut limit_reached = LimitReached::AllTransactionsIncluded;
111
112 let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
116 let transactions_num = t.transactions.len() as u64;
119 if transactions_num == 0 {
120 acks.push((t.included_in_block_ack, vec![PING_TRANSACTION_INDEX]));
121 return None;
122 }
123
124 let transactions_bytes =
126 t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
127 if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
128 limit_reached = LimitReached::MaxBytes;
129 return Some(t);
130 }
131 if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
132 limit_reached = LimitReached::MaxNumOfTransactions;
133 return Some(t);
134 }
135
136 total_bytes += transactions_bytes;
137
138 let start_idx = transactions.len() as TransactionIndex;
140 let indices: Vec<TransactionIndex> =
141 (start_idx..start_idx + t.transactions.len() as TransactionIndex).collect();
142
143 acks.push((t.included_in_block_ack, indices));
146 transactions.extend(t.transactions);
147 None
148 };
149
150 if let Some(t) = self.pending_transactions.take()
151 && let Some(pending_transactions) = handle_txs(t)
152 {
153 debug_fatal!(
154 "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
155 pending_transactions.transactions
156 );
157 }
158
159 while self.pending_transactions.is_none() {
162 if let Ok(t) = self.tx_receiver.try_recv() {
163 self.pending_transactions = handle_txs(t);
164 } else {
165 break;
166 }
167 }
168
169 let block_status_subscribers = self.block_status_subscribers.clone();
170 (
171 transactions,
172 Box::new(move |block_ref: BlockRef| {
173 let mut block_status_subscribers = block_status_subscribers.lock();
174
175 for (ack, tx_indices) in acks {
176 let (status_tx, status_rx) = oneshot::channel();
177
178 block_status_subscribers
179 .entry(block_ref)
180 .or_default()
181 .push(status_tx);
182
183 let _ = ack.send((block_ref, tx_indices, status_rx));
184 }
185 }),
186 limit_reached,
187 )
188 }
189
190 pub(crate) fn notify_own_blocks_status(
194 &self,
195 committed_blocks: Vec<BlockRef>,
196 gc_round: Round,
197 ) {
198 let mut block_status_subscribers = self.block_status_subscribers.lock();
200 for block_ref in committed_blocks {
201 if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
202 subscribers.into_iter().for_each(|s| {
203 let _ = s.send(BlockStatus::Sequenced(block_ref));
204 });
205 }
206 }
207
208 while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
210 if block_ref.round <= gc_round {
211 subscribers.into_iter().for_each(|s| {
212 let _ = s.send(BlockStatus::GarbageCollected(block_ref));
213 });
214 } else {
215 block_status_subscribers.insert(block_ref, subscribers);
216 break;
217 }
218 }
219 }
220
221 #[cfg(test)]
222 pub(crate) fn subscribe_for_block_status_testing(
223 &self,
224 block_ref: BlockRef,
225 ) -> oneshot::Receiver<BlockStatus> {
226 let (tx, rx) = oneshot::channel();
227 let mut block_status_subscribers = self.block_status_subscribers.lock();
228 block_status_subscribers
229 .entry(block_ref)
230 .or_default()
231 .push(tx);
232 rx
233 }
234
235 #[cfg(test)]
236 fn is_empty(&mut self) -> bool {
237 if self.pending_transactions.is_some() {
238 return false;
239 }
240 if let Ok(t) = self.tx_receiver.try_recv() {
241 self.pending_transactions = Some(t);
242 return false;
243 }
244 true
245 }
246}
247
248#[derive(Clone)]
249pub struct TransactionClient {
250 context: Arc<Context>,
251 sender: Sender<TransactionsGuard>,
252 max_transaction_size: u64,
253 max_transactions_in_block_bytes: u64,
254 max_transactions_in_block_count: u64,
255}
256
257#[derive(Debug, Error)]
258pub enum ClientError {
259 #[error("Failed to submit transaction, consensus is shutting down: {0}")]
260 ConsensusShuttingDown(String),
261
262 #[error("Transaction size ({0}B) is over limit ({1}B)")]
263 OversizedTransaction(u64, u64),
264
265 #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
266 OversizedTransactionBundleBytes(u64, u64),
267
268 #[error("Transaction bundle count ({0}) is over limit ({1})")]
269 OversizedTransactionBundleCount(u64, u64),
270}
271
272impl TransactionClient {
273 pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
274 Self::new_with_max_pending_transactions(context, MAX_PENDING_TRANSACTIONS)
275 }
276
277 fn new_with_max_pending_transactions(
278 context: Arc<Context>,
279 max_pending_transactions: usize,
280 ) -> (Self, Receiver<TransactionsGuard>) {
281 let (sender, receiver) = channel("consensus_input", max_pending_transactions);
282 (
283 Self {
284 sender,
285 max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
286
287 max_transactions_in_block_bytes: context
288 .protocol_config
289 .max_transactions_in_block_bytes(),
290 max_transactions_in_block_count: context
291 .protocol_config
292 .max_num_transactions_in_block(),
293 context: context.clone(),
294 },
295 receiver,
296 )
297 }
298
299 pub fn epoch(&self) -> Epoch {
301 self.context.committee.epoch()
302 }
303
304 pub async fn submit(
311 &self,
312 transactions: Vec<Vec<u8>>,
313 ) -> Result<
314 (
315 BlockRef,
316 Vec<TransactionIndex>,
317 oneshot::Receiver<BlockStatus>,
318 ),
319 ClientError,
320 > {
321 let included_in_block = self.submit_no_wait(transactions).await?;
322 included_in_block
323 .await
324 .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
325 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
326 }
327
328 pub(crate) async fn submit_no_wait(
338 &self,
339 transactions: Vec<Vec<u8>>,
340 ) -> Result<
341 oneshot::Receiver<(
342 BlockRef,
343 Vec<TransactionIndex>,
344 oneshot::Receiver<BlockStatus>,
345 )>,
346 ClientError,
347 > {
348 let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
349
350 let mut bundle_size = 0;
351
352 if transactions.len() as u64 > self.max_transactions_in_block_count {
353 return Err(ClientError::OversizedTransactionBundleCount(
354 transactions.len() as u64,
355 self.max_transactions_in_block_count,
356 ));
357 }
358
359 for transaction in &transactions {
360 if transaction.len() as u64 > self.max_transaction_size {
361 return Err(ClientError::OversizedTransaction(
362 transaction.len() as u64,
363 self.max_transaction_size,
364 ));
365 }
366 bundle_size += transaction.len() as u64;
367
368 if bundle_size > self.max_transactions_in_block_bytes {
369 return Err(ClientError::OversizedTransactionBundleBytes(
370 bundle_size,
371 self.max_transactions_in_block_bytes,
372 ));
373 }
374 }
375
376 let t = TransactionsGuard {
377 transactions: transactions.into_iter().map(Transaction::new).collect(),
378 included_in_block_ack: included_in_block_ack_send,
379 };
380 self.sender
381 .send(t)
382 .await
383 .tap_err(|e| error!("Submit transactions failed with {:?}", e))
384 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
385 Ok(included_in_block_ack_receive)
386 }
387}
388
389pub trait TransactionVerifier: Send + Sync + 'static {
392 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
395
396 fn verify_and_vote_batch(
405 &self,
406 block_ref: &BlockRef,
407 batch: &[&[u8]],
408 ) -> Result<Vec<TransactionIndex>, ValidationError>;
409}
410
411#[derive(Debug, Error)]
412pub enum ValidationError {
413 #[error("Invalid transaction: {0}")]
414 InvalidTransaction(String),
415}
416
417#[cfg(any(test, msim))]
419pub struct NoopTransactionVerifier;
420
421#[cfg(any(test, msim))]
422impl TransactionVerifier for NoopTransactionVerifier {
423 fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
424 Ok(())
425 }
426
427 fn verify_and_vote_batch(
428 &self,
429 _block_ref: &BlockRef,
430 _batch: &[&[u8]],
431 ) -> Result<Vec<TransactionIndex>, ValidationError> {
432 Ok(vec![])
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::{sync::Arc, time::Duration};
439
440 use consensus_config::AuthorityIndex;
441 use consensus_types::block::{
442 BlockDigest, BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX,
443 TransactionIndex,
444 };
445 use futures::{StreamExt, stream::FuturesUnordered};
446 use sui_protocol_config::ProtocolConfig;
447 use tokio::time::timeout;
448
449 use crate::transaction::NoopTransactionVerifier;
450 use crate::{
451 block_verifier::SignedBlockVerifier,
452 context::Context,
453 transaction::{BlockStatus, LimitReached, TransactionClient, TransactionConsumer},
454 };
455
456 #[tokio::test(flavor = "current_thread", start_paused = true)]
457 async fn basic_submit_and_consume() {
458 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
459 config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
461 config
462 });
463
464 let context = Arc::new(Context::new_for_test(4).0);
465 let (client, tx_receiver) = TransactionClient::new(context.clone());
466 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
467
468 let mut included_in_block_waiters = FuturesUnordered::new();
470 for i in 0..3 {
471 let transaction =
472 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
473 let w = client
474 .submit_no_wait(vec![transaction])
475 .await
476 .expect("Shouldn't submit successfully transaction");
477 included_in_block_waiters.push(w);
478 }
479
480 let (transactions, ack_transactions, _limit_reached) = consumer.next();
482 assert_eq!(transactions.len(), 3);
483
484 for (i, t) in transactions.iter().enumerate() {
485 let t: String = bcs::from_bytes(t.data()).unwrap();
486 assert_eq!(format!("transaction {i}").to_string(), t);
487 }
488
489 assert!(
490 timeout(Duration::from_secs(1), included_in_block_waiters.next())
491 .await
492 .is_err(),
493 "We should expect to timeout as none of the transactions have been acknowledged yet"
494 );
495
496 ack_transactions(BlockRef::MIN);
498
499 while let Some(result) = included_in_block_waiters.next().await {
501 assert!(result.is_ok());
502 }
503
504 assert!(consumer.is_empty());
506 }
507
508 #[tokio::test(flavor = "current_thread", start_paused = true)]
509 async fn block_status_update() {
510 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
511 config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
513 config.set_consensus_gc_depth_for_testing(10);
514 config
515 });
516
517 let context = Arc::new(Context::new_for_test(4).0);
518 let (client, tx_receiver) = TransactionClient::new(context.clone());
519 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
520
521 let mut included_in_block_waiters = FuturesUnordered::new();
523 for i in 1..=10 {
524 let transaction =
525 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
526 let w = client
527 .submit_no_wait(vec![transaction])
528 .await
529 .expect("Shouldn't submit successfully transaction");
530 included_in_block_waiters.push(w);
531
532 if i % 2 == 0 {
534 let (transactions, ack_transactions, _limit_reached) = consumer.next();
535 assert_eq!(transactions.len(), 2);
536 ack_transactions(BlockRef::new(
537 i,
538 AuthorityIndex::new_for_test(0),
539 BlockDigest::MIN,
540 ));
541 }
542 }
543
544 let mut transaction_count = 0;
545 let mut block_status_waiters = Vec::new();
547 while let Some(result) = included_in_block_waiters.next().await {
548 let (block_ref, tx_indices, block_status_waiter) =
549 result.expect("Block inclusion waiter shouldn't fail");
550 assert_eq!(tx_indices.len(), 1);
552 assert_eq!(tx_indices[0], transaction_count % 2);
556 transaction_count += 1;
557
558 block_status_waiters.push((block_ref, block_status_waiter));
559 }
560
561 let gc_round = 5;
563 consumer.notify_own_blocks_status(
564 vec![
565 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
566 BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
567 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
568 ],
569 gc_round,
570 );
571
572 for (block_ref, waiter) in block_status_waiters {
574 let block_status = waiter.await.expect("Block status waiter shouldn't fail");
575
576 if block_ref.round <= gc_round {
577 assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
578 } else {
579 assert!(matches!(block_status, BlockStatus::Sequenced(_)));
580 }
581 }
582
583 assert!(consumer.block_status_subscribers.lock().is_empty());
585 }
586
587 #[tokio::test]
588 async fn submit_over_max_fetch_size_and_consume() {
589 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
590 config.set_consensus_max_transaction_size_bytes_for_testing(100);
591 config.set_consensus_max_transactions_in_block_bytes_for_testing(100);
592 config
593 });
594
595 let context = Arc::new(Context::new_for_test(4).0);
596 let (client, tx_receiver) = TransactionClient::new(context.clone());
597 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
598
599 for i in 0..10 {
601 let transaction =
602 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
603 let _w = client
604 .submit_no_wait(vec![transaction])
605 .await
606 .expect("Shouldn't submit successfully transaction");
607 }
608
609 let mut all_transactions = Vec::new();
611 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
612 assert_eq!(transactions.len(), 7);
613
614 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
616 assert!(
617 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
618 "Should have fetched transactions up to {}",
619 context.protocol_config.max_transactions_in_block_bytes()
620 );
621 all_transactions.extend(transactions);
622
623 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
625 assert_eq!(transactions.len(), 3);
626
627 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
629 assert!(
630 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
631 "Should have fetched transactions up to {}",
632 context.protocol_config.max_transactions_in_block_bytes()
633 );
634 all_transactions.extend(transactions);
635
636 assert!(consumer.is_empty());
638
639 for (i, t) in all_transactions.iter().enumerate() {
640 let t: String = bcs::from_bytes(t.data()).unwrap();
641 assert_eq!(format!("transaction {i}").to_string(), t);
642 }
643 }
644
645 #[tokio::test]
646 async fn submit_large_batch_and_ack() {
647 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
648 config.set_consensus_max_transaction_size_bytes_for_testing(15);
649 config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
650 config
651 });
652
653 let context = Arc::new(Context::new_for_test(4).0);
654 let (client, tx_receiver) = TransactionClient::new(context.clone());
655 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
656 let mut all_receivers = Vec::new();
657 for i in 0..10 {
659 let transaction =
660 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
661 let w = client
662 .submit_no_wait(vec![transaction])
663 .await
664 .expect("Should submit successfully transaction");
665 all_receivers.push(w);
666 }
667
668 {
670 let transactions: Vec<_> = (10..15)
671 .map(|i| {
672 bcs::to_bytes(&format!("transaction {i}"))
673 .expect("Serialization should not fail.")
674 })
675 .collect();
676 let w = client
677 .submit_no_wait(transactions)
678 .await
679 .expect("Should submit successfully transaction");
680 all_receivers.push(w);
681 }
682
683 {
685 let i = 15;
686 let transaction =
687 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
688 let w = client
689 .submit_no_wait(vec![transaction])
690 .await
691 .expect("Shouldn't submit successfully transaction");
692 all_receivers.push(w);
693 }
694
695 {
697 let transactions: Vec<_> = (16..32)
698 .map(|i| {
699 bcs::to_bytes(&format!("transaction {i}"))
700 .expect("Serialization should not fail.")
701 })
702 .collect();
703 let result = client.submit_no_wait(transactions).await.unwrap_err();
704 assert_eq!(
705 result.to_string(),
706 "Transaction bundle size (210B) is over limit (200B)"
707 );
708 }
709
710 let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
713 let mut batch_index = 0;
714 while !consumer.is_empty() {
715 let (transactions, ack_transactions, _limit_reached) = consumer.next();
716
717 assert!(
718 transactions.len() as u64
719 <= context.protocol_config.max_num_transactions_in_block(),
720 "Should have fetched transactions up to {}",
721 context.protocol_config.max_num_transactions_in_block()
722 );
723
724 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
725 assert!(
726 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
727 "Should have fetched transactions up to {}",
728 context.protocol_config.max_transactions_in_block_bytes()
729 );
730
731 if batch_index == 0 {
733 assert_eq!(transactions.len(), 10);
734 for (i, transaction) in transactions.iter().enumerate() {
735 let t: String = bcs::from_bytes(transaction.data()).unwrap();
736 assert_eq!(format!("transaction {}", i).to_string(), t);
737 }
738 } else if batch_index == 1 {
740 assert_eq!(transactions.len(), 6);
741 for (i, transaction) in transactions.iter().enumerate() {
742 let t: String = bcs::from_bytes(transaction.data()).unwrap();
743 assert_eq!(format!("transaction {}", i + 10).to_string(), t);
744 }
745 } else {
746 panic!("Unexpected batch index");
747 }
748
749 batch_index += 1;
750
751 all_acks.push(ack_transactions);
752 }
753
754 for ack in all_acks {
756 ack(BlockRef::MIN);
757 }
758
759 for w in all_receivers {
761 let r = w.await;
762 assert!(r.is_ok());
763 }
764 }
765
766 #[tokio::test]
767 async fn test_submit_over_max_block_size_and_validate_block_size() {
768 {
770 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
771 config.set_consensus_max_transaction_size_bytes_for_testing(100);
772 config.set_consensus_max_num_transactions_in_block_for_testing(10);
773 config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
774 config
775 });
776
777 let context = Arc::new(Context::new_for_test(4).0);
778 let (client, tx_receiver) = TransactionClient::new(context.clone());
779 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
780 let mut all_receivers = Vec::new();
781
782 let max_num_transactions_in_block =
784 context.protocol_config.max_num_transactions_in_block();
785 for i in 0..2 * max_num_transactions_in_block {
786 let transaction = bcs::to_bytes(&format!("transaction {i}"))
787 .expect("Serialization should not fail.");
788 let w = client
789 .submit_no_wait(vec![transaction])
790 .await
791 .expect("Should submit successfully transaction");
792 all_receivers.push(w);
793 }
794
795 let (transactions, _ack_transactions, limit) = consumer.next();
797 assert_eq!(limit, LimitReached::MaxNumOfTransactions);
798 assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
799
800 let block_verifier =
802 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
803
804 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
805 assert!(
806 block_verifier.check_transactions(&batch).is_ok(),
807 "Number of transactions limit verification failed"
808 );
809 }
810
811 {
813 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
814 config.set_consensus_max_transaction_size_bytes_for_testing(100);
815 config.set_consensus_max_num_transactions_in_block_for_testing(1_000);
816 config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
817 config
818 });
819
820 let context = Arc::new(Context::new_for_test(4).0);
821 let (client, tx_receiver) = TransactionClient::new(context.clone());
822 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
823 let mut all_receivers = Vec::new();
824
825 let max_transactions_in_block_bytes =
826 context.protocol_config.max_transactions_in_block_bytes();
827 let mut total_size = 0;
828 loop {
829 let transaction = bcs::to_bytes(&"transaction".to_string())
830 .expect("Serialization should not fail.");
831 total_size += transaction.len() as u64;
832 let w = client
833 .submit_no_wait(vec![transaction])
834 .await
835 .expect("Should submit successfully transaction");
836 all_receivers.push(w);
837
838 if total_size >= 2 * max_transactions_in_block_bytes {
840 break;
841 }
842 }
843
844 let (transactions, _ack_transactions, limit) = consumer.next();
846 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
847 let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
848
849 assert_eq!(limit, LimitReached::MaxBytes);
850 assert!(
851 batch.len()
852 < context
853 .protocol_config
854 .consensus_max_num_transactions_in_block() as usize,
855 "Should have submitted less than the max number of transactions in a block"
856 );
857 assert!(size <= max_transactions_in_block_bytes);
858
859 let block_verifier =
861 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
862
863 assert!(
864 block_verifier.check_transactions(&batch).is_ok(),
865 "Total size of transactions limit verification failed"
866 );
867 }
868 }
869
870 #[tokio::test]
872 async fn submit_with_no_transactions() {
873 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
874 config.set_consensus_max_transaction_size_bytes_for_testing(15);
875 config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
876 config
877 });
878
879 let context = Arc::new(Context::new_for_test(4).0);
880 let (client, tx_receiver) = TransactionClient::new(context.clone());
881 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
882
883 let w_no_transactions = client
884 .submit_no_wait(vec![])
885 .await
886 .expect("Should submit successfully empty array of transactions");
887
888 let transaction =
889 bcs::to_bytes(&"transaction".to_string()).expect("Serialization should not fail.");
890 let w_with_transactions = client
891 .submit_no_wait(vec![transaction])
892 .await
893 .expect("Should submit successfully transaction");
894
895 let (transactions, ack_transactions, _limit_reached) = consumer.next();
896 assert_eq!(transactions.len(), 1);
897
898 ack_transactions(BlockRef::MIN);
900
901 {
902 let r = w_no_transactions.await;
903 let (block_ref, indices, _status) = r.unwrap();
904 assert_eq!(block_ref, BlockRef::MIN);
905 assert_eq!(indices, vec![PING_TRANSACTION_INDEX]);
906 }
907
908 {
909 let r = w_with_transactions.await;
910 let (block_ref, indices, _status) = r.unwrap();
911 assert_eq!(block_ref, BlockRef::MIN);
912 assert_eq!(indices, vec![0]);
913 }
914 }
915
916 #[tokio::test]
917 async fn ping_transaction_index_never_reached() {
918 static MAX_NUM_TRANSACTIONS_IN_BLOCK: u64 =
920 (TransactionIndex::MAX - NUM_RESERVED_TRANSACTION_INDICES) as u64;
921
922 static MAX_PENDING_TRANSACTIONS: usize = 2 * MAX_NUM_TRANSACTIONS_IN_BLOCK as usize;
924
925 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
926 config.set_consensus_max_transaction_size_bytes_for_testing(200_000);
927 config.set_consensus_max_transactions_in_block_bytes_for_testing(1_000_000);
928 config.set_consensus_max_num_transactions_in_block_for_testing(
929 MAX_NUM_TRANSACTIONS_IN_BLOCK,
930 );
931 config
932 });
933
934 let context = Arc::new(Context::new_for_test(4).0);
935 let (client, tx_receiver) = TransactionClient::new_with_max_pending_transactions(
936 context.clone(),
937 MAX_PENDING_TRANSACTIONS,
938 );
939 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
940
941 for i in 0..MAX_NUM_TRANSACTIONS_IN_BLOCK + 10 {
943 println!("Submitting transaction {i}");
944 let transaction =
945 bcs::to_bytes(&format!("t {i}")).expect("Serialization should not fail.");
946 let _w = client
947 .submit_no_wait(vec![transaction])
948 .await
949 .expect("Shouldn't submit successfully transaction");
950 }
951
952 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
954 assert_eq!(transactions.len() as u64, MAX_NUM_TRANSACTIONS_IN_BLOCK);
955
956 let t: String = bcs::from_bytes(transactions.last().unwrap().data()).unwrap();
957 assert_eq!(
958 t,
959 format!(
960 "t {}",
961 PING_TRANSACTION_INDEX - NUM_RESERVED_TRANSACTION_INDICES - 1
962 )
963 );
964 }
965}