Skip to content

Commit 07c4ccd

Browse files
author
xuchao
committed
Merge branch '1.8_test_3.10.x' into 1.8_release_3.10.x_mergedTest_new
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java # core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java
2 parents 85aae47 + 9fe68ea commit 07c4ccd

File tree

42 files changed

+143
-178
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+143
-178
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
267267
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
268268
+ ",pwd:" + tableInfo.getPassword();
269269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
270-
Thread.sleep(5 * 1000);
270+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
271271
} catch (InterruptedException e1) {
272272
LOG.error("", e1);
273273
}

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
113113
LOG.info("Running job on local embedded Flink mini cluster");
114114
}
115115

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
117-
try {
116+
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
118117
exec.start();
119-
return exec.executeJobBlocking(jobGraph);
120-
}
121-
finally {
118+
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
122119
transformations.clear();
123-
exec.closeAsync();
120+
return jobExecutionResult;
121+
} catch (Exception e) {
122+
throw new RuntimeException(e);
124123
}
125124
}
126125
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
271271
*/
272272
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
273273
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
274-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
274+
Set<URL> pluginClassPathSets = Sets.newHashSet();
275275
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
276276
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
277277

@@ -309,26 +309,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
309309
registerTableCache.put(tableInfo.getName(), regTable);
310310

311311
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
312-
pluginClassPatshSets.add(sourceTablePathUrl);
312+
pluginClassPathSets.add(sourceTablePathUrl);
313313
} else if (tableInfo instanceof AbstractTargetTableInfo) {
314314

315315
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
316316
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
317317
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
318318

319319
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
320-
pluginClassPatshSets.add(sinkTablePathUrl);
320+
pluginClassPathSets.add(sinkTablePathUrl);
321321
} else if (tableInfo instanceof AbstractSideTableInfo) {
322322
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
323323
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
324324

325325
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
326-
pluginClassPatshSets.add(sideTablePathUrl);
326+
pluginClassPathSets.add(sideTablePathUrl);
327327
} else {
328328
throw new RuntimeException("not support table type:" + tableInfo.getType());
329329
}
330330
}
331-
return pluginClassPatshSets;
331+
return pluginClassPathSets;
332332
}
333333

334334
/**

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
2727
import org.apache.flink.table.runtime.types.CRow;
28-
import org.apache.flink.types.Row;
2928

3029

3130
/**

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.io.File;
32-
import java.io.FileInputStream;
3332
import java.net.URLEncoder;
34-
import java.util.stream.Stream;
3533

3634
import org.apache.commons.codec.Charsets;
3735
import org.apache.flink.util.FileUtils;

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.config.Lex;
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
30+
import org.apache.calcite.sql.parser.SqlParseException;
31+
import org.apache.calcite.sql.parser.SqlParser;
2532
import com.google.common.collect.Lists;
2633
import org.apache.flink.table.calcite.FlinkPlannerImpl;
2734

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlMatchRecognize;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlSelect;
2532
import org.apache.calcite.sql.parser.SqlParseException;
2633
import org.apache.calcite.sql.parser.SqlParser;
2734
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5757

5858
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5959

60-
private String cacheType = "none";//None or LRU or ALL
60+
private String cacheType = "none";
6161

6262
private int cacheSize = 10000;
6363

64-
private long cacheTimeout = 60 * 1000;//
64+
private long cacheTimeout = 60_000L;
6565

6666
private int asyncCapacity=100;
6767

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
4949

50+
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
51+
5052
protected BaseSideInfo sideInfo;
5153

5254
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.common.base.Strings;
2828

2929
import java.io.Serializable;
30-
import java.util.Map;
3130

3231
/**
3332
* Join信息

0 commit comments

Comments
 (0)