Skip to content

Commit 8a961a2

Browse files
MantisusPijukatel
andauthored
fix: Handle the case when error_handler returns Request (#1595)
### Description - This PR fixes the behavior of `crawler` when `error_handler` returns `Request`. The old behavior resulted in the queue never reaching the empty state. ### Testing - Add new test --------- Co-authored-by: Josef Procházka <jos.prochazka@post.cz>
1 parent 2be9b9b commit 8a961a2

File tree

2 files changed

+58
-45
lines changed

2 files changed

+58
-45
lines changed

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 21 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,19 +1135,14 @@ async def _handle_request_retries(
11351135
except Exception as e:
11361136
raise UserDefinedErrorHandlerError('Exception thrown in user-defined request error handler') from e
11371137
else:
1138-
if new_request is not None:
1139-
request = new_request
1138+
if new_request is not None and new_request != request:
1139+
await request_manager.add_request(new_request)
1140+
await self._mark_request_as_handled(request)
1141+
return
11401142

11411143
await request_manager.reclaim_request(request)
11421144
else:
1143-
await wait_for(
1144-
lambda: request_manager.mark_request_as_handled(context.request),
1145-
timeout=self._internal_timeout,
1146-
timeout_message='Marking request as handled timed out after '
1147-
f'{self._internal_timeout.total_seconds()} seconds',
1148-
logger=self._logger,
1149-
max_retries=3,
1150-
)
1145+
await self._mark_request_as_handled(request)
11511146
await self._handle_failed_request(context, error)
11521147
self._statistics.record_request_processing_failure(request.unique_key)
11531148

@@ -1196,16 +1191,7 @@ async def _handle_skipped_request(
11961191
self, request: Request | str, reason: SkippedReason, *, need_mark: bool = False
11971192
) -> None:
11981193
if need_mark and isinstance(request, Request):
1199-
request_manager = await self.get_request_manager()
1200-
1201-
await wait_for(
1202-
lambda: request_manager.mark_request_as_handled(request),
1203-
timeout=self._internal_timeout,
1204-
timeout_message='Marking request as handled timed out after '
1205-
f'{self._internal_timeout.total_seconds()} seconds',
1206-
logger=self._logger,
1207-
max_retries=3,
1208-
)
1194+
await self._mark_request_as_handled(request)
12091195
request.state = RequestState.SKIPPED
12101196

12111197
url = request.url if isinstance(request, Request) else request
@@ -1417,14 +1403,8 @@ async def __run_task_function(self) -> None:
14171403
raise RequestHandlerError(e, context) from e
14181404

14191405
await self._commit_request_handler_result(context)
1420-
await wait_for(
1421-
lambda: request_manager.mark_request_as_handled(context.request),
1422-
timeout=self._internal_timeout,
1423-
timeout_message='Marking request as handled timed out after '
1424-
f'{self._internal_timeout.total_seconds()} seconds',
1425-
logger=self._logger,
1426-
max_retries=3,
1427-
)
1406+
1407+
await self._mark_request_as_handled(request)
14281408

14291409
request.state = RequestState.DONE
14301410

@@ -1467,29 +1447,15 @@ async def __run_task_function(self) -> None:
14671447
await request_manager.reclaim_request(request)
14681448
await self._statistics.error_tracker_retry.add(error=session_error, context=context)
14691449
else:
1470-
await wait_for(
1471-
lambda: request_manager.mark_request_as_handled(context.request),
1472-
timeout=self._internal_timeout,
1473-
timeout_message='Marking request as handled timed out after '
1474-
f'{self._internal_timeout.total_seconds()} seconds',
1475-
logger=self._logger,
1476-
max_retries=3,
1477-
)
1450+
await self._mark_request_as_handled(request)
14781451

14791452
await self._handle_failed_request(context, session_error)
14801453
self._statistics.record_request_processing_failure(request.unique_key)
14811454

14821455
except ContextPipelineInterruptedError as interrupted_error:
14831456
self._logger.debug('The context pipeline was interrupted', exc_info=interrupted_error)
14841457

1485-
await wait_for(
1486-
lambda: request_manager.mark_request_as_handled(context.request),
1487-
timeout=self._internal_timeout,
1488-
timeout_message='Marking request as handled timed out after '
1489-
f'{self._internal_timeout.total_seconds()} seconds',
1490-
logger=self._logger,
1491-
max_retries=3,
1492-
)
1458+
await self._mark_request_as_handled(request)
14931459

14941460
except ContextPipelineInitializationError as initialization_error:
14951461
self._logger.debug(
@@ -1663,3 +1629,14 @@ async def _crawler_state_task(self) -> None:
16631629
)
16641630

16651631
self._previous_crawler_state = current_state
1632+
1633+
async def _mark_request_as_handled(self, request: Request) -> None:
1634+
request_manager = await self.get_request_manager()
1635+
await wait_for(
1636+
lambda: request_manager.mark_request_as_handled(request),
1637+
timeout=self._internal_timeout,
1638+
timeout_message='Marking request as handled timed out after '
1639+
f'{self._internal_timeout.total_seconds()} seconds',
1640+
logger=self._logger,
1641+
max_retries=3,
1642+
)

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import pytest
1919

2020
from crawlee import ConcurrencySettings, Glob, service_locator
21-
from crawlee._request import Request
21+
from crawlee._request import Request, RequestState
2222
from crawlee._types import BasicCrawlingContext, EnqueueLinksKwargs, HttpMethod
2323
from crawlee._utils.robots import RobotsTxtFile
2424
from crawlee.configuration import Configuration
@@ -1768,3 +1768,39 @@ async def handler(_: BasicCrawlingContext) -> None:
17681768

17691769
# Wait for crawler to finish
17701770
await crawler_task
1771+
1772+
1773+
async def test_new_request_error_handler() -> None:
1774+
"""Test that error in new_request_handler is handled properly."""
1775+
queue = await RequestQueue.open()
1776+
crawler = BasicCrawler(
1777+
request_manager=queue,
1778+
)
1779+
1780+
request = Request.from_url('https://a.placeholder.com')
1781+
1782+
@crawler.router.default_handler
1783+
async def handler(context: BasicCrawlingContext) -> None:
1784+
if '|test' in context.request.unique_key:
1785+
return
1786+
raise ValueError('This error should not be handled by error handler')
1787+
1788+
@crawler.error_handler
1789+
async def error_handler(context: BasicCrawlingContext, error: Exception) -> Request | None:
1790+
return Request.from_url(
1791+
context.request.url,
1792+
unique_key=f'{context.request.unique_key}|test',
1793+
)
1794+
1795+
await crawler.run([request])
1796+
1797+
original_request = await queue.get_request(request.unique_key)
1798+
error_request = await queue.get_request(f'{request.unique_key}|test')
1799+
1800+
assert original_request is not None
1801+
assert original_request.state == RequestState.ERROR_HANDLER
1802+
assert original_request.was_already_handled
1803+
1804+
assert error_request is not None
1805+
assert error_request.state == RequestState.REQUEST_HANDLER
1806+
assert error_request.was_already_handled

0 commit comments

Comments
 (0)