Skip to content

Commit 8882e8d

Browse files
committed
kafkasource parallelism
1 parent 7302b16 commit 8882e8d

File tree

3 files changed

+3
-3
lines changed
  • kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka

3 files changed

+3
-3
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
112112

113113
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
114114
Integer parallelism = kafkaSourceTableInfo.getParallelism();
115-
if (parallelism != null) {
115+
if (parallelism > 0) {
116116
kafkaSource.setParallelism(parallelism);
117117
}
118118
return tableEnv.fromDataStream(kafkaSource, fields);

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
112112

113113
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
114114
Integer parallelism = kafkaSourceTableInfo.getParallelism();
115-
if (parallelism != null) {
115+
if (parallelism > 0) {
116116
kafkaSource.setParallelism(parallelism);
117117
}
118118
return tableEnv.fromDataStream(kafkaSource, fields);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
115115

116116
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
117117
Integer parallelism = kafkaSourceTableInfo.getParallelism();
118-
if (parallelism != null) {
118+
if (parallelism > 0) {
119119
kafkaSource.setParallelism(parallelism);
120120
}
121121
return tableEnv.fromDataStream(kafkaSource, fields);

0 commit comments

Comments
 (0)