Skip to content

Commit 5053fc4

Browse files
committed
dirty data manager
1 parent d2772f5 commit 5053fc4

File tree

19 files changed

+231
-113
lines changed

19 files changed

+231
-113
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.slf4j.Logger;
7878
import org.slf4j.LoggerFactory;
7979
import java.io.File;
80+
import java.io.IOException;
8081
import java.lang.reflect.Field;
8182
import java.lang.reflect.InvocationTargetException;
8283
import java.lang.reflect.Method;
@@ -85,6 +86,7 @@
8586
import java.net.URLDecoder;
8687
import java.util.List;
8788
import java.util.Map;
89+
import java.util.Optional;
8890
import java.util.Properties;
8991
import java.util.Set;
9092
import java.util.concurrent.TimeUnit;
@@ -131,7 +133,8 @@ public static void main(String[] args) throws Exception {
131133
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
132134

133135
dirtyProp = URLDecoder.decode(dirtyProp, Charsets.UTF_8.toString());
134-
Map dirtyCofig = PluginUtil.jsonStrToObject(dirtyProp, Map.class);
136+
// set DirtyDataManager dirtyconfig
137+
DirtyConfig dirtyConfig = getDirtyDataManagerDirtyConfig(dirtyProp);
135138

136139
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
137140
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
@@ -149,12 +152,11 @@ public static void main(String[] args) throws Exception {
149152

150153
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
151154
Map<String, Table> registerTableCache = Maps.newHashMap();
152-
// set DirtyDataManager hadoopconfig
153-
setDirtyDataManagerHadoopConfig(dirtyCofig);
155+
154156
//register udf
155157
registerUDF(sqlTree, jarURList, tableEnv);
156158
//register table schema
157-
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
159+
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache, dirtyConfig);
158160

159161
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache, queryConfig);
160162

@@ -165,16 +167,9 @@ public static void main(String[] args) throws Exception {
165167
env.execute(name);
166168
}
167169

