Skip to content

Commit e6b3ab1

Browse files
committed
bugfix serialize
1 parent 6eb2bb5 commit e6b3ab1

File tree

4 files changed

+14
-8
lines changed

4 files changed

+14
-8
lines changed

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumerFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import org.apache.flink.types.Row;
2525
import org.apache.kafka.common.requests.IsolationLevel;
2626

27+
import java.io.Serializable;
2728
import java.util.Properties;
2829
import java.util.regex.Pattern;
2930

3031
/**
3132
* company: www.dtstack.com
33+
*
3234
* @author: toutian
3335
* create: 2019/12/24
3436
*/
@@ -38,9 +40,9 @@ public class KafkaConsumerFactory extends AbstractKafkaConsumerFactory {
3840
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
3941
KafkaConsumer kafkaSrc = null;
4042
if (kafkaSourceTableInfo.getTopicIsPattern()) {
41-
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
43+
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
4244
} else {
43-
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
45+
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
4446
}
4547
return kafkaSrc;
4648
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09Factory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
2424
import org.apache.flink.types.Row;
2525

26+
import java.io.Serializable;
2627
import java.util.Properties;
2728
import java.util.regex.Pattern;
2829

2930
/**
3031
* company: www.dtstack.com
32+
*
3133
* @author: toutian
3234
* create: 2019/12/24
3335
*/
@@ -37,9 +39,9 @@ public class KafkaConsumer09Factory extends AbstractKafkaConsumerFactory {
3739
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
3840
KafkaConsumer09 kafkaSrc = null;
3941
if (kafkaSourceTableInfo.getTopicIsPattern()) {
40-
kafkaSrc = new KafkaConsumer09(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> 0L), props);
42+
kafkaSrc = new KafkaConsumer09(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L), props);
4143
} else {
42-
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> 0L), props);
44+
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L), props);
4345
}
4446
return kafkaSrc;
4547
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer010Factory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.types.Row;
2525
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
2626

27+
import java.io.Serializable;
2728
import java.util.Properties;
2829
import java.util.regex.Pattern;
2930

@@ -38,9 +39,9 @@ public class KafkaConsumer010Factory extends AbstractKafkaConsumerFactory {
3839
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
3940
KafkaConsumer010 kafkaSrc = null;
4041
if (kafkaSourceTableInfo.getTopicIsPattern()) {
41-
kafkaSrc = new KafkaConsumer010(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, SubscriptionState::partitionLag), props);
42+
kafkaSrc = new KafkaConsumer010(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag), props);
4243
} else {
43-
kafkaSrc = new KafkaConsumer010(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, SubscriptionState::partitionLag), props);
44+
kafkaSrc = new KafkaConsumer010(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) SubscriptionState::partitionLag), props);
4445
}
4546
return kafkaSrc;
4647
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011Factory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.types.Row;
2525
import org.apache.kafka.common.requests.IsolationLevel;
2626

27+
import java.io.Serializable;
2728
import java.util.Properties;
2829
import java.util.regex.Pattern;
2930

@@ -38,9 +39,9 @@ public class KafkaConsumer011Factory extends AbstractKafkaConsumerFactory {
3839
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
3940
KafkaConsumer011 kafkaSrc = null;
4041
if (kafkaSourceTableInfo.getTopicIsPattern()) {
41-
kafkaSrc = new KafkaConsumer011(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
42+
kafkaSrc = new KafkaConsumer011(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
4243
} else {
43-
kafkaSrc = new KafkaConsumer011(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
44+
kafkaSrc = new KafkaConsumer011(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
4445
}
4546
return kafkaSrc;
4647
}

0 commit comments

Comments
 (0)