Skip to content

Commit 3736d6b

Browse files
committed
Merge branch '1.5_3.6.1_union' into '1.5_v3.6.1'
union parse See merge request !61
2 parents f4ee386 + 4c7d979 commit 3736d6b

File tree

5 files changed

+39
-8
lines changed

5 files changed

+39
-8
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MetricConstant {
4545

4646
public static final String DT_NUM_RECORDS_OUT = "dtNumRecordsOut";
4747

48+
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
49+
4850
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
4951

5052
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
110110
aliasInfo.setAlias(alias.toString());
111111

112112
return aliasInfo;
113+
114+
case UNION:
115+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
116+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
117+
118+
parseSql(unionLeft, sideTableSet, queueInfo);
119+
120+
parseSql(unionRight, sideTableSet, queueInfo);
121+
122+
break;
113123
}
114124

115125
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,17 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
232232
throw new RuntimeException("---not deal type:" + sqlNode);
233233
}
234234

235+
break;
236+
237+
case UNION:
238+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
239+
240+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
241+
242+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
243+
244+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
245+
235246
break;
236247
default:
237248
break;

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
3434

3535
protected transient Meter outRecordsRate;
3636

37+
protected transient Counter outDirtyRecords;
38+
3739
public void initMetric() {
3840
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
41+
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
3942
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
4043
}
4144

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
4949

5050
private static final Logger LOG = LoggerFactory.getLogger(RetractJDBCOutputFormat.class);
5151

52+
private static int dirtyDataPrintFrequency = 1000;
53+
private static int receiveDataPrintFrequency = 1000;
54+
5255
private String username;
5356
private String password;
5457
private String drivername;
@@ -164,8 +167,11 @@ public void writeRecord(Tuple2 tuple2) {
164167
}
165168

166169
if (retract) {
167-
insertWrite(row);
168170
outRecords.inc();
171+
if (outRecords.getCount() % receiveDataPrintFrequency == 0) {
172+
LOG.info("Receive data : {}", row);
173+
}
174+
insertWrite(row);
169175
} else {
170176
//do nothing
171177
}
@@ -174,7 +180,6 @@ public void writeRecord(Tuple2 tuple2) {
174180

175181

176182
private void insertWrite(Row row) {
177-
System.out.println("接受到数据row:" +row );
178183
checkConnectionOpen(dbConn);
179184
try {
180185
if (batchInterval == 1) {
@@ -196,20 +201,20 @@ private void writeSingleRecord(Row row) {
196201
try {
197202
updatePreparedStmt(row, upload);
198203
upload.execute();
199-
System.out.println("单条插入成功:" + row);
204+
dbConn.commit();
200205
} catch (SQLException e) {
201-
System.out.println("单条插入失败:" + row);
202-
LOG.error("record insert failed ..", row.toString());
203-
LOG.error("", e);
206+
outDirtyRecords.inc();
207+
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0) {
208+
LOG.error("record insert failed ..", row.toString());
209+
LOG.error("", e);
210+
}
204211
}
205212
}
206213

207214
private synchronized void submitExecuteBatch() {
208215
try {
209-
LOG.info("submitExecuteBatch start......");
210216
this.upload.executeBatch();
211217
dbConn.commit();
212-
rows.forEach(row -> System.out.println("批量插入成功:"+ row));
213218
} catch (SQLException e) {
214219
try {
215220
dbConn.rollback();

0 commit comments

Comments
 (0)