@@ -205,7 +205,6 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
205205 or request.cclass == conn_impl._cclass:
206206 request.conn_impl = conn_impl
207207 request.completed = True
208- self ._busy_conn_impls.append(conn_impl)
209208 self ._requests.remove(request)
210209 self ._condition.notify_all()
211210 break
@@ -225,20 +224,16 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
225224 list depending on whether the waiter is still waiting for the request
226225 to be satisfied!
227226 """
227+ request.completed = True
228+ request.in_progress = False
229+ request.bg_processing = False
228230 if request.conn_impl is not None :
229231 if not request.is_replacing and not request.requires_ping:
230232 self ._open_count += 1
231233 if self ._num_to_create > 0 :
232234 self ._num_to_create -= 1
233- if request.waiting:
234- self ._busy_conn_impls.append(request.conn_impl)
235- elif request.conn_impl._is_pool_extra:
236- request.conn_impl._is_pool_extra = False
237- self ._conn_impls_to_drop.append(request.conn_impl)
238- elif request.conn_impl.invoke_session_callback:
239- self ._free_new_conn_impls.append(request.conn_impl)
240- else :
241- self ._free_used_conn_impls.append(request.conn_impl)
235+ if not request.waiting:
236+ request.reject()
242237 elif request.requires_ping:
243238 self ._open_count -= 1
244239 if self ._num_to_create == 0 and self ._open_count < self .min:
@@ -286,6 +281,7 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
286281 BaseThinDbObjectTypeCache type_cache
287282 PooledConnRequest request
288283 int cache_num
284+ self ._busy_conn_impls.remove(conn_impl)
289285 if conn_impl._dbobject_type_cache_num > 0 :
290286 cache_num = conn_impl._dbobject_type_cache_num
291287 type_cache = get_dbobject_type_cache(cache_num)
@@ -316,7 +312,6 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
316312 self ._condition.notify_all()
317313 return 0
318314 self ._free_used_conn_impls.append(conn_impl)
319- self ._busy_conn_impls.remove(conn_impl)
320315 self ._check_timeout()
321316
322317 cdef int _shutdown(self ) except - 1 :
@@ -561,7 +556,6 @@ cdef class ThinPoolImpl(BaseThinPoolImpl):
561556 request.conn_impl.set_call_timeout(self ._ping_timeout)
562557 request.conn_impl.ping()
563558 request.conn_impl.set_call_timeout(0 )
564- request.completed = True
565559 except exceptions.Error:
566560 request.conn_impl._force_close()
567561 request.conn_impl = None
@@ -571,12 +565,8 @@ cdef class ThinPoolImpl(BaseThinPoolImpl):
571565 request.conn_impl._force_close()
572566 request.conn_impl = conn_impl
573567 request.conn_impl._is_pool_extra = request.is_extra
574- request.completed = True
575568 except Exception as e:
576569 request.exception = e
577- finally :
578- request.in_progress = False
579- request.bg_processing = False
580570
581571 cdef int _return_connection(self , BaseThinConnImpl conn_impl) except - 1 :
582572 """
@@ -615,10 +605,15 @@ cdef class ThinPoolImpl(BaseThinPoolImpl):
615605 with self ._condition:
616606 try :
617607 self ._condition.wait_for(request.fulfill, self ._wait_timeout)
608+ except :
609+ if not request.bg_processing:
610+ request.reject()
611+ raise
618612 finally :
619613 request.waiting = False
620614 if not request.completed:
621615 errors._raise_err(errors.ERR_POOL_NO_CONNECTION_AVAILABLE)
616+ self ._busy_conn_impls.append(request.conn_impl)
622617 return request.conn_impl
623618
624619 def close (self , bint force ):
@@ -661,11 +656,12 @@ cdef class AsyncThinPoolImpl(BaseThinPoolImpl):
661656 async with self ._condition:
662657 try :
663658 await self ._condition.wait_for(request.fulfill)
659+ except :
660+ if not request.bg_processing:
661+ request.reject()
662+ raise
664663 finally :
665664 request.waiting = False
666- if not request.completed:
667- errors._raise_err(errors.ERR_POOL_NO_CONNECTION_AVAILABLE)
668- return request.conn_impl
669665
670666 async def _bg_task_func(self ):
671667 """
@@ -757,7 +753,6 @@ cdef class AsyncThinPoolImpl(BaseThinPoolImpl):
757753 request.conn_impl.set_call_timeout(self ._ping_timeout)
758754 await request.conn_impl.ping()
759755 request.conn_impl.set_call_timeout(0 )
760- request.completed = True
761756 except exceptions.Error:
762757 request.conn_impl._force_close()
763758 request.conn_impl = None
@@ -767,12 +762,8 @@ cdef class AsyncThinPoolImpl(BaseThinPoolImpl):
767762 request.conn_impl._force_close()
768763 request.conn_impl = conn_impl
769764 request.conn_impl._is_pool_extra = request.is_extra
770- request.completed = True
771765 except Exception as e:
772766 request.exception = e
773- finally :
774- request.in_progress = False
775- request.bg_processing = False
776767
777768 async def _return_connection(self , BaseThinConnImpl conn_impl):
778769 """
@@ -810,10 +801,13 @@ cdef class AsyncThinPoolImpl(BaseThinPoolImpl):
810801 # condition variables do not have that capability directly
811802 request = self ._create_request(params)
812803 try :
813- return await asyncio.wait_for(self ._acquire_helper(request),
814- self ._wait_timeout)
804+ await asyncio.wait_for(
805+ self ._acquire_helper(request), self ._wait_timeout
806+ )
815807 except asyncio.TimeoutError:
816808 errors._raise_err(errors.ERR_POOL_NO_CONNECTION_AVAILABLE)
809+ self ._busy_conn_impls.append(request.conn_impl)
810+ return request.conn_impl
817811
818812 async def close(self , bint force):
819813 """
@@ -884,7 +878,6 @@ cdef class PooledConnRequest:
884878 if self .requires_ping:
885879 self .pool_impl._add_request(self )
886880 else :
887- self .pool_impl._busy_conn_impls.append(conn_impl)
888881 self .completed = True
889882
890883 def fulfill (self ):
@@ -970,6 +963,26 @@ cdef class PooledConnRequest:
970963 pool._add_request(self )
971964 return False
972965
966+ cdef int reject(self ) except - 1 :
967+ """
968+ Called when a request has been rejected for any reason (such as when a
969+ wait timeout has been exceeded). Any connection that is associated with
970+ the request is returned to the pool or destroyed, depending on the
971+ request.
972+ """
973+ cdef:
974+ BaseThinPoolImpl pool_impl = self .pool_impl
975+ BaseThinConnImpl conn_impl = self .conn_impl
976+ if conn_impl is not None :
977+ self .conn_impl = None
978+ if conn_impl._is_pool_extra:
979+ conn_impl._is_pool_extra = False
980+ pool_impl._conn_impls_to_drop.append(conn_impl)
981+ elif conn_impl.invoke_session_callback:
982+ pool_impl._free_new_conn_impls.append(conn_impl)
983+ else :
984+ pool_impl._free_used_conn_impls.append(conn_impl)
985+
973986
974987# keep track of which pools need to be closed and ensure that they are closed
975988# gracefully when the main thread finishes its work
0 commit comments