Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit b6c1f0d

Browse files
authored
Merge pull request #1 from inloco/chore/run_utils
Add local run utils
2 parents c96b1fc + 51a6a04 commit b6c1f0d

File tree

10 files changed

+127
-11
lines changed

10 files changed

+127
-11
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ jobs:
7777
command: |
7878
count=0; \
7979
until nc -z localhost 9200 || ((count ++ >= 10)); \
80-
do echo "Retrying: Verify if Cassandra is ready"; sleep 5; done
80+
do echo "Retrying: Verify if Elasticsearch is ready"; sleep 5; done
8181
- run:
8282
name: "Run project tests (excluding vendor)"
8383
command: go test $(go list ./... | grep -v /vendor/)

Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,17 @@ go/deps:
33

44
test:
55
go test $$(go list ./... | grep -v /vendor/)
6+
7+
docker/build:
8+
GOOS=linux GOARCH=386 go build -o bin/injector cmd/injector.go
9+
GOOS=linux GOARCH=386 go build -o bin/producer util/producer/producer.go
10+
docker build --rm=false -t "inlocomedia/kafka-elasticsearch-injector:local" -f cmd/Dockerfile .
11+
docker build --rm=false -t "inlocomedia/kafka-elasticsearch-injector:producer-local" -f util/producer/Dockerfile .
12+
13+
docker/run:
14+
docker-compose up -d zookeeper kafka schema-registry elasticsearch kibana
15+
count=0; \
16+
until curl localhost:9200 || ((count ++ >= 10)); \
17+
do echo "Retrying: Verify if Elasticsearch is ready"; sleep 5; done
18+
curl -XPOST "localhost:9200/_template/my-topic" --data '{"template":"my-topic-*","settings":{"refresh_interval":"30s","number_of_replicas":0},"mappings":{"_default_":{"_all":{"enabled":"false"},"_source":{"enabled":"true"},"properties":{"@timestamp":{"format":"epoch_millis","ignore_malformed":true,"type":"date"}},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"text","index":false}}}]}}}'
19+
docker-compose up -d producer app

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ go get -u github.com/golang/dep/...
113113
dep ensure -v
114114
```
115115

116-
To run tests, run `docker-compose up -d` and run `make test`.
116+
To run tests, run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`.
117117

118118
### Versioning
119119

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4.0
1+
0.4.1

docker-compose.yml

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ version: '2'
22
services:
33
zookeeper:
44
image: 'confluentinc/cp-zookeeper:3.1.1'
5+
container_name: zookeeper
56
environment:
67
- ZOOKEEPER_CLIENT_PORT=2181
78
kafka:
89
image: 'confluentinc/cp-kafka:3.1.1'
10+
container_name: kafka
911
environment:
1012
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
1113
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
@@ -15,6 +17,7 @@ services:
1517
- "9092:9092"
1618
schema-registry:
1719
image: 'confluentinc/cp-schema-registry:3.1.1'
20+
container_name: schema-registry
1821
environment:
1922
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
2023
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
@@ -23,5 +26,46 @@ services:
2326
- "8081:8081"
2427
elasticsearch:
2528
image: 'elasticsearch:5-alpine'
29+
container_name: elasticsearch
2630
ports:
27-
- '9200:9200'
31+
- '9200:9200'
32+
kibana:
33+
image: 'kibana:5'
34+
container_name: kibana
35+
depends_on:
36+
- elasticsearch
37+
environment:
38+
- ELASTICSEARCH_URL=http://elasticsearch:9200
39+
- SERVER_HOST=0.0.0.0
40+
ports:
41+
- "5601:5601"
42+
app:
43+
image: 'inlocomedia/kafka-elasticsearch-injector:local'
44+
container_name: app
45+
depends_on:
46+
- kafka
47+
- elasticsearch
48+
- schema-registry
49+
environment:
50+
- KAFKA_ADDRESS=kafka:9092
51+
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
52+
- KAFKA_TOPICS=my-topic
53+
- KAFKA_CONSUMER_GROUP=my-topic-es-injector
54+
- ELASTICSEARCH_HOST=http://elasticsearch:9200
55+
- PROBES_PORT=5000
56+
- K8S_LIVENESS_ROUTE=/liveness
57+
- K8S_READINESS_ROUTE=/readiness
58+
- KAFKA_CONSUMER_CONCURRENCY=10
59+
- KAFKA_CONSUMER_BATCH_SIZE=10
60+
- METRICS_PORT=9102
61+
- LOG_LEVEL=DEBUG
62+
producer:
63+
image: 'inlocomedia/kafka-elasticsearch-injector:producer-local'
64+
container_name: producer
65+
depends_on:
66+
- kafka
67+
- schema-registry
68+
environment:
69+
- KAFKA_ADDRESS=kafka:9092
70+
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
71+
- LOG_LEVEL=DEBUG

