Skip to content

Commit 31e7068

Browse files
committed
use rpad deal char
1 parent 93f9d79 commit 31e7068

File tree

3 files changed

+41
-17
lines changed

3 files changed

+41
-17
lines changed

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,9 @@
3838

3939
public class OracleAsyncSideInfo extends RdbAsyncSideInfo {
4040

41-
private final static String SQL_DEFAULT_PLACEHOLDER = " ? ";
42-
private final static String DEAL_CHAR_KEY = "char";
43-
44-
private static String rpadFormat = "rpad(?, %d, ' ')";
45-
41+
private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
42+
private final String DEAL_CHAR_KEY = "char";
43+
private String RPAD_FORMAT = "rpad(?, %d, ' ')";
4644

4745

4846
public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
@@ -66,9 +64,9 @@ public String wrapperPlaceholder(String fieldName) {
6664

6765
if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
6866
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
69-
int charLength = fieldExtraInfo.getLength();
70-
if (fieldExtraInfo.getLength() > 0) {
71-
return String.format(rpadFormat, charLength);
67+
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
68+
if (charLength > 0) {
69+
return String.format(RPAD_FORMAT, charLength);
7270
}
7371
}
7472
return SQL_DEFAULT_PLACEHOLDER;

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2222
import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat;
2323
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
24+
import com.dtstack.flink.sql.table.TableInfo;
2425
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import org.apache.commons.lang3.StringUtils;
2627
import com.google.common.collect.Lists;
2728

2829
import java.util.ArrayList;
30+
import java.util.Arrays;
2931
import java.util.Iterator;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.stream.Collectors;
3235

3336
/**
3437
* Reason:
@@ -40,6 +43,10 @@
4043
public class OracleSink extends RdbSink implements IStreamSinkGener<RdbSink> {
4144
private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
4245

46+
private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
47+
private final String DEAL_CHAR_KEY = "char";
48+
private String RPAD_FORMAT = " rpad(?, %d, ' ') ";
49+
4350
@Override
4451
public String getDriverName() {
4552
return ORACLE_DRIVER;
@@ -193,16 +200,35 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
193200
*/
194201
public String makeValues(List<String> column) {
195202
StringBuilder sb = new StringBuilder("SELECT ");
196-
for (int i = 0; i < column.size(); ++i) {
197-
if (i != 0) {
198-
sb.append(",");
203+
String collect = column.stream()
204+
.map(col -> wrapperPlaceholder(col) + DtStringUtil.addQuoteForStr(col))
205+
.collect(Collectors.joining(", "));
206+
207+
sb.append(collect).append(" FROM DUAL");
208+
return sb.toString();
209+
}
210+
211+
/**
212+
* char type is wrapped with rpad
213+
* @param fieldName
214+
* @return
215+
*/
216+
public String wrapperPlaceholder(String fieldName) {
217+
int pos = rdbTableInfo.getFieldList().indexOf(fieldName);
218+
String type = rdbTableInfo.getFieldTypeList().get(pos);
219+
220+
if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
221+
TableInfo.FieldExtraInfo fieldExtraInfo = rdbTableInfo.getFieldExtraInfoList().get(pos);
222+
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
223+
if (charLength > 0) {
224+
return String.format(RPAD_FORMAT, charLength);
199225
}
200-
sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i)));
201226
}
202-
sb.append(" FROM DUAL");
203-
return sb.toString();
227+
return SQL_DEFAULT_PLACEHOLDER;
204228
}
205229

230+
231+
206232
public boolean containsIgnoreCase(List<String> l, String s) {
207233
Iterator<String> it = l.iterator();
208234
while (it.hasNext()) {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
8383

8484
private String schema;
8585

86+
protected RdbTableInfo rdbTableInfo;
87+
8688
public RichSinkFunction createJdbcSinkFunc() {
8789
if (driverName == null || dbURL == null || userName == null
8890
|| password == null || sqlTypes == null || tableName == null) {
@@ -111,8 +113,7 @@ public RichSinkFunction createJdbcSinkFunc() {
111113

112114
@Override
113115
public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
114-
RdbTableInfo rdbTableInfo = (RdbTableInfo) targetTableInfo;
115-
116+
this.rdbTableInfo = (RdbTableInfo) targetTableInfo;
116117
String tmpDbURL = rdbTableInfo.getUrl();
117118
String tmpUserName = rdbTableInfo.getUserName();
118119
String tmpPassword = rdbTableInfo.getPassword();
@@ -263,7 +264,6 @@ public void setDbType(String dbType) {
263264
/**
264265
* sqlserver and oracle maybe implement
265266
*
266-
* @param tableName
267267
* @param fieldNames
268268
* @param realIndexes
269269
* @return

0 commit comments

Comments
 (0)