Skip to content

Commit acd9335

Browse files
committed
kafka parallelism
1 parent 1764f98 commit acd9335

File tree

9 files changed

+60
-6
lines changed

9 files changed

+60
-6
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5353
/** Serialization schema for encoding records to Kafka. */
5454
protected SerializationSchema serializationSchema;
5555

56+
protected int parallelism;
57+
58+
5659
@Override
5760
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
5861
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
@@ -64,6 +67,11 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6467
}
6568
this.fieldTypes = types;
6669

70+
Integer parallelism = kafka09SinkTableInfo.getParallelism();
71+
if (parallelism != null) {
72+
this.parallelism = parallelism;
73+
}
74+
6775
properties = new Properties();
6876
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
6977
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
@@ -90,7 +98,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
9098

9199
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
92100
return record.f1;
93-
}).returns(getOutputType().getTypeAt(1));
101+
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
94102

95103
kafkaTableSink.emitDataStream(ds);
96104
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka09SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka09SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
46+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
47+
kafka09SinkTableInfo.setParallelism(parallelism);
48+
4549
for (String key : props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka09SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.api.common.functions.RuntimeContext;
3131
import org.apache.flink.api.common.typeinfo.TypeInformation;
3232
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3334
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3435
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
3536
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -115,6 +116,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
115116

116117
String fields = StringUtils.join(kafka09SourceTableInfo.getFields(), ",");
117118
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
118-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
119+
120+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
121+
Integer parallelism = kafka09SourceTableInfo.getParallelism();
122+
if (parallelism != null) {
123+
kafkaSource.setParallelism(parallelism);
124+
}
125+
return tableEnv.fromDataStream(kafkaSource, fields);
119126
}
120127
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5555

5656
protected Properties properties;
5757

58+
protected int parallelism;
59+
5860
/** Serialization schema for encoding records to Kafka. */
5961
protected SerializationSchema serializationSchema;
6062

@@ -69,6 +71,11 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6971
}
7072
this.fieldTypes = types;
7173

74+
Integer parallelism = kafka10SinkTableInfo.getParallelism();
75+
if (parallelism != null) {
76+
this.parallelism = parallelism;
77+
}
78+
7279
properties = new Properties();
7380
for (String key : kafka10SinkTableInfo.getKafkaParamKeys()) {
7481
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
@@ -95,7 +102,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
95102

96103
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
97104
return record.f1;
98-
}).returns(getOutputType().getTypeAt(1));
105+
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
99106

100107
kafkaTableSink.emitDataStream(ds);
101108
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka10SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka10SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
46+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
47+
kafka10SinkTableInfo.setParallelism(parallelism);
48+
4549
for (String key : props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka10SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.common.typeinfo.TypeInformation;
3030
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3132
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3233
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
3334
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -116,6 +117,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
116117

117118
String fields = StringUtils.join(kafka010SourceTableInfo.getFields(), ",");
118119
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
119-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
120+
121+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
122+
Integer parallelism = kafka010SourceTableInfo.getParallelism();
123+
if (parallelism != null) {
124+
kafkaSource.setParallelism(parallelism);
125+
}
126+
return tableEnv.fromDataStream(kafkaSource, fields);
120127
}
121128
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener
5151

5252
protected String topic;
5353

54+
protected int parallelism;
55+
5456
protected Properties properties;
5557

5658
/** Serialization schema for encoding records to Kafka. */
@@ -67,6 +69,11 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6769
}
6870
this.fieldTypes = types;
6971

72+
Integer parallelism = kafka11SinkTableInfo.getParallelism();
73+
if (parallelism != null) {
74+
this.parallelism = parallelism;
75+
}
76+
7077
properties = new Properties();
7178
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
7279
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
@@ -89,7 +96,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
8996
serializationSchema
9097
);
9198

92-
9399
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
94100
return record.f1;
95101
}).returns(getOutputType().getTypeAt(1));

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka11SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka11SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
46+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
47+
kafka11SinkTableInfo.setParallelism(parallelism);
48+
4549
for (String key : props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka11SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.common.typeinfo.TypeInformation;
3030
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3132
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3233
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
3334
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -117,6 +118,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
117118

118119
String fields = StringUtils.join(kafka011SourceTableInfo.getFields(), ",");
119120
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
120-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
121+
122+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
123+
Integer parallelism = kafka011SourceTableInfo.getParallelism();
124+
if (parallelism != null) {
125+
kafkaSource.setParallelism(parallelism);
126+
}
127+
return tableEnv.fromDataStream(kafkaSource, fields);
121128
}
122129
}

0 commit comments

Comments
 (0)