Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 7a0019d

Browse files
author
Mateus Moury
committed
refactor(elasticsearch): refactoring elasticsearch logic out of writer and its tests
1 parent 836b924 commit 7a0019d

File tree

2 files changed

+17
-111
lines changed

2 files changed

+17
-111
lines changed

src/elasticsearch/elasticsearch.go

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type basicDatabase interface {
2121

2222
type RecordDatabase interface {
2323
basicDatabase
24-
Insert(records []*models.Record) error
24+
Insert(records []*models.ElasticRecord) error
2525
ReadinessCheck() bool
2626
}
2727

@@ -49,7 +49,7 @@ func (d recordDatabase) CloseClient() {
4949
}
5050
}
5151

52-
func (d recordDatabase) Insert(records []*models.Record) error {
52+
func (d recordDatabase) Insert(records []*models.ElasticRecord) error {
5353
bulkRequest, err := d.buildBulkRequest(records)
5454
if err != nil {
5555
return err
@@ -79,62 +79,17 @@ func (d recordDatabase) ReadinessCheck() bool {
7979
return true
8080
}
8181

82-
func (d recordDatabase) buildBulkRequest(records []*models.Record) (*elastic.BulkService, error) {
82+
func (d recordDatabase) buildBulkRequest(records []*models.ElasticRecord) (*elastic.BulkService, error) {
8383
bulkRequest := d.GetClient().Bulk()
8484
for _, record := range records {
85-
index, err := d.getDatabaseIndex(record)
86-
if err != nil {
87-
return nil, err
88-
}
89-
90-
docID, err := d.getDatabaseDocID(record)
91-
if err != nil {
92-
return nil, err
93-
}
94-
95-
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(index).
96-
Type(record.Topic).
97-
Id(docID).
98-
Doc(record.FilteredFieldsJSON(d.config.BlacklistedColumns)))
85+
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(record.Index).
86+
Type(record.Type).
87+
Id(record.ID).
88+
Doc(record.Json))
9989
}
10090
return bulkRequest, nil
10191
}
10292

103-
func (d recordDatabase) getDatabaseIndex(record *models.Record) (string, error) {
104-
indexPrefix := d.config.Index
105-
if indexPrefix == "" {
106-
indexPrefix = record.Topic
107-
}
108-
109-
indexColumn := d.config.IndexColumn
110-
indexSuffix := record.FormatTimestamp()
111-
if indexColumn != "" {
112-
newIndexSuffix, err := record.GetValueForField(indexColumn)
113-
if err != nil {
114-
level.Error(d.logger).Log("err", err, "message", "Could not get column value from record.")
115-
return "", err
116-
}
117-
indexSuffix = newIndexSuffix
118-
}
119-
120-
return fmt.Sprintf("%s-%s", indexPrefix, indexSuffix), nil
121-
}
122-
123-
func (d recordDatabase) getDatabaseDocID(record *models.Record) (string, error) {
124-
docID := record.GetId()
125-
126-
docIDColumn := d.config.DocIDColumn
127-
if docIDColumn != "" {
128-
newDocID, err := record.GetValueForField(docIDColumn)
129-
if err != nil {
130-
level.Error(d.logger).Log("err", err, "message", "Could not get doc id value from record.")
131-
return "", err
132-
}
133-
docID = newDocID
134-
}
135-
return docID, nil
136-
}
137-
13893
func NewDatabase(logger log.Logger, config Config) RecordDatabase {
13994
return recordDatabase{logger: logger, config: config}
14095
}

src/elasticsearch/elasticsearch_test.go

Lines changed: 10 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package elasticsearch
22

33
import (
4-
"strconv"
54
"testing"
65

76
"os"
87

98
"context"
109

11-
"fmt"
1210
"time"
1311

1412
"encoding/json"
@@ -92,88 +90,41 @@ func TestRecordDatabase_ReadinessCheck(t *testing.T) {
9290
}
9391

9492
func TestRecordDatabase_Insert(t *testing.T) {
95-
now := time.Now()
96-
record, id, _ := fixtures.NewRecord(now)
97-
index := fmt.Sprintf("%s-%s", config.Index, record.FormatTimestamp())
98-
err := db.Insert([]*models.Record{record})
93+
record, id := fixtures.NewElasticRecord()
94+
err := db.Insert([]*models.ElasticRecord{record})
9995
db.GetClient().Refresh("_all").Do(context.Background())
10096
var recordFromES fixtures.FixtureRecord
10197
if assert.NoError(t, err) {
102-
count, err := db.GetClient().Count(index).Do(context.Background())
98+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
10399
if assert.NoError(t, err) {
104100
assert.Equal(t, int64(1), count)
105101
}
106-
res, err := db.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
102+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
107103
if assert.NoError(t, err) {
108104
json.Unmarshal(*res.Source, &recordFromES)
109105
}
110106
assert.Equal(t, recordFromES.Id, id)
111107
}
112-
db.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
108+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
113109
}
114110

115111
func TestRecordDatabase_Insert_Multiple(t *testing.T) {
116-
now := time.Now()
117-
record, id, _ := fixtures.NewRecord(now)
118-
index := fmt.Sprintf("%s-%s", config.Index, record.FormatTimestamp())
119-
err := db.Insert([]*models.Record{record, record})
112+
record, id := fixtures.NewElasticRecord()
113+
err := db.Insert([]*models.ElasticRecord{record, record})
120114
db.GetClient().Refresh("_all").Do(context.Background())
121115
var recordFromES fixtures.FixtureRecord
122116
if assert.NoError(t, err) {
123-
count, err := db.GetClient().Count(index).Do(context.Background())
117+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
124118
if assert.NoError(t, err) {
125119
assert.Equal(t, int64(1), count)
126120
}
127-
res, err := db.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
121+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
128122
if assert.NoError(t, err) {
129123
json.Unmarshal(*res.Source, &recordFromES)
130124
}
131125
assert.Equal(t, recordFromES.Id, id)
132126
}
133-
db.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
134-
}
135-
136-
func TestRecordDatabase_Insert_IndexColumnBlacklist(t *testing.T) {
137-
now := time.Now()
138-
record, id, value := fixtures.NewRecord(now)
139-
index := fmt.Sprintf("%s-%d", config.Index, id)
140-
err := dbIndexColumnBlacklist.Insert([]*models.Record{record})
141-
dbIndexColumnBlacklist.GetClient().Refresh("_all").Do(context.Background())
142-
var recordFromES fixtures.FixtureRecord
143-
if assert.NoError(t, err) {
144-
count, err := dbIndexColumnBlacklist.GetClient().Count(index).Do(context.Background())
145-
if assert.NoError(t, err) {
146-
assert.Equal(t, int64(1), count)
147-
}
148-
res, err := dbIndexColumnBlacklist.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
149-
if assert.NoError(t, err) {
150-
json.Unmarshal(*res.Source, &recordFromES)
151-
}
152-
assert.Empty(t, recordFromES.Id)
153-
assert.Equal(t, value, recordFromES.Value)
154-
}
155-
dbIndexColumnBlacklist.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
156-
}
157-
158-
func TestRecordDatabase_Insert_DocIDColumn(t *testing.T) {
159-
now := time.Now()
160-
record, id, _ := fixtures.NewRecord(now)
161-
index := fmt.Sprintf("%s-%s", config.Index, record.FormatTimestamp())
162-
err := dbDocIDColumn.Insert([]*models.Record{record})
163-
dbDocIDColumn.GetClient().Refresh("_all").Do(context.Background())
164-
var recordFromES fixtures.FixtureRecord
165-
if assert.NoError(t, err) {
166-
count, err := dbDocIDColumn.GetClient().Count(index).Do(context.Background())
167-
if assert.NoError(t, err) {
168-
assert.Equal(t, int64(1), count)
169-
}
170-
res, err := dbDocIDColumn.GetClient().Get().Index(index).Type(record.Topic).Id(strconv.Itoa(int(id))).Do(context.Background())
171-
if assert.NoError(t, err) {
172-
json.Unmarshal(*res.Source, &recordFromES)
173-
}
174-
assert.Equal(t, recordFromES.Id, id)
175-
}
176-
dbDocIDColumn.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
127+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
177128
}
178129

179130
func setupDB(d RecordDatabase) {

0 commit comments

Comments
 (0)