src/kafka/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88

99
"sync"
1010

11-
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1211
"github.com/Shopify/sarama"
1312
"github.com/bsm/sarama-cluster"
1413
"github.com/go-kit/kit/endpoint"
1514
"github.com/go-kit/kit/log"
1615
"github.com/go-kit/kit/log/level"
16+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1717
)
1818

1919
type Notification int32

src/kafka/consumer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ import (
1212

1313
"encoding/json"
1414

15+
"github.com/Shopify/sarama"
16+
"github.com/go-kit/kit/endpoint"
1517
"github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
1618
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
1719
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
1820
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1921
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
20-
"github.com/Shopify/sarama"
21-
"github.com/go-kit/kit/endpoint"
2222
"github.com/olivere/elastic"
2323
"github.com/stretchr/testify/assert"
2424
)
@@ -102,7 +102,7 @@ func TestKafka_Start(t *testing.T) {
102102
var msg *sarama.ProducerMessage
103103
if assert.NoError(t, err) {
104104

105-
err = producer.Publish(&rec)
105+
err = producer.Publish(rec)
106106
if assert.NoError(t, err) {
107107
msg = <-producer.GetSuccesses()
108108
} else {

src/kafka/fixtures/fixtures.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55

66
"math/rand"
77

8-
"github.com/inloco/kafka-elasticsearch-injector/src/models"
98
"github.com/inloco/goavro"
9+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1010
)
1111

1212
const DefaultTopic = "my-topic"
@@ -33,8 +33,8 @@ func (r *FixtureRecord) ToAvroSerialization() ([]byte, error) {
3333
return codec.BinaryFromNative(nil, map[string]interface{}{"id": r.Id})
3434
}
3535

36-
func NewFixtureRecord() FixtureRecord {
37-
return FixtureRecord{Id: rand.Int31()}
36+
func NewFixtureRecord() *FixtureRecord {
37+
return &FixtureRecord{Id: rand.Int31()}
3838
}
3939

4040
func NewRecord(ts time.Time) (*models.Record, int32, int32) {

util/producer/Dockerfile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FROM alpine:latest
2+
RUN apk add --update ca-certificates
3+
COPY bin/producer /
4+
RUN chmod +x producer
5+
ENTRYPOINT ["/producer"]

util/producer/producer.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"github.com/Shopify/sarama"
5+
"github.com/go-kit/kit/log/level"
6+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
7+
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
8+
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
9+
"os"
10+
"os/signal"
11+
"syscall"
12+
"time"
13+
)
14+
15+
func main() {
16+
logger := logger_builder.NewLogger("test-producer")
17+
registry, err := schema_registry.NewSchemaRegistry(os.Getenv("SCHEMA_REGISTRY_URL"))
18+
if err != nil {
19+
panic(err)
20+
}
21+
config := sarama.NewConfig()
22+
config.Producer.Return.Successes = true
23+
config.Producer.MaxMessageBytes = 20 * 1024 * 1024 // 20mb
24+
config.Producer.Flush.Frequency = 1 * time.Millisecond
25+
config.Version = sarama.V0_10_0_0 // This version is the same as in production
26+
producer, err := fixtures.NewProducer(os.Getenv("KAFKA_ADDRESS"), config, registry)
27+
if err != nil {
28+
panic(err)
29+
}
30+
numProducers := 1
31+
for i := 0; i < numProducers; i++ {
32+
go func() {
33+
for {
34+
rec := fixtures.NewFixtureRecord()
35+
err := producer.Publish(rec)
36+
if err != nil {
37+
level.Error(logger).Log("msg", "error publishing to kafka", "err", err)
38+
continue
39+
}
40+
msg := <-producer.GetSuccesses()
41+
level.Debug(logger).Log("msg", "produced!", "value", rec, "msg", *msg)
42+
}
43+
}()
44+
}
45+
signals := make(chan os.Signal, 1)
46+
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
47+
for {
48+
select {
49+
case <-signals:
50+
return
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)