This repository was archived by the owner on Oct 7, 2022. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +13
-4
lines changed
Expand file tree Collapse file tree 1 file changed +13
-4
lines changed Original file line number Diff line number Diff line change 11package store
22
33import (
4+ "github.com/go-kit/kit/log"
45 "github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
56 "github.com/inloco/kafka-elasticsearch-injector/src/models"
6- "github.com/go-kit/kit/log"
77)
88
99type Store interface {
@@ -12,17 +12,26 @@ type Store interface {
1212}
1313
1414type basicStore struct {
15- db elasticsearch.RecordDatabase
15+ db elasticsearch.RecordDatabase
16+ codec elasticsearch.Codec
1617}
1718
1819func (s basicStore ) Insert (records []* models.Record ) error {
19- return s .db .Insert (records )
20+ elasticRecords , err := s .codec .EncodeElasticRecords (records )
21+ if err != nil {
22+ return err
23+ }
24+ return s .db .Insert (elasticRecords )
2025}
2126
2227func (s basicStore ) ReadinessCheck () bool {
2328 return s .db .ReadinessCheck ()
2429}
2530
2631func NewStore (logger log.Logger ) Store {
27- return basicStore {elasticsearch .NewDatabase (logger , elasticsearch .NewConfig ())}
32+ config := elasticsearch .NewConfig ()
33+ return basicStore {
34+ db : elasticsearch .NewDatabase (logger , config ),
35+ codec : elasticsearch .NewCodec (logger , config ),
36+ }
2837}
You can’t perform that action at this time.
0 commit comments