Skip to content

Commit cb33db9

Browse files
sgoel-nrbonczjmergify[bot]TimPansinoumaannamalai
authored
LangChain: Fix message timestamps, add default role assignment, and Bedrock support (#1580)
* Record the request message as the time the request started for LangChain. * Tracking the original timestamp of the request for input messages that are recorded as LlmChatCompletionMessage event types. * First pass at preserving LlmChatCompletionMessage timestamp for the request with Bedrock methods. * the `kwargs` was being mapped directly to the OpenAI client and having timestamp in there caused a problem. As a quick test, only add the request timestamp after the wrapped function has been invoked. * Moved the request timestamp to its own variable instead of part of kwargs. * OpenAI async request messages were not being assigned the correct timestamp. * Trying to improve the passing of the request timestamp through for Bedrock. * Passing too many parameters. * Set a default role on input/output messages within LangChain. * [MegaLinter] Apply linters fixes * Fix request_timestamp for LlmChatCompletionSummary table * Fix request_timestamp for LlmChatCompletionSummary table * [MegaLinter] Apply linters fixes * Bedrock Converse Streaming Support (#1565) * Add more formatting to custom event validatators * Add streamed responses to converse mock server * Add streaming fixtures for testing for converse * Rename other bedrock test files * Add tests for converse streaming * Instrument converse streaming * Move GeneratorProxy adjacent functions to mixin * Fix checking of supported models * Reorganize converse error tests * Port new converse botocore tests to aiobotocore * Instrument response streaming in aiobotocore converse * Fix suggestions from code review * Port in converse changes from strands PR * Delete commented code --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> * Bedrock Converse Streaming Support (#1565) * Add more formatting to custom event validatators * Add streamed responses to converse mock server * Add streaming fixtures for testing for converse * Rename other bedrock test files * Add tests for converse streaming * Instrument converse streaming * Move GeneratorProxy adjacent functions to mixin * Fix checking of supported models * Reorganize converse error tests * Port new converse botocore tests to aiobotocore * Instrument response streaming in aiobotocore converse * Fix suggestions from code review * Port in converse changes from strands PR * Delete commented code --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> * [MegaLinter] Apply linters fixes * request_timestamp is now passed across different method * Fixed gemini model kwargs issue * [MegaLinter] Apply linters fixes * Update tests to validate presence of timestamp/ role and fix bugs in instrumentation. * Update aiobotocore instrumentation to receive request timestamp. --------- Co-authored-by: Josh Bonczkowski <jbonczkowski@newrelic.com> Co-authored-by: sgoel-nr <236423107+sgoel-nr@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Timothy Pansino <11214426+TimPansino@users.noreply.github.com> Co-authored-by: Uma Annamalai <uannamalai@newrelic.com>
1 parent 4f5ef0d commit cb33db9

19 files changed

+405
-49
lines changed

newrelic/core/custom_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def create_custom_event(event_type, params, settings=None, is_ml_event=False):
141141
)
142142
return None
143143

144-
intrinsics = {"type": name, "timestamp": int(1000.0 * time.time())}
144+
intrinsics = {"type": name, "timestamp": params.get("timestamp") or int(1000.0 * time.time())}
145145

146146
event = [intrinsics, attributes]
147147
return event

newrelic/hooks/external_aiobotocore.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
9898
response_extractor = getattr(instance, "_nr_response_extractor", None)
9999
stream_extractor = getattr(instance, "_nr_stream_extractor", None)
100100
response_streaming = getattr(instance, "_nr_response_streaming", False)
101+
request_timestamp = getattr(instance, "_nr_request_timestamp", None)
101102
is_converse = getattr(instance, "_nr_is_converse", False)
102103
ft = getattr(instance, "_nr_ft", None)
103104

@@ -125,6 +126,7 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
125126
transaction,
126127
bedrock_args,
127128
is_converse,
129+
request_timestamp,
128130
)
129131
raise
130132

@@ -187,7 +189,9 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
187189
if ft:
188190
ft.__exit__(None, None, None)
189191
bedrock_attrs["duration"] = ft.duration * 1000
190-
run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)
192+
run_bedrock_response_extractor(
193+
response_extractor, response_body, bedrock_attrs, is_embedding, transaction, request_timestamp
194+
)
191195

192196
except Exception:
193197
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)

newrelic/hooks/external_botocore.py

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818
import re
1919
import sys
20+
import time
2021
import uuid
2122
from io import BytesIO
2223

