Skip to content

Commit 1b4c632

Browse files
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dt-insight-engine/flinkStreamSQL into feat_elasticsearch6-sink
2 parents cfa0226 + a690319 commit 1b4c632

File tree

4 files changed

+30
-2
lines changed

4 files changed

+30
-2
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@
121121
<artifactId>junit</artifactId>
122122
<version>4.12</version>
123123
</dependency>
124+
<dependency>
125+
<groupId>ch.qos.logback</groupId>
126+
<artifactId>logback-classic</artifactId>
127+
<version>1.1.7</version>
128+
</dependency>
124129

125130
</dependencies>
126131

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
import java.util.Set;
7878

7979
import com.dtstack.flink.sql.option.Options;
80+
import ch.qos.logback.classic.Level;
81+
import ch.qos.logback.classic.LoggerContext;
8082

8183
/**
8284
* Date: 2018/6/26
@@ -105,9 +107,9 @@ public static void main(String[] args) throws Exception {
105107
String pluginLoadMode = options.getPluginLoadMode();
106108
String deployMode = options.getMode();
107109
String confProp = options.getConfProp();
108-
109110
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
110111
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
112+
setLogLevel(options.getLogLevel());
111113

112114
List<String> addJarFileList = Lists.newArrayList();
113115
if (!Strings.isNullOrEmpty(addJarListStr)) {
@@ -308,4 +310,10 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
308310
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
309311
return env;
310312
}
313+
private static void setLogLevel(String level){
314+
LoggerContext loggerContext= (LoggerContext) LoggerFactory.getILoggerFactory();
315+
//设置全局日志级别
316+
ch.qos.logback.classic.Logger logger = loggerContext.getLogger("root");
317+
logger.setLevel(Level.toLevel(level, Level.INFO));
318+
}
311319
}

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class Options {
6969
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
7070
private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
7171

72+
private String logLevel = "logLevel";
73+
7274
public String getMode() {
7375
return mode;
7476
}
@@ -172,4 +174,12 @@ public String getPluginLoadMode() {
172174
public void setPluginLoadMode(String pluginLoadMode) {
173175
this.pluginLoadMode = pluginLoadMode;
174176
}
177+
178+
public String getLogLevel() {
179+
return logLevel;
180+
}
181+
182+
public void setLogLevel(String logLevel) {
183+
this.logLevel = logLevel;
184+
}
175185
}

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.ClassUtil;
2424
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Lists;
2627
import com.google.common.collect.Maps;
2728
import org.apache.commons.lang3.StringUtils;
@@ -45,6 +46,7 @@ public abstract class AbsTableParser {
4546

4647
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
4748
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
4850

4951
private Map<String, Pattern> patternMap = Maps.newHashMap();
5052

@@ -131,9 +133,12 @@ public void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
131133
*/
132134
protected void dealNestField(Matcher matcher, TableInfo tableInfo) {
133135
String physicalField = matcher.group(1);
136+
Preconditions.checkArgument(!physicalFieldFunPattern.matcher(physicalField).find(),
137+
"No need to add data types when using functions, The correct way is : strLen(name) as nameSize, ");
138+
134139
String fieldType = matcher.group(3);
135140
String mappingField = matcher.group(4);
136-
Class fieldClass= dbTypeConvertToJavaType(fieldType);
141+
Class fieldClass = dbTypeConvertToJavaType(fieldType);
137142
boolean notNull = matcher.group(5) != null;
138143
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
139144
fieldExtraInfo.setNotNull(notNull);

0 commit comments

Comments
 (0)