diff --git a/CHANGES.md b/CHANGES.md index 10524a60..08257502 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Release Notes. * Add recover to goroutine to prevent unexpected panics. * Add mutex to fix some data race. * Replace external `goapi` dependency with in-repo generated protocols. +* Support pprof profiling. #### Plugins #### Documentation diff --git a/agent/core/compile.go b/agent/core/compile.go index c511ac02..baefdb10 100644 --- a/agent/core/compile.go +++ b/agent/core/compile.go @@ -19,17 +19,21 @@ package core import ( //go:nolint + _ "bytes" _ "encoding/base64" _ "fmt" + _ "io" _ "log" _ "math" _ "math/rand" _ "net" _ "os" + _ "path/filepath" _ "reflect" _ "runtime" _ "runtime/debug" _ "runtime/metrics" + _ "runtime/pprof" _ "sort" _ "strconv" _ "strings" @@ -55,4 +59,5 @@ import ( _ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3" _ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3" _ "github.com/apache/skywalking-go/protocols/collect/logging/v3" + _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10" ) diff --git a/agent/reporter/imports.go b/agent/reporter/imports.go index e1c6fd69..90c7e8a3 100644 --- a/agent/reporter/imports.go +++ b/agent/reporter/imports.go @@ -71,5 +71,6 @@ import ( _ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3" _ "github.com/apache/skywalking-go/protocols/collect/logging/v3" _ "github.com/apache/skywalking-go/protocols/collect/management/v3" + _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10" _ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3" ) diff --git a/plugins/core/pprof.go b/plugins/core/pprof.go new file mode 100644 index 00000000..8c836898 --- /dev/null +++ b/plugins/core/pprof.go @@ -0,0 +1,246 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "bytes" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" +) + +const ( + // Pprof event types + PprofEventsTypeCPU = "cpu" + PprofEventsTypeHeap = "heap" + PprofEventsTypeAllocs = "allocs" + PprofEventsTypeBlock = "block" + PprofEventsTypeMutex = "mutex" + PprofEventsTypeThread = "threadcreate" + PprofEventsTypeGoroutine = "goroutine" +) + +// CPU profiling state to ensure only one CPU profiling task runs at a time +var profilingIsRunning atomic.Bool + +func init() { + reporter.NewPprofTaskCommand = NewPprofTaskCommand +} + +type PprofTaskCommandImpl struct { + // Pprof Task ID + taskID string + // Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs) + events string + // Unit is minute, required for CPU, Block and Mutex events + duration time.Duration + // Unix timestamp in milliseconds when the task was created + createTime int64 + // Define the period of the pprof dump, required for Block and Mutex events + dumpPeriod int + + // for pprof task service + pprofFilePath string + logger operator.LogOperator + manager reporter.PprofReporter +} + +func NewPprofTaskCommand(taskID, events string, duration time.Duration, + createTime int64, dumpPeriod int, pprofFilePath string, + logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand { + return &PprofTaskCommandImpl{ + taskID: taskID, + events: events, + duration: duration, + createTime: createTime, + dumpPeriod: dumpPeriod, + pprofFilePath: pprofFilePath, + logger: logger, + manager: manager, + } +} + +func (c *PprofTaskCommandImpl) GetTaskID() string { + return c.taskID +} + +func (c *PprofTaskCommandImpl) GetCreateTime() int64 { + return c.createTime +} + +func (c *PprofTaskCommandImpl) GetDuration() time.Duration { + return c.duration +} + +func (c *PprofTaskCommandImpl) GetDumpPeriod() int { + return c.dumpPeriod +} + +func (c *PprofTaskCommandImpl) IsInvalidEvent() bool { + return !(c.events == PprofEventsTypeHeap || + c.events == PprofEventsTypeAllocs || + c.events == PprofEventsTypeGoroutine || + c.events == PprofEventsTypeThread || + c.events == PprofEventsTypeCPU || + c.events == PprofEventsTypeBlock || + c.events == PprofEventsTypeMutex) +} + +func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool { + return c.events == PprofEventsTypeHeap || + c.events == PprofEventsTypeAllocs || + c.events == PprofEventsTypeGoroutine || + c.events == PprofEventsTypeThread +} + +func (c *PprofTaskCommandImpl) HasDumpPeriod() bool { + return c.events == PprofEventsTypeBlock || + c.events == PprofEventsTypeMutex +} + +func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) { + if file, ok := writer.(*os.File); ok { + if err := file.Close(); err != nil { + c.logger.Errorf("failed to close pprof file: %v", err) + } + } +} + +func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) { + // sample data to buffer + if c.pprofFilePath == "" { + return &bytes.Buffer{}, nil + } + + // sample data to file + pprofFileName := filepath.Join(c.taskID, ".pprof") + pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName) + if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil { + return nil, err + } + + writer, err := os.Create(pprofFilePath) + if err != nil { + return nil, err + } + + return writer, nil +} + +func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) { + c.logger.Infof("start pprof task %s", c.taskID) + // For CPU profiling, check global state first + if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) { + return nil, fmt.Errorf("CPU profiling is already running") + } + + writer, err := c.getWriter() + if err != nil { + if c.events == PprofEventsTypeCPU { + profilingIsRunning.Store(false) + } + return nil, err + } + + switch c.events { + case PprofEventsTypeCPU: + if err = pprof.StartCPUProfile(writer); err != nil { + profilingIsRunning.Store(false) + if c.pprofFilePath != "" { + c.closeFileWriter(writer) + } + return nil, err + } + case PprofEventsTypeBlock: + runtime.SetBlockProfileRate(c.dumpPeriod) + case PprofEventsTypeMutex: + runtime.SetMutexProfileFraction(c.dumpPeriod) + } + + return writer, nil +} + +func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) { + c.logger.Infof("stop pprof task %s", c.taskID) + switch c.events { + case PprofEventsTypeCPU: + pprof.StopCPUProfile() + profilingIsRunning.Store(false) + case PprofEventsTypeBlock: + if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Block profile error %v", err) + } + runtime.SetBlockProfileRate(0) + case PprofEventsTypeMutex: + if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Mutex profile error %v", err) + } + runtime.SetMutexProfileFraction(0) + case PprofEventsTypeHeap: + if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Heap profile error %v", err) + } + case PprofEventsTypeAllocs: + if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Alloc profile error %v", err) + } + case PprofEventsTypeGoroutine: + if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Goroutine profile error %v", err) + } + case PprofEventsTypeThread: + if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil { + c.logger.Errorf("write Thread profile error %v", err) + } + } + + if c.pprofFilePath != "" { + c.closeFileWriter(writer) + } + c.readPprofData(c.taskID, writer) +} + +func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) { + var data []byte + if c.pprofFilePath == "" { + if buf, ok := writer.(*bytes.Buffer); ok { + data = buf.Bytes() + } + } else { + if file, ok := writer.(*os.File); ok { + filePath := file.Name() + fileData, err := os.ReadFile(filePath) + if err != nil { + c.logger.Errorf("failed to read pprof file %s: %v", filePath, err) + } + data = fileData + if err := os.Remove(filePath); err != nil { + c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err) + } + } + } + c.manager.ReportPprof(taskID, data) +} diff --git a/plugins/core/reporter/grpc/grpc.go b/plugins/core/reporter/grpc/grpc.go index 6734ea7c..9d2ba5d3 100644 --- a/plugins/core/reporter/grpc/grpc.go +++ b/plugins/core/reporter/grpc/grpc.go @@ -42,17 +42,19 @@ func NewGRPCReporter(logger operator.LogOperator, checkInterval time.Duration, connManager *reporter.ConnectionManager, cdsManager *reporter.CDSManager, + pprofTaskManager *reporter.PprofTaskManager, opts ...ReporterOption, ) (reporter.Reporter, error) { r := &gRPCReporter{ - logger: logger, - serverAddr: serverAddr, - tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize), - metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize), - logSendCh: make(chan *logv3.LogData, maxSendQueueSize), - checkInterval: checkInterval, - connManager: connManager, - cdsManager: cdsManager, + logger: logger, + serverAddr: serverAddr, + tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize), + metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize), + logSendCh: make(chan *logv3.LogData, maxSendQueueSize), + checkInterval: checkInterval, + connManager: connManager, + cdsManager: cdsManager, + pprofTaskManager: pprofTaskManager, } for _, o := range opts { o(r) @@ -83,10 +85,11 @@ type gRPCReporter struct { checkInterval time.Duration // bootFlag is set if Boot be executed - bootFlag bool - transform *reporter.Transform - connManager *reporter.ConnectionManager - cdsManager *reporter.CDSManager + bootFlag bool + transform *reporter.Transform + connManager *reporter.ConnectionManager + cdsManager *reporter.CDSManager + pprofTaskManager *reporter.PprofTaskManager } func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) { @@ -95,6 +98,7 @@ func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.Agen r.initSendPipeline() r.check() r.cdsManager.InitCDS(entity, cdsWatchers) + r.pprofTaskManager.InitPprofTask(entity) r.bootFlag = true } diff --git a/plugins/core/reporter/pprof_manager.go b/plugins/core/reporter/pprof_manager.go new file mode 100644 index 00000000..e3d5d9d2 --- /dev/null +++ b/plugins/core/reporter/pprof_manager.go @@ -0,0 +1,349 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package reporter + +import ( + "context" + "fmt" + "io" + "strconv" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3" + pprofv10 "github.com/apache/skywalking-go/protocols/collect/pprof/v10" +) + +const ( + // max chunk size for pprof data + maxChunkSize = 1 * 1024 * 1024 + // max send queue size for pprof data + maxPprofSendQueueSize = 30000 + // max duration for pprof task + pprofTaskDurationMaxMinute = 15 * time.Minute +) + +type PprofTaskCommand interface { + GetTaskID() string + GetCreateTime() int64 + GetDuration() time.Duration + GetDumpPeriod() int + StartTask() (io.Writer, error) + StopTask(io.Writer) + IsDirectSamplingType() bool + IsInvalidEvent() bool + HasDumpPeriod() bool +} +type PprofReporter interface { + ReportPprof(taskID string, content []byte) +} + +var NewPprofTaskCommand func(taskID, events string, duration time.Duration, + createTime int64, dumpPeriod int, pprofFilePath string, + logger operator.LogOperator, manager PprofReporter) PprofTaskCommand + +type PprofTaskManager struct { + logger operator.LogOperator + serverAddr string + pprofInterval time.Duration + PprofClient pprofv10.PprofTaskClient // for grpc + connManager *ConnectionManager + entity *Entity + pprofFilePath string + LastUpdateTime int64 + commands PprofTaskCommand + pprofSendCh chan *pprofv10.PprofData +} + +func NewPprofTaskManager(logger operator.LogOperator, serverAddr string, + pprofInterval time.Duration, connManager *ConnectionManager, + pprofFilePath string) (*PprofTaskManager, error) { + if pprofInterval <= 0 { + logger.Errorf("pprof interval less than zero, pprof profiling is disabled") + return nil, fmt.Errorf("pprof interval less than zero, pprof profiling is disabled") + } + pprofManager := &PprofTaskManager{ + logger: logger, + serverAddr: serverAddr, + pprofInterval: pprofInterval, + connManager: connManager, + pprofFilePath: pprofFilePath, + pprofSendCh: make(chan *pprofv10.PprofData, maxPprofSendQueueSize), + } + conn, err := connManager.GetConnection(serverAddr) + if err != nil { + return nil, err + } + pprofManager.PprofClient = pprofv10.NewPprofTaskClient(conn) + pprofManager.commands = nil + return pprofManager, nil +} + +func (r *PprofTaskManager) InitPprofTask(entity *Entity) { + r.entity = entity + r.initPprofSendPipeline() + go func() { + for { + switch r.connManager.GetConnectionStatus(r.serverAddr) { + case ConnectionStatusShutdown: + return + case ConnectionStatusDisconnect: + time.Sleep(r.pprofInterval) + continue + } + pprofCommand, err := r.PprofClient.GetPprofTaskCommands(context.Background(), &pprofv10.PprofTaskCommandQuery{ + Service: r.entity.ServiceName, + ServiceInstance: r.entity.ServiceInstanceName, + LastCommandTime: r.LastUpdateTime, + }) + if err != nil { + r.logger.Errorf("fetch pprof task commands error %v", err) + time.Sleep(r.pprofInterval) + continue + } + + if len(pprofCommand.GetCommands()) > 0 && pprofCommand.GetCommands()[0].Command == "PprofTaskQuery" { + rawCommand := pprofCommand.GetCommands()[0] + r.HandleCommand(rawCommand) + } + + time.Sleep(r.pprofInterval) + } + }() +} + +func (r *PprofTaskManager) HandleCommand(rawCommand *commonv3.Command) { + command := r.deserializePprofTaskCommand(rawCommand) + if command.GetCreateTime() > r.LastUpdateTime { + r.LastUpdateTime = command.GetCreateTime() + } else { + return + } + if err := r.checkCommand(command); err != nil { + r.logger.Errorf("check command error, cannot process this pprof task. reason: %v", err) + return + } + + if command.IsDirectSamplingType() { + // direct sampling of Heap, Allocs, Goroutine, Thread + writer, err := command.StartTask() + if err != nil { + r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err) + return + } + command.StopTask(writer) + } else { + // The CPU, Block and Mutex sampling lasts for a duration and then stops + writer, err := command.StartTask() + if err != nil { + r.logger.Errorf("start %s pprof task error %v \n", command.GetTaskID(), err) + return + } + time.AfterFunc(command.GetDuration(), func() { + command.StopTask(writer) + }) + } +} + +func (r *PprofTaskManager) deserializePprofTaskCommand(command *commonv3.Command) PprofTaskCommand { + args := command.Args + taskID := "" + events := "" + duration := 0 + dumpPeriod := 0 // Use -1 to indicate no explicit value provided + var createTime int64 = 0 + for _, pair := range args { + if pair.GetKey() == "TaskId" { + taskID = pair.GetValue() + } else if pair.GetKey() == "Events" { + events = pair.GetValue() + } else if pair.GetKey() == "Duration" { + if val, err := strconv.Atoi(pair.GetValue()); err == nil && val > 0 { + duration = val + } + } else if pair.GetKey() == "DumpPeriod" { + if val, err := strconv.Atoi(pair.GetValue()); err == nil && val >= 0 { + dumpPeriod = val + } + } else if pair.GetKey() == "CreateTime" { + createTime, _ = strconv.ParseInt(pair.GetValue(), 10, 64) + } + } + + return NewPprofTaskCommand( + taskID, + events, + time.Duration(duration)*time.Minute, + createTime, + dumpPeriod, + r.pprofFilePath, + r.logger, + r, + ) +} + +func (r *PprofTaskManager) checkCommand(command PprofTaskCommand) error { + if command.GetTaskID() == "" { + return fmt.Errorf("pprof task id cannot be empty, task id is %s", command.GetTaskID()) + } + if command.IsInvalidEvent() { + return fmt.Errorf("pprof task event is invalid, task id is %s", command.GetTaskID()) + } + if !command.IsDirectSamplingType() { + if command.GetDuration() <= 0 || command.GetDuration() > pprofTaskDurationMaxMinute { + return fmt.Errorf("pprof task duration must be between 0 and %v, task id is %s", pprofTaskDurationMaxMinute, command.GetTaskID()) + } + } + if command.HasDumpPeriod() && command.GetDumpPeriod() <= 0 { + return fmt.Errorf("pprof task dumpperiod must be greater than 0, task id is %s", command.GetTaskID()) + } + return nil +} + +func (r *PprofTaskManager) ReportPprof(taskID string, content []byte) { + metaData := &pprofv10.PprofMetaData{ + Service: r.entity.ServiceName, + ServiceInstance: r.entity.ServiceInstanceName, + TaskId: taskID, + Type: pprofv10.PprofProfilingStatus_PPROF_PROFILING_SUCCESS, + ContentSize: int32(len(content)), + } + + pprofData := &pprofv10.PprofData{ + Metadata: metaData, + Result: &pprofv10.PprofData_Content{ + Content: content, + }, + } + + select { + case r.pprofSendCh <- pprofData: + default: + r.logger.Errorf("reach max pprof send buffer") + } +} + +func (r *PprofTaskManager) initPprofSendPipeline() { + go func() { + defer func() { + if err := recover(); err != nil { + r.logger.Errorf("PprofTaskManager initPprofSendPipeline panic err %v", err) + } + }() + StreamLoop: + for { + switch r.connManager.GetConnectionStatus(r.serverAddr) { + case ConnectionStatusShutdown: + return + case ConnectionStatusDisconnect: + time.Sleep(5 * time.Second) + continue StreamLoop + } + + for pprofData := range r.pprofSendCh { + r.uploadPprofData(pprofData) + } + break + } + }() +} + +func (r *PprofTaskManager) uploadPprofData(pprofData *pprofv10.PprofData) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + stream, err := r.PprofClient.Collect(ctx) + if err != nil { + r.logger.Errorf("failed to start collect stream: %v", err) + return + } + + // Send metadata first + metadataMsg := &pprofv10.PprofData{ + Metadata: pprofData.Metadata, + } + if err = stream.Send(metadataMsg); err != nil { + r.logger.Errorf("failed to send metadata: %v", err) + return + } + + resp, err := stream.Recv() + if err != nil { + r.logger.Errorf("failed to receive server response: %v", err) + return + } + + switch resp.Status { + case pprofv10.PprofProfilingStatus_PPROF_TERMINATED_BY_OVERSIZE: + r.logger.Errorf("pprof is too large to be received by the oap server") + return + case pprofv10.PprofProfilingStatus_PPROF_EXECUTION_TASK_ERROR: + r.logger.Errorf("server rejected pprof upload due to execution task error") + return + } + + // Upload content in chunks + content := pprofData.GetContent() + chunkCount := 0 + contentSize := len(content) + + for offset := 0; offset < contentSize; offset += maxChunkSize { + end := offset + maxChunkSize + if end > contentSize { + end = contentSize + } + + chunkData := &pprofv10.PprofData{ + Result: &pprofv10.PprofData_Content{ + Content: content[offset:end], + }, + } + + if err := stream.Send(chunkData); err != nil { + r.logger.Errorf("failed to send pprof chunk %d: %v", chunkCount, err) + return + } + chunkCount++ + // Check context timeout + select { + case <-ctx.Done(): + r.logger.Errorf("context timeout during chunk upload for task %s", pprofData.Metadata.TaskId) + return + default: + } + } + + r.closePprofStream(stream) +} +func (r *PprofTaskManager) closePprofStream(stream pprofv10.PprofTask_CollectClient) { + if err := stream.CloseSend(); err != nil { + r.logger.Errorf("failed to close send stream: %v", err) + return + } + + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + r.logger.Errorf("error receiving final response %v", err) + break + } + } +} diff --git a/test/plugins/scenarios/logrus/config/excepted.yml b/test/plugins/scenarios/logrus/config/excepted.yml index 983c4aa0..88bfca1a 100644 --- a/test/plugins/scenarios/logrus/config/excepted.yml +++ b/test/plugins/scenarios/logrus/config/excepted.yml @@ -18,14 +18,23 @@ segmentItems: [] meterItems: [] logItems: - serviceName: logrus - logSize: ge 3 + logSize: ge 4 logs: - timestamp: nq 0 endpoint: '' body: type: TEXT - content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented - desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' } + content: { text: not null } + traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 } + tags: + data: + - { key: LEVEL, value: error } + layer: GENERAL + - timestamp: nq 0 + endpoint: '' + body: + type: TEXT + content: { text: not null } traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 } tags: data: diff --git a/test/plugins/scenarios/zap/config/excepted.yml b/test/plugins/scenarios/zap/config/excepted.yml index 0b927f96..dd1b2d82 100644 --- a/test/plugins/scenarios/zap/config/excepted.yml +++ b/test/plugins/scenarios/zap/config/excepted.yml @@ -18,14 +18,23 @@ segmentItems: [] meterItems: [] logItems: - serviceName: zap - logSize: ge 3 + logSize: ge 4 logs: - timestamp: nq 0 endpoint: '' body: type: TEXT - content: { text: 'fetch dynamic configuration error rpc error: code = Unimplemented - desc = Method not found: skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' } + content: { text: not null } + traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 } + tags: + data: + - { key: LEVEL, value: error } + layer: GENERAL + - timestamp: nq 0 + endpoint: '' + body: + type: TEXT + content: { text: not null } traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 } tags: data: diff --git a/tools/go-agent/config/agent.default.yaml b/tools/go-agent/config/agent.default.yaml index f07f22d8..02d351f4 100644 --- a/tools/go-agent/config/agent.default.yaml +++ b/tools/go-agent/config/agent.default.yaml @@ -54,6 +54,12 @@ reporter: authentication: ${SW_AGENT_REPORTER_GRPC_AUTHENTICATION:} # The interval(s) of fetching dynamic configuration from backend. cds_fetch_interval: ${SW_AGENT_REPORTER_GRPC_CDS_FETCH_INTERVAL:20} + pprof: + # The interval(s) of fetching pprof task from backend. + pprof_fetch_interval: ${SW_AGENT_REPORTER_GRPC_PPROF_TASK_FETCH_INTERVAL:20} + # The pprof file path generated when executing the profile task. + pprof_file_path: ${SW_AGENT_REPORTER_GRPC_PROFILE_PPROF_FILE_PATH:} + tls: # Whether to enable TLS with backend. enable: ${SW_AGENT_REPORTER_GRPC_TLS_ENABLE:false} diff --git a/tools/go-agent/config/loader.go b/tools/go-agent/config/loader.go index adc2a118..8d843df0 100644 --- a/tools/go-agent/config/loader.go +++ b/tools/go-agent/config/loader.go @@ -83,11 +83,17 @@ type Meter struct { } type GRPCReporter struct { - BackendService StringValue `yaml:"backend_service"` - MaxSendQueue StringValue `yaml:"max_send_queue"` - Authentication StringValue `yaml:"authentication"` - CDSFetchInterval StringValue `yaml:"cds_fetch_interval"` - TLS GRPCReporterTLS `yaml:"tls"` + BackendService StringValue `yaml:"backend_service"` + MaxSendQueue StringValue `yaml:"max_send_queue"` + Authentication StringValue `yaml:"authentication"` + CDSFetchInterval StringValue `yaml:"cds_fetch_interval"` + TLS GRPCReporterTLS `yaml:"tls"` + Pprof GRPCReporterPprof `yaml:"pprof"` +} + +type GRPCReporterPprof struct { + PprofFetchInterval StringValue `yaml:"pprof_fetch_interval"` + PprofFilePath StringValue `yaml:"pprof_file_path"` } type GRPCReporterTLS struct { diff --git a/tools/go-agent/instrument/reporter/instrument.go b/tools/go-agent/instrument/reporter/instrument.go index 68409e61..43bb06ae 100644 --- a/tools/go-agent/instrument/reporter/instrument.go +++ b/tools/go-agent/instrument/reporter/instrument.go @@ -160,7 +160,7 @@ func (i *Instrument) generateReporterInitFile(dir, reporterType string) (string, reporterInitTemplate := baseReporterInitTemplate if reporterType == consts.KafkaReporter { reporterInitTemplate += ` - _, cdsManager, err := initManager(logger, checkInterval) + _, cdsManager, _, err := initManager(logger, checkInterval) if err != nil { return nil, err } @@ -169,11 +169,11 @@ func (i *Instrument) generateReporterInitFile(dir, reporterType string) (string, reporterInitTemplate += kafkaReporterInitFunc } else { reporterInitTemplate += ` - connManager, cdsManager, err := initManager(logger, checkInterval) + connManager, cdsManager, pprofTaskManager, err := initManager(logger, checkInterval) if err != nil { return nil, err } - return initGRPCReporter(logger, checkInterval, connManager, cdsManager) + return initGRPCReporter(logger, checkInterval, connManager, cdsManager, pprofTaskManager) }` reporterInitTemplate += grpcReporterInitFunc } @@ -208,7 +208,7 @@ func {{.InitFuncName}}(logger operator.LogOperator) (Reporter, error) { const initManagerFunc = ` -func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, error) { +func initManager(logger operator.LogOperator, checkInterval time.Duration) (*ConnectionManager, *CDSManager, *PprofTaskManager, error) { authenticationVal := {{.Config.Reporter.GRPC.Authentication.ToGoStringValue}} backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}} @@ -229,16 +229,25 @@ func initManager(logger operator.LogOperator, checkInterval time.Duration) (*Con connManager, err = NewConnectionManager(logger, checkInterval, backendServiceVal, authenticationVal, nil) } if err != nil { - return nil, nil, err + return nil, nil, nil, err } cdsFetchIntervalVal := {{.Config.Reporter.GRPC.CDSFetchInterval.ToGoIntValue "the cds fetch interval must be number"}} cdsFetchInterval := time.Second * time.Duration(cdsFetchIntervalVal) cdsManager, err := NewCDSManager(logger, backendServiceVal, cdsFetchInterval, connManager) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return connManager, cdsManager, nil + + pprofFetchIntervalVal := {{.Config.Reporter.GRPC.Pprof.PprofFetchInterval.ToGoIntValue "the pprof fetch interval must be number"}} + pprofFetchInterval := time.Second * time.Duration(pprofFetchIntervalVal) + pprofFilePath := {{.Config.Reporter.GRPC.Pprof.PprofFilePath.ToGoStringValue}} + pprofTaskManager, err := NewPprofTaskManager(logger, backendServiceVal, pprofFetchInterval, connManager, pprofFilePath) + if err != nil { + return nil, nil, nil, err + } + + return connManager, cdsManager, pprofTaskManager, nil } ` @@ -247,13 +256,14 @@ const grpcReporterInitFunc = ` func initGRPCReporter(logger operator.LogOperator, checkInterval time.Duration, connManager *ConnectionManager, - cdsManager *CDSManager) (Reporter, error) { + cdsManager *CDSManager, + pprofTaskManager *PprofTaskManager) (Reporter, error) { var opts []ReporterOption maxSendQueueVal := {{.Config.Reporter.GRPC.MaxSendQueue.ToGoIntValue "the GRPC reporter max queue size must be number"}} opts = append(opts, WithMaxSendQueueSize(maxSendQueueVal)) backendServiceVal := {{.Config.Reporter.GRPC.BackendService.ToGoStringValue}} - return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, opts...) + return NewGRPCReporter(logger, backendServiceVal, checkInterval, connManager, cdsManager, pprofTaskManager, opts...) } `