Skip to content

Commit ad272de

Browse files
committed
fix sonarqube scan
1 parent d124305 commit ad272de

File tree

24 files changed

+71
-82
lines changed

24 files changed

+71
-82
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
216216
connCassandraDB(cassandraSideTableInfo);
217217

218218
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
219-
System.out.println("sqlCondition:" + sqlCondition);
219+
LOG.info("sqlCondition:{}" + sqlCondition);
220220

221221
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
222222
new AsyncFunction<Session, ResultSet>() {
@@ -265,7 +265,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
265265
public void onFailure(Throwable t) {
266266
LOG.error("Failed to retrieve the data: %s%n",
267267
t.getMessage());
268-
System.out.println("Failed to retrieve the data: " + t.getMessage());
269268
cluster.closeAsync();
270269
resultFuture.completeExceptionally(t);
271270
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.List;
3537

@@ -42,6 +44,8 @@
4244
public class CassandraAsyncSideInfo extends BaseSideInfo {
4345

4446
private static final long serialVersionUID = -4403313049809013362L;
47+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
48+
4549

4650
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4751
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
@@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
6367
}
6468

6569
sqlCondition = "select ${selectField} from ${tableName}";
66-
6770
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
71+
72+
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
6973
}
7074

7175

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59-
System.out.println("received oriainal data:" + tuple2);
59+
LOG.info("received oriainal data:{}" + tuple2);
6060
Tuple2<Boolean, Row> tupleTrans = tuple2;
6161
Boolean retract = tupleTrans.getField(0);
6262
if (!retract) {

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public static <T> TablePrintUtil build(List<T> data) {
9595
try {
9696
value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
9797
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
98-
e.printStackTrace();
98+
LOG.error("", e);
9999
}
100100
item[j] = value == null ? "null" : value;
101101
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ public class ExecuteProcessHelper {
9595

9696
public static ParamsInfo parseParams(String[] args) throws Exception {
9797
LOG.info("------------program params-------------------------");
98-
System.out.println("------------program params-------------------------");
9998
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
100-
Arrays.stream(args).forEach(System.out::println);
10199
LOG.info("-------------------------------------------");
102-
System.out.println("----------------------------------------");
103100

104101
OptionParser optionParser = new OptionParser(args);
105102
Options options = optionParser.getOptions();
@@ -228,12 +225,10 @@ private static void sqlTranslation(String localSqlPluginPath,
228225
//sql-dimensional table contains the dimension table of execution
229226
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
230227
} else {
231-
System.out.println("----------exec sql without dimension join-----------");
232-
System.out.println("----------real sql exec is--------------------------");
233-
System.out.println(result.getExecSql());
228+
LOG.info("----------exec sql without dimension join-----------");
229+
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
234230
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
235231
if (LOG.isInfoEnabled()) {
236-
System.out.println();
237232
LOG.info("exec sql: " + result.getExecSql());
238233
}
239234
}

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3030
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
31-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3231
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
3334
import org.apache.flink.types.Row;
3435

3536
import java.io.IOException;

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
import java.io.File;
3232
import java.io.FileInputStream;
3333
import java.net.URLEncoder;
34+
import java.util.stream.Stream;
35+
3436
import org.apache.commons.codec.Charsets;
37+
import org.apache.flink.util.FileUtils;
3538

3639

3740
/**
@@ -92,29 +95,21 @@ public Options getOptions(){
9295
}
9396

9497
public List<String> getProgramExeArgList() throws Exception {
95-
Map<String,Object> mapConf = PluginUtil.objectToMap(properties);
98+
Map<String, Object> mapConf = PluginUtil.objectToMap(properties);
9699
List<String> args = Lists.newArrayList();
97-
for(Map.Entry<String, Object> one : mapConf.entrySet()){
100+
for (Map.Entry<String, Object> one : mapConf.entrySet()) {
98101
String key = one.getKey();
99102
Object value = one.getValue();
100-
if(value == null){
103+
if (value == null) {
101104
continue;
102-
}else if(OPTION_SQL.equalsIgnoreCase(key)){
105+
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
103106
File file = new File(value.toString());
104-
FileInputStream in = new FileInputStream(file);
105-
byte[] filecontent = new byte[(int) file.length()];
106-
in.read(filecontent);
107-
String content = new String(filecontent, Charsets.UTF_8.name());
107+
String content = FileUtils.readFile(file, "UTF-8");
108108
value = URLEncoder.encode(content, Charsets.UTF_8.name());
109109
}
110110
args.add("-" + key);
111111
args.add(value.toString());
112112
}
113113
return args;
114114
}
115-
116-
public static void main(String[] args) throws Exception {
117-
OptionParser optionParser = new OptionParser(args);
118-
System.out.println(optionParser.getOptions());
119-
}
120115
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.table.runtime.types.CRow;
2828
import org.apache.flink.types.Row;
2929
import org.apache.flink.util.Collector;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
import java.sql.SQLException;
3234
import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +44,8 @@
4244

4345
public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> implements ISideReqRow {
4446

47+
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
48+
4549
protected BaseSideInfo sideInfo;
4650

4751
private ScheduledExecutorService es;
@@ -59,7 +63,7 @@ public BaseAllReqRow(BaseSideInfo sideInfo){
5963
public void open(Configuration parameters) throws Exception {
6064
super.open(parameters);
6165
initCache();
62-
System.out.println("----- all cacheRef init end-----");
66+
LOG.info("----- all cacheRef init end-----");
6367

6468
//start reload cache thread
6569
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.dtstack.flink.sql.config.CalciteConfig;
2323
import com.dtstack.flink.sql.util.TableUtils;
24+
import com.esotericsoftware.minlog.Log;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Lists;
2627
import com.google.common.collect.Sets;
@@ -264,11 +265,10 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall
264265
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableAlias, sqlNode);
265266
queueInfo.offer(sqlBasicCall);
266267

267-
System.out.println("-------build temporary query-----------");
268-
System.out.println(tmpSelectSql);
269-
System.out.println("---------------------------------------");
270-
}catch (Exception e){
271-
e.printStackTrace();
268+
Log.info("-------build temporary query-----------\n{}", tmpSelectSql);
269+
Log.info("---------------------------------------");
270+
} catch (Exception e) {
271+
Log.error("", e);
272272
throw new RuntimeException(e);
273273
}
274274
}
@@ -389,7 +389,6 @@ private void extractSelectField(SqlNode selectNode,
389389
}
390390

391391
}else if(selectNode.getKind() == CASE){
392-
System.out.println("selectNode");
393392
SqlCase sqlCase = (SqlCase) selectNode;
394393
SqlNodeList whenOperands = sqlCase.getWhenOperands();
395394
SqlNodeList thenOperands = sqlCase.getThenOperands();

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,46 +22,32 @@
2222

2323
import com.dtstack.flink.sql.config.CalciteConfig;
2424
import com.dtstack.flink.sql.util.TableUtils;
25-
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.Lists;
2725
import com.google.common.collect.Maps;
2826
import com.google.common.collect.Queues;
2927
import com.google.common.collect.Sets;
30-
import org.apache.calcite.sql.JoinType;
31-
import org.apache.calcite.sql.SqlAsOperator;
3228
import org.apache.calcite.sql.SqlBasicCall;
33-
import org.apache.calcite.sql.SqlBinaryOperator;
34-
import org.apache.calcite.sql.SqlDataTypeSpec;
3529
import org.apache.calcite.sql.SqlIdentifier;
3630
import org.apache.calcite.sql.SqlInsert;
3731
import org.apache.calcite.sql.SqlJoin;
3832
import org.apache.calcite.sql.SqlKind;
39-
import org.apache.calcite.sql.SqlLiteral;
4033
import org.apache.calcite.sql.SqlNode;
4134
import org.apache.calcite.sql.SqlNodeList;
42-
import org.apache.calcite.sql.SqlOperator;
4335
import org.apache.calcite.sql.SqlOrderBy;
4436
import org.apache.calcite.sql.SqlSelect;
4537
import org.apache.calcite.sql.SqlWith;
4638
import org.apache.calcite.sql.SqlWithItem;
47-
import org.apache.calcite.sql.fun.SqlCase;
48-
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
4939
import org.apache.calcite.sql.parser.SqlParseException;
5040
import org.apache.calcite.sql.parser.SqlParser;
51-
import org.apache.calcite.sql.parser.SqlParserPos;
52-
import org.apache.commons.collections.CollectionUtils;
53-
import org.apache.commons.lang3.StringUtils;
5441
import org.apache.flink.api.java.tuple.Tuple2;
5542
import org.apache.flink.table.api.Table;
5643
import org.slf4j.Logger;
5744
import org.slf4j.LoggerFactory;
5845

59-
import java.util.List;
6046
import java.util.Map;
6147
import java.util.Queue;
6248
import java.util.Set;
6349

64-
import static org.apache.calcite.sql.SqlKind.*;
50+
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
6551

6652
/**
6753
* Parsing sql, obtain execution information dimension table
@@ -76,8 +62,6 @@ public class SideSQLParser {
7662
private Map<String, Table> localTableCache = Maps.newHashMap();
7763

7864
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
79-
System.out.println("----------exec original Sql----------");
80-
System.out.println(exeSql);
8165
LOG.info("----------exec original Sql----------");
8266
LOG.info(exeSql);
8367

0 commit comments

Comments
 (0)