1use std::{
5 collections::{BTreeMap, HashMap},
6 panic,
7 sync::Arc,
8 time::Duration,
9};
10
11use anemo::{
12 PeerId, Response,
13 rpc::Status,
14 types::{PeerInfo, response::StatusCode},
15};
16use anemo_tower::{
17 auth::{AllowedPeers, RequireAuthorizationLayer},
18 callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler},
19 set_header::{SetRequestHeaderLayer, SetResponseHeaderLayer},
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use arc_swap::ArcSwapOption;
23use async_trait::async_trait;
24use bytes::Bytes;
25use consensus_config::{AuthorityIndex, NetworkKeyPair};
26use consensus_types::block::{BlockRef, Round};
27use serde::{Deserialize, Serialize};
28use tokio::sync::broadcast::error::RecvError;
29use tracing::{debug, error, warn};
30
31use super::{
32 BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService,
33 anemo_gen::{
34 consensus_rpc_client::ConsensusRpcClient,
35 consensus_rpc_server::{ConsensusRpc, ConsensusRpcServer},
36 },
37 connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle},
38 epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY},
39 metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse},
40};
41use crate::{
42 CommitIndex,
43 block::VerifiedBlock,
44 commit::CommitRange,
45 context::Context,
46 error::{ConsensusError, ConsensusResult},
47};
48
49pub(crate) struct AnemoClient {
51 context: Arc<Context>,
52 network: Arc<ArcSwapOption<anemo::Network>>,
53}
54
55impl AnemoClient {
56 const GET_CLIENT_INTERVAL: Duration = Duration::from_millis(10);
57
58 pub(crate) fn new(context: Arc<Context>) -> Self {
59 Self {
60 context,
61 network: Arc::new(ArcSwapOption::default()),
62 }
63 }
64
65 pub(crate) fn set_network(&self, network: anemo::Network) {
66 self.network.store(Some(Arc::new(network)));
67 }
68
69 async fn get_client(
70 &self,
71 peer: AuthorityIndex,
72 timeout: Duration,
73 ) -> ConsensusResult<ConsensusRpcClient<anemo::Peer>> {
74 let network = loop {
75 if let Some(network) = self.network.load_full() {
76 break network;
77 } else {
78 tokio::time::sleep(Self::GET_CLIENT_INTERVAL).await;
79 }
80 };
81
82 let authority = self.context.committee.authority(peer);
83 let peer_id = PeerId(authority.network_key.to_bytes());
84 if let Some(peer) = network.peer(peer_id) {
85 return Ok(ConsensusRpcClient::new(peer));
86 };
87
88 if network.known_peers().get(&peer_id).is_none() {
90 return Err(ConsensusError::UnknownNetworkPeer(format!("{}", peer_id)));
91 }
92
93 let (mut subscriber, _) = network.subscribe().map_err(|e| {
94 ConsensusError::NetworkClientConnection(format!(
95 "Cannot subscribe to AnemoNetwork updates: {e:?}"
96 ))
97 })?;
98
99 let sleep = tokio::time::sleep(timeout);
100 tokio::pin!(sleep);
101 loop {
102 tokio::select! {
103 recv = subscriber.recv() => match recv {
104 Ok(anemo::types::PeerEvent::NewPeer(pid)) if pid == peer_id => {
105 if let Some(peer) = network.peer(peer_id) {
107 return Ok(ConsensusRpcClient::new(peer));
108 }
109 warn!("Peer {} should be connected.", peer_id)
110 }
111 Err(RecvError::Closed) => return Err(ConsensusError::Shutdown),
112 Err(RecvError::Lagged(_)) => {
113 subscriber = subscriber.resubscribe();
114 if let Some(peer) = network.peer(peer_id) {
116 return Ok(ConsensusRpcClient::new(peer));
117 }
118 }
119 _ => {}
121 },
122 _ = &mut sleep => {
123 return Err(ConsensusError::PeerDisconnected(format!("{}", peer_id)));
124 },
125 }
126 }
127 }
128}
129
130#[async_trait]
131impl NetworkClient for AnemoClient {
132 const SUPPORT_STREAMING: bool = false;
133
134 async fn send_block(
135 &self,
136 peer: AuthorityIndex,
137 block: &VerifiedBlock,
138 timeout: Duration,
139 ) -> ConsensusResult<()> {
140 let mut client = self.get_client(peer, timeout).await?;
141 let request = SendBlockRequest {
142 block: block.serialized().clone(),
143 };
144 client
145 .send_block(anemo::Request::new(request).with_timeout(timeout))
146 .await
147 .map_err(|e| ConsensusError::NetworkRequest(format!("send_block failed: {e:?}")))?;
148 Ok(())
149 }
150
151 async fn subscribe_blocks(
152 &self,
153 _peer: AuthorityIndex,
154 _last_received: Round,
155 _timeout: Duration,
156 ) -> ConsensusResult<BlockStream> {
157 unimplemented!("Unimplemented")
158 }
159
160 async fn fetch_blocks(
161 &self,
162 peer: AuthorityIndex,
163 block_refs: Vec<BlockRef>,
164 highest_accepted_rounds: Vec<Round>,
165 breadth_first: bool,
166 timeout: Duration,
167 ) -> ConsensusResult<Vec<Bytes>> {
168 let mut client = self.get_client(peer, timeout).await?;
169 let request = FetchBlocksRequest {
170 block_refs: block_refs
171 .iter()
172 .filter_map(|r| match bcs::to_bytes(r) {
173 Ok(serialized) => Some(serialized),
174 Err(e) => {
175 debug!("Failed to serialize block ref {:?}: {e:?}", r);
176 None
177 }
178 })
179 .collect(),
180 highest_accepted_rounds,
181 breadth_first,
182 };
183 let response = client
184 .fetch_blocks(anemo::Request::new(request).with_timeout(timeout))
185 .await
186 .map_err(|e: Status| {
187 if e.status() == StatusCode::RequestTimeout {
188 ConsensusError::NetworkRequestTimeout(format!("fetch_blocks timeout: {e:?}"))
189 } else {
190 ConsensusError::NetworkRequest(format!("fetch_blocks failed: {e:?}"))
191 }
192 })?;
193 let body = response.into_body();
194 Ok(body.blocks)
195 }
196
197 async fn fetch_commits(
198 &self,
199 peer: AuthorityIndex,
200 commit_range: CommitRange,
201 timeout: Duration,
202 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
203 let mut client = self.get_client(peer, timeout).await?;
204 let request = FetchCommitsRequest {
205 start: commit_range.start(),
206 end: commit_range.end(),
207 };
208 let response = client
209 .fetch_commits(anemo::Request::new(request).with_timeout(timeout))
210 .await
211 .map_err(|e| ConsensusError::NetworkRequest(format!("fetch_blocks failed: {e:?}")))?;
212 let response = response.into_body();
213 Ok((response.commits, response.certifier_blocks))
214 }
215
216 async fn fetch_latest_blocks(
217 &self,
218 peer: AuthorityIndex,
219 authorities: Vec<AuthorityIndex>,
220 timeout: Duration,
221 ) -> ConsensusResult<Vec<Bytes>> {
222 let mut client = self.get_client(peer, timeout).await?;
223 let request = FetchLatestBlocksRequest { authorities };
224 let response = client
225 .fetch_latest_blocks(anemo::Request::new(request).with_timeout(timeout))
226 .await
227 .map_err(|e: Status| {
228 if e.status() == StatusCode::RequestTimeout {
229 ConsensusError::NetworkRequestTimeout(format!(
230 "fetch_latest_blocks timeout: {e:?}"
231 ))
232 } else {
233 ConsensusError::NetworkRequest(format!("fetch_latest_blocks failed: {e:?}"))
234 }
235 })?;
236 let body = response.into_body();
237 Ok(body.blocks)
238 }
239
240 async fn get_latest_rounds(
241 &self,
242 peer: AuthorityIndex,
243 timeout: Duration,
244 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
245 let mut client = self.get_client(peer, timeout).await?;
246 let request = GetLatestRoundsRequest {};
247 let response = client
248 .get_latest_rounds(anemo::Request::new(request).with_timeout(timeout))
249 .await
250 .map_err(|e: Status| {
251 if e.status() == StatusCode::RequestTimeout {
252 ConsensusError::NetworkRequestTimeout(format!(
253 "get_latest_rounds timeout: {e:?}"
254 ))
255 } else {
256 ConsensusError::NetworkRequest(format!("get_latest_rounds failed: {e:?}"))
257 }
258 })?;
259 let body = response.into_body();
260 Ok((body.highest_received, body.highest_accepted))
261 }
262}
263
264struct AnemoServiceProxy<S: NetworkService> {
266 peer_map: BTreeMap<PeerId, AuthorityIndex>,
267 service: Arc<S>,
268}
269
270impl<S: NetworkService> AnemoServiceProxy<S> {
271 fn new(context: Arc<Context>, service: Arc<S>) -> Self {
272 let peer_map = context
273 .committee
274 .authorities()
275 .map(|(index, authority)| {
276 let peer_id = PeerId(authority.network_key.to_bytes());
277 (peer_id, index)
278 })
279 .collect();
280 Self { peer_map, service }
281 }
282}
283
284#[async_trait]
285impl<S: NetworkService> ConsensusRpc for AnemoServiceProxy<S> {
286 async fn send_block(
287 &self,
288 request: anemo::Request<SendBlockRequest>,
289 ) -> Result<anemo::Response<SendBlockResponse>, anemo::rpc::Status> {
290 let Some(peer_id) = request.peer_id() else {
291 return Err(anemo::rpc::Status::new_with_message(
292 anemo::types::response::StatusCode::BadRequest,
293 "peer_id not found",
294 ));
295 };
296 let index = *self.peer_map.get(peer_id).ok_or_else(|| {
297 anemo::rpc::Status::new_with_message(
298 anemo::types::response::StatusCode::BadRequest,
299 "peer not found",
300 )
301 })?;
302 let block = request.into_body().block;
303 let block = ExtendedSerializedBlock {
304 block,
305 excluded_ancestors: vec![],
306 };
307 self.service
308 .handle_send_block(index, block)
309 .await
310 .map_err(|e| {
311 anemo::rpc::Status::new_with_message(
312 anemo::types::response::StatusCode::BadRequest,
313 format!("{e}"),
314 )
315 })?;
316 Ok(Response::new(SendBlockResponse {}))
317 }
318
319 async fn fetch_blocks(
320 &self,
321 request: anemo::Request<FetchBlocksRequest>,
322 ) -> Result<anemo::Response<FetchBlocksResponse>, anemo::rpc::Status> {
323 let Some(peer_id) = request.peer_id() else {
324 return Err(anemo::rpc::Status::new_with_message(
325 anemo::types::response::StatusCode::BadRequest,
326 "peer_id not found",
327 ));
328 };
329 let index = *self.peer_map.get(peer_id).ok_or_else(|| {
330 anemo::rpc::Status::new_with_message(
331 anemo::types::response::StatusCode::BadRequest,
332 "peer not found",
333 )
334 })?;
335 let body = request.into_body();
336 let block_refs = body
337 .block_refs
338 .into_iter()
339 .filter_map(|serialized| match bcs::from_bytes(&serialized) {
340 Ok(r) => Some(r),
341 Err(e) => {
342 debug!("Failed to deserialize block ref {:?}: {e:?}", serialized);
343 None
344 }
345 })
346 .collect();
347
348 let highest_accepted_rounds = body.highest_accepted_rounds;
349 let breadth_first = body.breadth_first;
350 let blocks = self
351 .service
352 .handle_fetch_blocks(index, block_refs, highest_accepted_rounds, breadth_first)
353 .await
354 .map_err(|e| {
355 anemo::rpc::Status::new_with_message(
356 anemo::types::response::StatusCode::BadRequest,
357 format!("{e}"),
358 )
359 })?;
360 Ok(Response::new(FetchBlocksResponse { blocks }))
361 }
362
363 async fn fetch_commits(
364 &self,
365 request: anemo::Request<FetchCommitsRequest>,
366 ) -> Result<anemo::Response<FetchCommitsResponse>, anemo::rpc::Status> {
367 let Some(peer_id) = request.peer_id() else {
368 return Err(anemo::rpc::Status::new_with_message(
369 anemo::types::response::StatusCode::BadRequest,
370 "peer_id not found",
371 ));
372 };
373 let index = *self.peer_map.get(peer_id).ok_or_else(|| {
374 anemo::rpc::Status::new_with_message(
375 anemo::types::response::StatusCode::BadRequest,
376 "peer not found",
377 )
378 })?;
379 let request = request.into_body();
380 let (commits, certifier_blocks) = self
381 .service
382 .handle_fetch_commits(index, (request.start..=request.end).into())
383 .await
384 .map_err(|e| {
385 anemo::rpc::Status::new_with_message(
386 anemo::types::response::StatusCode::InternalServerError,
387 format!("{e}"),
388 )
389 })?;
390 let commits = commits
391 .into_iter()
392 .map(|c| c.serialized().clone())
393 .collect();
394 let certifier_blocks = certifier_blocks
395 .into_iter()
396 .map(|b| b.serialized().clone())
397 .collect();
398 Ok(Response::new(FetchCommitsResponse {
399 commits,
400 certifier_blocks,
401 }))
402 }
403
404 async fn fetch_latest_blocks(
405 &self,
406 request: anemo::Request<FetchLatestBlocksRequest>,
407 ) -> Result<anemo::Response<FetchLatestBlocksResponse>, anemo::rpc::Status> {
408 let Some(peer_id) = request.peer_id() else {
409 return Err(anemo::rpc::Status::new_with_message(
410 anemo::types::response::StatusCode::BadRequest,
411 "peer_id not found",
412 ));
413 };
414 let index = *self.peer_map.get(peer_id).ok_or_else(|| {
415 anemo::rpc::Status::new_with_message(
416 anemo::types::response::StatusCode::BadRequest,
417 "peer not found",
418 )
419 })?;
420 let body = request.into_body();
421 let blocks = self
422 .service
423 .handle_fetch_latest_blocks(index, body.authorities)
424 .await
425 .map_err(|e| {
426 anemo::rpc::Status::new_with_message(
427 anemo::types::response::StatusCode::BadRequest,
428 format!("{e}"),
429 )
430 })?;
431 Ok(Response::new(FetchLatestBlocksResponse { blocks }))
432 }
433
434 async fn get_latest_rounds(
435 &self,
436 request: anemo::Request<GetLatestRoundsRequest>,
437 ) -> Result<anemo::Response<GetLatestRoundsResponse>, anemo::rpc::Status> {
438 let Some(peer_id) = request.peer_id() else {
439 return Err(anemo::rpc::Status::new_with_message(
440 anemo::types::response::StatusCode::BadRequest,
441 "peer_id not found",
442 ));
443 };
444 let index = *self.peer_map.get(peer_id).ok_or_else(|| {
445 anemo::rpc::Status::new_with_message(
446 anemo::types::response::StatusCode::BadRequest,
447 "peer not found",
448 )
449 })?;
450 let (highest_received, highest_accepted) = self
451 .service
452 .handle_get_latest_rounds(index)
453 .await
454 .map_err(|e| {
455 anemo::rpc::Status::new_with_message(
456 anemo::types::response::StatusCode::InternalServerError,
457 format!("{e}"),
458 )
459 })?;
460 Ok(Response::new(GetLatestRoundsResponse {
461 highest_received,
462 highest_accepted,
463 }))
464 }
465}
466
467pub(crate) struct AnemoManager {
474 context: Arc<Context>,
475 network_keypair: Option<NetworkKeyPair>,
476 client: Arc<AnemoClient>,
477 network: Arc<ArcSwapOption<anemo::Network>>,
478 connection_monitor_handle: Option<ConnectionMonitorHandle>,
479}
480
481impl AnemoManager {
482 pub(crate) fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self {
483 Self {
484 context: context.clone(),
485 network_keypair: Some(network_keypair),
486 client: Arc::new(AnemoClient::new(context)),
487 network: Arc::new(ArcSwapOption::default()),
488 connection_monitor_handle: None,
489 }
490 }
491}
492
493impl<S: NetworkService> NetworkManager<S> for AnemoManager {
494 type Client = AnemoClient;
495
496 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self {
497 AnemoManager::new(context, network_keypair)
498 }
499
500 fn client(&self) -> Arc<Self::Client> {
501 self.client.clone()
502 }
503
504 async fn install_service(&mut self, service: Arc<S>) {
505 self.context
506 .metrics
507 .network_metrics
508 .network_type
509 .with_label_values(&["anemo"])
510 .set(1);
511
512 debug!("Starting anemo service");
513
514 let server = ConsensusRpcServer::new(AnemoServiceProxy::new(self.context.clone(), service));
515 let authority = self.context.committee.authority(self.context.own_index);
516 let own_address = if authority.address.is_localhost_ip() {
519 authority.address.clone()
520 } else {
521 authority.address.with_zero_ip()
522 };
523 let epoch_string: String = self.context.committee.epoch().to_string();
524 let inbound_network_metrics = self.context.metrics.network_metrics.inbound.clone();
525 let outbound_network_metrics = self.context.metrics.network_metrics.outbound.clone();
526 let quinn_connection_metrics = self
527 .context
528 .metrics
529 .network_metrics
530 .quinn_connection_metrics
531 .clone();
532 let all_peer_ids = self
533 .context
534 .committee
535 .authorities()
536 .map(|(_i, authority)| PeerId(authority.network_key.to_bytes()));
537
538 let routes = anemo::Router::new()
539 .route_layer(RequireAuthorizationLayer::new(AllowedPeers::new(
540 all_peer_ids,
541 )))
542 .route_layer(RequireAuthorizationLayer::new(AllowedEpoch::new(
543 epoch_string.clone(),
544 )))
545 .add_rpc_service(server);
546
547 let service = tower::ServiceBuilder::new()
549 .layer(
550 TraceLayer::new_for_server_errors()
551 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
552 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
553 )
554 .layer(CallbackLayer::new(MetricsCallbackMaker::new(
555 inbound_network_metrics,
556 self.context.parameters.anemo.excessive_message_size,
557 )))
558 .layer(SetResponseHeaderLayer::overriding(
559 EPOCH_HEADER_KEY.parse().unwrap(),
560 epoch_string.clone(),
561 ))
562 .service(routes);
563
564 let outbound_layer = tower::ServiceBuilder::new()
565 .layer(
566 TraceLayer::new_for_client_and_server_errors()
567 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
568 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
569 )
570 .layer(CallbackLayer::new(MetricsCallbackMaker::new(
571 outbound_network_metrics,
572 self.context.parameters.anemo.excessive_message_size,
573 )))
574 .layer(SetRequestHeaderLayer::overriding(
575 EPOCH_HEADER_KEY.parse().unwrap(),
576 epoch_string,
577 ))
578 .into_inner();
579
580 let anemo_config = {
581 let mut quic_config = anemo::QuicConfig::default();
582 quic_config.max_concurrent_bidi_streams = Some(10_000);
584 quic_config.stream_receive_window = Some(100 << 20);
588 quic_config.receive_window = Some(200 << 20);
589 quic_config.send_window = Some(200 << 20);
590 quic_config.crypto_buffer_size = Some(1 << 20);
591 quic_config.socket_receive_buffer_size = Some(20 << 20);
592 quic_config.socket_send_buffer_size = Some(20 << 20);
593 quic_config.allow_failed_socket_buffer_size_setting = true;
594 quic_config.max_idle_timeout_ms = Some(30_000);
595 quic_config.keep_alive_interval_ms = Some(5_000);
597
598 let mut config = anemo::Config::default();
599 config.quic = Some(quic_config);
600 config.max_frame_size = Some(1 << 30);
603 config.inbound_request_timeout_ms = Some(300_000);
605 config.outbound_request_timeout_ms = Some(300_000);
606 config.shutdown_idle_timeout_ms = Some(1_000);
607 config.connectivity_check_interval_ms = Some(2_000);
608 config.connection_backoff_ms = Some(1_000);
609 config.max_connection_backoff_ms = Some(20_000);
610 config
611 };
612
613 let mut retries_left = 90;
614 let addr = own_address
615 .to_anemo_address()
616 .unwrap_or_else(|op| panic!("{op}: {own_address}"));
617 let private_key_bytes = self.network_keypair.take().unwrap().private_key_bytes();
618 let network = loop {
619 let network_result = anemo::Network::bind(addr.clone())
620 .server_name("consensus")
621 .private_key(private_key_bytes)
622 .config(anemo_config.clone())
623 .outbound_request_layer(outbound_layer.clone())
624 .start(service.clone());
625 match network_result {
626 Ok(n) => {
627 break n;
628 }
629 Err(e) => {
630 retries_left -= 1;
631
632 if retries_left <= 0 {
633 panic!("Failed to initialize AnemoNetwork at {addr}! Last error: {e:#?}");
634 }
635 warn!(
636 "Address {addr} should be available for the Consensus service, retrying in one second: {e:#?}",
637 );
638 tokio::time::sleep(Duration::from_secs(1)).await;
639 }
640 }
641 };
642
643 let mut known_peer_ids = HashMap::new();
644 for (_i, authority) in self.context.committee.authorities() {
645 let peer_id = PeerId(authority.network_key.to_bytes());
646 let peer_address = match authority.address.to_anemo_address() {
647 Ok(addr) => addr,
648 Err(e) => {
652 error!(
653 "Failed to convert {:?} to anemo address: {:?}",
654 authority.address, e
655 );
656 continue;
657 }
658 };
659 let peer_info = PeerInfo {
660 peer_id,
661 affinity: anemo::types::PeerAffinity::High,
662 address: vec![peer_address.clone()],
663 };
664 network.known_peers().insert(peer_info);
665 known_peer_ids.insert(peer_id, authority.hostname.clone());
666 }
667
668 let connection_monitor_handle = AnemoConnectionMonitor::spawn(
669 network.downgrade(),
670 quinn_connection_metrics,
671 known_peer_ids,
672 );
673
674 self.connection_monitor_handle = Some(connection_monitor_handle);
675 self.client.set_network(network.clone());
676 self.network.store(Some(Arc::new(network)));
677 }
678
679 async fn stop(&mut self) {
680 if let Some(network) = self.network.load_full() {
681 if let Err(e) = network.shutdown().await {
682 warn!("Failure when shutting down AnemoNetwork: {e:?}");
683 }
684 self.network.store(None);
685 }
686
687 if let Some(connection_monitor_handle) = self.connection_monitor_handle.take() {
688 connection_monitor_handle.stop().await;
689 }
690
691 self.context
692 .metrics
693 .network_metrics
694 .network_type
695 .with_label_values(&["anemo"])
696 .set(0);
697 }
698}
699
700impl SizedRequest for anemo::Request<Bytes> {
703 fn size(&self) -> usize {
704 self.body().len()
705 }
706
707 fn route(&self) -> String {
708 self.route().to_string()
709 }
710}
711
712impl SizedResponse for anemo::Response<Bytes> {
713 fn size(&self) -> usize {
714 self.body().len()
715 }
716
717 fn error_type(&self) -> Option<String> {
718 if self.status().is_success() {
719 None
720 } else {
721 Some(self.status().to_string())
722 }
723 }
724}
725
726impl MakeCallbackHandler for MetricsCallbackMaker {
727 type Handler = MetricsResponseCallback;
728
729 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
730 self.handle_request(request)
731 }
732}
733
734impl ResponseHandler for MetricsResponseCallback {
735 fn on_response(mut self, response: &anemo::Response<bytes::Bytes>) {
736 MetricsResponseCallback::on_response(&mut self, response)
737 }
738
739 fn on_error<E>(mut self, err: &E) {
740 MetricsResponseCallback::on_error(&mut self, err)
741 }
742}
743
744#[derive(Clone, Serialize, Deserialize)]
746pub(crate) struct SendBlockRequest {
747 block: Bytes,
749}
750
751#[derive(Clone, Serialize, Deserialize, prost::Message)]
752pub(crate) struct SendBlockResponse {}
753
754#[derive(Clone, Serialize, Deserialize)]
755pub(crate) struct FetchBlocksRequest {
756 block_refs: Vec<Vec<u8>>,
757 highest_accepted_rounds: Vec<Round>,
758 breadth_first: bool,
759}
760
761#[derive(Clone, Serialize, Deserialize)]
762pub(crate) struct FetchBlocksResponse {
763 blocks: Vec<Bytes>,
765}
766
767#[derive(Clone, Serialize, Deserialize)]
768pub(crate) struct FetchCommitsRequest {
769 start: CommitIndex,
770 end: CommitIndex,
771}
772
773#[derive(Clone, Serialize, Deserialize)]
774pub(crate) struct FetchCommitsResponse {
775 commits: Vec<Bytes>,
777 certifier_blocks: Vec<Bytes>,
779}
780
781#[derive(Clone, Serialize, Deserialize)]
782pub(crate) struct FetchLatestBlocksRequest {
783 authorities: Vec<AuthorityIndex>,
784}
785
786#[derive(Clone, Serialize, Deserialize)]
787pub(crate) struct FetchLatestBlocksResponse {
788 blocks: Vec<Bytes>,
790}
791
792#[derive(Clone, Serialize, Deserialize)]
793pub(crate) struct GetLatestRoundsRequest {}
794
795#[derive(Clone, Serialize, Deserialize)]
796pub(crate) struct GetLatestRoundsResponse {
797 highest_received: Vec<Round>,
799 highest_accepted: Vec<Round>,
801}