Skip to content

Commit 93f9d79

Browse files
committed
deal char length
1 parent 0e612c3 commit 93f9d79

File tree

4 files changed

+77
-7
lines changed

4 files changed

+77
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ public abstract class AbsTableParser {
4242

4343
private static final String PRIMARY_KEY = "primaryKey";
4444
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
45+
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4546

4647
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
4748
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
4850

4951
private Map<String, Pattern> patternMap = Maps.newHashMap();
5052

@@ -105,13 +107,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
105107
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
106108
String fieldName = String.join(" ", filedNameArr);
107109
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
108-
Class fieldClass = dbTypeConvertToJavaType(fieldType);
110+
111+
112+
Class fieldClass = null;
113+
TableInfo.FieldExtraInfo fieldExtraInfo = null;
114+
115+
Matcher matcher = charTypePattern.matcher(fieldType);
116+
if (matcher.find()) {
117+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
118+
fieldExtraInfo = new TableInfo.FieldExtraInfo();
119+
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
120+
} else {
121+
fieldClass = dbTypeConvertToJavaType(fieldType);
122+
}
109123

110124
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
111125
tableInfo.addField(fieldName);
112126
tableInfo.addFieldClass(fieldClass);
113127
tableInfo.addFieldType(fieldType);
114-
tableInfo.addFieldExtraInfo(null);
128+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
115129
}
116130

117131
tableInfo.finish();

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable {
194194
* default false:allow field is null
195195
*/
196196
boolean notNull = false;
197+
/**
198+
* field length,eg.char(4)
199+
*/
200+
int length;
201+
202+
public int getLength() {
203+
return length;
204+
}
205+
206+
public void setLength(int length) {
207+
this.length = length;
208+
}
197209

198210
public boolean getNotNull() {
199211
return notNull;
@@ -202,5 +214,13 @@ public boolean getNotNull() {
202214
public void setNotNull(boolean notNull) {
203215
this.notNull = notNull;
204216
}
217+
218+
@Override
219+
public String toString() {
220+
return "FieldExtraInfo{" +
221+
"notNull=" + notNull +
222+
", length=" + length +
223+
'}';
224+
}
205225
}
206226
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import com.dtstack.flink.sql.util.DtStringUtil;
2728
import com.dtstack.flink.sql.util.ParseUtils;
29+
import com.mchange.lang.CharUtils;
2830
import org.apache.calcite.sql.SqlNode;
31+
import org.apache.commons.lang3.StringUtils;
2932
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3033
import com.google.common.collect.Lists;
3134

@@ -35,6 +38,13 @@
3538

3639
public class OracleAsyncSideInfo extends RdbAsyncSideInfo {
3740

41+
private final static String SQL_DEFAULT_PLACEHOLDER = " ? ";
42+
private final static String DEAL_CHAR_KEY = "char";
43+
44+
private static String rpadFormat = "rpad(?, %d, ' ')";
45+
46+
47+
3848
public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3949
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4050
}
@@ -49,4 +59,18 @@ public String quoteIdentifier(String identifier) {
4959
return "\"" + identifier + "\"";
5060
}
5161

62+
@Override
63+
public String wrapperPlaceholder(String fieldName) {
64+
int pos = sideTableInfo.getFieldList().indexOf(fieldName);
65+
String type = sideTableInfo.getFieldTypeList().get(pos);
66+
67+
if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
68+
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
69+
int charLength = fieldExtraInfo.getLength();
70+
if (fieldExtraInfo.getLength() > 0) {
71+
return String.format(rpadFormat, charLength);
72+
}
73+
}
74+
return SQL_DEFAULT_PLACEHOLDER;
75+
}
5276
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,26 @@ public String getAdditionalWhereClause() {
136136

137137
public String getSelectFromStatement(String tableName, List<String> selectFields, List<String> conditionFields, List<String> sqlJoinCompareOperate,
138138
List<PredicateInfo> predicateInfoes) {
139-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
140-
String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ")
139+
String fromClause = selectFields.stream()
140+
.map(this::quoteIdentifier)
141+
.collect(Collectors.joining(", "));
142+
143+
String whereClause = conditionFields.stream()
144+
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
145+
.collect(Collectors.joining(" AND "));
146+
147+
String predicateClause = predicateInfoes.stream()
148+
.map(this::buildFilterCondition)
141149
.collect(Collectors.joining(" AND "));
142-
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
143150

144-
String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
151+
String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
145152
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause();
146-
return sql;
153+
154+
return dimQuerySql;
155+
}
156+
157+
public String wrapperPlaceholder(String fieldName) {
158+
return " ? ";
147159
}
148160

149161
public String buildFilterCondition(PredicateInfo info) {

0 commit comments

Comments
 (0)