Skip to content

Commit ecf7a99

Browse files
goffrieConvex, Inc.
authored andcommitted
Simplify ConvexPgPool a bit and add comments (#38240)
non-comment changes: - make use of `GenericClient` to abstract over `Client` vs `Transaction` - instead of creating a `connector` function with a complicated type, just store the `MakeRustlsConnect` object in ConvexPgPool and inline the connect code GitOrigin-RevId: 2af05f41aa431e01ea18fa51fd07939f8cba96b4
1 parent c29f1a9 commit ecf7a99

File tree

1 file changed

+48
-52
lines changed

1 file changed

+48
-52
lines changed

crates/postgres/src/connection.rs

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
//! Implements a Postgres connection pool and statement cache.
2+
//!
3+
//! Unlike deadpool-postgres, we:
4+
//! - limit the number of cached prepared statements owned by each connection in
5+
//! order to avoid high/unbounded memory usage on the Postgres server
6+
//! - automatically clean up idle connections.
7+
18
use std::{
29
collections::VecDeque,
310
sync::{
@@ -28,7 +35,6 @@ use fastrace::{
2835
Span,
2936
};
3037
use futures::{
31-
future::BoxFuture,
3238
pin_mut,
3339
select_biased,
3440
Future,
@@ -56,20 +62,17 @@ use tokio::{
5662
};
5763
use tokio_postgres::{
5864
config::TargetSessionAttrs,
59-
tls::{
60-
MakeTlsConnect,
61-
TlsConnect,
62-
},
6365
types::{
6466
BorrowToSql,
6567
ToSql,
6668
},
69+
GenericClient,
6770
Row,
6871
RowStream,
69-
Socket,
7072
Statement,
7173
Transaction,
7274
};
75+
use tokio_postgres_rustls::MakeRustlsConnect;
7376

7477
use crate::metrics::{
7578
connection_lifetime_timer,
@@ -108,6 +111,9 @@ where
108111
}
109112
}
110113

114+
/// Stores the escaped form of a Postgres [schema]
115+
///
116+
/// [schema]: https://www.postgresql.org/docs/17/ddl-schemas.html
111117
#[derive(Clone, Debug)]
112118
pub(crate) struct SchemaName {
113119
pub(crate) escaped: String,
@@ -128,21 +134,23 @@ impl SchemaName {
128134
}
129135

130136
type StatementCache = LruCache<String, tokio_postgres::Statement>;
137+
/// A Postgres connection, owned by either the connection pool
138+
/// ([`ConvexPgPool`]), or by an active connection ([`PostgresConnection`]).
131139
struct PooledConnection {
132140
client: tokio_postgres::Client,
133141
statement_cache: Mutex<StatementCache>,
134142
last_used: Instant,
135143
}
136144

137-
async fn prepare(
138-
prepare: impl AsyncFnOnce(&str) -> Result<tokio_postgres::Statement, tokio_postgres::Error>,
145+
async fn prepare_cached(
146+
client: &impl GenericClient,
139147
cache: &Mutex<StatementCache>,
140148
statement: String,
141149
) -> anyhow::Result<tokio_postgres::Statement> {
142150
if let Some(prepared) = cache.lock().get(&statement) {
143151
return Ok(prepared.clone());
144152
}
145-
let prepared = prepare(&statement).await?;
153+
let prepared = client.prepare(&statement).await?;
146154
// N.B.: if the cache is at capacity, this will drop the oldest statement,
147155
// which will send a message on the connection asking to deallocate it
148156
cache.lock().put(statement, prepared.clone());
@@ -157,18 +165,11 @@ impl PooledConnection {
157165
last_used: Instant::now(),
158166
}
159167
}
160-
161-
async fn prepare_cached(&self, query: String) -> anyhow::Result<tokio_postgres::Statement> {
162-
let client = &self.client;
163-
prepare(
164-
async |query| client.prepare(query).await,
165-
&self.statement_cache,
166-
query,
167-
)
168-
.await
169-
}
170168
}
171169

170+
/// An active Postgres connection from a [`ConvexPgPool`].
171+
///
172+
/// Returns the underlying connection to the pool when dropped.
172173
pub(crate) struct PostgresConnection<'a> {
173174
pool: &'a ConvexPgPool,
174175
_permit: SemaphorePermit<'a>,
@@ -218,7 +219,13 @@ impl PostgresConnection<'_> {
218219
}
219220

220221
pub async fn prepare_cached(&self, query: &'static str) -> anyhow::Result<Statement> {
221-
with_timeout(self.conn().prepare_cached(self.substitute_db_name(query))).await
222+
let conn = self.conn();
223+
with_timeout(prepare_cached(
224+
&conn.client,
225+
&conn.statement_cache,
226+
self.substitute_db_name(query),
227+
))
228+
.await
222229
}
223230

224231
pub async fn query_raw<P, I>(
@@ -286,6 +293,7 @@ impl Drop for PostgresConnection<'_> {
286293
}
287294
}
288295

296+
/// Represents an active transaction on a [`PostgresConnection`].
289297
pub struct PostgresTransaction<'a> {
290298
inner: Transaction<'a>,
291299
statement_cache: &'a Mutex<StatementCache>,
@@ -298,8 +306,8 @@ impl PostgresTransaction<'_> {
298306
}
299307

300308
pub async fn prepare_cached(&self, query: &'static str) -> anyhow::Result<Statement> {
301-
with_timeout(prepare(
302-
async |query| self.inner.prepare(query).await,
309+
with_timeout(prepare_cached(
310+
&self.inner,
303311
self.statement_cache,
304312
self.substitute_db_name(query),
305313
))
@@ -340,58 +348,42 @@ impl PostgresTransaction<'_> {
340348
}
341349
}
342350

351+
/// A Postgres connection pool.
352+
///
353+
/// This struct is always used behind an `Arc`.
343354
pub struct ConvexPgPool {
344355
pg_config: tokio_postgres::Config,
345-
connector: Box<
346-
dyn for<'a> Fn(
347-
&'a tokio_postgres::Config,
348-
) -> BoxFuture<'a, anyhow::Result<tokio_postgres::Client>>
349-
+ Send
350-
+ Sync,
351-
>,
356+
tls_connect: MakeRustlsConnect,
352357
/// Limits the total number of connections that can be handed out
353358
/// simultaneously.
354359
semaphore: Semaphore,
355360
/// Idle connections, ordered by `last_used` from oldest to newest
356361
connections: Mutex<VecDeque<PooledConnection>>,
357362
stats: ConnectionPoolStats,
358-
worker: JoinHandle<()>,
363+
idle_worker: JoinHandle<()>,
359364
}
360365

361366
impl ConvexPgPool {
362-
pub(crate) fn new<T: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static>(
367+
pub(crate) fn new(
363368
pg_config: tokio_postgres::Config,
364-
connect: T,
365-
) -> Arc<Self>
366-
where
367-
T::Stream: Send,
368-
T::TlsConnect: Send,
369-
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
370-
{
369+
tls_connect: MakeRustlsConnect,
370+
) -> Arc<Self> {
371371
let max_size = *POSTGRES_MAX_CONNECTIONS;
372372
tracing::info!("Postgres connection pool max size {max_size}");
373373
// The idle worker needs a (weak) reference to the created ConvexPgPool,
374374
// but the pool also wants a reference to the worker; resolve this
375375
// cyclic situation by sneaking the weak reference through a channel.
376376
let (this_tx, this_rx) = oneshot::channel();
377-
let worker = common::runtime::tokio_spawn("postgres_idle_worker", async move {
377+
let idle_worker = common::runtime::tokio_spawn("postgres_idle_worker", async move {
378378
Self::idle_worker(this_rx.await.expect("nothing sent on this_tx?")).await
379379
});
380380
let this = Arc::new(ConvexPgPool {
381381
pg_config,
382-
connector: Box::new(move |pg_config| {
383-
let f = pg_config.connect(connect.clone());
384-
async move {
385-
let (client, conn) = f.await?;
386-
common::runtime::tokio_spawn("postgres_connection", conn);
387-
Ok(client)
388-
}
389-
.boxed()
390-
}),
382+
tls_connect,
391383
semaphore: Semaphore::new(max_size),
392384
connections: Mutex::new(VecDeque::new()),
393385
stats: new_connection_pool_stats(""),
394-
worker,
386+
idle_worker,
395387
});
396388
_ = this_tx.send(Arc::downgrade(&this));
397389
this
@@ -426,9 +418,12 @@ impl ConvexPgPool {
426418
return Ok((permit, conn));
427419
}
428420
}
429-
let client = (self.connector)(&self.pg_config)
421+
let (client, conn) = self
422+
.pg_config
423+
.connect(self.tls_connect.clone())
430424
.in_span(Span::enter_with_local_parent("postgres_connect"))
431425
.await?;
426+
common::runtime::tokio_spawn("postgres_connection", conn);
432427
anyhow::Ok((permit, PooledConnection::new(client)))
433428
})
434429
.await;
@@ -445,12 +440,13 @@ impl ConvexPgPool {
445440
})
446441
}
447442

443+
/// Drops all pooled connections and prevents the creation of new ones.
448444
pub fn shutdown(&self) {
449445
// N.B.: this doesn't abort in-progress connections, but they won't be
450446
// returned to the pool on drop
451447
self.semaphore.close();
452448
self.connections.lock().clear();
453-
self.worker.abort();
449+
self.idle_worker.abort();
454450
}
455451

456452
async fn idle_worker(this: Weak<Self>) {
@@ -480,6 +476,6 @@ impl ConvexPgPool {
480476

481477
impl Drop for ConvexPgPool {
482478
fn drop(&mut self) {
483-
self.worker.abort();
479+
self.idle_worker.abort();
484480
}
485481
}

0 commit comments

Comments
 (0)