1919package com .dtstack .flink .sql .sink .db ;
2020
2121import com .dtstack .flink .sql .sink .rdb .dialect .JDBCDialect ;
22+ import com .dtstack .flink .sql .util .DtStringUtil ;
23+ import org .apache .commons .lang3 .StringUtils ;
2224
25+ import java .util .Arrays ;
26+ import java .util .List ;
2327import java .util .Optional ;
28+ import java .util .stream .Collectors ;
2429
2530/**
2631 * Date: 2020/1/19
@@ -43,4 +48,79 @@ public String quoteIdentifier(String identifier) {
4348 return identifier ;
4449 }
4550
51+ @ Override
52+ public Optional <String > getUpsertStatement (String schema , String tableName , String [] fieldNames , String [] uniqueKeyFields , boolean allReplace ) {
53+ tableName = DtStringUtil .getTableFullPath (schema , tableName );
54+ StringBuilder sb = new StringBuilder ();
55+ sb .append ("MERGE INTO " + tableName + " T1 USING "
56+ + "(" + buildValuesStatement (fieldNames ) + ") T2 ("
57+ + buildFiledNameStatement (fieldNames ) +
58+ ") ON ("
59+ + buildConnectionConditions (uniqueKeyFields ) + ") " );
60+
61+ String updateSql = buildUpdateConnection (fieldNames , uniqueKeyFields , allReplace );
62+
63+ if (StringUtils .isNotEmpty (updateSql )) {
64+ sb .append (" WHEN MATCHED THEN UPDATE SET " );
65+ sb .append (updateSql );
66+ }
67+
68+ sb .append (" WHEN NOT MATCHED THEN "
69+ + "INSERT (" + Arrays .stream (fieldNames ).map (this ::quoteIdentifier ).collect (Collectors .joining ("," )) + ") VALUES ("
70+ + Arrays .stream (fieldNames ).map (col -> "T2." + quoteIdentifier (col )).collect (Collectors .joining ("," )) + ")" );
71+ return Optional .of (sb .toString ());
72+ }
73+
74+ /**
75+ * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
76+ * @param fieldNames
77+ * @param uniqueKeyFields
78+ * @param allReplace
79+ * @return
80+ */
81+ private String buildUpdateConnection (String [] fieldNames , String [] uniqueKeyFields , boolean allReplace ) {
82+ List <String > uniqueKeyList = Arrays .asList (uniqueKeyFields );
83+ return Arrays .stream (fieldNames )
84+ .filter (col -> !uniqueKeyList .contains (col ))
85+ .map (col -> buildConnectString (allReplace , col ))
86+ .collect (Collectors .joining ("," ));
87+ }
88+
89+ private String buildConnectString (boolean allReplace , String col ) {
90+ return allReplace ? quoteIdentifier ("T1" ) + "." + quoteIdentifier (col ) + " = " + quoteIdentifier ("T2" ) + "." + quoteIdentifier (col ) :
91+ quoteIdentifier ("T1" ) + "." + quoteIdentifier (col ) + " =NVL(" + quoteIdentifier ("T2" ) + "." + quoteIdentifier (col ) + ","
92+ + quoteIdentifier ("T1" ) + "." + quoteIdentifier (col ) + ")" ;
93+ }
94+
95+
96+ private String buildConnectionConditions (String [] uniqueKeyFields ) {
97+ return Arrays .stream (uniqueKeyFields ).map (col -> "T1." + quoteIdentifier (col ) + "=T2." + quoteIdentifier (col )).collect (Collectors .joining ("," ));
98+ }
99+
100+ /**
101+ * build sql part e.g: VALUES('1001','zs','sss')
102+ *
103+ * @param column destination column
104+ * @return
105+ */
106+ public String buildValuesStatement (String [] column ) {
107+ StringBuilder sb = new StringBuilder ("VALUES(" );
108+ String collect = Arrays .stream (column )
109+ .map (col -> " ? " )
110+ .collect (Collectors .joining (", " ));
111+
112+ return sb .append (collect ).append (")" ).toString ();
113+ }
114+
115+ /**
116+ * build sql part e.g: id, name, address
117+ * @param column
118+ * @return
119+ */
120+ public String buildFiledNameStatement (String [] column ) {
121+ return Arrays .stream (column )
122+ .collect (Collectors .joining (", " ));
123+ }
124+
125+
46126}
0 commit comments