Skip to content

Commit 4540f4a

Browse files
author
sishu@dtstack.com
committed
Merge branch 'v1.4.0'
ysq
2 parents 1796e20 + c55b933 commit 4540f4a

22 files changed

+38
-43
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ public static void main(String[] args) throws Exception {
116116
String deployMode = cl.getOptionValue("mode");
117117
String confProp = cl.getOptionValue("confProp");
118118

119-
Preconditions.checkNotNull(sql, "it requires input parameters sql");
120-
Preconditions.checkNotNull(name, "it requires input parameters name");
121-
Preconditions.checkNotNull(localSqlPluginPath, "it requires input parameters localSqlPluginPath");
119+
Preconditions.checkNotNull(sql, "parameters of sql is required");
120+
Preconditions.checkNotNull(name, "parameters of name is required");
121+
Preconditions.checkNotNull(localSqlPluginPath, "parameters of localSqlPluginPath is required");
122122

123123
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
124124
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
@@ -133,7 +133,7 @@ public static void main(String[] args) throws Exception {
133133
DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
134134
Thread.currentThread().setContextClassLoader(dtClassLoader);
135135

136-
URLClassLoader parentClassloader = null;
136+
URLClassLoader parentClassloader;
137137
if(!LOCAL_MODE.equals(deployMode)){
138138
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
139139
}else{

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@
4444
import java.util.Map;
4545

4646
/**
47-
* 所有的继承接口命名规则:类型 + "AsyncReqRow" 比如==》MysqlAsyncReqRow
48-
* 当前只支持Left join / inner join(join)
49-
* FIXME 不支持right join
47+
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
48+
* only support Left join / inner join(join),not support right join
5049
* Date: 2018/7/9
5150
* Company: www.dtstack.com
5251
* @author xuchao
@@ -72,14 +71,13 @@ public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
7271

7372
protected JoinType joinType;
7473

75-
//key:返回值位置,返回值在输入数据中的索引位置
74+
//key:Returns the value of the position, returns the index values ​​in the input data
7675
protected Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
7776

7877
protected Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
7978

8079
protected SideTableInfo sideTableInfo;
8180

82-
//TODO 需要指定类型
8381
protected AbsSideCache sideCache;
8482

8583
public AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
@@ -202,7 +200,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
202200

203201
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
204202
if(joinType == JoinType.LEFT){
205-
//保留left 表数据
203+
//Reserved left table data
206204
Row row = fillData(input, null);
207205
resultFuture.complete(Collections.singleton(row));
208206
}else{

core/src/main/java/com/dtstack/flink/sql/side/CacheMissVal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.dtstack.flink.sql.side.cache.CacheObj;
2525

2626
/**
27-
* 仅仅用来标记未命中的维表数据
27+
* Only the data marked to dimension table miss
2828
* Date: 2018/8/28
2929
* Company: www.dtstack.com
3030
* @author xuchao

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.List;
3434

3535
/**
36-
*FIXME 需要考虑是直接返回所有的字段然后在外层再包裹原先的查询?
36+
* Need to consider is the direct return to the fields and then all wrapped in the outer layer of the original query?
3737
* Date: 2018/7/20
3838
* Company: www.dtstack.com
3939
* @author xuchao
@@ -42,7 +42,7 @@
4242
public class ParserJoinField {
4343

4444
/**
45-
* 需要解析出selectlist和where中的字段信息
45+
* Need to parse the fields of information and where selectlist
4646
* @return
4747
*/
4848
public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import static org.apache.calcite.sql.SqlKind.*;
4444

4545
/**
46-
* 解析sql,获取维表的执行信息
46+
* Parsing sql, obtain execution information dimension table
4747
* Date: 2018/7/24
4848
* Company: www.dtstack.com
4949
* @author xuchao
@@ -169,27 +169,27 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
169169

170170

171171
private void dealSelectResultWithJoinInfo(JoinInfo joinInfo, SqlSelect sqlNode, Queue<Object> queueInfo){
172-
//SideJoinInfo重命名
172+
//SideJoinInfo rename
173173
if(joinInfo.checkIsSide()){
174174
joinInfo.setSelectFields(sqlNode.getSelectList());
175175
joinInfo.setSelectNode(sqlNode);
176176
if(joinInfo.isRightIsSideTable()){
177-
//判断left是不是一个简单表
177+
//Analyzing left is not a simple table
178178
if(joinInfo.getLeftNode().toString().contains("SELECT")){
179179
queueInfo.offer(joinInfo.getLeftNode());
180180
}
181181

182182
queueInfo.offer(joinInfo);
183183
}else{
184-
//判断right是不是一个简单表
184+
//Determining right is not a simple table
185185
if(joinInfo.getRightNode().getKind() == SELECT){
186186
queueInfo.offer(joinInfo.getLeftNode());
187187
}
188188

189189
queueInfo.offer(joinInfo);
190190
}
191191

192-
//更新from 节点
192+
//Update from node
193193
SqlOperator operator = new SqlAsOperator();
194194
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
195195
String joinLeftTableName = joinInfo.getLeftTableName();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
7979
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
8080
Object pollObj = null;
8181

82-
//TODO 需要清理
82+
//need clean
8383
boolean preIsSideJoin = false;
8484
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();
8585

@@ -150,7 +150,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
150150
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
151151
DataStream adaptStream = tableEnv.toAppendStream(targetTable, org.apache.flink.types.Row.class);
152152

153-
//join side table 之前先 keyby ===>减少 维表在各个async 的缓存大小
153+
//join side table before keyby ===> Reducing the size of each dimension table cache of async
154154
if(sideTableInfo.isPartitionedJoin()){
155155
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());
156156
String[] leftJoinColArr = new String[leftJoinColList.size()];
@@ -159,7 +159,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
159159
}
160160

161161
AsyncReqRow asyncDbReq = loadAsyncReq(sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
162-
//TODO 并行度应该设置为多少?超时时间设置? capacity设置?
162+
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
163163
DataStream dsOut = AsyncDataStream.orderedWait(adaptStream, asyncDbReq, 10000, TimeUnit.MILLISECONDS, 10)
164164
.setParallelism(sideTableInfo.getParallelism());
165165

@@ -526,7 +526,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
526526
}
527527

528528
/**
529-
* 判断维表的join 条件十分包含全部的等值条件(即是维表定义中的主键)
529+
* Analyzing conditions are very join the dimension tables include all equivalent conditions (i.e., dimension table is the primary key definition
530530
* @return
531531
*/
532532
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, List<String> primaryKeys){

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
4949

5050
private int cacheSize = 10000;
5151

52-
private long cacheTimeout = 60 * 1000;//默认1分钟
52+
private long cacheTimeout = 60 * 1000;//
5353

5454
private boolean partitionedJoin = false;
5555

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.flink.table.sinks.TableSink;
2828

2929
/**
30-
* 根据指定的sink type 加载jar,并初始化对象
30+
* Loads jar and initializes the object according to the specified sink type
3131
* Date: 2017/3/10
3232
* Company: www.dtstack.com
3333
* @author xuchao

core/src/main/java/com/dtstack/flink/sql/source/IStreamSourceGener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
public interface IStreamSourceGener<T> {
3434

3535
/**
36-
* 获取输入源
3736
* @param sourceTableInfo
3837
* @param env
3938
* @param tableEnv

core/src/main/java/com/dtstack/flink/sql/source/StreamSourceFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.flink.table.api.java.StreamTableEnvironment;
3131

3232
/**
33-
* 创建streamTableSource
33+
* streamTableSource
3434
* Date: 2017/3/10
3535
* Company: www.dtstack.com
3636
* @author xuchao
@@ -58,7 +58,7 @@ public static AbsSourceParser getSqlParser(String resultType, String sqlRootDir)
5858
}
5959

6060
/**
61-
* 根据指定的类型构造数据源
61+
* The configuration of the type specified data source
6262
* @param sourceTableInfo
6363
* @return
6464
*/

0 commit comments

Comments
 (0)