Skip to content

Commit f59f52c

Browse files
lrafeeimergify[bot]TimPansino
authored
Asyncio loop_factory fix (#1576)
* Runner instrumentation in asyncio * Clean up asyncio instrumentation * Add asyncio tests for loop_factory * Modify uvicorn test for loop_factory * Fix linter errors * [MegaLinter] Apply linters fixes * Apply suggestions from code review --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Tim Pansino <timpansino@gmail.com>
1 parent 32215b9 commit f59f52c

File tree

5 files changed

+176
-24
lines changed

5 files changed

+176
-24
lines changed

newrelic/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2084,6 +2084,10 @@ def _process_module_builtin_defaults():
20842084
"asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events"
20852085
)
20862086

2087+
_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")
2088+
2089+
_process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners")
2090+
20872091
_process_module_definition(
20882092
"langchain_core.runnables.base",
20892093
"newrelic.hooks.mlmodel_langchain",
@@ -2671,8 +2675,6 @@ def _process_module_builtin_defaults():
26712675
"langchain_core.callbacks.manager", "newrelic.hooks.mlmodel_langchain", "instrument_langchain_callbacks_manager"
26722676
)
26732677

2674-
_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")
2675-
26762678
_process_module_definition("asgiref.sync", "newrelic.hooks.adapter_asgiref", "instrument_asgiref_sync")
26772679

26782680
_process_module_definition(

newrelic/hooks/coroutines_asyncio.py

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,73 @@
1616
from newrelic.core.trace_cache import trace_cache
1717

1818

19-
def remove_from_cache(task):
19+
def remove_from_cache_callback(task):
2020
cache = trace_cache()
2121
cache.task_stop(task)
2222

2323

24-
def propagate_task_context(task):
24+
def wrap_create_task(task):
2525
trace_cache().task_start(task)
26-
task.add_done_callback(remove_from_cache)
26+
task.add_done_callback(remove_from_cache_callback)
2727
return task
2828

2929

30-
def _bind_loop(loop, *args, **kwargs):
30+
def _instrument_event_loop(loop):
31+
if loop and hasattr(loop, "create_task") and not hasattr(loop.create_task, "__wrapped__"):
32+
wrap_out_function(loop, "create_task", wrap_create_task)
33+
34+
35+
def _bind_set_event_loop(loop, *args, **kwargs):
3136
return loop
3237

3338

34-
def wrap_create_task(wrapped, instance, args, kwargs):
35-
loop = _bind_loop(*args, **kwargs)
39+
def wrap_set_event_loop(wrapped, instance, args, kwargs):
40+
loop = _bind_set_event_loop(*args, **kwargs)
3641

37-
if loop and not hasattr(loop.create_task, "__wrapped__"):
38-
wrap_out_function(loop, "create_task", propagate_task_context)
42+
_instrument_event_loop(loop)
3943

4044
return wrapped(*args, **kwargs)
4145

4246

47+
def wrap__lazy_init(wrapped, instance, args, kwargs):
48+
result = wrapped(*args, **kwargs)
49+
# This logic can be used for uvloop, but should
50+
# work for any valid custom loop factory.
51+
52+
# A custom loop_factory will be used to create
53+
# a new event loop instance. It will then run
54+
# the main() coroutine on this event loop. Once
55+
# this coroutine is complete, the event loop will
56+
# be stopped and closed.
57+
58+
# The new loop that is created and set as the
59+
# running loop of the duration of the run() call.
60+
# When the coroutine starts, it runs in the context
61+
# that was active when run() was called. Any tasks
62+
# created within this coroutine on this new event
63+
# loop will inherit that context.
64+
65+
# Note: The loop created by loop_factory is never
66+
# set as the global current loop for the thread,
67+
# even while it is running.
68+
loop = instance._loop
69+
_instrument_event_loop(loop)
70+
71+
return result
72+
73+
4374
def instrument_asyncio_base_events(module):
44-
wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context)
75+
wrap_out_function(module, "BaseEventLoop.create_task", wrap_create_task)
4576

4677

4778
def instrument_asyncio_events(module):
4879
if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14
49-
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
50-
else: # Python <= 3.13
51-
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
80+
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)
81+
elif hasattr(module, "BaseDefaultEventLoopPolicy"): # Python <= 3.13
82+
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)
83+
84+
85+
# For Python >= 3.11
86+
def instrument_asyncio_runners(module):
87+
if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"):
88+
wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init)

