Skip to content

Commit 6b20f12

Browse files
committed
kafka source parallelsim
1 parent 834f835 commit 6b20f12

File tree

5 files changed

+13
-8
lines changed
  • kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka

5 files changed

+13
-8
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2930
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
3031
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3132
import org.apache.flink.table.api.Table;
@@ -79,6 +80,12 @@ protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaS
7980
return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
8081
}
8182

83+
protected void setParallelism(Integer parallelism, DataStreamSource kafkaSource) {
84+
if (parallelism > 0) {
85+
kafkaSource.setParallelism(parallelism);
86+
}
87+
}
88+
8289
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
8390
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
8491
kafkaSrc.setStartFromEarliest();

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
4949

5050
String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName);
5151
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
52-
kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism());
5352

53+
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
5454
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
5555
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
5656

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,14 @@
2121

2222
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2323
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
24-
import com.dtstack.flink.sql.util.DtStringUtil;
2524
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.flink.api.common.typeinfo.TypeInformation;
2726
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2827
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2928
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
30-
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3129
import org.apache.flink.table.api.Table;
3230
import org.apache.flink.table.api.java.StreamTableEnvironment;
3331
import org.apache.flink.types.Row;
34-
35-
import java.util.Map;
3632
import java.util.Properties;
3733

3834
/**
@@ -54,11 +50,13 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5450

5551
String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName);
5652
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
57-
kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism());
5853

54+
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
5955
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
6056
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
6157

6258
return tableEnv.fromDataStream(kafkaSource, fields);
6359
}
60+
61+
6462
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5353

5454
String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName);
5555
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
56-
kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism());
5756

57+
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
5858
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
5959
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
6060

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5252

5353
String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName);
5454
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
55-
kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism());
5655

56+
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
5757
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
5858
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
5959

0 commit comments

Comments
 (0)