@@ -193,6 +194,7 @@ def create_chat_completion_message_event(
193194
request_id,
194195
llm_metadata_dict,
195196
response_id=None,
197+
request_timestamp=None,
196198
):
197199
if not transaction:
198200
return
@@ -227,6 +229,8 @@ def create_chat_completion_message_event(
227229

228230
if settings.ai_monitoring.record_content.enabled:
229231
chat_completion_message_dict["content"] = content
232+
if request_timestamp:
233+
chat_completion_message_dict["timestamp"] = request_timestamp
230234

231235
chat_completion_message_dict.update(llm_metadata_dict)
232236

@@ -266,6 +270,8 @@ def create_chat_completion_message_event(
266270

267271
if settings.ai_monitoring.record_content.enabled:
268272
chat_completion_message_dict["content"] = content
273+
if request_timestamp:
274+
chat_completion_message_dict["timestamp"] = request_timestamp
269275

270276
chat_completion_message_dict.update(llm_metadata_dict)
271277

@@ -542,10 +548,22 @@ def extract_bedrock_cohere_model_streaming_response(response_body, bedrock_attrs
542548

543549

544550
def handle_bedrock_exception(
545-
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction, kwargs, is_converse
551+
exc,
552+
is_embedding,
553+
model,
554+
span_id,
555+
trace_id,
556+
request_extractor,
557+
request_body,
558+
ft,
559+
transaction,
560+
kwargs,
561+
is_converse,
562+
request_timestamp=None,
546563
):
547564
try:
548565
bedrock_attrs = {"model": model, "span_id": span_id, "trace_id": trace_id}
566+
549567
if is_converse:
550568
try:
551569
input_message_list = [
@@ -589,12 +607,14 @@ def handle_bedrock_exception(
589607
if is_embedding:
590608
handle_embedding_event(transaction, error_attributes)
591609
else:
592-
handle_chat_completion_event(transaction, error_attributes)
610+
handle_chat_completion_event(transaction, error_attributes, request_timestamp)
593611
except Exception:
594612
_logger.warning(EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE, exc_info=True)
595613

596614

597-
def run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction):
615+
def run_bedrock_response_extractor(
616+
response_extractor, response_body, bedrock_attrs, is_embedding, transaction, request_timestamp=None
617+
):
598618
# Run response extractor for non-streaming responses
599619
try:
600620
response_extractor(response_body, bedrock_attrs)
@@ -604,7 +624,7 @@ def run_bedrock_response_extractor(response_extractor, response_body, bedrock_at
604624
if is_embedding:
605625
handle_embedding_event(transaction, bedrock_attrs)
606626
else:
607-
handle_chat_completion_event(transaction, bedrock_attrs)
627+
handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp)
608628

609629

610630
def run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs):
@@ -628,6 +648,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
628648
if not settings.ai_monitoring.enabled:
629649
return wrapped(*args, **kwargs)
630650

651+
request_timestamp = int(1000.0 * time.time())
652+
631653
transaction.add_ml_model_info("Bedrock", BOTOCORE_VERSION)
632654
transaction._add_agent_attribute("llm", True)
633655

@@ -683,6 +705,7 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
683705
instance._nr_ft = ft
684706
instance._nr_response_streaming = response_streaming
685707
instance._nr_settings = settings
708+
instance._nr_request_timestamp = request_timestamp
686709

687710
# Add a bedrock flag to instance so we can determine when make_api_call instrumentation is hit from non-Bedrock paths and bypass it if so
688711
instance._nr_is_bedrock = True
@@ -703,6 +726,7 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
703726
transaction,
704727
kwargs,
705728
is_converse=False,
729+
request_timestamp=request_timestamp,
706730
)
707731
raise
708732

@@ -733,6 +757,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
733757
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)
734758

735759
try:
760+
bedrock_attrs.pop("timestamp", None) # The request timestamp is only needed for request extraction
761+
736762
if response_streaming:
737763
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
738764
# This class is used in numerous other services in botocore, and would cause conflicts.
@@ -748,7 +774,14 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
748774
bedrock_attrs["duration"] = ft.duration * 1000
749775
response["body"] = StreamingBody(BytesIO(response_body), len(response_body))
750776

751-
run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)
777+
run_bedrock_response_extractor(
778+
response_extractor,
779+
response_body,
780+
bedrock_attrs,
781+
is_embedding,
782+
transaction,
783+
request_timestamp=request_timestamp,
784+
)
752785

