sui_indexer/
database.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use diesel::prelude::ConnectionError;
7use diesel_async::RunQueryDsl;
8use diesel_async::pooled_connection::AsyncDieselConnectionManager;
9use diesel_async::pooled_connection::PoolError;
10use diesel_async::pooled_connection::bb8::Pool;
11use diesel_async::pooled_connection::bb8::PooledConnection;
12use diesel_async::pooled_connection::bb8::RunError;
13use diesel_async::{AsyncConnection, AsyncPgConnection};
14use futures::FutureExt;
15use url::Url;
16
17use crate::db::ConnectionConfig;
18use crate::db::ConnectionPoolConfig;
19
20#[derive(Clone, Debug)]
21pub struct ConnectionPool {
22    database_url: Arc<Url>,
23    pool: Pool<AsyncPgConnection>,
24}
25
26impl ConnectionPool {
27    pub async fn new(database_url: Url, config: ConnectionPoolConfig) -> Result<Self, PoolError> {
28        let database_url = Arc::new(database_url);
29        let connection_config = config.connection_config();
30        let mut manager_config = diesel_async::pooled_connection::ManagerConfig::default();
31        manager_config.custom_setup =
32            Box::new(move |url| establish_connection(url, connection_config).boxed());
33        let manager =
34            AsyncDieselConnectionManager::new_with_config(database_url.as_str(), manager_config);
35
36        Pool::builder()
37            .max_size(config.pool_size)
38            .connection_timeout(config.connection_timeout)
39            .build(manager)
40            .await
41            .map(|pool| Self { database_url, pool })
42    }
43
44    /// Retrieves a connection from the pool.
45    pub async fn get(&self) -> Result<Connection<'_>, RunError> {
46        self.pool.get().await.map(Connection::PooledConnection)
47    }
48
49    /// Get a new dedicated connection that will not be managed by the pool.
50    /// An application may want a persistent connection (e.g. to do a
51    /// postgres LISTEN) that will not be closed or repurposed by the pool.
52    ///
53    /// This method allows reusing the manager's configuration but otherwise
54    /// bypassing the pool
55    pub async fn dedicated_connection(&self) -> Result<Connection<'static>, PoolError> {
56        self.pool
57            .dedicated_connection()
58            .await
59            .map(Connection::Dedicated)
60    }
61
62    /// Returns information about the current state of the pool.
63    pub fn state(&self) -> bb8::State {
64        self.pool.state()
65    }
66
67    /// Returns the database url that this pool is configured with
68    pub fn url(&self) -> &Url {
69        &self.database_url
70    }
71}
72
73pub enum Connection<'a> {
74    PooledConnection(PooledConnection<'a, AsyncPgConnection>),
75    Dedicated(AsyncPgConnection),
76}
77
78impl Connection<'static> {
79    pub async fn dedicated(database_url: &Url) -> Result<Self, ConnectionError> {
80        AsyncPgConnection::establish(database_url.as_str())
81            .await
82            .map(Connection::Dedicated)
83    }
84
85    /// Run the provided Migrations
86    pub async fn run_pending_migrations<M>(
87        self,
88        migrations: M,
89    ) -> diesel::migration::Result<Vec<diesel::migration::MigrationVersion<'static>>>
90    where
91        M: diesel::migration::MigrationSource<diesel::pg::Pg> + Send + 'static,
92    {
93        use diesel::migration::MigrationVersion;
94        use diesel_migrations::MigrationHarness;
95
96        let mut connection =
97            diesel_async::async_connection_wrapper::AsyncConnectionWrapper::<Self>::from(self);
98
99        tokio::task::spawn_blocking(move || {
100            connection
101                .run_pending_migrations(migrations)
102                .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect())
103        })
104        .await
105        .unwrap()
106    }
107}
108
109impl std::ops::Deref for Connection<'_> {
110    type Target = AsyncPgConnection;
111
112    fn deref(&self) -> &Self::Target {
113        match self {
114            Connection::PooledConnection(pooled) => pooled.deref(),
115            Connection::Dedicated(dedicated) => dedicated,
116        }
117    }
118}
119
120impl std::ops::DerefMut for Connection<'_> {
121    fn deref_mut(&mut self) -> &mut AsyncPgConnection {
122        match self {
123            Connection::PooledConnection(pooled) => pooled.deref_mut(),
124            Connection::Dedicated(dedicated) => dedicated,
125        }
126    }
127}
128
129impl ConnectionConfig {
130    async fn apply(&self, connection: &mut AsyncPgConnection) -> Result<(), diesel::result::Error> {
131        diesel::sql_query(format!(
132            "SET statement_timeout = {}",
133            self.statement_timeout.as_millis(),
134        ))
135        .execute(connection)
136        .await?;
137
138        if self.read_only {
139            diesel::sql_query("SET default_transaction_read_only = 'on'")
140                .execute(connection)
141                .await?;
142        }
143
144        Ok(())
145    }
146}
147
148/// Function used by the Connection Pool Manager to establish and setup new connections
149async fn establish_connection(
150    url: &str,
151    config: ConnectionConfig,
152) -> Result<AsyncPgConnection, ConnectionError> {
153    let mut connection = AsyncPgConnection::establish(url).await?;
154
155    config
156        .apply(&mut connection)
157        .await
158        .map_err(ConnectionError::CouldntSetupConfiguration)?;
159
160    Ok(connection)
161}