Skip to content

Commit 09d5f92

Browse files
committed
kafka parse timestamp
1 parent 86a0e2a commit 09d5f92

File tree

4 files changed

+79
-12
lines changed

4 files changed

+79
-12
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.api.common.typeinfo.Types;
2729
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2830
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2931
import org.apache.flink.metrics.MetricGroup;
3032
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3134
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3235
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3336
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,7 +46,11 @@
4346

4447
import java.io.IOException;
4548
import java.lang.reflect.Field;
49+
import java.sql.Date;
50+
import java.sql.Time;
51+
import java.sql.Timestamp;
4652
import java.util.Iterator;
53+
import java.util.List;
4754
import java.util.Map;
4855
import java.util.Set;
4956

@@ -62,7 +69,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6269

6370
private static final long serialVersionUID = 2385115520960444192L;
6471

65-
private static int rowLenth = 1000;
72+
private static int dirtyDataFrequency = 1000;
6673

6774
private final ObjectMapper objectMapper = new ObjectMapper();
6875

@@ -112,7 +119,7 @@ public Row deserialize(byte[] message) throws IOException {
112119

113120
try {
114121
JsonNode root = objectMapper.readTree(message);
115-
if (numInRecord.getCount()%rowLenth == 0){
122+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
116123
LOG.info(root.toString());
117124
}
118125

@@ -134,17 +141,18 @@ public Row deserialize(byte[] message) throws IOException {
134141
}
135142
} else {
136143
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
144+
Object value = convert(node, fieldTypes[i]);
138145
row.setField(i, value);
139146
}
140147
}
141148

142149
numInResolveRecord.inc();
143150
return row;
144-
} catch (Throwable t) {
151+
} catch (Exception e) {
145152
//add metric of dirty data
146-
if (dirtyDataCounter.getCount()%rowLenth == 0){
153+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0 || LOG.isDebugEnabled()) {
147154
LOG.info("dirtyData: " + new String(message));
155+
LOG.info(" " ,e);
148156
}
149157
dirtyDataCounter.inc();
150158
return null;
@@ -243,4 +251,31 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
243251
private static String partitionLagMetricName(TopicPartition tp) {
244252
return tp + ".records-lag";
245253
}
254+
255+
private Object convert(JsonNode node, TypeInformation<?> info) {
256+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
257+
return node.asBoolean();
258+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
259+
return node.asText();
260+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
261+
return Date.valueOf(node.asText());
262+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
263+
// local zone
264+
return Time.valueOf(node.asText());
265+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
266+
// local zone
267+
return Timestamp.valueOf(node.asText());
268+
} else {
269+
// for types that were specified without JSON schema
270+
// e.g. POJOs
271+
try {
272+
return objectMapper.treeToValue(node, info.getTypeClass());
273+
} catch (JsonProcessingException e) {
274+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
275+
}
276+
}
277+
}
278+
279+
280+
246281
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.api.common.typeinfo.Types;
2729
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2830
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2931
import org.apache.flink.metrics.MetricGroup;
3032
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3134
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3235
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3336
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,7 +46,11 @@
4346

4447
import java.io.IOException;
4548
import java.lang.reflect.Field;
49+
import java.sql.Date;
50+
import java.sql.Time;
51+
import java.sql.Timestamp;
4652
import java.util.Iterator;
53+
import java.util.List;
4754
import java.util.Map;
4855
import java.util.Set;
4956

@@ -62,7 +69,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6269

6370
private static final long serialVersionUID = 2385115520960444192L;
6471

65-
private static int rowLenth = 1000;
72+
private static int dirtyDataFrequency = 1000;
6673

6774
private final ObjectMapper objectMapper = new ObjectMapper();
6875

@@ -112,7 +119,7 @@ public Row deserialize(byte[] message) throws IOException {
112119
try {
113120
JsonNode root = objectMapper.readTree(message);
114121

115-
if (numInRecord.getCount()%rowLenth == 0){
122+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
116123
LOG.info(root.toString());
117124
}
118125

@@ -134,17 +141,18 @@ public Row deserialize(byte[] message) throws IOException {
134141
}
135142
} else {
136143
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
144+
Object value = convert(node, fieldTypes[i]);
138145
row.setField(i, value);
139146
}
140147
}
141148

142149
numInResolveRecord.inc();
143150
return row;
144-
} catch (Throwable t) {
151+
} catch (Exception e) {
145152
//add metric of dirty data
146-
if (dirtyDataCounter.getCount()%rowLenth == 0){
153+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0 || LOG.isDebugEnabled()) {
147154
LOG.info("dirtyData: " + new String(message));
155+
LOG.error(" ", e);
148156
}
149157
dirtyDataCounter.inc();
150158
return null;
@@ -244,4 +252,28 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
244252
private static String partitionLagMetricName(TopicPartition tp) {
245253
return tp + ".records-lag";
246254
}
255+
256+
private Object convert(JsonNode node, TypeInformation<?> info) {
257+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
258+
return node.asBoolean();
259+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
260+
return node.asText();
261+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
262+
return Date.valueOf(node.asText());
263+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
264+
// local zone
265+
return Time.valueOf(node.asText());
266+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
267+
// local zone
268+
return Timestamp.valueOf(node.asText());
269+
} else {
270+
// for types that were specified without JSON schema
271+
// e.g. POJOs
272+
try {
273+
return objectMapper.treeToValue(node, info.getTypeClass());
274+
} catch (JsonProcessingException e) {
275+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
276+
}
277+
}
278+
}
247279
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public Row deserialize(byte[] message) throws IOException {
155155
return row;
156156
} catch (Exception e) {
157157
//add metric of dirty data
158-
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
158+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0 || LOG.isDebugEnabled()) {
159159
LOG.info("dirtyData: " + new String(message));
160160
LOG.error("" , e);
161161
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ private void writeSingleRecord(Row row) {
204204
dbConn.commit();
205205
} catch (SQLException e) {
206206
outDirtyRecords.inc();
207-
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0) {
207+
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0 || LOG.isDebugEnabled()) {
208208
LOG.error("record insert failed ..", row.toString());
209209
LOG.error("", e);
210210
}

0 commit comments

Comments
 (0)