Skip to content

Commit b3877e5

Browse files
committed
optimize code
1 parent a797fee commit b3877e5

File tree

6 files changed

+16
-14
lines changed

6 files changed

+16
-14
lines changed

core/src/main/java/com/dtstack/flink/sql/util/Krb5Utils.java renamed to core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* @author: wuren
2929
* @create: 2020/09/14
3030
**/
31-
public class Krb5Utils {
31+
public class KrbUtils {
3232

3333
public static final String KRB5_CONF_KEY = "java.security.krb5.conf";
3434
public static final String HADOOP_AUTH_KEY = "hadoop.security.authentication";

core/src/test/java/com/dtstack/flink/sql/util/Krb5UtilsTest.java renamed to core/src/test/java/com/dtstack/flink/sql/util/KrbUtilsTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,16 @@
2828
* @author: wuren
2929
* @create: 2020/09/14
3030
**/
31-
public class Krb5UtilsTest {
31+
public class KrbUtilsTest {
3232
@Test
3333
public void testGetUgi() throws IOException {
3434
String principal = "";
3535
String keytabPath = "";
3636
String krb5confPath = "";
3737
try {
38-
Krb5Utils.getUgi(principal, keytabPath, krb5confPath);
38+
KrbUtils.getUgi(principal, keytabPath, krb5confPath);
3939
} catch (IllegalArgumentException e) {
4040
Assert.assertEquals(e.getMessage(), "Can't get Kerberos realm");
4141
}
42-
4342
}
4443
}

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2525
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2626
import com.dtstack.flink.sql.util.JDBCUtils;
27-
import com.dtstack.flink.sql.util.Krb5Utils;
27+
import com.dtstack.flink.sql.util.KrbUtils;
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.hadoop.conf.Configuration;
3030
import org.apache.hadoop.security.UserGroupInformation;
@@ -72,7 +72,7 @@ public Connection getConn(String dbUrl, String userName, String password) {
7272
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7373
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7474
String principal = impalaSideTableInfo.getPrincipal();
75-
UserGroupInformation ugi = Krb5Utils.getUgi(principal, keyTabFilePath, krb5FilePath);
75+
UserGroupInformation ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
7676
connection = ugi.doAs(new PrivilegedAction<Connection>() {
7777
@Override
7878
public Connection run() {

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2525
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
26-
import com.dtstack.flink.sql.util.Krb5Utils;
26+
import com.dtstack.flink.sql.util.KrbUtils;
2727
import io.vertx.core.Vertx;
2828
import io.vertx.core.VertxOptions;
2929
import io.vertx.core.json.JsonObject;
@@ -72,7 +72,7 @@ public void open(Configuration parameters) throws Exception {
7272
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7373
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7474
String principal = impalaSideTableInfo.getPrincipal();
75-
ugi = Krb5Utils.getUgi(principal, keyTabFilePath, krb5FilePath);
75+
ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
7676
openJdbc(parameters);
7777
} else {
7878
openJdbc(parameters);

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
44
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
5-
import com.dtstack.flink.sql.util.Krb5Utils;
5+
import com.dtstack.flink.sql.util.KrbUtils;
66
import org.apache.hadoop.security.UserGroupInformation;
77

88
import java.io.IOException;
@@ -54,11 +54,15 @@ public ImpalaOutputFormat(
5454
@Override
5555
public void open(int taskNumber, int numTasks) throws IOException {
5656
if (authMech == 1) {
57-
UserGroupInformation ugi = Krb5Utils.getUgi(principal, keytabPath, krb5confPath);
57+
UserGroupInformation ugi = KrbUtils.getUgi(principal, keytabPath, krb5confPath);
5858
ugi.doAs(new PrivilegedAction<Object>() {
5959
@Override
6060
public Object run() {
61-
openJdbc();
61+
try {
62+
openJdbc();
63+
} catch (IOException e) {
64+
e.printStackTrace();
65+
}
6266
return null;
6367
}
6468
});

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,10 @@ public void open(int taskNumber, int numTasks) throws IOException {
113113
openJdbc();
114114
}
115115

116-
public void openJdbc() {
116+
public void openJdbc() throws IOException {
117117
try {
118118
establishConnection();
119119
initMetric();
120-
121120
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || keyFields == null || keyFields.length == 0) {
122121
String insertSql = dialect.getInsertIntoStatement(schema, tableName, fieldNames, partitionFields);
123122
LOG.info("execute insert sql: {}", insertSql);
@@ -128,7 +127,7 @@ public void openJdbc() {
128127
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(), allReplace, this);
129128
}
130129
jdbcWriter.open(connection);
131-
} catch (SQLException | IOException sqe) {
130+
} catch (SQLException sqe) {
132131
throw new IllegalArgumentException("open() failed.", sqe);
133132
} catch (ClassNotFoundException cnfe) {
134133
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);

0 commit comments

Comments
 (0)