Skip to content

Commit 160ab83

Browse files
committed
bugfix sink initMetric
1 parent e6b3ab1 commit 160ab83

File tree

5 files changed

+4
-1
lines changed

5 files changed

+4
-1
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
3939
kafkaSinkTableInfo.setName(tableName);
4040
kafkaSinkTableInfo.setType(MathUtil.getString(props.get(KafkaSinkTableInfo.TYPE_KEY.toLowerCase())));
4141
parseFieldsInfo(fieldsInfo, kafkaSinkTableInfo);
42-
kafkaSinkTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase())));
4342

4443
if (props.get(KafkaSinkTableInfo.SINK_DATA_TYPE) != null) {
4544
kafkaSinkTableInfo.setSinkDataType(props.get(KafkaSinkTableInfo.SINK_DATA_TYPE).toString());

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public KafkaProducer(String topicId, SerializationSchema<Row> serializationSchem
4848
public void open(Configuration configuration) throws Exception {
4949
RuntimeContext runtimeContext = getRuntimeContext();
5050
serializationMetricWrapper.setRuntimeContext(runtimeContext);
51+
serializationMetricWrapper.initMetric();
5152
super.open(configuration);
5253
}
5354

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public KafkaProducer09(String topicId, SerializationSchema<Row> serializationSch
4848
public void open(Configuration configuration) {
4949
RuntimeContext runtimeContext = getRuntimeContext();
5050
serializationMetricWrapper.setRuntimeContext(runtimeContext);
51+
serializationMetricWrapper.initMetric();
5152
super.open(configuration);
5253
}
5354

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public KafkaProducer010(String topicId, SerializationSchema<Row> serializationSc
4848
public void open(Configuration configuration) {
4949
RuntimeContext runtimeContext = getRuntimeContext();
5050
serializationMetricWrapper.setRuntimeContext(runtimeContext);
51+
serializationMetricWrapper.initMetric();
5152
super.open(configuration);
5253
}
5354

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public KafkaProducer011(String topicId, SerializationSchema<Row> serializationSc
4949
public void open(Configuration configuration) throws Exception {
5050
RuntimeContext runtimeContext = getRuntimeContext();
5151
serializationMetricWrapper.setRuntimeContext(runtimeContext);
52+
serializationMetricWrapper.initMetric();
5253
super.open(configuration);
5354
}
5455

0 commit comments

Comments
 (0)