Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 7f15203

Browse files
authored
Merge pull request #2 from inloco/hotfix/check-decoded-filled-write-failure
Fix: check if decoded slice is empty before decoding kafka records
2 parents b6c1f0d + a16a909 commit 7f15203

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4.1
1+
0.4.2

src/kafka/consumer.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,18 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan Notification) {
8686
buf[idx] = kafkaMsg
8787
idx++
8888
for idx == buffSize {
89-
for _, msg := range buf {
90-
req, err := k.consumer.Decoder(nil, msg)
91-
if err != nil {
92-
level.Error(k.consumer.Logger).Log(
93-
"message", "Error decoding message",
94-
"err", err.Error(),
95-
)
96-
continue
89+
if decoded == nil {
90+
for _, msg := range buf {
91+
req, err := k.consumer.Decoder(nil, msg)
92+
if err != nil {
93+
level.Error(k.consumer.Logger).Log(
94+
"message", "Error decoding message",
95+
"err", err.Error(),
96+
)
97+
continue
98+
}
99+
decoded = append(decoded, req)
97100
}
98-
decoded = append(decoded, req)
99101
}
100102
if res, err := k.consumer.Endpoint(context.Background(), decoded); err != nil {
101103
level.Error(k.consumer.Logger).Log("message", "error on endpoint call", "err", err.Error())

0 commit comments

Comments
 (0)