Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Release Notes.
* Add recover to goroutine to prevent unexpected panics.
* Add mutex to fix some data race.
* Replace external `goapi` dependency with in-repo generated protocols.
* Support pprof profiling.
#### Plugins

#### Documentation
Expand Down
5 changes: 5 additions & 0 deletions agent/core/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package core

import (
//go:nolint
_ "bytes"
_ "encoding/base64"
_ "fmt"
_ "io"
_ "log"
_ "math"
_ "math/rand"
_ "net"
_ "os"
_ "path/filepath"
_ "reflect"
_ "runtime"
_ "runtime/debug"
_ "runtime/metrics"
_ "runtime/pprof"
_ "sort"
_ "strconv"
_ "strings"
Expand All @@ -55,4 +59,5 @@ import (
_ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
)
1 change: 1 addition & 0 deletions agent/reporter/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ import (
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
_ "github.com/apache/skywalking-go/protocols/collect/management/v3"
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
_ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
)
246 changes: 246 additions & 0 deletions plugins/core/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package core

import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"

"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/reporter"
)

const (
// Pprof event types
PprofEventsTypeCPU = "cpu"
PprofEventsTypeHeap = "heap"
PprofEventsTypeAllocs = "allocs"
PprofEventsTypeBlock = "block"
PprofEventsTypeMutex = "mutex"
PprofEventsTypeThread = "threadcreate"
PprofEventsTypeGoroutine = "goroutine"
)

// CPU profiling state to ensure only one CPU profiling task runs at a time
var profilingIsRunning atomic.Bool

func init() {
reporter.NewPprofTaskCommand = NewPprofTaskCommand
}

type PprofTaskCommandImpl struct {
// Pprof Task ID
taskID string
// Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
events string
// Unit is minute, required for CPU, Block and Mutex events
duration time.Duration
// Unix timestamp in milliseconds when the task was created
createTime int64
// Define the period of the pprof dump, required for Block and Mutex events
dumpPeriod int

// for pprof task service
pprofFilePath string
logger operator.LogOperator
manager reporter.PprofReporter
}

func NewPprofTaskCommand(taskID, events string, duration time.Duration,
createTime int64, dumpPeriod int, pprofFilePath string,
logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
return &PprofTaskCommandImpl{
taskID: taskID,
events: events,
duration: duration,
createTime: createTime,
dumpPeriod: dumpPeriod,
pprofFilePath: pprofFilePath,
logger: logger,
manager: manager,
}
}

func (c *PprofTaskCommandImpl) GetTaskID() string {
return c.taskID
}

func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
return c.createTime
}

func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
return c.duration
}

func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
return c.dumpPeriod
}

func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
return !(c.events == PprofEventsTypeHeap ||
c.events == PprofEventsTypeAllocs ||
c.events == PprofEventsTypeGoroutine ||
c.events == PprofEventsTypeThread ||
c.events == PprofEventsTypeCPU ||
c.events == PprofEventsTypeBlock ||
c.events == PprofEventsTypeMutex)
}

func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
return c.events == PprofEventsTypeHeap ||
c.events == PprofEventsTypeAllocs ||
c.events == PprofEventsTypeGoroutine ||
c.events == PprofEventsTypeThread
}

func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
return c.events == PprofEventsTypeBlock ||
c.events == PprofEventsTypeMutex
}

func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
if file, ok := writer.(*os.File); ok {
if err := file.Close(); err != nil {
c.logger.Errorf("failed to close pprof file: %v", err)
}
}
}

func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
// sample data to buffer
if c.pprofFilePath == "" {
return &bytes.Buffer{}, nil
}

// sample data to file
pprofFileName := filepath.Join(c.taskID, ".pprof")
pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
return nil, err
}

writer, err := os.Create(pprofFilePath)
if err != nil {
return nil, err
}

return writer, nil
}

func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
c.logger.Infof("start pprof task %s", c.taskID)
// For CPU profiling, check global state first
if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
return nil, fmt.Errorf("CPU profiling is already running")
}

writer, err := c.getWriter()
if err != nil {
if c.events == PprofEventsTypeCPU {
profilingIsRunning.Store(false)
}
return nil, err
}

switch c.events {
case PprofEventsTypeCPU:
if err = pprof.StartCPUProfile(writer); err != nil {
profilingIsRunning.Store(false)
if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
return nil, err
}
case PprofEventsTypeBlock:
runtime.SetBlockProfileRate(c.dumpPeriod)
case PprofEventsTypeMutex:
runtime.SetMutexProfileFraction(c.dumpPeriod)
}

return writer, nil
}

func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
c.logger.Infof("stop pprof task %s", c.taskID)
switch c.events {
case PprofEventsTypeCPU:
pprof.StopCPUProfile()
profilingIsRunning.Store(false)
case PprofEventsTypeBlock:
if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Block profile error %v", err)
}
runtime.SetBlockProfileRate(0)
case PprofEventsTypeMutex:
if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Mutex profile error %v", err)
}
runtime.SetMutexProfileFraction(0)
case PprofEventsTypeHeap:
if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Heap profile error %v", err)
}
case PprofEventsTypeAllocs:
if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Alloc profile error %v", err)
}
case PprofEventsTypeGoroutine:
if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Goroutine profile error %v", err)
}
case PprofEventsTypeThread:
if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Thread profile error %v", err)
}
}

if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
c.readPprofData(c.taskID, writer)
}

func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
var data []byte
if c.pprofFilePath == "" {
if buf, ok := writer.(*bytes.Buffer); ok {
data = buf.Bytes()
}
} else {
if file, ok := writer.(*os.File); ok {
filePath := file.Name()
fileData, err := os.ReadFile(filePath)
if err != nil {
c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
}
data = fileData
if err := os.Remove(filePath); err != nil {
c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
}
}
}
c.manager.ReportPprof(taskID, data)
}
28 changes: 16 additions & 12 deletions plugins/core/reporter/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ func NewGRPCReporter(logger operator.LogOperator,
checkInterval time.Duration,
connManager *reporter.ConnectionManager,
cdsManager *reporter.CDSManager,
pprofTaskManager *reporter.PprofTaskManager,
opts ...ReporterOption,
) (reporter.Reporter, error) {
r := &gRPCReporter{
logger: logger,
serverAddr: serverAddr,
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
checkInterval: checkInterval,
connManager: connManager,
cdsManager: cdsManager,
logger: logger,
serverAddr: serverAddr,
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
checkInterval: checkInterval,
connManager: connManager,
cdsManager: cdsManager,
pprofTaskManager: pprofTaskManager,
}
for _, o := range opts {
o(r)
Expand Down Expand Up @@ -83,10 +85,11 @@ type gRPCReporter struct {
checkInterval time.Duration

// bootFlag is set if Boot be executed
bootFlag bool
transform *reporter.Transform
connManager *reporter.ConnectionManager
cdsManager *reporter.CDSManager
bootFlag bool
transform *reporter.Transform
connManager *reporter.ConnectionManager
cdsManager *reporter.CDSManager
pprofTaskManager *reporter.PprofTaskManager
}

func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
Expand All @@ -95,6 +98,7 @@ func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.Agen
r.initSendPipeline()
r.check()
r.cdsManager.InitCDS(entity, cdsWatchers)
r.pprofTaskManager.InitPprofTask(entity)
r.bootFlag = true
}

Expand Down
Loading
Loading