Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 940f907

Browse files
committed
fix(elasticsearch): retry only failed bulk items
1 parent e836f3a commit 940f907

File tree

6 files changed

+100
-19
lines changed

6 files changed

+100
-19
lines changed

Gopkg.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/elasticsearch/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Config struct {
1313
DocIDColumn string
1414
BlacklistedColumns []string
1515
BulkTimeout time.Duration
16+
Backoff time.Duration
1617
}
1718

1819
func NewConfig() Config {
@@ -24,12 +25,21 @@ func NewConfig() Config {
2425
timeout = d
2526
}
2627
}
28+
backoffStr, exists := os.LookupEnv("ES_BULK_BACKOFF")
29+
backoff := 1 * time.Second
30+
if exists {
31+
d, err := time.ParseDuration(backoffStr)
32+
if err == nil {
33+
backoff = d
34+
}
35+
}
2736
return Config{
2837
Host: os.Getenv("ELASTICSEARCH_HOST"),
2938
Index: os.Getenv("ES_INDEX"),
3039
IndexColumn: os.Getenv("ES_INDEX_COLUMN"),
3140
DocIDColumn: os.Getenv("ES_DOC_ID_COLUMN"),
3241
BlacklistedColumns: strings.Split(os.Getenv("ES_BLACKLISTED_COLUMNS"), ","),
3342
BulkTimeout: timeout,
43+
Backoff: backoff,
3444
}
3545
}

src/elasticsearch/elasticsearch.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55

66
"fmt"
77

8+
"net/http"
9+
810
"github.com/go-kit/kit/log"
911
"github.com/go-kit/kit/log/level"
1012
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1113
"github.com/olivere/elastic"
12-
"github.com/pkg/errors"
1314
)
1415

1516
var esClient *elastic.Client
@@ -21,7 +22,7 @@ type basicDatabase interface {
2122

2223
type RecordDatabase interface {
2324
basicDatabase
24-
Insert(records []*models.ElasticRecord) error
25+
Insert(records []*models.ElasticRecord) (*InsertResponse, error)
2526
ReadinessCheck() bool
2627
}
2728

@@ -49,24 +50,52 @@ func (d recordDatabase) CloseClient() {
4950
}
5051
}
5152

