From fce756bbd9f61a906559ecd0cf252e586df1d92c Mon Sep 17 00:00:00 2001 From: bb7133 Date: Tue, 16 Sep 2025 16:48:37 -0700 Subject: [PATCH] support buffered logger --- config.go | 6 ++++ log.go | 12 ++++++- zap_log_test.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/config.go b/config.go index b8421d3..c802295 100644 --- a/config.go +++ b/config.go @@ -38,6 +38,12 @@ type FileLogConfig struct { // Compression function for rotated files. // Currently only `gzip` and empty are supported, empty means compression disabled. Compression string `toml:"compression" json:"compression"` + // IsBuffered is true means use buffered logger. + IsBuffered bool `toml:"is-buffered" json:"is-buffered"` + // BufferSize is the size of the buffer. + BufferSize int `toml:"buffer-size" json:"buffer-size"` + // BufferFlushInterval is the interval of buffer flush. + BufferFlushInterval time.Duration `toml:"buffer-flush-interval" json:"buffer-flush-interval"` } // Config serializes log related config in toml/json. diff --git a/log.go b/log.go index 6788631..00db327 100644 --- a/log.go +++ b/log.go @@ -49,7 +49,15 @@ func InitLogger(cfg *Config, opts ...zap.Option) (*zap.Logger, *ZapProperties, e if err != nil { return nil, nil, err } - output = zapcore.AddSync(lg) + if cfg.File.IsBuffered { + output = &zapcore.BufferedWriteSyncer{ + WS: zapcore.AddSync(lg), + Size: cfg.File.BufferSize, + FlushInterval: cfg.File.BufferFlushInterval, + } + } else { + output = zapcore.AddSync(lg) + } } else { stdOut, _, err := zap.Open([]string{"stdout"}...) if err != nil { @@ -258,6 +266,8 @@ func S() *zap.SugaredLogger { // ReplaceGlobals replaces the global Logger and SugaredLogger, and returns a // function to restore the original values. It's safe for concurrent use. +// Be careful when using this with buffered logger, the flush goroutine in +// https://pkg.go.dev/go.uber.org/zap/zapcore#BufferedWriteSyncer will not be stopped. func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) func() { // TODO: This globalMu can be replaced by atomic.Swap(), available since go1.17. globalMu.Lock() diff --git a/zap_log_test.go b/zap_log_test.go index c37a9fd..643f7d9 100644 --- a/zap_log_test.go +++ b/zap_log_test.go @@ -197,6 +197,91 @@ func TestRotateLog(t *testing.T) { _ = os.RemoveAll(tempDir) } +func TestBufferedLog(t *testing.T) { + tempDir, _ := os.MkdirTemp("/tmp", "buffered-tests-log") + conf := &Config{ + Level: "info", + File: FileLogConfig{ + Filename: tempDir + "/test.log", + MaxSize: 10, // no rotate + IsBuffered: true, + BufferSize: 1024, + BufferFlushInterval: 3 * time.Second, + }, + } + + bufferedLogger, _, err := InitLogger(conf) + require.NoError(t, err) + + logFile := filepath.Join(tempDir, "test.log") + + bufferedLogger.Info("first message") + content, err := os.ReadFile(logFile) + if err == nil { + require.Empty(t, content, "expected empty before time-based flush") + } else { + require.True(t, os.IsNotExist(err), "expected file not exist before time-based flush") + } + + require.Eventually(t, func() bool { + data, err := os.ReadFile(logFile) + return err == nil && strings.Contains(string(data), "first message") + }, 5*time.Second, 100*time.Millisecond, "expected data flushed after interval") + + longMsg := strings.Repeat("x", 200) + for i := 0; i < 20; i++ { // Larger than buffer size + bufferedLogger.Info("big message", zap.String("msg", longMsg)) + } + + require.Eventually(t, func() bool { + data, err := os.ReadFile(logFile) + return err == nil && strings.Contains(string(data), "big message") + }, 1*time.Second, 100*time.Millisecond, "expected data flushed after buffer size exceeded") + + _ = os.RemoveAll(tempDir) +} + +func TestBufferedLogWithRotate(t *testing.T) { + tempDir, _ := os.MkdirTemp("/tmp", "buffered-rotate-tests-log") + conf := &Config{ + Level: "info", + File: FileLogConfig{ + Filename: filepath.Join(tempDir, "test.log"), + MaxSize: 1, // 1 MB + IsBuffered: true, + BufferSize: 4 * 1024, + BufferFlushInterval: 5 * time.Second, + }, + } + + bufferedLogger, _, err := InitLogger(conf) + require.NoError(t, err) + + largeMsg := strings.Repeat("x", 200*1024) + + for i := 0; i < 10; i++ { + bufferedLogger.Info("rotating message", zap.String("msg", largeMsg)) + } + + require.Eventually(t, func() bool { + files, err := os.ReadDir(tempDir) + if err != nil { + return false + } + return len(files) >= 2 + }, 5*time.Second, 200*time.Millisecond, "expected log rotation after buffer flush") + + files, _ := os.ReadDir(tempDir) + var totalSize int64 + for _, f := range files { + info, _ := f.Info() + totalSize += info.Size() + } + require.Greater(t, totalSize, int64(1*1024*1024), "expected total written logs > 1MB") + + _ = os.RemoveAll(tempDir) +} + func TestErrorLog(t *testing.T) { ts := newTestLogSpy(t) conf := &Config{Level: "debug", DisableTimestamp: true} @@ -232,8 +317,8 @@ func TestLogJSON(t *testing.T) { "backoff", time.Second, ) logger.With(zap.String("connID", "1"), zap.String("traceID", "dse1121")).Info("new connection") - ts.assertMessages("{\"level\":\"INFO\",\"caller\":\"zap_log_test.go:229\",\"message\":\"failed to fetch URL\",\"url\":\"http://example.com\",\"attempt\":3,\"backoff\":\"1s\"}", - "{\"level\":\"INFO\",\"caller\":\"zap_log_test.go:234\",\"message\":\"new connection\",\"connID\":\"1\",\"traceID\":\"dse1121\"}") + ts.assertMessages("{\"level\":\"INFO\",\"caller\":\"zap_log_test.go:314\",\"message\":\"failed to fetch URL\",\"url\":\"http://example.com\",\"attempt\":3,\"backoff\":\"1s\"}", + "{\"level\":\"INFO\",\"caller\":\"zap_log_test.go:319\",\"message\":\"new connection\",\"connID\":\"1\",\"traceID\":\"dse1121\"}") } func TestRotateLogWithCompress(t *testing.T) { @@ -279,8 +364,8 @@ func TestCompressError(t *testing.T) { conf := &Config{ Level: "info", File: FileLogConfig{ - Filename: tempDir + "/test.log", - MaxSize: 1, + Filename: tempDir + "/test.log", + MaxSize: 1, Compression: "xxx", }, }