Skip to content

Commit 02246f5

Browse files
committed
Pool deadlocked until timeout?
1 parent 353e821 commit 02246f5

File tree

3 files changed

+14
-10
lines changed

3 files changed

+14
-10
lines changed

src/crawlee/_autoscaling/autoscaled_pool.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,20 +237,21 @@ async def _worker_task_orchestrator(self, run: _AutoscaledPoolRun) -> None:
237237

238238
current_status = self._system_status.get_current_system_info()
239239
if not current_status.is_system_idle:
240-
logger.debug('Not scheduling new tasks - system is overloaded')
240+
logger.info('Not scheduling new tasks - system is overloaded')
241241
elif self._is_paused:
242-
logger.debug('Not scheduling new tasks - the autoscaled pool is paused')
242+
logger.info('Not scheduling new tasks - the autoscaled pool is paused')
243243
elif self.current_concurrency >= self.desired_concurrency:
244-
logger.debug('Not scheduling new tasks - already running at desired concurrency')
244+
logger.info('Not scheduling new tasks - already running at desired concurrency')
245245
elif not await self._is_task_ready_function():
246246
logger.debug('Not scheduling new task - no task is ready')
247247
else:
248-
logger.debug('Scheduling a new task')
248+
logger.info('Scheduling a new task')
249249
worker_task = asyncio.create_task(self._worker_task(), name='autoscaled pool worker task')
250250
worker_task.add_done_callback(lambda task: self._reap_worker_task(task, run))
251251
run.worker_tasks.append(worker_task)
252252

253253
if math.isfinite(self._max_tasks_per_minute):
254+
logger.info('Deadlock sleep????')
254255
await asyncio.sleep(60 / self._max_tasks_per_minute)
255256

256257
continue

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1369,7 +1369,10 @@ async def __is_task_ready_function(self) -> bool:
13691369
return False
13701370

13711371
request_manager = await self.get_request_manager()
1372-
return not await request_manager.is_empty()
1372+
is_ready = not await request_manager.is_empty()
1373+
if is_ready:
1374+
self.log.info('There is a request to process')
1375+
return is_ready
13731376

13741377
async def __run_task_function(self) -> None:
13751378
request_manager = await self.get_request_manager()

tests/unit/events/test_local_event_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ def listener() -> AsyncMock:
1717
async def async_listener(payload: Any) -> None:
1818
pass
1919

20-
al = AsyncMock()
21-
update_wrapper(al, async_listener)
22-
return al
20+
return AsyncMock(target=async_listener)
2321

2422

2523
async def test_emit_system_info_event(listener: AsyncMock) -> None:
26-
async with LocalEventManager(system_info_interval=timedelta(milliseconds=50)) as event_manager:
24+
system_info_interval = timedelta(milliseconds=50)
25+
test_tolerance_coefficient = 10
26+
async with LocalEventManager(system_info_interval=system_info_interval) as event_manager:
2727
event_manager.on(event=Event.SYSTEM_INFO, listener=listener)
28-
await asyncio.sleep(0.2)
28+
await asyncio.sleep(system_info_interval.total_seconds() * test_tolerance_coefficient)
2929

3030
assert listener.call_count >= 1
3131
assert isinstance(listener.call_args[0][0], EventSystemInfoData)

0 commit comments

Comments
 (0)