@@ -8,6 +8,7 @@ use super::{
88 ConnectionRequestReceiver ,
99 ConnectionRequestResult ,
1010 ConnectionRequester ,
11+ WeakConnectionRequester ,
1112 } ,
1213 establish:: ConnectionEstablisher ,
1314 manager,
@@ -110,6 +111,10 @@ pub(crate) struct ConnectionPoolWorker {
110111 /// sender ends of this receiver drop, this worker will be notified and drop too.
111112 handle_listener : WorkerHandleListener ,
112113
114+ /// Sender for connection check out requests. Does not keep the worker alive the way
115+ /// a `ConnectionRequeter` would since it doesn't hold a `WorkerHandle`.
116+ weak_requester : WeakConnectionRequester ,
117+
113118 /// Receiver for incoming connection check out requests.
114119 request_receiver : ConnectionRequestReceiver ,
115120
@@ -218,6 +223,7 @@ impl ConnectionPoolWorker {
218223 service_connection_count : HashMap :: new ( ) ,
219224 available_connections : VecDeque :: new ( ) ,
220225 max_pool_size,
226+ weak_requester : connection_requester. weak ( ) ,
221227 request_receiver,
222228 wait_queue : Default :: default ( ) ,
223229 management_receiver,
@@ -312,6 +318,12 @@ impl ConnectionPoolWorker {
312318 shutdown_ack = Some ( ack) ;
313319 break ;
314320 }
321+ BroadcastMessage :: FillPool => {
322+ crate :: runtime:: execute ( fill_pool (
323+ self . weak_requester . clone ( ) ,
324+ ack,
325+ ) ) ;
326+ }
315327 #[ cfg( test) ]
316328 BroadcastMessage :: SyncWorkers => {
317329 ack. acknowledge ( ( ) ) ;
@@ -363,30 +375,39 @@ impl ConnectionPoolWorker {
363375 }
364376
365377 fn check_out ( & mut self , request : ConnectionRequest ) {
366- // first attempt to check out an available connection
367- while let Some ( mut conn) = self . available_connections . pop_back ( ) {
368- // Close the connection if it's stale.
369- if conn. generation . is_stale ( & self . generation ) {
370- self . close_connection ( conn, ConnectionClosedReason :: Stale ) ;
371- continue ;
378+ if request. is_warm_pool ( ) {
379+ if self . total_connection_count >= self . min_pool_size . unwrap_or ( 0 ) {
380+ let _ = request. fulfill ( ConnectionRequestResult :: PoolWarmed ) ;
381+ return ;
372382 }
383+ } else {
384+ // first attempt to check out an available connection
385+ while let Some ( mut conn) = self . available_connections . pop_back ( ) {
386+ // Close the connection if it's stale.
387+ if conn. generation . is_stale ( & self . generation ) {
388+ self . close_connection ( conn, ConnectionClosedReason :: Stale ) ;
389+ continue ;
390+ }
373391
374- // Close the connection if it's idle.
375- if conn. is_idle ( self . max_idle_time ) {
376- self . close_connection ( conn, ConnectionClosedReason :: Idle ) ;
377- continue ;
378- }
392+ // Close the connection if it's idle.
393+ if conn. is_idle ( self . max_idle_time ) {
394+ self . close_connection ( conn, ConnectionClosedReason :: Idle ) ;
395+ continue ;
396+ }
379397
380- conn. mark_as_in_use ( self . manager . clone ( ) ) ;
381- if let Err ( request) = request. fulfill ( ConnectionRequestResult :: Pooled ( Box :: new ( conn) ) ) {
382- // checking out thread stopped listening, indicating it hit the WaitQueue
383- // timeout, so we put connection back into pool.
384- let mut connection = request. unwrap_pooled_connection ( ) ;
385- connection. mark_as_available ( ) ;
386- self . available_connections . push_back ( connection) ;
387- }
398+ conn. mark_as_in_use ( self . manager . clone ( ) ) ;
399+ if let Err ( request) =
400+ request. fulfill ( ConnectionRequestResult :: Pooled ( Box :: new ( conn) ) )
401+ {
402+ // checking out thread stopped listening, indicating it hit the WaitQueue
403+ // timeout, so we put connection back into pool.
404+ let mut connection = request. unwrap_pooled_connection ( ) ;
405+ connection. mark_as_available ( ) ;
406+ self . available_connections . push_back ( connection) ;
407+ }
388408
389- return ;
409+ return ;
410+ }
390411 }
391412
392413 // otherwise, attempt to create a connection.
@@ -669,6 +690,30 @@ async fn establish_connection(
669690 establish_result. map_err ( |e| e. cause )
670691}
671692
693+ async fn fill_pool (
694+ requester : WeakConnectionRequester ,
695+ ack : crate :: runtime:: AcknowledgmentSender < ( ) > ,
696+ ) {
697+ let mut establishing = vec ! [ ] ;
698+ loop {
699+ let result = requester. request_warm_pool ( ) . await ;
700+ match result {
701+ None => break ,
702+ Some ( ConnectionRequestResult :: Establishing ( handle) ) => {
703+ // Let connections finish establishing in parallel.
704+ establishing. push ( crate :: runtime:: spawn ( async move {
705+ let _ = handle. await ;
706+ // The connection is dropped here, returning it to the pool.
707+ } ) ) ;
708+ }
709+ _ => break ,
710+ } ;
711+ }
712+ // Wait for all connections to finish establishing before reporting completion.
713+ futures_util:: future:: join_all ( establishing) . await ;
714+ ack. acknowledge ( ( ) ) ;
715+ }
716+
672717/// Enum modeling the possible pool states as described in the CMAP spec.
673718///
674719/// The "closed" state is omitted here because the pool considered closed only
0 commit comments