Skip to content

Commit d3f904d

Browse files
committed
fix
2 parents 4362a3b + 3c9ffb6 commit d3f904d

File tree

38 files changed

+556
-405
lines changed

38 files changed

+556
-405
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void asyncInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,R
215215
connCassandraDB(cassandraSideTableInfo);
216216

217217
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
218-
System.out.println("sqlCondition:" + sqlCondition);
218+
LOG.info("sqlCondition:{}" + sqlCondition);
219219

220220
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
221221
new AsyncFunction<Session, ResultSet>() {
@@ -264,7 +264,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
264264
public void onFailure(Throwable t) {
265265
LOG.error("Failed to retrieve the data: %s%n",
266266
t.getMessage());
267-
System.out.println("Failed to retrieve the data: " + t.getMessage());
268267
cluster.closeAsync();
269268
resultFuture.completeExceptionally(t);
270269
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.List;
3537

@@ -42,6 +44,8 @@
4244
public class CassandraAsyncSideInfo extends BaseSideInfo {
4345

4446
private static final long serialVersionUID = -4403313049809013362L;
47+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
48+
4549

4650
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4751
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
@@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
6367
}
6468

6569
sqlCondition = "select ${selectField} from ${tableName}";
66-
6770
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
71+
72+
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
6973
}
7074

7175

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public void open(Configuration parameters) throws Exception {
5151
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
5761
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59-
System.out.println("received oriainal data:" + tuple2);
59+
LOG.info("received oriainal data:{}" + tuple2);
6060
Tuple2<Boolean, Row> tupleTrans = tuple2;
6161
Boolean retract = tupleTrans.getField(0);
6262
if (!retract) {

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public static <T> TablePrintUtil build(List<T> data) {
9595
try {
9696
value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
9797
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
98-
e.printStackTrace();
98+
LOG.error("", e);
9999
}
100100
item[j] = value == null ? "null" : value;
101101
}

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3030
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
31-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3231
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
3334
import org.apache.flink.types.Row;
3435

3536
import java.io.IOException;

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
import java.io.File;
3232
import java.io.FileInputStream;
3333
import java.net.URLEncoder;
34+
import java.util.stream.Stream;
35+
3436
import org.apache.commons.codec.Charsets;
37+
import org.apache.flink.util.FileUtils;
3538

3639

3740
/**
@@ -92,29 +95,21 @@ public Options getOptions(){
9295
}
9396

9497
public List<String> getProgramExeArgList() throws Exception {
95-
Map<String,Object> mapConf = PluginUtil.objectToMap(properties);
98+
Map<String, Object> mapConf = PluginUtil.objectToMap(properties);
9699
List<String> args = Lists.newArrayList();
97-
for(Map.Entry<String, Object> one : mapConf.entrySet()){
100+
for (Map.Entry<String, Object> one : mapConf.entrySet()) {
98101
String key = one.getKey();
99102
Object value = one.getValue();
100-
if(value == null){
103+
if (value == null) {
101104
continue;
102-
}else if(OPTION_SQL.equalsIgnoreCase(key)){
105+
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
103106
File file = new File(value.toString());
104-
FileInputStream in = new FileInputStream(file);
105-
byte[] filecontent = new byte[(int) file.length()];
106-
in.read(filecontent);
107-
String content = new String(filecontent, Charsets.UTF_8.name());
107+
String content = FileUtils.readFile(file, "UTF-8");
108108
value = URLEncoder.encode(content, Charsets.UTF_8.name());
109109
}
110110
args.add("-" + key);
111111
args.add(value.toString());
112112
}
113113
return args;
114114
}
115-
116-
public static void main(String[] args) throws Exception {
117-
OptionParser optionParser = new OptionParser(args);
118-
System.out.println(optionParser.getOptions());
119-
}
120115
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
2523
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2624
import org.apache.flink.api.java.tuple.Tuple2;
2725
import org.apache.flink.configuration.Configuration;
2826
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2927
import org.apache.flink.types.Row;
3028
import org.apache.flink.util.Collector;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import com.dtstack.flink.sql.factory.DTThreadFactory;
33+
import org.apache.calcite.sql.JoinType;
3134

3235
import java.sql.SQLException;
3336
import java.sql.Timestamp;
@@ -45,6 +48,8 @@
4548

4649
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
4750

51+
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
52+
4853
protected BaseSideInfo sideInfo;
4954

5055
private ScheduledExecutorService es;
@@ -62,7 +67,7 @@ public BaseAllReqRow(BaseSideInfo sideInfo){
6267
public void open(Configuration parameters) throws Exception {
6368
super.open(parameters);
6469
initCache();
65-
System.out.println("----- all cacheRef init end-----");
70+
LOG.info("----- all cacheRef init end-----");
6671

6772
//start reload cache thread
6873
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424

2525
import java.io.Serializable;
26+
import java.util.Objects;
2627

2728
/**
2829
* Reason:
@@ -64,4 +65,24 @@ public TypeInformation getTypeInformation() {
6465
public void setTypeInformation(TypeInformation typeInformation) {
6566
this.typeInformation = typeInformation;
6667
}
68+
69+
@Override
70+
public boolean equals(Object o) {
71+
if (this == o) {
72+
return true;
73+
}
74+
75+
if (o == null || getClass() != o.getClass()) {
76+
return false;
77+
}
78+
79+
FieldInfo fieldInfo = (FieldInfo) o;
80+
return Objects.equals(table, fieldInfo.table) &&
81+
Objects.equals(fieldName, fieldInfo.fieldName);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(table, fieldName);
87+
}
6788
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.dtstack.flink.sql.config.CalciteConfig;
2323
import com.dtstack.flink.sql.util.TableUtils;
24+
import com.esotericsoftware.minlog.Log;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Lists;
2627
import com.google.common.collect.Sets;
@@ -190,11 +191,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
190191
tableInfo.setJoinType(joinType);
191192
tableInfo.setCondition(joinNode.getCondition());
192193

193-
if(!needBuildTemp){
194-
return tableInfo;
195-
}
196-
197-
if(tableInfo.getLeftNode().getKind() != AS){
194+
if(tableInfo.getLeftNode().getKind() != AS && needBuildTemp){
198195
extractTemporaryQuery(tableInfo.getLeftNode(), tableInfo.getLeftTableAlias(), (SqlBasicCall) parentWhere,
199196
parentSelectList, queueInfo, joinFieldSet, tableRef);
200197
}else {
@@ -264,11 +261,10 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall
264261
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableAlias, sqlNode);
265262
queueInfo.offer(sqlBasicCall);
266263

267-
System.out.println("-------build temporary query-----------");
268-
System.out.println(tmpSelectSql);
269-
System.out.println("---------------------------------------");
270-
}catch (Exception e){
271-
e.printStackTrace();
264+
Log.info("-------build temporary query-----------\n{}", tmpSelectSql);
265+
Log.info("---------------------------------------");
266+
} catch (Exception e) {
267+
Log.error("", e);
272268
throw new RuntimeException(e);
273269
}
274270
}
@@ -389,7 +385,6 @@ private void extractSelectField(SqlNode selectNode,
389385
}
390386

391387
}else if(selectNode.getKind() == CASE){
392-
System.out.println("selectNode");
393388
SqlCase sqlCase = (SqlCase) selectNode;
394389
SqlNodeList whenOperands = sqlCase.getWhenOperands();
395390
SqlNodeList thenOperands = sqlCase.getThenOperands();

0 commit comments

Comments
 (0)