Skip to content

Commit f5a927f

Browse files
authored
Merge pull request #146 from marklogic-community/feature/more-null-handling
More StringUtils sanity checking
2 parents 38668d2 + a77e7de commit f5a927f

File tree

9 files changed

+22
-20
lines changed

9 files changed

+22
-20
lines changed

src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.marklogic.client.ext.helper.LoggingObject;
88
import com.marklogic.client.ext.modulesloader.ssl.SimpleX509TrustManager;
99
import org.apache.kafka.common.config.types.Password;
10+
import org.springframework.util.StringUtils;
1011

1112
import javax.net.ssl.*;
1213
import java.io.InputStream;
@@ -28,11 +29,11 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsed
2829
clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType));
2930

3031
String database = (String) parsedConfig.get(MarkLogicConfig.CONNECTION_DATABASE);
31-
if (database != null && database.trim().length() > 0) {
32+
if (StringUtils.hasText(database)) {
3233
clientConfig.setDatabase(database);
3334
}
3435
String connType = (String) parsedConfig.get(MarkLogicConfig.CONNECTION_TYPE);
35-
if (connType != null && connType.trim().length() > 0) {
36+
if (StringUtils.hasText(connType)) {
3637
clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(connType.toUpperCase()));
3738
}
3839

src/main/java/com/marklogic/kafka/connect/sink/WriteBatcherSinkTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected void onStart(Map<String, Object> parsedConfig) {
5353
configureWriteBatcher(parsedConfig, writeBatcher);
5454

5555
final String flowName = (String) parsedConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_NAME);
56-
if (flowName != null && flowName.trim().length() > 0) {
56+
if (StringUtils.hasText(flowName)) {
5757
writeBatcher.onBatchSuccess(buildRunFlowListener(flowName, parsedConfig, databaseClientConfig));
5858
}
5959

@@ -180,7 +180,7 @@ protected RunFlowWriteBatchListener buildRunFlowListener(String flowName, Map<St
180180
String logMessage = String.format("After ingesting a batch, will run flow '%s'", flowName);
181181
final String flowSteps = (String) parsedConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_STEPS);
182182
List<String> steps = null;
183-
if (flowSteps != null && flowSteps.trim().length() > 0) {
183+
if (StringUtils.hasText(flowSteps)) {
184184
steps = Arrays.asList(flowSteps.split(","));
185185
logMessage += String.format(" with steps '%s' constrained to the URIs in that batch", steps);
186186
}
@@ -205,9 +205,9 @@ protected Optional<ServerTransform> buildServerTransform(final Map<String, Objec
205205
if (StringUtils.hasText(transformName)) {
206206
ServerTransform transform = new ServerTransform(transformName);
207207
String params = (String) parsedConfig.get(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS);
208-
if (params != null && params.trim().length() > 0) {
208+
if (StringUtils.hasText(params)) {
209209
String delimiter = (String) parsedConfig.get(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER);
210-
if (delimiter != null && delimiter.trim().length() > 0) {
210+
if (StringUtils.hasText(delimiter)) {
211211
addTransformParameters(transform, params, delimiter);
212212
} else {
213213
logger.warn("Unable to apply transform parameters to transform: {}; please set the " +

src/main/java/com/marklogic/kafka/connect/source/AbstractPlanInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.marklogic.client.ext.helper.LoggingObject;
55
import com.marklogic.client.row.RowManager;
66
import com.marklogic.kafka.connect.ConfigUtil;
7+
import org.springframework.util.StringUtils;
78

89
import java.util.Map;
910

@@ -19,7 +20,7 @@ abstract class AbstractPlanInvoker extends LoggingObject {
1920
protected AbstractPlanInvoker(DatabaseClient client, Map<String, Object> parsedConfig) {
2021
this.client = client;
2122
String value = (String) parsedConfig.get(MarkLogicSourceConfig.KEY_COLUMN);
22-
if (value != null && value.trim().length() > 0) {
23+
if (StringUtils.hasText(value)) {
2324
this.keyColumn = value;
2425
} else {
2526
this.keyColumn = null;

src/main/java/com/marklogic/kafka/connect/source/CsvPlanInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.marklogic.kafka.connect.ConfigUtil;
1212
import com.marklogic.kafka.connect.MarkLogicConnectorException;
1313
import org.apache.kafka.connect.source.SourceRecord;
14+
import org.springframework.util.StringUtils;
1415

1516
import java.io.BufferedReader;
1617
import java.io.IOException;
@@ -57,7 +58,7 @@ public Results invokePlan(PlanBuilder.Plan plan, String topic) {
5758
}
5859

5960
private Optional<Integer> getIndexOfKeyColumn(String headerLine) {
60-
if (keyColumn != null) {
61+
if (StringUtils.hasText(keyColumn)) {
6162
ArrayNode headerNames;
6263
try {
6364
headerNames = (ArrayNode) csvMapper.readTree(headerLine);

src/main/java/com/marklogic/kafka/connect/source/DocumentWriteOperationBuilder.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.marklogic.client.impl.DocumentWriteOperationImpl;
66
import com.marklogic.client.io.DocumentMetadataHandle;
77
import com.marklogic.client.io.marker.AbstractWriteHandle;
8+
import org.springframework.util.StringUtils;
89

910
public class DocumentWriteOperationBuilder {
1011

@@ -23,17 +24,17 @@ public DocumentWriteOperation build(RecordContent recordContent) {
2324
if (content == null) {
2425
throw new NullPointerException("'content' must not be null");
2526
}
26-
if (hasText(collections)) {
27+
if (StringUtils.hasText(collections)) {
2728
metadata.getCollections().addAll(collections.trim().split(","));
2829
}
29-
if (hasText(permissions)) {
30+
if (StringUtils.hasText(permissions)) {
3031
new DefaultDocumentPermissionsParser().parsePermissions(permissions.trim(), metadata.getPermissions());
3132
}
3233

33-
if (hasText(uriPrefix)) {
34+
if (StringUtils.hasText(uriPrefix)) {
3435
uri = uriPrefix + uri;
3536
}
36-
if (hasText(uriSuffix)) {
37+
if (StringUtils.hasText(uriSuffix)) {
3738
uri += uriSuffix;
3839
}
3940

@@ -54,10 +55,6 @@ protected DocumentWriteOperation build(DocumentWriteOperation.OperationType oper
5455
return new DocumentWriteOperationImpl(operationType, uri, metadata, content);
5556
}
5657

57-
private boolean hasText(String val) {
58-
return val != null && val.trim().length() > 0;
59-
}
60-
6158
public DocumentWriteOperationBuilder withUriPrefix(String uriPrefix) {
6259
this.uriPrefix = uriPrefix;
6360
return this;

src/main/java/com/marklogic/kafka/connect/source/DslQueryHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected String appendConstraintAndOrderByToQuery(String previousMaxConstraintC
4242
String constrainedDsl = userDslQuery;
4343
if (StringUtils.hasText(constraintColumnName)) {
4444
String constraintPhrase = "";
45-
if (previousMaxConstraintColumnValue != null) {
45+
if (StringUtils.hasText(previousMaxConstraintColumnValue)) {
4646
String sanitizedValue = previousMaxConstraintColumnValue.replaceAll(VALUE_SANITIZATION_PATTERN, "");
4747
constraintPhrase = String.format(".where(op.gt(op.col('%s'), '%s'))", constraintColumnName, sanitizedValue);
4848
}

src/main/java/com/marklogic/kafka/connect/source/JsonPlanInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.marklogic.client.expression.PlanBuilder;
66
import com.marklogic.client.io.JacksonHandle;
77
import org.apache.kafka.connect.source.SourceRecord;
8+
import org.springframework.util.StringUtils;
89

910
import java.util.ArrayList;
1011
import java.util.List;
@@ -31,7 +32,7 @@ public Results invokePlan(PlanBuilder.Plan plan, String topic) {
3132
}
3233

3334
private String getKeyValueFromRow(JsonNode row) {
34-
if (keyColumn != null && row.has(keyColumn)) {
35+
if (StringUtils.hasText(keyColumn) && row.has(keyColumn)) {
3536
JsonNode column = row.get(keyColumn);
3637
return this.includeColumnTypes ? column.get("value").asText() : column.asText();
3738
}

src/main/java/com/marklogic/kafka/connect/source/SerializedQueryHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected void appendConstraintAndOrderByToQuery(JsonNode currentSerializedQuery
5858
if (StringUtils.hasText(constraintColumnName)) {
5959
ObjectNode orderByNode = buildOrderByNode(true);
6060
ArrayNode rootArgsArray = (ArrayNode) currentSerializedQuery.get(OPTIC_PLAN_ROOT_NODE).get("args");
61-
if (previousMaxConstraintColumnValue != null) {
61+
if (StringUtils.hasText(previousMaxConstraintColumnValue)) {
6262
ObjectNode constraintNode = buildConstraintNode(previousMaxConstraintColumnValue);
6363
rootArgsArray.add(constraintNode);
6464
}

src/main/java/com/marklogic/kafka/connect/source/XmlPlanInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.marklogic.client.io.DOMHandle;
66
import com.marklogic.kafka.connect.MarkLogicConnectorException;
77
import org.apache.kafka.connect.source.SourceRecord;
8+
import org.springframework.util.StringUtils;
89
import org.w3c.dom.Element;
910
import org.w3c.dom.NamedNodeMap;
1011
import org.w3c.dom.Node;
@@ -65,7 +66,7 @@ private List<SourceRecord> convertRowsToSourceRecords(DOMHandle result, String t
6566
}
6667

6768
private String getKeyFromRow(Node row) {
68-
if (keyColumn != null) {
69+
if (StringUtils.hasText(keyColumn)) {
6970
NodeList columns = row.getChildNodes();
7071
int len = columns.getLength();
7172
for (int j = 0; j < len; j++) {

0 commit comments

Comments
 (0)