2121import com .dtstack .flink .sql .sink .rdb .RdbSink ;
2222import com .dtstack .flink .sql .sink .rdb .format .ExtendOutputFormat ;
2323import com .dtstack .flink .sql .sink .rdb .format .RetractJDBCOutputFormat ;
24+ import com .dtstack .flink .sql .util .DtStringUtil ;
2425import org .apache .commons .lang3 .StringUtils ;
2526import org .apache .flink .shaded .guava18 .com .google .common .collect .Lists ;
2627
@@ -56,11 +57,11 @@ public void buildSql(String tableName, List<String> fields) {
5657
5758 private void buildInsertSql (String tableName , List <String > fields ) {
5859
59- tableName = quoteTable (tableName );
60+ tableName = DtStringUtil . addQuoteForTableName (tableName );
6061 String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})" ;
6162
6263 List <String > adaptFields = Lists .newArrayList ();
63- fields .forEach (field -> adaptFields .add (quoteColumn (field )));
64+ fields .forEach (field -> adaptFields .add (DtStringUtil . addQuoteForColumn (field )));
6465
6566 String fieldsStr = StringUtils .join (adaptFields , "," );
6667 String placeholder = "" ;
@@ -83,7 +84,7 @@ private void buildInsertSql(String tableName, List<String> fields) {
8384 */
8485 @ Override
8586 public String buildUpdateSql (String tableName , List <String > fieldNames , Map <String , List <String >> realIndexes , List <String > fullField ) {
86- tableName = quoteTable (tableName );
87+ tableName = DtStringUtil . addQuoteForTableName (tableName );
8788 StringBuilder sb = new StringBuilder ();
8889
8990 sb .append ("MERGE INTO " + tableName + " T1 USING "
@@ -111,10 +112,10 @@ public String quoteColumns(List<String> column) {
111112 }
112113
113114 public String quoteColumns (List <String > column , String table ) {
114- String prefix = StringUtils .isBlank (table ) ? "" : quoteTable (table ) + "." ;
115+ String prefix = StringUtils .isBlank (table ) ? "" : DtStringUtil . addQuoteForTableName (table ) + "." ;
115116 List <String > list = new ArrayList <>();
116117 for (String col : column ) {
117- list .add (prefix + quoteColumn (col ));
118+ list .add (prefix + DtStringUtil . addQuoteForColumn (col ));
118119 }
119120 return StringUtils .join (list , "," );
120121 }
@@ -147,34 +148,23 @@ protected List<String> keyColList(Map<String, List<String>> realIndexes) {
147148 * @return
148149 */
149150 public String getUpdateSql (List <String > updateColumn , List <String > fullColumn , String leftTable , String rightTable , List <String > indexCols ) {
150- String prefixLeft = StringUtils .isBlank (leftTable ) ? "" : quoteTable (leftTable ) + "." ;
151- String prefixRight = StringUtils .isBlank (rightTable ) ? "" : quoteTable (rightTable ) + "." ;
151+ String prefixLeft = StringUtils .isBlank (leftTable ) ? "" : DtStringUtil . addQuoteForTableName (leftTable ) + "." ;
152+ String prefixRight = StringUtils .isBlank (rightTable ) ? "" : DtStringUtil . addQuoteForTableName (rightTable ) + "." ;
152153 List <String > list = new ArrayList <>();
153154 for (String col : fullColumn ) {
154155 // filter index column
155156 if (indexCols == null || indexCols .size () == 0 || containsIgnoreCase (indexCols ,col )) {
156157 continue ;
157158 }
158159 if (containsIgnoreCase (updateColumn ,col )) {
159- list .add (prefixLeft + col + "=" + prefixRight + col );
160+ list .add (prefixLeft + DtStringUtil . addQuoteForColumn ( col ) + "=" + prefixRight + DtStringUtil . addQuoteForColumn ( col ) );
160161 } else {
161- list .add (prefixLeft + col + "=null" );
162+ list .add (prefixLeft + DtStringUtil . addQuoteForColumn ( col ) + "=null" );
162163 }
163164 }
164165 return StringUtils .join (list , "," );
165166 }
166167
167- public String quoteTable (String table ) {
168- String [] parts = table .split ("\\ ." );
169- StringBuilder sb = new StringBuilder ();
170- for (int i = 0 ; i < parts .length ; ++i ) {
171- if (i != 0 ) {
172- sb .append ("." );
173- }
174- sb .append (getStartQuote () + parts [i ] + getEndQuote ());
175- }
176- return sb .toString ();
177- }
178168
179169 /**
180170 * build connect sql by index column, such as T1."A"=T2."A"
@@ -186,7 +176,7 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
186176 for (Map .Entry <String , List <String >> entry : updateKey .entrySet ()) {
187177 List <String > colList = new ArrayList <>();
188178 for (String col : entry .getValue ()) {
189- colList .add ("T1." + quoteColumn (col ) + "=T2." + quoteColumn (col ));
179+ colList .add ("T1." + DtStringUtil . addQuoteForColumn (col ) + "=T2." + DtStringUtil . addQuoteForColumn (col ));
190180 }
191181 exprList .add (StringUtils .join (colList , " AND " ));
192182 }
@@ -205,7 +195,7 @@ public String makeValues(List<String> column) {
205195 if (i != 0 ) {
206196 sb .append ("," );
207197 }
208- sb .append ("? " + quoteColumn (column .get (i )));
198+ sb .append ("? " + DtStringUtil . addQuoteForColumn (column .get (i )));
209199 }
210200 sb .append (" FROM DUAL" );
211201 return sb .toString ();
@@ -220,17 +210,6 @@ public boolean containsIgnoreCase(List<String> l, String s) {
220210 return false ;
221211 }
222212
223- public String quoteColumn (String column ) {
224- return getStartQuote () + column + getEndQuote ();
225- }
226-
227- public String getStartQuote () {
228- return "\" " ;
229- }
230-
231- public String getEndQuote () {
232- return "\" " ;
233- }
234213
235214
236215}
0 commit comments