Skip to content

Commit 658a640

Browse files
committed
Merge remote-tracking branch 'origin/hotfix_1.8_3.9.x_23442' into hotfix_1.8_3.10.x_merged23442
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java # oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java
2 parents 6159bb9 + 7c3f0a2 commit 658a640

File tree

8 files changed

+80
-17
lines changed

8 files changed

+80
-17
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ public abstract class AbsTableParser {
4343

4444
private static final String PRIMARY_KEY = "primaryKey";
4545
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
46+
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4647

4748
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
4849
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4950
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
51+
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5052

5153
private Map<String, Pattern> patternMap = Maps.newHashMap();
5254

@@ -107,13 +109,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
107109
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
108110
String fieldName = String.join(" ", filedNameArr);
109111
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
110-
Class fieldClass = dbTypeConvertToJavaType(fieldType);
112+
113+
114+
Class fieldClass = null;
115+
TableInfo.FieldExtraInfo fieldExtraInfo = null;
116+
117+
Matcher matcher = charTypePattern.matcher(fieldType);
118+
if (matcher.find()) {
119+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
120+
fieldExtraInfo = new TableInfo.FieldExtraInfo();
121+
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
122+
} else {
123+
fieldClass = dbTypeConvertToJavaType(fieldType);
124+
}
111125

112126
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
113127
tableInfo.addField(fieldName);
114128
tableInfo.addFieldClass(fieldClass);
115129
tableInfo.addFieldType(fieldType);
116-
tableInfo.addFieldExtraInfo(null);
130+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
117131
}
118132

119133
tableInfo.finish();

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable {
194194
* default false:allow field is null
195195
*/
196196
boolean notNull = false;
197+
/**
198+
* field length,eg.char(4)
199+
*/
200+
int length;
201+
202+
public int getLength() {
203+
return length;
204+
}
205+
206+
public void setLength(int length) {
207+
this.length = length;
208+
}
197209

198210
public boolean getNotNull() {
199211
return notNull;
@@ -202,5 +214,13 @@ public boolean getNotNull() {
202214
public void setNotNull(boolean notNull) {
203215
this.notNull = notNull;
204216
}
217+
218+
@Override
219+
public String toString() {
220+
return "FieldExtraInfo{" +
221+
"notNull=" + notNull +
222+
", length=" + length +
223+
'}';
224+
}
205225
}
206226
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,10 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import com.dtstack.flink.sql.util.DtStringUtil;
27-
import com.dtstack.flink.sql.util.ParseUtils;
28-
import org.apache.calcite.sql.SqlNode;
28+
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
30-
import com.google.common.collect.Lists;
31-
32-
import java.util.Arrays;
3330
import java.util.List;
3431

3532

@@ -49,4 +46,21 @@ public String quoteIdentifier(String identifier) {
4946
return "\"" + identifier + "\"";
5047
}
5148

49+
@Override
50+
public String wrapperPlaceholder(String fieldName) {
51+
int pos = sideTableInfo.getFieldList().indexOf(fieldName);
52+
String type = sideTableInfo.getFieldTypeList().get(pos);
53+
54+
String sqlDefaultPlaceholder = " ? ";
55+
String rpadFormat = "rpad(?, %d, ' ')";
56+
57+
if (StringUtils.contains(type.toLowerCase(), "char")) {
58+
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
59+
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
60+
if (charLength > 0) {
61+
return String.format(rpadFormat, charLength);
62+
}
63+
}
64+
return sqlDefaultPlaceholder;
65+
}
5266
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
*/
3232
public class OracleSink extends RdbSink implements IStreamSinkGener<RdbSink> {
3333

34+
private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
35+
private final String DEAL_CHAR_KEY = "char";
36+
private String RPAD_FORMAT = " rpad(?, %d, ' ') ";
37+
3438
public OracleSink() {
3539
super(new OracleDialect());
3640
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,26 @@ public String getAdditionalWhereClause() {
137137

138138
public String getSelectFromStatement(String tableName, List<String> selectFields, List<String> conditionFields, List<String> sqlJoinCompareOperate,
139139
List<PredicateInfo> predicateInfoes) {
140-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
141-
String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ")
140+
String fromClause = selectFields.stream()
141+
.map(this::quoteIdentifier)
142+
.collect(Collectors.joining(", "));
143+
144+
String whereClause = conditionFields.stream()
145+
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
146+
.collect(Collectors.joining(" AND "));
147+
148+
String predicateClause = predicateInfoes.stream()
149+
.map(this::buildFilterCondition)
142150
.collect(Collectors.joining(" AND "));
143-
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
144151

145-
String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
152+
String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
146153
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause();
147-
return sql;
154+
155+
return dimQuerySql;
156+
}
157+
158+
public String wrapperPlaceholder(String fieldName) {
159+
return " ? ";
148160
}
149161

150162
public String buildFilterCondition(PredicateInfo info) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public boolean check() {
4747
Preconditions.checkNotNull(tableName, "rdb of tableName is required");
4848
Preconditions.checkNotNull(userName, "rdb of userName is required");
4949
Preconditions.checkNotNull(password, "rdb of password is required");
50+
Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
51+
"fields and fieldExtraInfoList attributes must be the same length");
5052
return true;
5153
}
5254

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2222
import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo;
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
24-
import org.apache.commons.lang3.StringUtils;
2524
import org.apache.flink.api.common.typeinfo.TypeInformation;
2625
import org.apache.flink.api.java.tuple.Tuple2;
2726
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -34,10 +33,6 @@
3433
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
3534

3635
import java.io.Serializable;
37-
import java.math.BigDecimal;
38-
import java.sql.Date;
39-
import java.sql.Timestamp;
40-
import java.sql.Types;
4136
import java.util.Arrays;
4237
import java.util.List;
4338

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ public boolean check() {
183183
}
184184

185185

186+
Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
187+
"fields and fieldExtraInfoList attributes must be the same length");
186188
return true;
187189
}
188190

0 commit comments

Comments
 (0)