tests/adapter_uvicorn/test_uvicorn.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ def app(request):
5656
return request.param
5757

5858

59-
@pytest.fixture
60-
def port(app):
59+
@pytest.fixture(params=["asyncio", "uvloop", "none"], ids=["asyncio", "uvloop", "none"])
60+
def port(app, request):
6161
port = get_open_port()
6262

6363
loops = []
@@ -72,7 +72,7 @@ def on_tick_sync():
7272
async def on_tick():
7373
on_tick_sync()
7474

75-
config = Config(app, host="127.0.0.1", port=port, loop="asyncio")
75+
config = Config(app, host="127.0.0.1", port=port, loop=request.param)
7676
config.callback_notify = on_tick
7777
config.log_config = {"version": 1}
7878
config.disable_lifespan = True

tests/coroutines_asyncio/test_context_propagation.py

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,31 @@
3636
import uvloop
3737

3838
loop_policies = (pytest.param(None, id="asyncio"), pytest.param(uvloop.EventLoopPolicy(), id="uvloop"))
39+
uvloop_factory = (pytest.param(uvloop.new_event_loop, id="uvloop"), pytest.param(None, id="None"))
3940
except ImportError:
4041
loop_policies = (pytest.param(None, id="asyncio"),)
42+
uvloop_factory = (pytest.param(None, id="None"),)
43+
44+
45+
def loop_factories():
46+
import asyncio
47+
48+
if sys.platform == "win32":
49+
return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"), *uvloop_factory)
50+
else:
51+
return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"), *uvloop_factory)
4152

4253

4354
@pytest.fixture(autouse=True)
4455
def reset_event_loop():
45-
from asyncio import set_event_loop, set_event_loop_policy
56+
try:
57+
from asyncio import set_event_loop, set_event_loop_policy
58+
59+
# Remove the loop policy to avoid side effects
60+
set_event_loop_policy(None)
61+
except ImportError:
62+
from asyncio import set_event_loop
4663

47-
# Remove the loop policy to avoid side effects
48-
set_event_loop_policy(None)
4964
set_event_loop(None)
5065

5166

@@ -102,6 +117,7 @@ async def _test(asyncio, schedule, nr_enabled=True):
102117
return trace
103118

104119

120+
@pytest.mark.skipif(sys.version_info >= (3, 16), reason="loop_policy is not available")
105121
@pytest.mark.parametrize("loop_policy", loop_policies)
106122
@pytest.mark.parametrize("schedule", ("create_task", "ensure_future"))
107123
@validate_transaction_metrics(
@@ -166,10 +182,12 @@ def handle_exception(loop, context):
166182
memcache_trace("cmd"),
167183
],
168184
)
169-
def test_two_transactions(event_loop, trace):
185+
def test_two_transactions_with_global_event_loop(event_loop, trace):
170186
"""
171187
Instantiate a coroutine in one transaction and await it in
172188
another. This should not cause any errors.
189+
This uses the global event loop policy, which has been deprecated
190+
since Python 3.11 and is scheduled for removal in Python 3.16.
173191
"""
174192
import asyncio
175193

@@ -211,6 +229,99 @@ async def await_task():
211229
event_loop.run_until_complete(asyncio.gather(afut, bfut))
212230

213231

