1use std::sync::Arc;
5
6use diesel::prelude::ConnectionError;
7use diesel_async::RunQueryDsl;
8use diesel_async::pooled_connection::AsyncDieselConnectionManager;
9use diesel_async::pooled_connection::PoolError;
10use diesel_async::pooled_connection::bb8::Pool;
11use diesel_async::pooled_connection::bb8::PooledConnection;
12use diesel_async::pooled_connection::bb8::RunError;
13use diesel_async::{AsyncConnection, AsyncPgConnection};
14use futures::FutureExt;
15use url::Url;
16
17use crate::db::ConnectionConfig;
18use crate::db::ConnectionPoolConfig;
19
20#[derive(Clone, Debug)]
21pub struct ConnectionPool {
22 database_url: Arc<Url>,
23 pool: Pool<AsyncPgConnection>,
24}
25
26impl ConnectionPool {
27 pub async fn new(database_url: Url, config: ConnectionPoolConfig) -> Result<Self, PoolError> {
28 let database_url = Arc::new(database_url);
29 let connection_config = config.connection_config();
30 let mut manager_config = diesel_async::pooled_connection::ManagerConfig::default();
31 manager_config.custom_setup =
32 Box::new(move |url| establish_connection(url, connection_config).boxed());
33 let manager =
34 AsyncDieselConnectionManager::new_with_config(database_url.as_str(), manager_config);
35
36 Pool::builder()
37 .max_size(config.pool_size)
38 .connection_timeout(config.connection_timeout)
39 .build(manager)
40 .await
41 .map(|pool| Self { database_url, pool })
42 }
43
44 pub async fn get(&self) -> Result<Connection<'_>, RunError> {
46 self.pool.get().await.map(Connection::PooledConnection)
47 }
48
49 pub async fn dedicated_connection(&self) -> Result<Connection<'static>, PoolError> {
56 self.pool
57 .dedicated_connection()
58 .await
59 .map(Connection::Dedicated)
60 }
61
62 pub fn state(&self) -> bb8::State {
64 self.pool.state()
65 }
66
67 pub fn url(&self) -> &Url {
69 &self.database_url
70 }
71}
72
73pub enum Connection<'a> {
74 PooledConnection(PooledConnection<'a, AsyncPgConnection>),
75 Dedicated(AsyncPgConnection),
76}
77
78impl Connection<'static> {
79 pub async fn dedicated(database_url: &Url) -> Result<Self, ConnectionError> {
80 AsyncPgConnection::establish(database_url.as_str())
81 .await
82 .map(Connection::Dedicated)
83 }
84
85 pub async fn run_pending_migrations<M>(
87 self,
88 migrations: M,
89 ) -> diesel::migration::Result<Vec<diesel::migration::MigrationVersion<'static>>>
90 where
91 M: diesel::migration::MigrationSource<diesel::pg::Pg> + Send + 'static,
92 {
93 use diesel::migration::MigrationVersion;
94 use diesel_migrations::MigrationHarness;
95
96 let mut connection =
97 diesel_async::async_connection_wrapper::AsyncConnectionWrapper::<Self>::from(self);
98
99 tokio::task::spawn_blocking(move || {
100 connection
101 .run_pending_migrations(migrations)
102 .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect())
103 })
104 .await
105 .unwrap()
106 }
107}
108
109impl std::ops::Deref for Connection<'_> {
110 type Target = AsyncPgConnection;
111
112 fn deref(&self) -> &Self::Target {
113 match self {
114 Connection::PooledConnection(pooled) => pooled.deref(),
115 Connection::Dedicated(dedicated) => dedicated,
116 }
117 }
118}
119
120impl std::ops::DerefMut for Connection<'_> {
121 fn deref_mut(&mut self) -> &mut AsyncPgConnection {
122 match self {
123 Connection::PooledConnection(pooled) => pooled.deref_mut(),
124 Connection::Dedicated(dedicated) => dedicated,
125 }
126 }
127}
128
129impl ConnectionConfig {
130 async fn apply(&self, connection: &mut AsyncPgConnection) -> Result<(), diesel::result::Error> {
131 diesel::sql_query(format!(
132 "SET statement_timeout = {}",
133 self.statement_timeout.as_millis(),
134 ))
135 .execute(connection)
136 .await?;
137
138 if self.read_only {
139 diesel::sql_query("SET default_transaction_read_only = 'on'")
140 .execute(connection)
141 .await?;
142 }
143
144 Ok(())
145 }
146}
147
148async fn establish_connection(
150 url: &str,
151 config: ConnectionConfig,
152) -> Result<AsyncPgConnection, ConnectionError> {
153 let mut connection = AsyncPgConnection::establish(url).await?;
154
155 config
156 .apply(&mut connection)
157 .await
158 .map_err(ConnectionError::CouldntSetupConfiguration)?;
159
160 Ok(connection)
161}