sui_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11    ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12    NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair, Stake,
13};
14use consensus_core::{
15    Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16    RandomnessSignatureHandler, storage::rocksdb_store::RocksDBStore,
17};
18use core::panic;
19use fastcrypto::encoding::{Encoding, Hex};
20use fastcrypto::traits::KeyPair as _;
21use mysten_metrics::{RegistryID, RegistryService};
22use mysten_network::Multiaddr;
23use prometheus::{
24    IntGauge, IntGaugeVec, Registry, register_int_gauge_vec_with_registry,
25    register_int_gauge_with_registry,
26};
27use std::collections::BTreeMap;
28use std::path::PathBuf;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use sui_config::{ConsensusConfig, NodeConfig};
32use sui_network::endpoint_manager::{AddressSource, ConsensusAddressUpdater};
33use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
34use sui_types::crypto::NetworkPublicKey;
35use sui_types::error::{SuiErrorKind, SuiResult};
36use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
37use sui_types::node_role::NodeRole;
38use sui_types::{
39    committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
40};
41use tokio::sync::{Mutex, broadcast};
42use tokio::time::{sleep, timeout};
43use tracing::{error, info};
44
45#[cfg(test)]
46#[path = "../unit_tests/consensus_manager_tests.rs"]
47pub mod consensus_manager_tests;
48
49#[derive(PartialEq)]
50enum Running {
51    True(EpochId, ProtocolVersion),
52    False,
53}
54
55/// Stores address updates that should be persisted across epoch changes.
56/// We store the consensus NetworkPublicKey to avoid repeated conversions.
57struct AddressOverridesMap {
58    // We store the AddressSource on a BTreeMap as it helps accessing the keys in priority order.
59    map: BTreeMap<
60        ConsensusNetworkPublicKey,
61        BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
62    >,
63}
64
65impl AddressOverridesMap {
66    pub fn new() -> Self {
67        Self {
68            map: BTreeMap::new(),
69        }
70    }
71
72    pub fn insert(
73        &mut self,
74        network_pubkey: ConsensusNetworkPublicKey,
75        source: sui_network::endpoint_manager::AddressSource,
76        addresses: Vec<Multiaddr>,
77    ) {
78        self.map
79            .entry(network_pubkey)
80            .or_default()
81            .insert(source, addresses);
82    }
83
84    pub fn remove(
85        &mut self,
86        network_pubkey: ConsensusNetworkPublicKey,
87        source: sui_network::endpoint_manager::AddressSource,
88    ) {
89        self.map
90            .entry(network_pubkey.clone())
91            .or_default()
92            .remove(&source);
93
94        // If no sources remain for this peer, remove the peer entry entirely
95        if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
96            self.map.remove(&network_pubkey);
97        }
98    }
99
100    /// Returns the highest-priority active override `(source, address)` for the
101    /// peer, or `None` when no override is installed (the on-chain committee
102    /// address is in use).
103    pub fn get_highest_priority_source_and_address(
104        &self,
105        network_pubkey: ConsensusNetworkPublicKey,
106    ) -> Option<(sui_network::endpoint_manager::AddressSource, Multiaddr)> {
107        self.map
108            .get(&network_pubkey)
109            .and_then(|sources| sources.first_key_value())
110            .and_then(|(source, addresses)| {
111                addresses.first().cloned().map(|address| (*source, address))
112            })
113    }
114
115    pub fn get_all_highest_priority_addresses(
116        &self,
117    ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
118        let mut result = Vec::new();
119
120        for (network_pubkey, sources) in self.map.iter() {
121            if let Some((_source, addresses)) = sources.first_key_value()
122                && let Some(address) = addresses.first()
123            {
124                result.push((network_pubkey.clone(), address.clone()));
125            }
126        }
127        result
128    }
129}
130
131/// Rebuilds the consensus `Committee` with Mysticeti v3 threshold parameters.
132/// `malicious_stake` and `crash_stake` come from env vars with reference-budget
133/// defaults (`f = c = 1250`); the nominal `threshold_total_stake = 5f + 3c + 1`
134/// is derived inside `Committee::new_v3`. This is a temporary iteration knob
135/// until the thresholds are promoted into `ProtocolConfig`.
136fn apply_v3_threshold_overrides(committee: Committee) -> Committee {
137    let malicious_stake: Stake = std::env::var("SUI_CONSENSUS_V3_MALICIOUS_STAKE")
138        .ok()
139        .and_then(|s| s.parse().ok())
140        .unwrap_or(1_250);
141    let crash_stake: Stake = std::env::var("SUI_CONSENSUS_V3_CRASH_STAKE")
142        .ok()
143        .and_then(|s| s.parse().ok())
144        .unwrap_or(1_250);
145    info!(
146        "consensus_manager: applying v3 committee thresholds \
147         (malicious_stake={malicious_stake}, crash_stake={crash_stake})"
148    );
149    Committee::new_v3(
150        committee.epoch(),
151        committee.authorities_slice().to_vec(),
152        malicious_stake,
153        crash_stake,
154    )
155}
156
157fn to_consensus_protocol_config(config: &ProtocolConfig) -> ConsensusProtocolConfig {
158    let chain_type = match config.chain() {
159        Chain::Mainnet => ChainType::Mainnet,
160        Chain::Testnet => ChainType::Testnet,
161        Chain::Unknown => ChainType::Unknown,
162    };
163    ConsensusProtocolConfig::new(
164        config.version.as_u64(),
165        chain_type,
166        config.max_transaction_size_bytes(),
167        config.max_transactions_in_block_bytes(),
168        config.max_num_transactions_in_block(),
169        config.gc_depth(),
170        /* transaction_voting_enabled */ true,
171        config.mysticeti_num_leaders_per_round(),
172        config.consensus_bad_nodes_stake_threshold(),
173        /* enable_v3 */ false,
174        /* leader_schedule_window_size */ 300,
175        /* leader_schedule_update_interval */ 12,
176    )
177}
178
179/// Used by Sui to start consensus protocol for each epoch.
180/// Supports both validator mode (with protocol keypair) and observer mode (without).
181pub struct ConsensusManager {
182    consensus_config: ConsensusConfig,
183    protocol_keypair: Option<ProtocolKeyPair>,
184    network_keypair: NetworkKeyPair,
185    storage_base_path: PathBuf,
186    metrics: Arc<ConsensusManagerMetrics>,
187    registry_service: RegistryService,
188    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
189
190    // Use a shared lazy Mysticeti client so we can update the internal Mysticeti
191    // client that gets created for every new epoch.
192    client: Arc<LazyMysticetiClient>,
193    consensus_client: Arc<UpdatableConsensusClient>,
194
195    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
196
197    #[cfg(test)]
198    pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
199    #[cfg(not(test))]
200    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
201    consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
202
203    running: Mutex<Running>,
204
205    #[cfg(test)]
206    pub(crate) boot_counter: Mutex<u64>,
207    #[cfg(not(test))]
208    boot_counter: Mutex<u64>,
209
210    // Persistent storage for address updates across epoch changes.
211    // Keyed by NetworkPublicKey and then by AddressSource.
212    address_overrides: parking_lot::Mutex<AddressOverridesMap>,
213}
214
215impl ConsensusManager {
216    pub fn new(
217        node_config: &NodeConfig,
218        consensus_config: &ConsensusConfig,
219        registry_service: &RegistryService,
220        consensus_client: Arc<UpdatableConsensusClient>,
221        node_role: NodeRole,
222    ) -> Self {
223        let metrics = Arc::new(ConsensusManagerMetrics::new(
224            &registry_service.default_registry(),
225        ));
226        let client = Arc::new(LazyMysticetiClient::new());
227        let (consumer_monitor_sender, _) = broadcast::channel(1);
228        let protocol_keypair = if node_role.is_validator() {
229            Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
230        } else {
231            None
232        };
233        Self {
234            consensus_config: consensus_config.clone(),
235            protocol_keypair,
236            network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
237            storage_base_path: consensus_config.db_path().to_path_buf(),
238            metrics,
239            registry_service: registry_service.clone(),
240            authority: ArcSwapOption::empty(),
241            client,
242            consensus_client,
243            consensus_handler: Mutex::new(None),
244            consumer_monitor: ArcSwapOption::empty(),
245            consumer_monitor_sender,
246            running: Mutex::new(Running::False),
247            boot_counter: Mutex::new(0),
248            address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
249        }
250    }
251
252    pub async fn start(
253        &self,
254        node_config: &NodeConfig,
255        epoch_store: Arc<AuthorityPerEpochStore>,
256        consensus_handler_initializer: ConsensusHandlerInitializer,
257        tx_validator: SuiTxValidator,
258        randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
259    ) {
260        let epoch = epoch_store.epoch();
261        let protocol_config = epoch_store.protocol_config();
262        let consensus_protocol_config = to_consensus_protocol_config(protocol_config);
263        let system_state = epoch_store.epoch_start_state();
264        let committee = if consensus_protocol_config.enable_v3() {
265            apply_v3_threshold_overrides(system_state.get_consensus_committee())
266        } else {
267            system_state.get_consensus_committee()
268        };
269
270        // Ensure start() is not called twice.
271        let start_time = Instant::now();
272        let mut running = self.running.lock().await;
273        if let Running::True(running_epoch, running_version) = *running {
274            error!(
275                "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
276            );
277            return;
278        }
279        *running = Running::True(epoch, protocol_config.version);
280
281        info!(
282            "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
283            protocol_config.version
284        );
285
286        self.consensus_client.set(self.client.clone());
287
288        let consensus_config = node_config
289            .consensus_config()
290            .expect("consensus_config should exist");
291
292        let parameters = Parameters {
293            db_path: self.get_store_path(epoch),
294            listen_address_override: consensus_config.listen_address.clone(),
295            ..consensus_config.parameters.clone().unwrap_or_default()
296        };
297
298        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
299
300        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
301
302        let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
303        let last_processed_commit_index =
304            consensus_handler.last_processed_subdag_index() as CommitIndex;
305        let replay_after_commit_index =
306            last_processed_commit_index.saturating_sub(num_prior_commits);
307
308        let (commit_consumer, commit_receiver) =
309            CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
310        let monitor = commit_consumer.monitor();
311
312        // Spin up the new Mysticeti consensus handler to listen for committed sub dags, before starting authority.
313        let handler = MysticetiConsensusHandler::new(
314            last_processed_commit_index,
315            consensus_handler,
316            commit_receiver,
317            monitor.clone(),
318        );
319        let mut consensus_handler = self.consensus_handler.lock().await;
320        *consensus_handler = Some(handler);
321
322        // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
323        // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
324        // If indeed any commits did happen, then we assume that node did participate on previous run.
325        let participated_on_previous_run =
326            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
327                previous_monitor.highest_handled_commit() > 0
328            } else {
329                false
330            };
331
332        // Increment the boot counter only if the consensus successfully participated in the previous run.
333        // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
334        // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
335        // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
336        let mut boot_counter = self.boot_counter.lock().await;
337        if participated_on_previous_run {
338            *boot_counter += 1;
339        } else {
340            info!(
341                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
342                *boot_counter
343            );
344        }
345
346        let authority = ConsensusAuthority::start(
347            NetworkType::Tonic,
348            epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
349            committee.clone(),
350            parameters.clone(),
351            consensus_protocol_config,
352            self.protocol_keypair.clone(),
353            self.network_keypair.clone(),
354            Arc::new(Clock::default()),
355            Arc::new(tx_validator.clone()),
356            commit_consumer,
357            registry.clone(),
358            *boot_counter,
359            randomness_signature_handler,
360        )
361        .await;
362        let client = authority.transaction_client();
363
364        let registry_id = self.registry_service.add(registry.clone());
365
366        let registered_authority = Arc::new((authority, registry_id));
367        self.authority.swap(Some(registered_authority.clone()));
368
369        // Reapply all stored address updates to the new consensus instance.
370        let highest_priority_addresses = self
371            .address_overrides
372            .lock()
373            .get_all_highest_priority_addresses();
374        for (network_pubkey, address) in highest_priority_addresses {
375            registered_authority
376                .0
377                .update_peer_address(network_pubkey, Some(address.clone()));
378        }
379
380        // Initialize the client to send transactions to this Mysticeti instance.
381        self.client.set(client);
382
383        // Send the consumer monitor to the replay waiter.
384        let _ = self.consumer_monitor_sender.send(monitor);
385
386        let elapsed = start_time.elapsed().as_secs_f64();
387        self.metrics.start_latency.set(elapsed as i64);
388
389        tracing::info!(
390            "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
391            epoch,
392            protocol_config.version,
393            elapsed
394        );
395    }
396
397    pub async fn shutdown(&self) {
398        info!("Shutting down consensus ...");
399
400        // Ensure shutdown() is called on a running consensus and get the epoch/version info.
401        let start_time = Instant::now();
402        let mut running = self.running.lock().await;
403        let (shutdown_epoch, shutdown_version) = match *running {
404            Running::True(epoch, version) => {
405                tracing::info!(
406                    "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
407                );
408                *running = Running::False;
409                (epoch, version)
410            }
411            Running::False => {
412                error!("Consensus shutdown was called but consensus is not running");
413                return;
414            }
415        };
416
417        // Stop consensus submissions.
418        self.client.clear();
419
420        // swap with empty to ensure there is no other reference to authority and we can safely do Arc unwrap
421        let r = self.authority.swap(None).unwrap();
422        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
423            panic!("Failed to retrieve the Mysticeti authority");
424        };
425
426        // shutdown the authority and wait for it
427        authority.stop().await;
428
429        // drop the old consensus handler to force stop any underlying task running.
430        let mut consensus_handler = self.consensus_handler.lock().await;
431        if let Some(mut handler) = consensus_handler.take() {
432            handler.abort().await;
433        }
434
435        // unregister the registry id
436        self.registry_service.remove(registry_id);
437
438        self.consensus_client.clear();
439
440        let elapsed = start_time.elapsed().as_secs_f64();
441        self.metrics.shutdown_latency.set(elapsed as i64);
442
443        tracing::info!(
444            "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
445            elapsed
446        );
447    }
448
449    pub async fn is_running(&self) -> bool {
450        let running = self.running.lock().await;
451        matches!(*running, Running::True(_, _))
452    }
453
454    pub fn replay_waiter(&self) -> ReplayWaiter {
455        let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
456        ReplayWaiter::new(consumer_monitor_receiver)
457    }
458
459    pub fn get_storage_base_path(&self) -> PathBuf {
460        self.consensus_config.db_path().to_path_buf()
461    }
462
463    pub fn consensus_store(&self) -> Option<Arc<RocksDBStore>> {
464        self.authority.load().as_ref().map(|a| a.0.store())
465    }
466
467    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
468        let mut store_path = self.storage_base_path.clone();
469        store_path.push(format!("{}", epoch));
470        store_path
471    }
472}
473
474// Implementing the interface so we can update the consensus peer addresses when requested.
475impl ConsensusAddressUpdater for ConsensusManager {
476    fn update_address(
477        &self,
478        network_pubkey: NetworkPublicKey,
479        source: sui_network::endpoint_manager::AddressSource,
480        addresses: Vec<Multiaddr>,
481    ) -> SuiResult<()> {
482        // Convert to consensus network public key once
483        let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
484
485        // Determine which override (if any) should be used after this update.
486        let highest_priority = {
487            let mut address_overrides = self.address_overrides.lock();
488
489            if addresses.is_empty() {
490                address_overrides.remove(network_pubkey.clone(), source);
491            } else {
492                address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
493            }
494
495            address_overrides.get_highest_priority_source_and_address(network_pubkey.clone())
496        };
497        self.metrics.set_active_address_source(
498            &Hex::encode(network_pubkey.to_bytes()),
499            highest_priority.as_ref().map(|(source, _)| *source),
500        );
501
502        // Apply the update to running consensus if it exists
503        let address_to_apply = highest_priority.map(|(_, address)| address);
504        if let Some(authority) = self.authority.load_full() {
505            authority
506                .0
507                .update_peer_address(network_pubkey, address_to_apply);
508            Ok(())
509        } else {
510            info!(
511                "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
512                network_pubkey, source
513            );
514            Err(SuiErrorKind::GenericAuthorityError {
515                error: "Consensus authority node is not running. Can not apply address update"
516                    .to_string(),
517            }
518            .into())
519        }
520    }
521}
522
523/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
524/// change where a client is set after the new consensus is started for the new epoch.
525#[derive(Default)]
526pub struct UpdatableConsensusClient {
527    // An extra layer of Arc<> is needed as required by ArcSwapAny.
528    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
529}
530
531impl UpdatableConsensusClient {
532    pub fn new() -> Self {
533        Self {
534            client: ArcSwapOption::empty(),
535        }
536    }
537
538    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
539        const START_TIMEOUT: Duration = Duration::from_secs(300);
540        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
541        if let Ok(client) = timeout(START_TIMEOUT, async {
542            loop {
543                let Some(client) = self.client.load_full() else {
544                    sleep(RETRY_INTERVAL).await;
545                    continue;
546                };
547                return client;
548            }
549        })
550        .await
551        {
552            return client;
553        }
554
555        panic!(
556            "Timed out after {:?} waiting for Consensus to start!",
557            START_TIMEOUT,
558        );
559    }
560
561    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
562        self.client.store(Some(Arc::new(client)));
563    }
564
565    pub fn clear(&self) {
566        self.client.store(None);
567    }
568}
569
570#[async_trait]
571impl ConsensusClient for UpdatableConsensusClient {
572    async fn submit(
573        &self,
574        transactions: &[ConsensusTransaction],
575        epoch_store: &Arc<AuthorityPerEpochStore>,
576    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
577        let client = self.get().await;
578        client.submit(transactions, epoch_store).await
579    }
580}
581
582/// Waits for consensus to finish replaying at consensus handler.
583pub struct ReplayWaiter {
584    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
585}
586
587impl ReplayWaiter {
588    pub(crate) fn new(
589        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
590    ) -> Self {
591        Self {
592            consumer_monitor_receiver,
593        }
594    }
595
596    pub(crate) async fn wait_for_replay(mut self) {
597        loop {
598            info!("Waiting for consensus to start replaying ...");
599            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
600                continue;
601            };
602            info!("Waiting for consensus handler to finish replaying ...");
603            monitor
604                .replay_to_consumer_last_processed_commit_complete()
605                .await;
606            break;
607        }
608    }
609}
610
611impl Clone for ReplayWaiter {
612    fn clone(&self) -> Self {
613        Self {
614            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
615        }
616    }
617}
618
619pub struct ConsensusManagerMetrics {
620    start_latency: IntGauge,
621    shutdown_latency: IntGauge,
622    active_address_source: IntGaugeVec,
623}
624
625impl ConsensusManagerMetrics {
626    pub fn new(registry: &Registry) -> Self {
627        Self {
628            start_latency: register_int_gauge_with_registry!(
629                "consensus_manager_start_latency",
630                "The latency of starting up consensus nodes",
631                registry,
632            )
633            .unwrap(),
634            shutdown_latency: register_int_gauge_with_registry!(
635                "consensus_manager_shutdown_latency",
636                "The latency of shutting down consensus nodes",
637                registry,
638            )
639            .unwrap(),
640            active_address_source: register_int_gauge_vec_with_registry!(
641                "consensus_active_address_source",
642                "Active consensus address source per committee peer, encoded as the gauge \
643                 value: 0=committee (no override active; the on-chain committee address is in \
644                 use), 1=admin, 2=config, 3=discovery, 4=seed, 5=chain (override priority \
645                 highest to lowest). One series per peer; `peer_id` is the full hex consensus \
646                 network public key.",
647                &["peer_id"],
648                registry,
649            )
650            .unwrap(),
651        }
652    }
653
654    /// Records the active consensus address source for a committee peer as a single
655    /// per-peer gauge whose value is the source's `metric_code`. `active =
656    /// Some(source)` means an override is installed; `None` means no override.
657    fn set_active_address_source(
658        &self,
659        peer_id: &str,
660        active: Option<sui_network::endpoint_manager::AddressSource>,
661    ) {
662        let code = active.map_or(
663            AddressSource::DEFAULT_ADDRESS_SOURCE_CODE,
664            sui_network::endpoint_manager::AddressSource::metric_code,
665        );
666        self.active_address_source
667            .with_label_values(&[peer_id])
668            .set(code);
669    }
670}