1616from contextlib import contextmanager
1717
1818from opentelemetry import trace as otel_api_trace
19+ from opentelemetry .trace .propagation .tracecontext import TraceContextTextMapPropagator
20+ from opentelemetry import metrics as otel_api_metric
21+ from opentelemetry .baggage .propagation import W3CBaggagePropagator
22+ from opentelemetry .propagators .composite import CompositePropagator
23+ from opentelemetry .propagate import set_global_textmap
1924
2025from newrelic .api .application import application_instance , register_application
26+ from newrelic .api .web_transaction import WebTransaction
2127from newrelic .api .background_task import BackgroundTask
22- from newrelic .api .datastore_trace import DatastoreTrace
23- from newrelic .api .external_trace import ExternalTrace
28+ from newrelic .api .message_transaction import MessageTransaction
2429from newrelic .api .function_trace import FunctionTrace
30+ from newrelic .api .datastore_trace import DatastoreTrace
2531from newrelic .api .message_trace import MessageTrace
26- from newrelic .api .message_transaction import MessageTransaction
32+ from newrelic .api .external_trace import ExternalTrace
2733from newrelic .api .time_trace import current_trace , notice_error
28- from newrelic .api .transaction import Sentinel , current_transaction
29- from newrelic .api .web_transaction import WebTransaction
34+ from newrelic .api .transaction import (
35+ current_transaction ,
36+ Sentinel ,
37+ )
38+ from newrelic .common .encoding_utils import (
39+ W3CTraceState ,
40+ NrTraceState ,
41+ )
3042from newrelic .core .otlp_utils import create_resource
3143
3244# Attributes that help distinguish span types are
5769 "Flask" : "wsgi" ,
5870 "Requests" : "external" ,
5971}
72+
73+ class NRTraceContextPropagator (TraceContextTextMapPropagator ):
74+ LIST_OF_TRACEPARENT_KEYS = ("traceparent" , "HTTP_TRACEPARENT" )
75+ LIST_OF_TRACESTATE_KEYS = ("tracestate" , "HTTP_TRACESTATE" )
76+
77+ def _convert_nr_to_otel (self , tracestate ):
78+ application_settings = application_instance (activate = False ).settings
79+ vendors = W3CTraceState .decode (tracestate )
80+ trusted_account_key = application_settings .trusted_account_key or (
81+ application_settings .serverless_mode .enabled and application_settings .account_id
82+ )
83+ payload = vendors .pop (f"{ trusted_account_key } @nr" , "" )
84+
85+ otel_tracestate = W3CTraceState (NrTraceState .decode (payload , trusted_account_key )).text ()
86+ return otel_tracestate
87+
88+ def _convert_otel_to_nr (self , tracestate ):
89+ tracestate_dict = W3CTraceState .decode (tracestate )
90+ # Convert sampled, priority, and timestamp data types
91+ tracestate_dict ["sa" ] = True if tracestate_dict .get ("sa" ).upper () == "TRUE" else False
92+ tracestate_dict ["pr" ] = float (tracestate_dict .get ("pr" ))
93+ tracestate_dict ["ti" ] = int (tracestate_dict .get ("ti" ))
94+
95+ nr_tracestate = NrTraceState (tracestate_dict ).text ()
96+ return nr_tracestate
97+
98+ def extract (self , carrier , context = None , getter = None ):
99+ # We need to make sure that the carrier goes out
100+ # in OTel format. However, we want to convert this to
101+ # NR to use the `accept_distributed_trace_headers` API
102+ transaction = current_transaction ()
103+ tracestate_key = None
104+ tracestate_headers = None
105+ for key in self .LIST_OF_TRACESTATE_KEYS :
106+ if key in carrier :
107+ tracestate_key = key
108+ tracestate_headers = carrier [tracestate_key ]
109+ break
110+ # If we are passing into New Relic, traceparent and/or tracestate's keys also need to be NR compatible.
111+ if tracestate_headers :
112+ # Check to see if in NR or OTel format
113+ if "@nr=" in tracestate_headers :
114+ # NR format
115+ # Reformatting DT keys in case they are in the HTTP_* format:
116+ nr_headers = carrier .copy ()
117+ for header_type in ("traceparent" , "tracestate" , "newrelic" ):
118+ if (header_type not in nr_headers ) and (f"HTTP_{ header_type .upper ()} " in nr_headers ):
119+ nr_headers [header_type ] = nr_headers .pop (f"HTTP_{ header_type .upper ()} " )
120+ transaction .accept_distributed_trace_headers (nr_headers )
121+ # Convert NR format to OTel format for OTel extract function
122+ tracestate = self ._convert_nr_to_otel (tracestate_headers )
123+ carrier [tracestate_key ] = tracestate
124+ else :
125+ # OTel format
126+ if transaction :
127+ # Convert to NR format to use the
128+ # `accept_distributed_trace_headers` API
129+ nr_tracestate = self ._convert_otel_to_nr (tracestate_headers )
130+ nr_headers = {key : value for key , value in carrier .items ()}
131+ nr_headers .pop ("HTTP_TRACESTATE" , None )
132+ nr_headers ["tracestate" ] = nr_tracestate
133+ for header_type in ("traceparent" , "newrelic" ):
134+ if header_type not in nr_headers :
135+ nr_headers [header_type ] = nr_headers .pop (f"HTTP_{ header_type .upper ()} " , None )
136+ transaction .accept_distributed_trace_headers (nr_headers )
137+ elif ("traceparent" in carrier ) and transaction :
138+ transaction .accept_distributed_trace_headers (carrier )
139+
140+ return super ().extract (carrier = carrier , context = context , getter = getter )
141+
142+
143+ def inject (self , carrier , context = None , setter = None ):
144+ transaction = current_transaction ()
145+ # Only insert headers if we have not done so already this transaction
146+ # Distributed Trace State will have the following states:
147+ # 0 if not set
148+ # 1 if already accepted
149+ # 2 if inserted but not accepted
150+
151+ if transaction and not transaction ._distributed_trace_state :
152+ try :
153+ nr_headers = [(key , value ) for key , value in carrier .items ()]
154+ transaction .insert_distributed_trace_headers (nr_headers )
155+ # Convert back, now with new headers
156+ carrier .update (dict (nr_headers ))
157+ carrier ["tracestate" ] = self ._convert_nr_to_otel (carrier ["tracestate" ])
158+
159+ except AttributeError :
160+ # Already in list form.
161+ transaction .insert_distributed_trace_headers (carrier )
162+
163+ # If it came in list form, we likely want to keep it in that format.
164+ # Convert to dict to modify NR format of tracestate to Otel's format
165+ # and then convert back to the list of tuples.
166+ otel_headers = dict (carrier )
167+ otel_headers ["tracestate" ] = self ._convert_nr_to_otel (otel_headers ["tracestate" ])
168+
169+ # This is done instead of assigning the result of a list
170+ # comprehension to preserve the ID of the carrier in
171+ # order to allow propagation.
172+ for header in otel_headers .items ():
173+ if header not in carrier :
174+ carrier .append (header )
175+
176+ elif not transaction :
177+ # Convert carrier's tracestate to Otel format if not already
178+ # This assumes that carrier is a dict but tracestate is in NR format.
179+ if ("tracestate" in carrier ) and ("@nr=" in carrier ["tracestate" ]):
180+ # Needs to be converted to OTel before running original function
181+ carrier ["tracestate" ] = self ._convert_nr_to_otel (carrier ["tracestate" ])
182+ return super ().inject (carrier = carrier , context = context , setter = setter )
183+
184+
185+ # Context and Context Propagator Setup
186+ otel_context_propagator = CompositePropagator (
187+ propagators = [
188+ NRTraceContextPropagator (),
189+ W3CBaggagePropagator (),
190+ ]
191+ )
192+ set_global_textmap (otel_context_propagator )
193+
60194# ----------------------------------------------
61195# Custom OTel Spans and Traces
62196# ----------------------------------------------
@@ -186,6 +320,16 @@ def get_span_context(self):
186320 if not (hasattr (self , "nr_trace" ) and self .nr_trace ):
187321 return otel_api_trace .INVALID_SPAN_CONTEXT
188322
323+ if self .nr_transaction .settings .distributed_tracing .enabled :
324+ nr_tracestate_headers = (
325+ self .nr_transaction ._create_distributed_trace_data ()
326+ )
327+
328+ nr_tracestate_headers ["sa" ] = self ._is_sampled ()
329+ otel_tracestate_headers = [
330+ (key , str (value )) for key , value in nr_tracestate_headers .items ()
331+ ]
332+ else :
189333 otel_tracestate_headers = None
190334
191335 return otel_api_trace .SpanContext (
@@ -234,7 +378,9 @@ def set_status(self, status, description=None):
234378 # TODO: not implemented yet
235379 pass
236380
237- def record_exception (self , exception , attributes = None , timestamp = None , escaped = False ):
381+ def record_exception (
382+ self , exception , attributes = None , timestamp = None , escaped = False
383+ ):
238384 if not hasattr (self , "nr_trace" ):
239385 if exception :
240386 notice_error ((type (exception ), exception , exception .__traceback__ ))
@@ -322,7 +468,17 @@ def start_span(
322468 transaction = current_transaction ()
323469 self .attributes = attributes or {}
324470
471+ # If parent_span_context exists, we can create traceparent
472+ # and tracestate headers
473+ _headers = {}
474+ if parent_span_context and application_instance (activate = False ).settings .distributed_tracing .enabled :
475+ parent_span_trace_id = parent_span_context .trace_id
476+ parent_span_span_id = parent_span_context .span_id
477+ parent_span_trace_flags = parent_span_context .trace_flags
478+
479+
325480 # If remote_parent, transaction must be created, regardless of kind type
481+ # Make sure we transfer DT headers when we are here, if DT is enabled
326482 if parent_span_context and parent_span_context .is_remote :
327483 if kind in (otel_api_trace .SpanKind .SERVER , otel_api_trace .SpanKind .CLIENT ):
328484 # This is a web request
@@ -332,6 +488,11 @@ def start_span(
332488 port = self .attributes .get ("net.host.port" )
333489 request_method = self .attributes .get ("http.method" )
334490 request_path = self .attributes .get ("http.route" )
491+
492+ if not headers :
493+ headers = _headers
494+ update_sampled_flag = True
495+
335496 transaction = WebTransaction (
336497 self .nr_application ,
337498 name = name ,
@@ -342,7 +503,13 @@ def start_span(
342503 request_path = request_path ,
343504 headers = headers ,
344505 )
345- elif kind in (otel_api_trace .SpanKind .PRODUCER , otel_api_trace .SpanKind .INTERNAL ):
506+
507+ if update_sampled_flag and parent_span_context :
508+ transaction ._sampled = bool (parent_span_trace_flags )
509+ elif kind in (
510+ otel_api_trace .SpanKind .PRODUCER ,
511+ otel_api_trace .SpanKind .INTERNAL ,
512+ ):
346513 transaction = BackgroundTask (self .nr_application , name = name )
347514 elif kind == otel_api_trace .SpanKind .CONSUMER :
348515 # NOTE: NR uses MessageTransaction for Pika, RabbitMQ, Kafka
@@ -356,7 +523,7 @@ def start_span(
356523 destination_name = name ,
357524 application = self .nr_application ,
358525 transport_type = self .instrumentation_library ,
359- headers = headers ,
526+ headers = _headers ,
360527 )
361528 else :
362529 transaction = BackgroundTask (self .nr_application , name = name , group = "Celery" )
@@ -381,6 +548,10 @@ def start_span(
381548 request_method = self .attributes .get ("http.method" )
382549 request_path = self .attributes .get ("http.route" )
383550
551+ if not headers :
552+ headers = _headers
553+ update_GUID_flag = True
554+
384555 transaction = WebTransaction (
385556 self .nr_application ,
386557 name = name ,
@@ -391,6 +562,11 @@ def start_span(
391562 request_path = request_path ,
392563 headers = headers ,
393564 )
565+
566+ if update_GUID_flag and parent_span_context :
567+ guid = parent_span_trace_id >> 64
568+ transaction .guid = f"{ guid :x} "
569+
394570 transaction .__enter__ ()
395571 elif kind == otel_api_trace .SpanKind .INTERNAL :
396572 if transaction :
@@ -423,7 +599,7 @@ def start_span(
423599 destination_name = name ,
424600 application = self .nr_application ,
425601 transport_type = self .instrumentation_library ,
426- headers = headers ,
602+ headers = _headers ,
427603 )
428604 else :
429605 transaction = BackgroundTask (self .nr_application , name = name , group = "Celery" )
@@ -489,3 +665,4 @@ def get_tracer(
489665 ** kwargs ,
490666 ):
491667 return Tracer (resource = self ._resource , instrumentation_library = instrumenting_module_name )
668+
0 commit comments