232+
@pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.Runner is not available")
233+
@validate_transaction_metrics("await_task", background_task=True)
234+
@validate_transaction_metrics("create_coro", background_task=True, index=-2)
235+
@pytest.mark.parametrize("loop_factory", loop_factories())
236+
@pytest.mark.parametrize(
237+
"trace",
238+
[
239+
function_trace(name="simple_gen"),
240+
external_trace(library="lib", url="http://foo.com"),
241+
database_trace("select * from foo"),
242+
datastore_trace("lib", "foo", "bar"),
243+
message_trace("lib", "op", "typ", "name"),
244+
memcache_trace("cmd"),
245+
],
246+
)
247+
def test_two_transactions_with_loop_factory(trace, loop_factory):
248+
"""
249+
Instantiate a coroutine in one transaction and await it in
250+
another. This should not cause any errors.
251+
Starting in Python 3.11, the asyncio.Runner class was added
252+
as well as the loop_factory parameter. The loop_factory
253+
parameter provides a replacement for loop policies (which
254+
are scheduled for removal in Python 3.16).
255+
"""
256+
import asyncio
257+
258+
@trace
259+
async def task():
260+
pass
261+
262+
@background_task(name="create_coro")
263+
async def create_coro():
264+
return asyncio.create_task(task())
265+
266+
@background_task(name="await_task")
267+
async def await_task(task_to_await):
268+
return await task_to_await
269+
270+
async def _main():
271+
_task = await create_coro()
272+
return await await_task(_task)
273+
274+
with asyncio.Runner(loop_factory=loop_factory) as runner:
275+
runner.run(_main())
276+
277+
278+
@pytest.mark.skipif(sys.version_info < (3, 11), reason="loop_factory/asyncio.Runner is not available")
279+
@pytest.mark.parametrize("loop_factory", loop_factories())
280+
@validate_transaction_metrics(
281+
"test_context_propagation:test_context_propagation_with_loop_factory",
282+
background_task=True,
283+
scoped_metrics=(("Function/waiter2", 2), ("Function/waiter3", 2)),
284+
)
285+
@background_task()
286+
def test_context_propagation_with_loop_factory(loop_factory):
287+
import asyncio
288+
289+
exceptions = []
290+
291+
def handle_exception(loop, context):
292+
exceptions.append(context)
293+
294+
# Call default handler for standard logging
295+
loop.default_exception_handler(context)
296+
297+
async def subtask():
298+
with FunctionTrace(name="waiter2", terminal=True):
299+
pass
300+
301+
await child()
302+
303+
async def _task(trace):
304+
assert current_trace() == trace
305+
306+
await subtask()
307+
308+
trace = current_trace()
309+
310+
with asyncio.Runner(loop_factory=loop_factory) as runner:
311+
assert trace == current_trace()
312+
runner._loop.set_exception_handler(handle_exception)
313+
runner.run(_task(trace))
314+
runner.run(_task(trace))
315+
316+
# The agent should have removed all traces from the cache since
317+
# run_until_complete has terminated (all callbacks scheduled inside the
318+
# task have run)
319+
assert len(trace_cache()) == 1 # Sentinel is all that remains
320+
321+
# # Assert that no exceptions have occurred
322+
assert not exceptions, exceptions
323+
324+
214325
# Sentinel left in cache transaction exited
215326
async def sentinel_in_cache_txn_exited(asyncio, bg):
216327
event = asyncio.Event()

tox.ini

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ envlist =
116116
python-adapter_hypercorn-{py310,py311,py312,py313,py314}-hypercornlatest,
117117
python-adapter_hypercorn-{py38,py39}-hypercorn{0010,0011,0012,0013},
118118
python-adapter_mcp-{py310,py311,py312,py313,py314},
119-
python-adapter_uvicorn-{py38,py39,py310,py311,py312,py313,py314}-uvicornlatest,
120-
python-adapter_uvicorn-py38-uvicorn014,
119+
python-adapter_uvicorn-{py39,py310,py311,py312,py313,py314}-uvicornlatest,
120+
python-adapter_uvicorn-py38-uvicorn020,
121121
python-adapter_waitress-{py38,py39,py310,py311,py312,py313,py314}-waitresslatest,
122122
python-application_celery-{py38,py39,py310,py311,py312,py313,py314,pypy311}-celerylatest,
123123
python-application_celery-py311-celery{0504,0503,0502},
@@ -239,9 +239,11 @@ deps =
239239
adapter_hypercorn-hypercorn0010: hypercorn[h3]<0.11
240240
adapter_hypercorn: niquests
241241
adapter_mcp: fastmcp
242-
adapter_uvicorn-uvicorn014: uvicorn<0.15
242+
adapter_uvicorn-uvicorn020: uvicorn<0.21
243+
adapter_uvicorn-uvicorn020: uvloop<0.20
243244
adapter_uvicorn-uvicornlatest: uvicorn
244245
adapter_uvicorn: typing-extensions
246+
adapter_uvicorn: uvloop
245247
adapter_waitress: WSGIProxy2
246248
adapter_waitress-waitresslatest: waitress
247249
agent_features: beautifulsoup4

0 commit comments

Comments
 (0)