Skip to content

Commit 5296857

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_26255' into 1.8_release_3.10.x
2 parents f7d04be + 8098435 commit 5296857

File tree

8 files changed

+173
-29
lines changed

8 files changed

+173
-29
lines changed

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 95 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -522,10 +522,53 @@ private void extractJoinField(SqlNode condition, Set<Tuple2<String, String>> joi
522522
}
523523

524524
SqlKind joinKind = condition.getKind();
525-
if( joinKind == AND || joinKind == EQUALS ){
526-
extractJoinField(((SqlBasicCall)condition).operands[0], joinFieldSet);
527-
extractJoinField(((SqlBasicCall)condition).operands[1], joinFieldSet);
528-
}else{
525+
if ( AGGREGATE.contains(condition.getKind())
526+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
527+
|| COMPARISON.contains(joinKind)
528+
|| joinKind == OTHER_FUNCTION
529+
|| joinKind == DIVIDE
530+
|| joinKind == CAST
531+
|| joinKind == TRIM
532+
|| joinKind == TIMES
533+
|| joinKind == PLUS
534+
|| joinKind == NOT_IN
535+
|| joinKind == OR
536+
|| joinKind == AND
537+
|| joinKind == MINUS
538+
|| joinKind == TUMBLE
539+
|| joinKind == TUMBLE_START
540+
|| joinKind == TUMBLE_END
541+
|| joinKind == SESSION
542+
|| joinKind == SESSION_START
543+
|| joinKind == SESSION_END
544+
|| joinKind == HOP
545+
|| joinKind == HOP_START
546+
|| joinKind == HOP_END
547+
|| joinKind == BETWEEN
548+
|| joinKind == IS_NULL
549+
|| joinKind == IS_NOT_NULL
550+
|| joinKind == CONTAINS
551+
|| joinKind == TIMESTAMP_ADD
552+
|| joinKind == TIMESTAMP_DIFF
553+
|| joinKind == LIKE
554+
|| joinKind == COALESCE
555+
|| joinKind == EQUALS ){
556+
557+
SqlBasicCall sqlBasicCall = (SqlBasicCall) condition;
558+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
559+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
560+
if(sqlNode instanceof SqlLiteral){
561+
continue;
562+
}
563+
564+
if(sqlNode instanceof SqlDataTypeSpec){
565+
continue;
566+
}
567+
568+
extractJoinField(sqlNode, joinFieldSet);
569+
}
570+
571+
} else if (condition.getKind() == IDENTIFIER){
529572
Preconditions.checkState(((SqlIdentifier)condition).names.size() == 2, "join condition must be format table.field");
530573
Tuple2<String, String> tuple2 = Tuple2.of(((SqlIdentifier)condition).names.get(0), ((SqlIdentifier)condition).names.get(1));
531574
joinFieldSet.add(tuple2);
@@ -833,20 +876,57 @@ public SqlBasicCall buildEmptyCondition(){
833876
private SqlIdentifier checkAndReplaceJoinCondition(SqlNode node, Map<String, String> tableMap){
834877

835878
SqlKind joinKind = node.getKind();
836-
if( joinKind == AND || joinKind == EQUALS ){
837-
SqlIdentifier leftNode = checkAndReplaceJoinCondition(((SqlBasicCall)node).operands[0], tableMap);
838-
SqlIdentifier rightNode = checkAndReplaceJoinCondition(((SqlBasicCall)node).operands[1], tableMap);
879+
if( AGGREGATE.contains(joinKind)
880+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
881+
|| COMPARISON.contains(joinKind)
882+
|| joinKind == OTHER_FUNCTION
883+
|| joinKind == DIVIDE
884+
|| joinKind == CAST
885+
|| joinKind == TRIM
886+
|| joinKind == TIMES
887+
|| joinKind == PLUS
888+
|| joinKind == NOT_IN
889+
|| joinKind == OR
890+
|| joinKind == AND
891+
|| joinKind == MINUS
892+
|| joinKind == TUMBLE
893+
|| joinKind == TUMBLE_START
894+
|| joinKind == TUMBLE_END
895+
|| joinKind == SESSION
896+
|| joinKind == SESSION_START
897+
|| joinKind == SESSION_END
898+
|| joinKind == HOP
899+
|| joinKind == HOP_START
900+
|| joinKind == HOP_END
901+
|| joinKind == BETWEEN
902+
|| joinKind == IS_NULL
903+
|| joinKind == IS_NOT_NULL
904+
|| joinKind == CONTAINS
905+
|| joinKind == TIMESTAMP_ADD
906+
|| joinKind == TIMESTAMP_DIFF
907+
|| joinKind == LIKE
908+
|| joinKind == COALESCE
909+
|| joinKind == EQUALS ){
910+
SqlBasicCall sqlBasicCall = (SqlBasicCall) node;
911+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
912+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
913+
if(sqlNode instanceof SqlLiteral){
914+
continue;
915+
}
839916

840-
if(leftNode != null){
841-
((SqlBasicCall)node).setOperand(0, leftNode);
842-
}
917+
if(sqlNode instanceof SqlDataTypeSpec){
918+
continue;
919+
}
920+
921+
SqlIdentifier replaceNode = checkAndReplaceJoinCondition(sqlNode, tableMap);
922+
if(replaceNode != null){
923+
((SqlBasicCall)node).setOperand(i, replaceNode);
924+
}
843925

844-
if(rightNode != null){
845-
((SqlBasicCall)node).setOperand(1, leftNode);
846926
}
847927

848928
return null;
849-
} else {
929+
} else if (node.getKind() == IDENTIFIER) {
850930
//replace table
851931
Preconditions.checkState(((SqlIdentifier)node).names.size() == 2, "join condition must be format table.field");
852932
String tbName = ((SqlIdentifier) node).names.get(0);
@@ -857,6 +937,8 @@ private SqlIdentifier checkAndReplaceJoinCondition(SqlNode node, Map<String, Str
857937

858938
return null;
859939
}
940+
941+
return null;
860942
}
861943

862944
/**

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -439,10 +439,53 @@ public static void replaceJoinFieldRefTableName(SqlNode condition, Map<String, S
439439
return;
440440
}
441441
SqlKind joinKind = condition.getKind();
442-
if( joinKind == AND || joinKind == EQUALS ){
443-
replaceJoinFieldRefTableName(((SqlBasicCall)condition).operands[0], oldTabFieldRefNew);
444-
replaceJoinFieldRefTableName(((SqlBasicCall)condition).operands[1], oldTabFieldRefNew);
445-
}else{
442+
if( AGGREGATE.contains(joinKind)
443+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
444+
|| COMPARISON.contains(joinKind)
445+
|| joinKind == OTHER_FUNCTION
446+
|| joinKind == DIVIDE
447+
|| joinKind == CAST
448+
|| joinKind == TRIM
449+
|| joinKind == TIMES
450+
|| joinKind == PLUS
451+
|| joinKind == NOT_IN
452+
|| joinKind == OR
453+
|| joinKind == AND
454+
|| joinKind == MINUS
455+
|| joinKind == TUMBLE
456+
|| joinKind == TUMBLE_START
457+
|| joinKind == TUMBLE_END
458+
|| joinKind == SESSION
459+
|| joinKind == SESSION_START
460+
|| joinKind == SESSION_END
461+
|| joinKind == HOP
462+
|| joinKind == HOP_START
463+
|| joinKind == HOP_END
464+
|| joinKind == BETWEEN
465+
|| joinKind == IS_NULL
466+
|| joinKind == IS_NOT_NULL
467+
|| joinKind == CONTAINS
468+
|| joinKind == TIMESTAMP_ADD
469+
|| joinKind == TIMESTAMP_DIFF
470+
|| joinKind == LIKE
471+
|| joinKind == COALESCE
472+
|| joinKind == EQUALS ){
473+
474+
SqlBasicCall sqlBasicCall = (SqlBasicCall) condition;
475+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
476+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
477+
if(sqlNode instanceof SqlLiteral){
478+
continue;
479+
}
480+
481+
if(sqlNode instanceof SqlDataTypeSpec){
482+
continue;
483+
}
484+
485+
replaceJoinFieldRefTableName(sqlNode, oldTabFieldRefNew);
486+
}
487+
488+
} else if (condition.getKind() == IDENTIFIER) {
446489
Preconditions.checkState(((SqlIdentifier)condition).names.size() == 2, "join condition must be format table.field");
447490
String fieldRefTable = ((SqlIdentifier)condition).names.get(0);
448491

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.flink.api.common.serialization.SerializationSchema;
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
31-
import org.apache.flink.formats.json.DTJsonRowSerializationSchema;
3229
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3330
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3431
import org.apache.flink.table.runtime.types.CRow;

kafka-base/kafka-base-sink/src/main/java/org/apache/flink/formats/json/DTJsonRowSerializationSchema.java renamed to kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/DTJsonRowSerializationSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.formats.json;
18+
package com.dtstack.flink.sql.sink.kafka.serialization;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.typeinfo.Types;
2626
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSchemaConverter;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,34 +159,34 @@ private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
159159
}
160160

161161
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
162-
if (info == Types.VOID || object == null) {
162+
if (info.equals(Types.VOID) || object == null) {
163163
return container.nullNode();
164-
} else if (info == Types.BOOLEAN) {
164+
} else if (info.equals(Types.BOOLEAN)) {
165165
return container.booleanNode((Boolean) object);
166-
} else if (info == Types.STRING) {
166+
} else if (info.equals(Types.STRING)) {
167167
return container.textNode((String) object);
168-
} else if (info == Types.BIG_DEC) {
168+
} else if (info.equals(Types.BIG_DEC)) {
169169
// convert decimal if necessary
170170
if (object instanceof BigDecimal) {
171171
return container.numberNode((BigDecimal) object);
172172
}
173173
return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
174-
} else if (info == Types.BIG_INT) {
174+
} else if (info.equals(Types.BIG_INT)) {
175175
// convert integer if necessary
176176
if (object instanceof BigInteger) {
177177
return container.numberNode((BigInteger) object);
178178
}
179179
return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
180-
} else if (info == Types.SQL_DATE) {
180+
} else if (info.equals(Types.SQL_DATE)) {
181181
return container.textNode(object.toString());
182-
} else if (info == Types.SQL_TIME) {
182+
} else if (info.equals(Types.SQL_TIME)) {
183183
final Time time = (Time) object;
184184
// strip milliseconds if possible
185185
if (time.getTime() % 1000 > 0) {
186186
return container.textNode(timeFormatWithMillis.format(time));
187187
}
188188
return container.textNode(timeFormat.format(time));
189-
} else if (info == Types.SQL_TIMESTAMP) {
189+
} else if (info.equals(Types.SQL_TIMESTAMP)) {
190190
return container.textNode(timestampFormat.format((Timestamp) object));
191191
} else if (info instanceof RowTypeInfo) {
192192
if (reuse != null && reuse instanceof ObjectNode) {

launcher/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
<version>1.2.17</version>
3030
</dependency>
3131

32+
<dependency>
33+
<groupId>org.slf4j</groupId>
34+
<artifactId>slf4j-log4j12</artifactId>
35+
<version>1.6.1</version>
36+
</dependency>
37+
3238
<dependency>
3339
<groupId>com.alibaba</groupId>
3440
<artifactId>fastjson</artifactId>

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.flink.runtime.jobgraph.JobGraph;
4242
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
4343
import org.apache.flink.util.FileUtils;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
4446

4547
import java.io.File;
4648
import java.io.IOException;
@@ -57,6 +59,8 @@
5759
*/
5860

5961
public class LauncherMain {
62+
63+
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
6064
private static final String CORE_JAR = "core";
6165

6266
private static String SP = File.separator;
@@ -68,9 +72,13 @@ private static String getLocalCoreJarPath(String localSqlRootJar) throws Excepti
6872
}
6973

7074
public static void main(String[] args) throws Exception {
75+
76+
LOG.info("----start----");
77+
7178
if (args.length == 1 && args[0].endsWith(".json")){
7279
args = parseJson(args);
7380
}
81+
7482
OptionParser optionParser = new OptionParser(args);
7583
Options launcherOptions = optionParser.getOptions();
7684
String mode = launcherOptions.getMode();
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#控制台输出:
2+
log4j.rootLogger = INFO,consoleAppender
3+
log4j.appender.consoleAppender = org.apache.log4j.ConsoleAppender
4+
log4j.appender.console.Target = System.out
5+
log4j.appender.consoleAppender.layout = org.apache.log4j.PatternLayout
6+
log4j.appender.consoleAppender.layout.ConversionPattern =%d %-5p %m %n
7+
log4j.appender.consoleAppender.ImmediateFlush = true

0 commit comments

Comments
 (0)