168-
private static void setDirtyDataManagerHadoopConfig(Map dirtyCofig) {
169-
DirtyConfig dirtyConfig = new DirtyConfig(dirtyCofig);
170-
String path = dirtyConfig.getPath();
171-
if (!StringUtils.isEmpty(path)) {
172-
DirtyDataManager.savePath = path;
173-
}
174-
Map<String, Object> hadoopConfig = dirtyConfig.getHadoopConfig();
175-
if (!MapUtils.isEmpty(hadoopConfig)) {
176-
DirtyDataManager.hadoopConfig = hadoopConfig;
177-
}
170+
private static DirtyConfig getDirtyDataManagerDirtyConfig(String dirtyProp) throws IOException {
171+
Map dirtyCofig = PluginUtil.jsonStrToObject(dirtyProp, Map.class);
172+
return dirtyCofig.size() == 0 ? null : new DirtyConfig(dirtyCofig);
178173
}
179174

180175
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
@@ -256,14 +251,16 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
256251

257252

258253
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
259-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
254+
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap,
255+
Map<String, Table> registerTableCache, DirtyConfig dirtyConfig) throws Exception {
260256
Set<URL> classPathSet = Sets.newHashSet();
261257
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
262258
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
263259

264260
if (tableInfo instanceof SourceTableInfo) {
265261

266262
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
263+
sourceTableInfo.setDirtyConfig(dirtyConfig);
267264
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
268265
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
269266
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
@@ -293,7 +290,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
293290
registerTableCache.put(tableInfo.getName(), regTable);
294291
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
295292
} else if (tableInfo instanceof TargetTableInfo) {
296-
293+
tableInfo.setDirtyConfig(dirtyConfig);
297294
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
298295
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
299296
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class ConfigConstrant {
6161
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6262
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6363

64+
public static final String METRIC_JOB_ID = "<job_id>";
6465

6566
// restart plocy
6667
public static final int failureRate = 3;

core/src/main/java/com/dtstack/flink/sql/dirty/DirtyDataManager.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@
1818

1919
package com.dtstack.flink.sql.dirty;
2020

21+
import com.dtstack.flink.sql.config.DirtyConfig;
2122
import com.dtstack.flink.sql.exception.ParseOrWriteRecordException;
2223
import com.dtstack.flink.sql.util.DateUtil;
2324
import com.dtstack.flink.sql.util.FileSystemUtil;
2425
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import org.apache.commons.collections.MapUtils;
2527
import org.apache.commons.lang3.StringUtils;
26-
import org.apache.flink.types.Row;
2728
import org.apache.hadoop.fs.FSDataOutputStream;
28-
import org.apache.hadoop.fs.FSOutputSummer;
2929
import org.apache.hadoop.fs.FileSystem;
3030
import org.apache.hadoop.fs.Path;
3131
import org.apache.hadoop.hdfs.DFSOutputStream;
3232
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
3333

3434
import java.io.IOException;
35-
import java.io.Serializable;
3635
import java.util.Date;
3736
import java.util.EnumSet;
3837
import java.util.Map;
@@ -45,42 +44,70 @@
4544
* Company: www.dtstack.com
4645
* @author huyifan.zju@163.com
4746
*/
48-
public class DirtyDataManager implements Serializable{
47+
public class DirtyDataManager {
48+
4949
private static final EnumSet<HdfsDataOutputStream.SyncFlag> syncFlags = EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH);
5050

5151
private static final String FIELD_DELIMITER = "\u0001";
5252
private static final String LINE_DELIMITER = "\n";
5353

5454
private static final ObjectMapper objMapper = new ObjectMapper();
55-
public static Map<String, Object> hadoopConfig = null;
56-
public static String savePath = "/default";
55+
private Optional<FSDataOutputStream> fsOutputStream;
56+
private Map<String, Object> hadoopConfig;
57+
private String defaultSavePath = "/dirtydata";
58+
59+
public DirtyDataManager(DirtyConfig dirtyConfig) {
60+
String path = dirtyConfig.getPath();
61+
if (!StringUtils.isEmpty(path)) {
62+
this.defaultSavePath = path;
63+
}
5764

58-
public static FSDataOutputStream createFileSystem(String prefix, String jobId) {
65+
Map<String, Object> hadoopConfig = dirtyConfig.getHadoopConfig();
66+
if (!MapUtils.isEmpty(hadoopConfig)) {
67+
this.hadoopConfig = hadoopConfig;
68+
}
69+
}
70+
71+
public Optional<FSDataOutputStream> createFsOutputStream(String prefix, String jobId) {
5972
if (null == hadoopConfig) {
60-
return null;
73+
return Optional.empty();
6174
}
6275

6376
try {
64-
String location = savePath + "/" + prefix + "_" + UUID.randomUUID() + ".txt";
77+
String location = defaultSavePath + "/" + jobId + "/" + prefix + "_" + UUID.randomUUID() + ".txt";
6578
FileSystem fs = FileSystemUtil.getFileSystem(hadoopConfig, null, jobId, "dirty");
6679
Path dataSavePath = new Path(location);
67-
return fs.create(dataSavePath, true);
80+
fsOutputStream = Optional.of(fs.create(dataSavePath, true));
81+
return fsOutputStream;
6882
} catch (Exception e) {
6983
throw new RuntimeException("Open dirty manager error", e);
7084
}
7185
}
7286

7387

74-
public static void writeData(FSDataOutputStream stream, String content, ParseOrWriteRecordException ex) throws IOException {
88+
public void writeData(String content, ParseOrWriteRecordException ex) {
7589
try {
76-
String line = StringUtils.join(new String[]{content, objMapper.writeValueAsString(ex.toString()), DateUtil.timestampToString(new Date())}, FIELD_DELIMITER);
77-
stream.writeChars(line);
78-
stream.writeChars(LINE_DELIMITER);
79-
DFSOutputStream dfsOutputStream = (DFSOutputStream) stream.getWrappedStream();
80-
dfsOutputStream.hsync(syncFlags);
90+
if (fsOutputStream.isPresent()) {
91+
String line = StringUtils.join(new String[]{content, objMapper.writeValueAsString(ex.toString()), DateUtil.timestampToString(new Date())}, FIELD_DELIMITER);
92+
fsOutputStream.get().writeChars(line);
93+
fsOutputStream.get().writeChars(LINE_DELIMITER);
94+
DFSOutputStream dfsOutputStream = (DFSOutputStream) fsOutputStream.get().getWrappedStream();
95+
dfsOutputStream.hsync(syncFlags);
96+
}
8197
} catch (IOException e) {
8298
throw new RuntimeException(e);
8399
}
84100
}
85101

102+
public void close() {
103+
if (fsOutputStream.isPresent()) {
104+
try {
105+
fsOutputStream.get().flush();
106+
fsOutputStream.get().close();
107+
} catch (IOException e) {
108+
throw new RuntimeException(e);
109+
}
110+
}
111+
}
112+
86113
}

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,86 @@
1717
*/
1818
package com.dtstack.flink.sql.sink;
1919

20+
import com.dtstack.flink.sql.config.DirtyConfig;
21+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22+
import com.dtstack.flink.sql.dirty.DirtyDataManager;
23+
import com.dtstack.flink.sql.exception.ParseOrWriteRecordException;
2024
import com.dtstack.flink.sql.metric.MetricConstant;
2125
import org.apache.flink.api.java.tuple.Tuple2;
22-
2326
import org.apache.flink.api.common.io.RichOutputFormat;
2427
import org.apache.flink.metrics.Counter;
2528
import org.apache.flink.metrics.Meter;
2629
import org.apache.flink.metrics.MeterView;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.IOException;
34+
import java.util.Map;
2735

2836
/**
2937
* Created by sishu.yss on 2018/11/28.
3038
*/
31-
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
39+
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2> {
40+
private static final Logger LOG = LoggerFactory.getLogger(MetricOutputFormat.class);
41+
42+
private static final String SINK_DIRTYDATA_PREFIX = "sink";
43+
44+
protected transient Counter outRecords;
45+
46+
protected transient Counter outDirtyRecords;
3247

33-
protected transient Counter outRecords;
48+
protected transient Meter outRecordsRate;
3449

35-
protected transient Counter outDirtyRecords;
50+
protected int dirtyDataPrintFrequency = 1000;
51+
protected int receiveDataPrintFrequency = 1000;
52+
protected DirtyConfig dirtyConfig;
53+
protected DirtyDataManager dirtyDataManager;
3654

37-
protected transient Meter outRecordsRate;
55+
protected String jobId;
3856

39-
public void initMetric() {
57+
public void initMetric() {
4058
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
4159
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
4260
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
43-
}
61+
}
62+
63+
64+
public void initDirtyDataOutputStream() {
65+
if (null != dirtyConfig) {
66+
Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
67+
if (vars != null && vars.get(ConfigConstrant.METRIC_JOB_ID) != null) {
68+
jobId = vars.get(ConfigConstrant.METRIC_JOB_ID);
69+
}
70+
dirtyDataManager = new DirtyDataManager(dirtyConfig);
71+
dirtyDataManager.createFsOutputStream(SINK_DIRTYDATA_PREFIX, jobId);
72+
}
73+
}
74+
75+
/**
76+
* 对json解析失败时的异常处理
77+
* @param message
78+
* @param e
79+
* @throws IOException
80+
*/
81+
protected void dealInsertError(String message, Exception e) {
82+
if (null != dirtyDataManager) {
83+
dirtyDataManager.writeData(new String(message), new ParseOrWriteRecordException(e.getMessage(), e));
84+
}
85+
86+
if (null == dirtyDataManager && (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0)) {
87+
LOG.info("record insert failed .." + new String(message));
88+
LOG.error("", e);
89+
}
90+
outDirtyRecords.inc();
91+
}
92+
93+
public void closeFsDataOutputStream() {
94+
if (null != dirtyDataManager) {
95+
dirtyDataManager.close();
96+
}
97+
}
4498

99+
public void setDirtyConfig(DirtyConfig dirtyConfig) {
100+
this.dirtyConfig = dirtyConfig;
101+
}
45102
}

core/src/main/java/com/dtstack/flink/sql/source/AbsDeserialization.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
package com.dtstack.flink.sql.source;
2020

21+
import com.dtstack.flink.sql.config.DirtyConfig;
22+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2123
import com.dtstack.flink.sql.dirty.DirtyDataManager;
24+
import com.dtstack.flink.sql.exception.ParseOrWriteRecordException;
2225
import com.dtstack.flink.sql.metric.MetricConstant;
2326
import org.apache.flink.api.common.functions.RuntimeContext;
2427
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2528
import org.apache.flink.metrics.Counter;
2629
import org.apache.flink.metrics.Meter;
2730
import org.apache.flink.metrics.MeterView;
28-
import org.apache.hadoop.fs.FSDataOutputStream;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
2933

3034
import java.io.IOException;
3135
import java.util.Map;
@@ -40,9 +44,9 @@
4044

4145
public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {
4246

47+
private static final Logger LOG = LoggerFactory.getLogger(AbsDeserialization.class);
4348
private static final long serialVersionUID = 2176278128811784415L;
4449

45-
private static final String JOB_ID = "<job_id>";
4650
private static final String SOURCE_DIRTYDATA_PREFIX = "source";
4751

4852
private transient RuntimeContext runtimeContext;
@@ -63,7 +67,10 @@ public abstract class AbsDeserialization<T> extends AbstractDeserializationSchem
6367

6468
protected transient Meter numInBytesRate;
6569

66-
protected FSDataOutputStream fsDataOutputStream;
70+
protected DirtyConfig dirtyConfig;
71+
protected DirtyDataManager dirtyDataManager;
72+
73+
protected int dirtyDataFrequency = 1000;
6774

6875
protected String jobId;
6976

@@ -89,18 +96,41 @@ public void initMetric() {
8996
}
9097

9198
public void initDirtyDataOutputStream() {
92-
Map<String, String> vars = runtimeContext.getMetricGroup().getAllVariables();
93-
if (vars != null && vars.get(JOB_ID) != null) {
94-
jobId = vars.get(JOB_ID);
99+
if (null != dirtyConfig) {
100+
Map<String, String> vars = runtimeContext.getMetricGroup().getAllVariables();
101+
if (vars != null && vars.get(ConfigConstrant.METRIC_JOB_ID) != null) {
102+
jobId = vars.get(ConfigConstrant.METRIC_JOB_ID);
103+
}
104+
dirtyDataManager = new DirtyDataManager(dirtyConfig);
105+
dirtyDataManager.createFsOutputStream(SOURCE_DIRTYDATA_PREFIX, jobId);
95106
}
96-
fsDataOutputStream = DirtyDataManager.createFileSystem(SOURCE_DIRTYDATA_PREFIX, jobId);
97107
}
98108

99-
public void closefsDataOutputStream() throws IOException {
100-
if (null != fsDataOutputStream) {
101-
fsDataOutputStream.close();
109+
/**
110+
* 对json解析失败时的异常处理
111+
* @param message
112+
* @param e
113+
* @throws IOException
114+
*/
115+
protected void dealParseError(byte[] message, Exception e) throws IOException {
116+
if (null != dirtyDataManager) {
117+
dirtyDataManager.writeData(new String(message), new ParseOrWriteRecordException(e.getMessage(), e));
118+
}
119+
120+
if (null == dirtyDataManager && (dirtyDataCounter.getCount() % dirtyDataFrequency == 0)) {
121+
LOG.info("dirtyData: " + new String(message));
122+
LOG.error("", e);
102123
}
124+
dirtyDataCounter.inc();
103125
}
104126

127+
public void closefsDataOutputStream() {
128+
if (null != dirtyDataManager) {
129+
dirtyDataManager.close();
130+
}
131+
}
105132

133+
public void setDirtyConfig(DirtyConfig dirtyConfig) {
134+
this.dirtyConfig = dirtyConfig;
135+
}
106136
}

0 commit comments

Comments
 (0)