File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change 99import org .apache .flink .api .java .typeutils .RowTypeInfo ;
1010import org .apache .flink .api .java .typeutils .TupleTypeInfo ;
1111import org .apache .flink .streaming .api .datastream .DataStream ;
12+ import org .apache .flink .streaming .api .datastream .DataStreamSink ;
1213import org .apache .flink .streaming .api .functions .sink .OutputFormatSinkFunction ;
1314import org .apache .flink .streaming .api .functions .sink .RichSinkFunction ;
1415import org .apache .flink .table .sinks .RetractStreamTableSink ;
@@ -63,7 +64,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
6364 .setFieldTypes (this .fieldTypes );
6465 KuduOutputFormat kuduOutputFormat = builder .finish ();
6566 RichSinkFunction richSinkFunction = new OutputFormatSinkFunction (kuduOutputFormat );
66- dataStream .addSink (richSinkFunction );
67+ DataStreamSink dataStreamSink = dataStream .addSink (richSinkFunction );
68+ dataStreamSink .name (tableName );
69+ if (parallelism > 0 ) {
70+ dataStreamSink .setParallelism (parallelism );
71+ }
6772 }
6873
6974 @ Override
You can’t perform that action at this time.
0 commit comments