Skip to content

Commit 4362a3b

Browse files
committed
merge 1.8
1 parent 3f30625 commit 4362a3b

File tree

20 files changed

+226
-700
lines changed

20 files changed

+226
-700
lines changed

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public TypeInformation<?>[] getFieldTypes() {
7373
@Override
7474
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7575
// flink 1.9 use consumeDataStream
76+
consumeDataStream(dataStream);
7677
}
7778

7879
@Override

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.flink.api.java.typeutils.RowTypeInfo;
6161
import org.apache.flink.streaming.api.datastream.DataStream;
6262
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
63+
import org.apache.flink.table.api.EnvironmentSettings;
6364
import org.apache.flink.table.api.Table;
6465
import org.apache.flink.table.api.TableEnvironment;
6566
import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -149,7 +150,7 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin
149150

150151
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
151152
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
152-
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
153+
StreamTableEnvironment tableEnv = getStreamTableEnv(env,paramsInfo.getConfProp());
153154

154155
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
155156
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
@@ -284,11 +285,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
284285
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
285286

286287
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
287-
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
288-
.map((Tuple2<Boolean, Row> f0) -> {
289-
return f0.f1;
290-
})
291-
.returns(typeInfo);
288+
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
292289

293290
String fields = String.join(",", typeInfo.getFieldNames());
294291

@@ -352,6 +349,19 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
352349
return env;
353350
}
354351

352+
private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
353+
// use blink and streammode
354+
EnvironmentSettings settings = EnvironmentSettings.newInstance()
355+
.useBlinkPlanner()
356+
.inStreamingMode()
357+
.build();
358+
359+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
360+
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
361+
return tableEnv;
362+
}
363+
364+
355365
public static void setLogLevel(ParamsInfo paramsInfo){
356366
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
357367
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){

core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ protected void beforeDeserialize() throws IOException {
117117
protected void afterDeserialize() throws IOException {
118118
}
119119

120+
public DeserializationSchema<Row> getDeserializationSchema() {
121+
return deserializationSchema;
122+
}
123+
120124
public RuntimeContext getRuntimeContext() {
121125
return runtimeContext;
122126
}

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,10 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5858
private final String[] fieldNames;
5959
private final TypeInformation<?>[] fieldTypes;
6060
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
61+
private TypeInformation<Row> typeInfo;
6162

6263
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
64+
this.typeInfo = typeInfo;
6365
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6466
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6567
this.rowAndFieldMapping = rowAndFieldMapping;
@@ -178,5 +180,8 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
178180
}
179181
}
180182

181-
183+
@Override
184+
public TypeInformation<Row> getProducedType() {
185+
return typeInfo;
186+
}
182187
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,17 @@ private RichSinkFunction createEsSinkFunction() {
120120

121121
@Override
122122
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
123+
consumeDataStream(dataStream);
124+
}
125+
126+
@Override
127+
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
123128
RichSinkFunction richSinkFunction = createEsSinkFunction();
124129
DataStreamSink streamSink = dataStream.addSink(richSinkFunction);
125130
if (parallelism > 0) {
126131
streamSink.setParallelism(parallelism);
127132
}
133+
return streamSink;
128134
}
129135

130136
@Override

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.dtstack.flink.sql.format.FormatType;
2121
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
22-
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
2322
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2423
import org.apache.commons.lang3.StringUtils;
2524
import org.apache.flink.api.common.serialization.SerializationSchema;

0 commit comments

Comments
 (0)