Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/ai/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func NewEvaluator(name string, opts *EvaluatorOptions, fn EvaluatorFunc) Evaluat
Type: "evaluator",
Subtype: "evaluator",
}
_, err := tracing.RunInNewSpan(ctx, spanMetadata, datapoint,
_, err := tracing.RunInNewSpan(ctx, spanMetadata, datapoint, nil,
func(ctx context.Context, input *Example) (*EvaluatorCallbackResponse, error) {
traceId := trace.SpanContextFromContext(ctx).TraceID().String()
spanId := trace.SpanContextFromContext(ctx).SpanID().String()
Expand Down
2 changes: 1 addition & 1 deletion go/ai/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func GenerateWithRequest(ctx context.Context, r api.Registry, opts *GenerateActi
Subtype: "util",
}

return tracing.RunInNewSpan(ctx, spanMetadata, req, func(ctx context.Context, req *ModelRequest) (*ModelResponse, error) {
return tracing.RunInNewSpan(ctx, spanMetadata, req, nil, func(ctx context.Context, req *ModelRequest) (*ModelResponse, error) {
var wrappedCb ModelStreamCallback
currentRole := RoleModel
currentIndex := messageIndex
Expand Down
12 changes: 6 additions & 6 deletions go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ func (a *ActionDef[In, Out, Stream]) Name() string { return a.desc.Name }

// Run executes the Action's function in a new trace span.
func (a *ActionDef[In, Out, Stream]) Run(ctx context.Context, input In, cb StreamCallback[Stream]) (output Out, err error) {
r, err := a.runWithTelemetry(ctx, input, cb)
r, err := a.runWithTelemetry(ctx, input, cb, nil)
if err != nil {
return base.Zero[Out](), err
}
return r.Result, nil
}

// Run executes the Action's function in a new trace span.
func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input In, cb StreamCallback[Stream]) (output api.ActionRunResult[Out], err error) {
func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input In, cb StreamCallback[Stream], telemetryCb func(traceID, spanID string)) (output api.ActionRunResult[Out], err error) {
inputBytes, _ := json.Marshal(input)
logger.FromContext(ctx).Debug("Action.Run",
"name", a.Name(),
Expand Down Expand Up @@ -215,7 +215,7 @@ func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input

var traceID string
var spanID string
o, err := tracing.RunInNewSpan(ctx, spanMetadata, input,
o, err := tracing.RunInNewSpan(ctx, spanMetadata, input, telemetryCb,
func(ctx context.Context, input In) (Out, error) {
traceInfo := tracing.SpanTraceInfo(ctx)
traceID = traceInfo.TraceID
Expand Down Expand Up @@ -253,15 +253,15 @@ func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input

// RunJSON runs the action with a JSON input, and returns a JSON result.
func (a *ActionDef[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (json.RawMessage, error) {
r, err := a.RunJSONWithTelemetry(ctx, input, cb)
r, err := a.RunJSONWithTelemetry(ctx, input, cb, nil)
if err != nil {
return nil, err
}
return r.Result, nil
}

// RunJSON runs the action with a JSON input, and returns a JSON result along with telemetry info.
func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error) {
func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage], telemetryCb api.TelemetryCallback) (*api.ActionRunResult[json.RawMessage], error) {
i, err := base.UnmarshalAndNormalize[In](input, a.desc.InputSchema)
if err != nil {
return nil, NewError(INVALID_ARGUMENT, err.Error())
Expand All @@ -278,7 +278,7 @@ func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, i
}
}

r, err := a.runWithTelemetry(ctx, i, scb)
r, err := a.runWithTelemetry(ctx, i, scb, telemetryCb)
if err != nil {
return &api.ActionRunResult[json.RawMessage]{
TraceId: r.TraceId,
Expand Down
7 changes: 6 additions & 1 deletion go/core/api/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type ActionRunResult[T any] struct {
SpanId string
}

// TelemetryCallback is called when telemetry information becomes available.
// It receives the trace ID and span ID as soon as the span is created.
type TelemetryCallback func(traceID, spanID string)

// Action is the interface that all Genkit primitives (e.g. flows, models, tools) have in common.
type Action interface {
Registerable
Expand All @@ -35,7 +39,8 @@ type Action interface {
// RunJSON runs the action with the given JSON input and streaming callback and returns the output as JSON.
RunJSON(ctx context.Context, input json.RawMessage, cb func(context.Context, json.RawMessage) error) (json.RawMessage, error)
// RunJSONWithTelemetry runs the action with the given JSON input and streaming callback and returns the output as JSON along with telemetry info.
RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb func(context.Context, json.RawMessage) error) (*ActionRunResult[json.RawMessage], error)
// The telemetryCb callback, if provided, is called as soon as the trace span is created with the trace ID and span ID.
RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb func(context.Context, json.RawMessage) error, telemetryCb TelemetryCallback) (*ActionRunResult[json.RawMessage], error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is only used for reflection API? Can we pull that from the context instead so that this isn't a breaking change?

// Desc returns a descriptor of the action.
Desc() ActionDesc
}
Expand Down
6 changes: 3 additions & 3 deletions go/core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Run[Out any](ctx context.Context, name string, fn func() (Out, error)) (Out
Type: "flowStep",
Subtype: "flowStep",
}
return tracing.RunInNewSpan(ctx, spanMetadata, nil, func(ctx context.Context, _ any) (Out, error) {
return tracing.RunInNewSpan(ctx, spanMetadata, nil, nil, func(ctx context.Context, _ any) (Out, error) {
o, err := fn()
if err != nil {
return base.Zero[Out](), err
Expand All @@ -113,8 +113,8 @@ func (f *Flow[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessa
}

// RunJSON runs the flow with JSON input and streaming callback and returns the output as JSON.
func (f *Flow[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error) {
return (*ActionDef[In, Out, Stream])(f).RunJSONWithTelemetry(ctx, input, cb)
func (f *Flow[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage], telemetryCb api.TelemetryCallback) (*api.ActionRunResult[json.RawMessage], error) {
return (*ActionDef[In, Out, Stream])(f).RunJSONWithTelemetry(ctx, input, cb, telemetryCb)
}

// Desc returns the descriptor of the flow.
Expand Down
3 changes: 3 additions & 0 deletions go/core/schemas.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ SpanStatus omit
TimeEvent omit
TimeEventAnnotation omit
TraceData omit
SpanStartEvent omit
SpanEndEvent omit
TraceEvent omit

GenerationCommonConfig.maxOutputTokens type int
GenerationCommonConfig.topK type int
Expand Down
8 changes: 8 additions & 0 deletions go/core/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ type SpanMetadata struct {

// RunInNewSpan runs f on input in a new span with the provided metadata.
// The metadata contains all span configuration including name, type, labels, etc.
// If telemetryCb is provided, it will be called with the trace ID and span ID as soon as the span is created.
func RunInNewSpan[I, O any](
ctx context.Context,
metadata *SpanMetadata,
input I,
telemetryCb func(traceID, spanID string),
f func(context.Context, I) (O, error),
) (O, error) {
// TODO: support span links.
Expand Down Expand Up @@ -239,6 +241,12 @@ func RunInNewSpan[I, O any](
TraceID: span.SpanContext().TraceID().String(),
SpanID: span.SpanContext().SpanID().String(),
}

// Fire telemetry callback immediately if provided
if telemetryCb != nil {
telemetryCb(sm.TraceInfo.TraceID, sm.TraceInfo.SpanID)
}

defer span.End()
defer func() { span.SetAttributes(sm.attributes()...) }()
ctx = spanMetaKey.NewContext(ctx, sm)
Expand Down
18 changes: 9 additions & 9 deletions go/core/tracing/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestRunInNewSpanWithMetadata(t *testing.T) {
ctx := context.Background()
input := "test input"

output, err := RunInNewSpan(ctx, tc.metadata, input,
output, err := RunInNewSpan(ctx, tc.metadata, input, nil,
func(ctx context.Context, input string) (string, error) {
// Verify that span metadata is available in context
sm := spanMetaKey.FromContext(ctx)
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestRunInNewSpanWithTypeConvenience(t *testing.T) {
Subtype: "tool",
}

output, err := RunInNewSpan(ctx, metadata, "input",
output, err := RunInNewSpan(ctx, metadata, "input", nil,
func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
Expand Down Expand Up @@ -390,10 +390,10 @@ func TestNestedSpanPaths(t *testing.T) {
ctx := context.Background()

// Test nested spans to verify path building
_, err := RunInNewSpan(ctx, &SpanMetadata{Name: "chatFlow", IsRoot: true, Type: "action", Subtype: "flow"}, "input",
_, err := RunInNewSpan(ctx, &SpanMetadata{Name: "chatFlow", IsRoot: true, Type: "action", Subtype: "flow"}, "input", nil,
func(ctx context.Context, input string) (string, error) {
// Nested action span
return RunInNewSpan(ctx, &SpanMetadata{Name: "myTool", IsRoot: false, Type: "action", Subtype: "tool"}, input,
return RunInNewSpan(ctx, &SpanMetadata{Name: "myTool", IsRoot: false, Type: "action", Subtype: "tool"}, input, nil,
func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestIsFailureSourceOnError(t *testing.T) {
_, err := RunInNewSpan(ctx, &SpanMetadata{
Name: "failing-action",
Type: "action",
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
return "", testErr
})

Expand All @@ -446,7 +446,7 @@ func TestRootSpanAutoDetection(t *testing.T) {
Type: "action",
Subtype: "flow",
IsRoot: false, // Even when explicitly set to false, should be overridden
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand All @@ -469,7 +469,7 @@ func TestRootSpanAutoDetection(t *testing.T) {
Name: "explicitRootFlow",
Type: "action",
IsRoot: true, // Explicitly set to true
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand All @@ -492,13 +492,13 @@ func TestRootSpanAutoDetection(t *testing.T) {
Name: "parentFlow",
Type: "action",
IsRoot: true,
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
// This is a nested span - should NOT be root
_, err := RunInNewSpan(ctx, &SpanMetadata{
Name: "childAction",
Type: "action",
IsRoot: false,
}, input, func(ctx context.Context, input string) (string, error) {
}, input, nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand Down
Loading
Loading