Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 5172c71

Browse files
committed
feat(elasticsearch): remove types to support elasticsearch 7.0
1 parent 7e944c8 commit 5172c71

File tree

6 files changed

+31
-11
lines changed

6 files changed

+31
-11
lines changed

Gopkg.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ docker/run:
1515
count=0; \
1616
until curl localhost:9200 || ((count ++ >= 10)); \
1717
do echo "Retrying: Verify if Elasticsearch is ready"; sleep 5; done
18-
curl -XPOST "localhost:9200/_template/my-topic" --data '{"template":"my-topic-*","settings":{"refresh_interval":"30s","number_of_replicas":0},"mappings":{"_default_":{"_all":{"enabled":"false"},"_source":{"enabled":"true"},"properties":{"@timestamp":{"format":"epoch_millis","ignore_malformed":true,"type":"date"}},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"text","index":false}}}]}}}'
18+
curl -XPOST -H "Content-Type: application/json" "localhost:9200/_template/my-topic" --data '{"template":"my-topic-*","settings":{"refresh_interval":"30s","number_of_replicas":0},"mappings":{"_source":{"enabled":"true"},"properties":{"@timestamp":{"format":"epoch_millis","ignore_malformed":true,"type":"date"}},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword","index":true}}}]}}'
1919
docker-compose up -d producer app

docker-compose.yml

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,23 @@ services:
2525
ports:
2626
- "8081:8081"
2727
elasticsearch:
28-
image: 'elasticsearch:5-alpine'
28+
image: 'docker.elastic.co/elasticsearch/elasticsearch-oss:7.0.0'
2929
container_name: elasticsearch
30+
environment:
31+
- cluster.name=docker-cluster
32+
- bootstrap.memory_lock=true
33+
- discovery.type=single-node
34+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
3035
ports:
31-
- '9200:9200'
36+
- "9200:9200"
37+
ulimits:
38+
memlock:
39+
soft: -1
40+
hard: -1
41+
volumes:
42+
- esdata1:/usr/share/elasticsearch/data
3243
kibana:
33-
image: 'kibana:5'
44+
image: 'docker.elastic.co/kibana/kibana-oss:7.0.0'
3445
container_name: kibana
3546
depends_on:
3647
- elasticsearch
@@ -70,4 +81,8 @@ services:
7081
environment:
7182
- KAFKA_ADDRESS=kafka:9092
7283
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
73-
- LOG_LEVEL=DEBUG
84+
- LOG_LEVEL=DEBUG
85+
86+
volumes:
87+
esdata1:
88+
driver: local

src/elasticsearch/codec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.El
3636

3737
elasticRecords[idx] = &models.ElasticRecord{
3838
Index: index,
39-
Type: record.Topic,
39+
Type: "_doc",
4040
ID: docID,
4141
Json: record.FilteredFieldsJSON(c.config.BlacklistedColumns),
4242
}

src/elasticsearch/elasticsearch.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package elasticsearch
22

33
import (
44
"context"
5-
65
"fmt"
76

7+
"github.com/pkg/errors"
8+
89
"net/http"
910

1011
"github.com/go-kit/kit/log"
@@ -53,7 +54,7 @@ func (d recordDatabase) CloseClient() {
5354
type InsertResponse struct {
5455
AlreadyExists []string
5556
Retry []*models.ElasticRecord
56-
Overloaded bool
57+
Backoff bool
5758
}
5859

5960
func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse, error) {
@@ -65,7 +66,10 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
6566
ctx, cancel := context.WithTimeout(context.Background(), timeout)
6667
defer cancel()
6768
res, err := bulkRequest.Do(ctx)
68-
69+
if err == elastic.ErrNoClient || errors.Cause(err) == elastic.ErrNoClient {
70+
level.Warn(d.logger).Log("message", "no elasticsearch node available", "err", err)
71+
return &InsertResponse{AlreadyExists: nil, Retry: records, Backoff: true}, nil
72+
}
6973
if err != nil {
7074
return nil, err
7175
}

src/injector/store/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ func (s basicStore) Insert(records []*models.Record) error {
3333
break
3434
}
3535
//some records failed to index, backoff(if overloaded) then retry
36-
if res.Overloaded {
36+
if res.Backoff {
3737
time.Sleep(s.backoff)
3838
}
39-
s.db.Insert(res.Retry)
39+
elasticRecords = res.Retry
4040
}
4141
return nil
4242
}

0 commit comments

Comments
 (0)