753786
except Exception:
754787
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
@@ -770,6 +803,8 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
770803
if not settings.ai_monitoring.enabled:
771804
return wrapped(*args, **kwargs)
772805

806+
request_timestamp = int(1000.0 * time.time())
807+
773808
transaction.add_ml_model_info("Bedrock", BOTOCORE_VERSION)
774809
transaction._add_agent_attribute("llm", True)
775810

@@ -800,6 +835,7 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
800835
instance._nr_ft = ft
801836
instance._nr_response_streaming = response_streaming
802837
instance._nr_settings = settings
838+
instance._nr_request_timestamp = request_timestamp
803839
instance._nr_is_converse = True
804840

805841
# Add a bedrock flag to instance so we can determine when make_api_call instrumentation is hit from non-Bedrock paths and bypass it if so
@@ -810,7 +846,18 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
810846
response = wrapped(*args, **kwargs)
811847
except Exception as exc:
812848
handle_bedrock_exception(
813-
exc, False, model, span_id, trace_id, request_extractor, {}, ft, transaction, kwargs, is_converse=True
849+
exc,
850+
False,
851+
model,
852+
span_id,
853+
trace_id,
854+
request_extractor,
855+
{},
856+
ft,
857+
transaction,
858+
kwargs,
859+
is_converse=True,
860+
request_timestamp=request_timestamp,
814861
)
815862
raise
816863

@@ -824,6 +871,7 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
824871

825872
response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
826873
bedrock_attrs = extract_bedrock_converse_attrs(kwargs, response, response_headers, model, span_id, trace_id)
874+
bedrock_attrs["timestamp"] = request_timestamp
827875

828876
try:
829877
if response_streaming:
@@ -838,7 +886,9 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
838886

839887
ft.__exit__(None, None, None)
840888
bedrock_attrs["duration"] = ft.duration * 1000
841-
run_bedrock_response_extractor(response_extractor, {}, bedrock_attrs, False, transaction)
889+
run_bedrock_response_extractor(
890+
response_extractor, {}, bedrock_attrs, False, transaction, request_timestamp=request_timestamp
891+
)
842892

843893
except Exception:
844894
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
@@ -888,7 +938,7 @@ def extract_bedrock_converse_attrs(kwargs, response, response_headers, model, sp
888938

889939

890940
class BedrockRecordEventMixin:
891-
def record_events_on_stop_iteration(self, transaction):
941+
def record_events_on_stop_iteration(self, transaction, request_timestamp=None):
892942
if hasattr(self, "_nr_ft"):
893943
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
894944
self._nr_ft.__exit__(None, None, None)
@@ -899,14 +949,14 @@ def record_events_on_stop_iteration(self, transaction):
899949

900950
try:
901951
bedrock_attrs["duration"] = self._nr_ft.duration * 1000
902-
handle_chat_completion_event(transaction, bedrock_attrs)
952+
handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp)
903953
except Exception:
904954
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
905955

906956
# Clear cached data as this can be very large.
907957
self._nr_bedrock_attrs.clear()
908958

909-
def record_error(self, transaction, exc):
959+
def record_error(self, transaction, exc, request_timestamp=None):
910960
if hasattr(self, "_nr_ft"):
911961
try:
912962
ft = self._nr_ft
@@ -929,32 +979,32 @@ def record_error(self, transaction, exc):
929979
ft.__exit__(*sys.exc_info())
930980
error_attributes["duration"] = ft.duration * 1000
931981

932-
handle_chat_completion_event(transaction, error_attributes)
982+
handle_chat_completion_event(transaction, error_attributes, request_timestamp)
933983

934984
# Clear cached data as this can be very large.
935985
error_attributes.clear()
936986
except Exception:
937987
_logger.warning(EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE, exc_info=True)
938988

939-
def record_stream_chunk(self, event, transaction):
989+
def record_stream_chunk(self, event, transaction, request_timestamp=None):
940990
if event:
941991
try:
942992
if getattr(self, "_nr_is_converse", False):
943993
return self.converse_record_stream_chunk(event, transaction)
944994
else:
945-
return self.invoke_record_stream_chunk(event, transaction)
995+
return self.invoke_record_stream_chunk(event, transaction, request_timestamp)
946996
except Exception:
947997
_logger.warning(RESPONSE_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)
948998

