Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 1d57f80

Browse files
author
Mateus Moury
committed
feat(elastic_codecs): adding codecs that transforms a kafka record into a record to be inserted in elasticsearch
1 parent 7a0019d commit 1d57f80

File tree

2 files changed

+160
-0
lines changed

2 files changed

+160
-0
lines changed

src/elasticsearch/codec.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package elasticsearch
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/go-kit/kit/log"
7+
"github.com/go-kit/kit/log/level"
8+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
9+
)
10+
11+
type Codec interface {
12+
EncodeElasticRecords(records []*models.Record) ([]*models.ElasticRecord, error)
13+
}
14+
15+
type basicCodec struct {
16+
config Config
17+
logger log.Logger
18+
}
19+
20+
func NewCodec(logger log.Logger, config Config) Codec {
21+
return basicCodec{logger: logger, config: config}
22+
}
23+
24+
func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.ElasticRecord, error) {
25+
elasticRecords := make([]*models.ElasticRecord, len(records))
26+
for idx, record := range records {
27+
index, err := c.getDatabaseIndex(record)
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
docID, err := c.getDatabaseDocID(record)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
elasticRecords[idx] = &models.ElasticRecord{
38+
Index: index,
39+
Type: record.Topic,
40+
ID: docID,
41+
Json: record.FilteredFieldsJSON(c.config.BlacklistedColumns),
42+
}
43+
}
44+
45+
return elasticRecords, nil
46+
}
47+
48+
func (c basicCodec) getDatabaseIndex(record *models.Record) (string, error) {
49+
indexPrefix := c.config.Index
50+
if indexPrefix == "" {
51+
indexPrefix = record.Topic
52+
}
53+
54+
indexColumn := c.config.IndexColumn
55+
indexSuffix := record.FormatTimestamp()
56+
if indexColumn != "" {
57+
newIndexSuffix, err := record.GetValueForField(indexColumn)
58+
if err != nil {
59+
level.Error(c.logger).Log("err", err, "message", "Could not get column value from record.")
60+
return "", err
61+
}
62+
indexSuffix = newIndexSuffix
63+
}
64+
65+
return fmt.Sprintf("%s-%s", indexPrefix, indexSuffix), nil
66+
}
67+
68+
func (c basicCodec) getDatabaseDocID(record *models.Record) (string, error) {
69+
docID := record.GetId()
70+
71+
docIDColumn := c.config.DocIDColumn
72+
if docIDColumn != "" {
73+
newDocID, err := record.GetValueForField(docIDColumn)
74+
if err != nil {
75+
level.Error(c.logger).Log("err", err, "message", "Could not get doc id value from record.")
76+
return "", err
77+
}
78+
docID = newDocID
79+
}
80+
return docID, nil
81+
}

src/elasticsearch/codec_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package elasticsearch
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
11+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
12+
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
13+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
14+
)
15+
16+
var codecLogger = logger_builder.NewLogger("elasticsearch-test")
17+
18+
func TestCodec_EncodeElasticRecords(t *testing.T) {
19+
codec := &basicCodec{
20+
config: Config{},
21+
logger: codecLogger,
22+
}
23+
record, id, value := fixtures.NewRecord(time.Now())
24+
25+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
26+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
27+
elasticRecord := elasticRecords[0]
28+
assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestamp()), elasticRecord.Index)
29+
assert.Equal(t, record.Topic, elasticRecord.Type)
30+
assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID)
31+
assert.Equal(t, id, elasticRecord.Json["id"])
32+
assert.Equal(t, value, elasticRecord.Json["value"])
33+
}
34+
}
35+
36+
func TestCodec_EncodeElasticRecords_ColumnsBlacklist(t *testing.T) {
37+
codec := &basicCodec{
38+
config: Config{BlacklistedColumns: []string{"value"}},
39+
logger: codecLogger,
40+
}
41+
record, _, _ := fixtures.NewRecord(time.Now())
42+
43+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
44+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
45+
elasticRecord := elasticRecords[0]
46+
assert.Contains(t, elasticRecord.Json, "id")
47+
assert.NotContains(t, elasticRecord.Json, "value")
48+
}
49+
}
50+
51+
func TestCodec_EncodeElasticRecords_IndexColumn(t *testing.T) {
52+
indexPrefix := "prefix"
53+
54+
codec := &basicCodec{
55+
config: Config{Index: indexPrefix, IndexColumn: "id"},
56+
logger: codecLogger,
57+
}
58+
record, id, _ := fixtures.NewRecord(time.Now())
59+
60+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
61+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
62+
elasticRecord := elasticRecords[0]
63+
assert.Equal(t, fmt.Sprintf("%v-%v", indexPrefix, id), elasticRecord.Index)
64+
}
65+
}
66+
67+
func TestCodec_EncodeElasticRecords_DocIDColumn(t *testing.T) {
68+
codec := &basicCodec{
69+
config: Config{DocIDColumn: "id"},
70+
logger: codecLogger,
71+
}
72+
record, id, _ := fixtures.NewRecord(time.Now())
73+
74+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
75+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
76+
elasticRecord := elasticRecords[0]
77+
assert.Equal(t, strconv.Itoa(int(id)), elasticRecord.ID)
78+
}
79+
}

0 commit comments

Comments
 (0)