Skip to content

Commit 6eb2bb5

Browse files
committed
opt all over
1 parent 3cfe939 commit 6eb2bb5

File tree

44 files changed

+433
-2395
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+433
-2395
lines changed

core/src/main/java/com/dtstack/flink/sql/format/JsonDataParser.java

Lines changed: 0 additions & 173 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package com.dtstack.flink.sql.format;
2020

21+
import com.dtstack.flink.sql.metric.MetricConstant;
2122
import org.apache.flink.api.common.functions.RuntimeContext;
2223
import org.apache.flink.api.common.serialization.SerializationSchema;
24+
import org.apache.flink.metrics.Counter;
25+
import org.apache.flink.metrics.Meter;
26+
import org.apache.flink.metrics.MeterView;
2327
import org.apache.flink.types.Row;
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
2628

2729

2830
/**
@@ -34,20 +36,24 @@
3436
*/
3537
public class SerializationMetricWrapper implements SerializationSchema<Row> {
3638

37-
private static final Logger LOG = LoggerFactory.getLogger(SerializationMetricWrapper.class);
38-
3939
private SerializationSchema<Row> serializationSchema;
4040

4141
private transient RuntimeContext runtimeContext;
4242

43+
protected transient Counter dtNumRecordsOut;
44+
45+
protected transient Meter dtNumRecordsOutRate;
46+
47+
4348
public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
4449
this.serializationSchema = serializationSchema;
4550
}
4651

4752
public void initMetric() {
53+
dtNumRecordsOut = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
54+
dtNumRecordsOutRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(dtNumRecordsOut, 20));
4855
}
4956

50-
5157
@Override
5258
public byte[] serialize(Row element) {
5359
beforeSerialize();
@@ -60,6 +66,7 @@ protected void beforeSerialize() {
6066
}
6167

6268
protected void afterSerialize() {
69+
dtNumRecordsOut.inc();
6370
}
6471

6572
public RuntimeContext getRuntimeContext() {

core/src/main/java/com/dtstack/flink/sql/table/TargetTableInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23+
import com.dtstack.flink.sql.format.FormatType;
24+
2325
/**
2426
* Reason:
2527
* Date: 2018/6/25
@@ -33,7 +35,7 @@ public abstract class TargetTableInfo extends TableInfo {
3335

3436
public static final String SINK_DATA_TYPE = "sinkdatatype";
3537

36-
private String sinkDataType = "json";
38+
private String sinkDataType = FormatType.JSON.name();
3739

3840
public String getSinkDataType() {
3941
return sinkDataType;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import com.dtstack.flink.sql.format.FormatType;
21+
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.serialization.SerializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
27+
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
28+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
29+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
31+
import org.apache.flink.types.Row;
32+
33+
import java.util.Optional;
34+
import java.util.Properties;
35+
36+
/**
37+
* company: www.dtstack.com
38+
*
39+
* @author: toutian
40+
* create: 2019/12/26
41+
*/
42+
public abstract class AbstractKafkaProducerFactory {
43+
44+
public abstract RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner);
45+
46+
protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
47+
return new SerializationMetricWrapper(createSerializationSchema(kafkaSinkTableInfo, typeInformation));
48+
}
49+
50+
private SerializationSchema<Row> createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
51+
SerializationSchema<Row> serializationSchema = null;
52+
if (FormatType.JSON.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
53+
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getSchemaString())) {
54+
serializationSchema = new JsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
55+
} else if (typeInformation != null && typeInformation.getArity() != 0) {
56+
serializationSchema = new JsonRowSerializationSchema(typeInformation);
57+
} else {
58+
throw new IllegalArgumentException("sinkDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
59+
}
60+
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
61+
if (StringUtils.isBlank(kafkaSinkTableInfo.getFieldDelimiter())) {
62+
throw new IllegalArgumentException("sinkDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
63+
}
64+
final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInformation);
65+
serSchemaBuilder.setFieldDelimiter(kafkaSinkTableInfo.getFieldDelimiter().toCharArray()[0]);
66+
serializationSchema = serSchemaBuilder.build();
67+
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
68+
if (StringUtils.isBlank(kafkaSinkTableInfo.getSchemaString())) {
69+
throw new IllegalArgumentException("sinkDataType:" + FormatType.AVRO.name() + " must set schemaString");
70+
}
71+
serializationSchema = new AvroRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
72+
}
73+
74+
if (null == serializationSchema) {
75+
throw new UnsupportedOperationException("FormatType:" + kafkaSinkTableInfo.getSinkDataType());
76+
}
77+
return serializationSchema;
78+
}
79+
80+
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

0 commit comments

Comments
 (0)