949-
def invoke_record_stream_chunk(self, event, transaction):
999+
def invoke_record_stream_chunk(self, event, transaction, request_timestamp=None):
9501000
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
9511001
chunk = json.loads(event["chunk"]["bytes"].decode("utf-8"))
9521002
self._nr_model_extractor(chunk, bedrock_attrs)
9531003
# In Langchain, the bedrock iterator exits early if type is "content_block_stop".
9541004
# So we need to call the record events here since stop iteration will not be raised.
9551005
_type = chunk.get("type")
9561006
if _type == "content_block_stop":
957-
self.record_events_on_stop_iteration(transaction)
1007+
self.record_events_on_stop_iteration(transaction, request_timestamp)
9581008

9591009
def converse_record_stream_chunk(self, event, transaction):
9601010
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
@@ -984,6 +1034,7 @@ def __iter__(self):
9841034
class GeneratorProxy(BedrockRecordEventMixin, ObjectProxy):
9851035
def __init__(self, wrapped):
9861036
super().__init__(wrapped)
1037+
self._nr_request_timestamp = int(1000.0 * time.time())
9871038

9881039
def __iter__(self):
9891040
return self
@@ -996,12 +1047,12 @@ def __next__(self):
9961047
return_val = None
9971048
try:
9981049
return_val = self.__wrapped__.__next__()
999-
self.record_stream_chunk(return_val, transaction)
1050+
self.record_stream_chunk(return_val, transaction, self._nr_request_timestamp)
10001051
except StopIteration:
1001-
self.record_events_on_stop_iteration(transaction)
1052+
self.record_events_on_stop_iteration(transaction, self._nr_request_timestamp)
10021053
raise
10031054
except Exception as exc:
1004-
self.record_error(transaction, exc)
1055+
self.record_error(transaction, exc, self._nr_request_timestamp)
10051056
raise
10061057
return return_val
10071058

@@ -1020,6 +1071,10 @@ def __aiter__(self):
10201071

10211072

10221073
class AsyncGeneratorProxy(BedrockRecordEventMixin, ObjectProxy):
1074+
def __init__(self, wrapped):
1075+
super().__init__(wrapped)
1076+
self._nr_request_timestamp = int(1000.0 * time.time())
1077+
10231078
def __aiter__(self):
10241079
return self
10251080

@@ -1030,12 +1085,12 @@ async def __anext__(self):
10301085
return_val = None
10311086
try:
10321087
return_val = await self.__wrapped__.__anext__()
1033-
self.record_stream_chunk(return_val, transaction)
1088+
self.record_stream_chunk(return_val, transaction, self._nr_request_timestamp)
10341089
except StopAsyncIteration:
1035-
self.record_events_on_stop_iteration(transaction)
1090+
self.record_events_on_stop_iteration(transaction, self._nr_request_timestamp)
10361091
raise
10371092
except Exception as exc:
1038-
self.record_error(transaction, exc)
1093+
self.record_error(transaction, exc, self._nr_request_timestamp)
10391094
raise
10401095
return return_val
10411096

@@ -1084,7 +1139,7 @@ def handle_embedding_event(transaction, bedrock_attrs):
10841139
transaction.record_custom_event("LlmEmbedding", embedding_dict)
10851140

10861141

1087-
def handle_chat_completion_event(transaction, bedrock_attrs):
1142+
def handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp=None):
10881143
chat_completion_id = str(uuid.uuid4())
10891144
# Grab LLM-related custom attributes off of the transaction to store as metadata on LLM events
10901145
custom_attrs_dict = transaction._custom_params
@@ -1128,6 +1183,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
11281183
"response.number_of_messages": number_of_messages,
11291184
"response.choices.finish_reason": bedrock_attrs.get("response.choices.finish_reason", None),
11301185
"error": bedrock_attrs.get("error", None),
1186+
"timestamp": request_timestamp or None,
11311187
}
11321188
chat_completion_summary_dict.update(llm_metadata_dict)
11331189
chat_completion_summary_dict = {k: v for k, v in chat_completion_summary_dict.items() if v is not None}
@@ -1144,6 +1200,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
11441200
request_id=request_id,
11451201
llm_metadata_dict=llm_metadata_dict,
11461202
response_id=response_id,
1203+
request_timestamp=request_timestamp,
11471204
)
11481205

11491206

0 commit comments

Comments
 (0)