52-
func (d recordDatabase) Insert(records []*models.ElasticRecord) error {
53+
type InsertResponse struct {
54+
AlreadyExists []string
55+
Retry []*models.ElasticRecord
56+
}
57+
58+
func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse, error) {
5359
bulkRequest, err := d.buildBulkRequest(records)
5460
if err != nil {
55-
return err
61+
return nil, err
5662
}
5763
timeout := d.config.BulkTimeout
5864
ctx, cancel := context.WithTimeout(context.Background(), timeout)
5965
defer cancel()
6066
res, err := bulkRequest.Do(ctx)
61-
if err == nil {
62-
if res.Errors {
63-
for _, f := range res.Failed() {
64-
return errors.New(fmt.Sprintf("%v", f.Error))
67+
68+
if err != nil {
69+
return nil, err
70+
}
71+
if res.Errors {
72+
created := res.Created()
73+
var alreadyExistsIds []string
74+
for _, c := range created {
75+
if c.Status == http.StatusConflict {
76+
alreadyExistsIds = append(alreadyExistsIds, c.Id)
77+
level.Warn(d.logger).Log("message", "document already exists", "id", c.Id)
78+
}
79+
}
80+
failed := res.Failed()
81+
var retry []*models.ElasticRecord
82+
if len(failed) > 0 {
83+
recordMap := make(map[string]*models.ElasticRecord)
84+
for _, rec := range records {
85+
recordMap[rec.ID] = rec
86+
}
87+
for _, f := range failed {
88+
if f.Status == http.StatusTooManyRequests {
89+
//es is overloaded, backoff
90+
level.Warn(d.logger).Log("message", "insert failed: elasticsearch is overloaded", "failed_id", f.Id)
91+
retry = append(retry, recordMap[f.Id])
92+
}
6593
}
6694
}
95+
return &InsertResponse{alreadyExistsIds, retry}, nil
6796
}
6897

69-
return err
98+
return &InsertResponse{[]string{}, []*models.ElasticRecord{}}, nil
7099
}
71100

72101
func (d recordDatabase) ReadinessCheck() bool {
@@ -82,7 +111,8 @@ func (d recordDatabase) ReadinessCheck() bool {
82111
func (d recordDatabase) buildBulkRequest(records []*models.ElasticRecord) (*elastic.BulkService, error) {
83112
bulkRequest := d.GetClient().Bulk()
84113
for _, record := range records {
85-
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(record.Index).
114+
bulkRequest.Add(elastic.NewBulkIndexRequest().OpType("create").
115+
Index(record.Index).
86116
Type(record.Type).
87117
Id(record.ID).
88118
Doc(record.Json))

src/elasticsearch/elasticsearch_test.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"encoding/json"
1313

14+
"strconv"
15+
1416
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
1517
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
1618
"github.com/inloco/kafka-elasticsearch-injector/src/models"
@@ -72,8 +74,30 @@ func TestRecordDatabase_ReadinessCheck(t *testing.T) {
7274

7375
func TestRecordDatabase_Insert(t *testing.T) {
7476
record, id := fixtures.NewElasticRecord()
75-
err := db.Insert([]*models.ElasticRecord{record})
77+
_, err := db.Insert([]*models.ElasticRecord{record})
78+
db.GetClient().Refresh("_all").Do(context.Background())
79+
var recordFromES fixtures.FixtureRecord
80+
if assert.NoError(t, err) {
81+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
82+
if assert.NoError(t, err) {
83+
assert.Equal(t, int64(1), count)
84+
}
85+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
86+
if assert.NoError(t, err) {
87+
json.Unmarshal(*res.Source, &recordFromES)
88+
}
89+
assert.Equal(t, recordFromES.Id, id)
90+
}
91+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
92+
}
93+
94+
func TestRecordDatabase_Insert_RepeatedId(t *testing.T) {
95+
record, id := fixtures.NewElasticRecord()
96+
_, err := db.Insert([]*models.ElasticRecord{record})
7697
db.GetClient().Refresh("_all").Do(context.Background())
98+
res, err := db.Insert([]*models.ElasticRecord{record})
99+
assert.Len(t, res.AlreadyExists, 1)
100+
assert.Contains(t, res.AlreadyExists, strconv.Itoa(int(id)))
77101
var recordFromES fixtures.FixtureRecord
78102
if assert.NoError(t, err) {
79103
count, err := db.GetClient().Count(record.Index).Do(context.Background())
@@ -91,7 +115,7 @@ func TestRecordDatabase_Insert(t *testing.T) {
91115

92116
func TestRecordDatabase_Insert_Multiple(t *testing.T) {
93117
record, id := fixtures.NewElasticRecord()
94-
err := db.Insert([]*models.ElasticRecord{record, record})
118+
_, err := db.Insert([]*models.ElasticRecord{record, record})
95119
db.GetClient().Refresh("_all").Do(context.Background())
96120
var recordFromES fixtures.FixtureRecord
97121
if assert.NoError(t, err) {

src/injector/store/store.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package store
22

33
import (
4+
"time"
5+
46
"github.com/go-kit/kit/log"
57
"github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
68
"github.com/inloco/kafka-elasticsearch-injector/src/models"
@@ -12,16 +14,29 @@ type Store interface {
1214
}
1315

1416
type basicStore struct {
15-
db elasticsearch.RecordDatabase
16-
codec elasticsearch.Codec
17+
db elasticsearch.RecordDatabase
18+
codec elasticsearch.Codec
19+
backoff time.Duration
1720
}
1821

1922
func (s basicStore) Insert(records []*models.Record) error {
2023
elasticRecords, err := s.codec.EncodeElasticRecords(records)
2124
if err != nil {
2225
return err
2326
}
24-
return s.db.Insert(elasticRecords)
27+
for {
28+
res, err := s.db.Insert(elasticRecords)
29+
if err != nil {
30+
return err
31+
}
32+
if len(res.Retry) == 0 {
33+
break
34+
}
35+
//some records failed to index, backoff then retry
36+
time.Sleep(s.backoff)
37+
s.db.Insert(res.Retry)
38+
}
39+
return nil
2540
}
2641

2742
func (s basicStore) ReadinessCheck() bool {
@@ -31,7 +46,8 @@ func (s basicStore) ReadinessCheck() bool {
3146
func NewStore(logger log.Logger) Store {
3247
config := elasticsearch.NewConfig()
3348
return basicStore{
34-
db: elasticsearch.NewDatabase(logger, config),
35-
codec: elasticsearch.NewCodec(logger, config),
49+
db: elasticsearch.NewDatabase(logger, config),
50+
codec: elasticsearch.NewCodec(logger, config),
51+
backoff: config.Backoff,
3652
}
3753
}

src/kafka/consumer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func (s fixtureService) Insert(records []*models.Record) error {
3434
if err != nil {
3535
return err
3636
}
37-
return s.db.Insert(elasticRecords)
37+
_, err = s.db.Insert(elasticRecords)
38+
return err
3839
}
3940

4041
func (s fixtureService) ReadinessCheck() bool {

0 commit comments

Comments
 (0)