Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class GrokFilterConfig extends CommonFilterConfig {

private final GrokConfig grok;
public static final String GROK_FILTER = "GROK_FILTER";

public static final String GROK_TARGET_CONFIG = "target";
public static final String GROK_TARGET_DOC = "The target field to put the extracted Grok data (optional)";

/**
* Creates a new {@link GrokFilterConfig} instance.
Expand All @@ -29,11 +32,26 @@ public GrokConfig grok() {
return grok;
}

public String target() {
return getString(GROK_TARGET_CONFIG);
}

public static ConfigDef configDef() {
int filterGroupCounter = 0;
final ConfigDef def = new ConfigDef(CommonFilterConfig.configDef())
.define(getSourceConfigKey(GROK_FILTER, filterGroupCounter++))
.define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++));
.define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++))
.define(
GROK_TARGET_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH,
GROK_TARGET_DOC,
GROK_FILTER,
filterGroupCounter++,
ConfigDef.Width.NONE,
GROK_TARGET_CONFIG
);
for (ConfigDef.ConfigKey configKey : GrokConfig.configDef().configKeys().values()) {
def.define(new ConfigDef.ConfigKey(
configKey.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.config.GrokFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
Expand All @@ -17,6 +16,7 @@
import io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,7 +58,7 @@ public void configure(final Map<String, ?> props) {
*/
@Override
public ConfigDef configDef() {
return CommonFilterConfig.configDef();
return GrokFilterConfig.configDef();
}

/**
Expand All @@ -68,10 +68,56 @@ public ConfigDef configDef() {
protected RecordsIterable<TypedStruct> apply(final FilterContext context,
final TypedStruct record) throws FilterException {

final String value = record.getString(config.source());
final TypedValue value = record.find(config.source());

if (value == null) {
throw new FilterException("Invalid field '" + config.source() + "', field does not exist");
}

final List<String> valuesToProcess = resolveValues(value);
final List<TypedStruct> extractedResults = new ArrayList<>();

for (String valueToProcess : valuesToProcess) {
extractedResults.add(applyFilterOnValue(valueToProcess));
}

return buildResult(extractedResults);
}

/**
* Normalizes the configured source field into the list of strings to run through the Grok patterns.
*
* @param value the typed value obtained from the record for the configured source path.
* @return list of string entries to parse.
*/
private List<String> resolveValues(final TypedValue value) {
if (value.type() == Type.STRING) {
return List.of(value.getString());
}

if (value.type() == Type.ARRAY) {
final Collection<Object> array = value.getArray();
final List<String> values = new ArrayList<>(array.size());
for (Object item : array) {
if (!(item instanceof String)) {
throw new FilterException(
"Array contains non-string element of type: " + item.getClass().getName());
}
values.add((String) item);
}
return values;
}

if (value == null) return null;
throw new FilterException("Source field must be either STRING or ARRAY type, got: " + value.type());
}

/**
* Applies all configured Grok patterns on the given value and returns the resulting struct.
*
* @param value the string payload to match.
* @return the struct built from captured groups.
*/
private TypedStruct applyFilterOnValue(final String value) {
final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

List<SchemaAndNamedCaptured> allNamedCaptured = new ArrayList<>(matchPatterns.size());
Expand All @@ -89,7 +135,30 @@ protected RecordsIterable<TypedStruct> apply(final FilterContext context,
}

final Schema schema = mergeToSchema(allNamedCaptured);
return RecordsIterable.of(mergeToStruct(allNamedCaptured, schema));
return mergeToStruct(allNamedCaptured, schema);
}

/**
* Builds the output payload by honoring the target field and the number of extracted results.
*
* @param results list of extracted structs for each processed value.
* @return iterable wrapping the final record to merge.
*/
private RecordsIterable<TypedStruct> buildResult(final List<TypedStruct> results) {
final boolean hasTarget = config.target() != null;
final String targetField = hasTarget ? config.target() : config.source();

if (results.size() == 1 && !hasTarget) {
return RecordsIterable.of(results.get(0));
}

final TypedStruct result = TypedStruct.create();
if (results.size() == 1) {
result.insert(targetField, results.get(0));
} else {
result.insert(targetField, TypedValue.array(results, Type.STRUCT));
}
return RecordsIterable.of(result);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.transform.GrokConfig;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,4 +87,116 @@ public void testGivenPatternWithNoGroupWhenCapturedNameOnlyIsFalse() {
Assert.assertEquals("INFO", struct.getString("LOGLEVEL"));
Assert.assertEquals("a dummy log message", struct.getString("GREEDYDATA"));
}

@Test
public void testGivenTargetField() {
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put("target", "logData");
filter.configure(configs, alias -> null);
List<TypedStruct> results = filter.apply(null, DATA, false).collect();

Assert.assertEquals(1, results.size());
TypedStruct struct = results.get(0);

// Original message field should still exist
Assert.assertEquals(INPUT, struct.getString("message"));

// Extracted data should be in the target field
Assert.assertTrue(struct.exists("logData"));
TypedStruct logData = struct.getStruct("logData");
Assert.assertNotNull(logData);
Assert.assertEquals("1970-01-01 00:00:00,000", logData.getString("timestamp"));
Assert.assertEquals("INFO", logData.getString("level"));
// Inside the target struct, message is just the extracted value (no merging with original)
Assert.assertEquals("a dummy log message", logData.getString("message"));
}

@Test
public void testGivenTargetFieldWithOverwrite() {
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put("target", "parsed");
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
filter.configure(configs, alias -> null);
List<TypedStruct> results = filter.apply(null, DATA, false).collect();

Assert.assertEquals(1, results.size());
TypedStruct struct = results.get(0);

// Original message field should still exist
Assert.assertEquals(INPUT, struct.getString("message"));

// Extracted data should be in the target field with overwrite applied
Assert.assertTrue(struct.exists("parsed"));
TypedStruct parsed = struct.getStruct("parsed");
Assert.assertNotNull(parsed);
Assert.assertEquals("1970-01-01 00:00:00,000", parsed.getString("timestamp"));
Assert.assertEquals("INFO", parsed.getString("level"));
// With overwrite, message should be a string, not an array
Assert.assertEquals("a dummy log message", parsed.getString("message"));
}

@Test
public void testGivenArraySourceWithNestedTarget() {
TypedStruct record = TypedStruct.create();
record.put(
"logData",
Arrays.asList(
"1970-01-01 00:00:00,000 INFO first log message",
"1970-01-01 00:00:01,000 WARN second log message",
"1970-01-01 00:00:02,000 ERROR third log message"));

configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logData");
configs.put("target", "parsed.records");
filter.configure(configs, alias -> null);

List<TypedStruct> results = filter.apply(null, record, false).collect();
Assert.assertEquals(1, results.size());
TypedStruct struct = results.get(0);

Assert.assertTrue(struct.exists("parsed"));
TypedStruct parsed = struct.getStruct("parsed");
Assert.assertNotNull(parsed);
Assert.assertTrue(parsed.exists("records"));
List<Object> records = parsed.getArray("records");
Assert.assertEquals(3, records.size());

TypedStruct first = (TypedStruct) records.get(0);
Assert.assertEquals("1970-01-01 00:00:00,000", first.getString("timestamp"));
Assert.assertEquals("INFO", first.getString("level"));
Assert.assertEquals("first log message", first.getString("message"));

TypedStruct last = (TypedStruct) records.get(2);
Assert.assertEquals("1970-01-01 00:00:02,000", last.getString("timestamp"));
Assert.assertEquals("ERROR", last.getString("level"));
Assert.assertEquals("third log message", last.getString("message"));
}

@Test(expected = FilterException.class)
public void testGivenArraySourceWithInvalidElementTypeShouldFail() {
TypedStruct record = TypedStruct.create().put(
"logs",
Arrays.asList("1970-01-01 00:00:00,000 INFO first log message", 1));

configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs");
filter.configure(configs, alias -> null);

filter.apply(null, record, false);
}

@Test(expected = FilterException.class)
public void testGivenArraySourceWithNonMatchingEntryShouldFail() {
TypedStruct record = TypedStruct.create().put(
"logs",
Arrays.asList(
"1970-01-01 00:00:00,000 INFO first log message",
"INVALID"));

configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs");
filter.configure(configs, alias -> null);

filter.apply(null, record, false);
}
}
17 changes: 9 additions & 8 deletions docs/content/en/docs/Developer Guide/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,15 @@ The `GrokFilter`is based on: https://github.com/streamthoughts/kafka-connect-tra

### Configuration

| Configuration | Description | Type | Default | Importance |
|----------------------|-----------------------------------------------|---------|-----------|------------|
| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high |
| `pattern` | The Grok pattern to match. | string | *-* | high |
| `overwrite` | The fields to overwrite. | list | medium |
| `patternDefinitions` | Custom pattern definitions. | list | *-* | low |
| `patternsDir` | List of user-defined pattern directories | string | *-* | low |
| `source` | The input field on which to apply the filter | string | *message* | medium |
| Configuration | Description | Type | Default | Importance |
|----------------------|----------------------------------------------------------|----------------------- |-----------|------------|
| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high |
| `pattern` | The Grok pattern to match. | string | *-* | high |
| `overwrite` | The fields to overwrite. | list | | medium |
| `patternDefinitions` | Custom pattern definitions. | list | *-* | low |
| `patternsDir` | List of user-defined pattern directories. | string | *-* | low |
| `source` | The input field on which to apply the filter | string / array<string> | *message* | medium |
| `target` | (Optional) Destination field receiving extracted values. | string | *-* | medium |

### Examples

Expand Down