From 5bca71c8f7da5971252a4395e1c8041aa8966b40 Mon Sep 17 00:00:00 2001 From: F378515 Date: Mon, 1 Dec 2025 10:03:25 +0100 Subject: [PATCH] feat(filters): support GrokFilter target and array input --- .../filepulse/config/GrokFilterConfig.java | 20 +++- .../connect/filepulse/filter/GrokFilter.java | 79 +++++++++++- .../filepulse/filter/GrokFilterTest.java | 113 ++++++++++++++++++ .../en/docs/Developer Guide/filters.md | 17 +-- 4 files changed, 215 insertions(+), 14 deletions(-) diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java index 766a65c5e..26eacdf9c 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java @@ -14,6 +14,9 @@ public class GrokFilterConfig extends CommonFilterConfig { private final GrokConfig grok; public static final String GROK_FILTER = "GROK_FILTER"; + + public static final String GROK_TARGET_CONFIG = "target"; + public static final String GROK_TARGET_DOC = "The target field to put the extracted Grok data (optional)"; /** * Creates a new {@link GrokFilterConfig} instance. @@ -29,11 +32,26 @@ public GrokConfig grok() { return grok; } + public String target() { + return getString(GROK_TARGET_CONFIG); + } + public static ConfigDef configDef() { int filterGroupCounter = 0; final ConfigDef def = new ConfigDef(CommonFilterConfig.configDef()) .define(getSourceConfigKey(GROK_FILTER, filterGroupCounter++)) - .define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++)); + .define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++)) + .define( + GROK_TARGET_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + GROK_TARGET_DOC, + GROK_FILTER, + filterGroupCounter++, + ConfigDef.Width.NONE, + GROK_TARGET_CONFIG + ); for (ConfigDef.ConfigKey configKey : GrokConfig.configDef().configKeys().values()) { def.define(new ConfigDef.ConfigKey( configKey.name, diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.java index e52a0c26d..638ecac65 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.java @@ -6,7 +6,6 @@ */ package io.streamthoughts.kafka.connect.filepulse.filter; -import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig; import io.streamthoughts.kafka.connect.filepulse.config.GrokFilterConfig; import io.streamthoughts.kafka.connect.filepulse.data.Type; import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; @@ -17,6 +16,7 @@ import io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,7 +58,7 @@ public void configure(final Map props) { */ @Override public ConfigDef configDef() { - return CommonFilterConfig.configDef(); + return GrokFilterConfig.configDef(); } /** @@ -68,10 +68,56 @@ public ConfigDef configDef() { protected RecordsIterable apply(final FilterContext context, final TypedStruct record) throws FilterException { - final String value = record.getString(config.source()); + final TypedValue value = record.find(config.source()); + + if (value == null) { + throw new FilterException("Invalid field '" + config.source() + "', field does not exist"); + } + + final List valuesToProcess = resolveValues(value); + final List extractedResults = new ArrayList<>(); + + for (String valueToProcess : valuesToProcess) { + extractedResults.add(applyFilterOnValue(valueToProcess)); + } + + return buildResult(extractedResults); + } + + /** + * Normalizes the configured source field into the list of strings to run through the Grok patterns. + * + * @param value the typed value obtained from the record for the configured source path. + * @return list of string entries to parse. + */ + private List resolveValues(final TypedValue value) { + if (value.type() == Type.STRING) { + return List.of(value.getString()); + } + + if (value.type() == Type.ARRAY) { + final Collection array = value.getArray(); + final List values = new ArrayList<>(array.size()); + for (Object item : array) { + if (!(item instanceof String)) { + throw new FilterException( + "Array contains non-string element of type: " + item.getClass().getName()); + } + values.add((String) item); + } + return values; + } - if (value == null) return null; + throw new FilterException("Source field must be either STRING or ARRAY type, got: " + value.type()); + } + /** + * Applies all configured Grok patterns on the given value and returns the resulting struct. + * + * @param value the string payload to match. + * @return the struct built from captured groups. + */ + private TypedStruct applyFilterOnValue(final String value) { final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); List allNamedCaptured = new ArrayList<>(matchPatterns.size()); @@ -89,7 +135,30 @@ protected RecordsIterable apply(final FilterContext context, } final Schema schema = mergeToSchema(allNamedCaptured); - return RecordsIterable.of(mergeToStruct(allNamedCaptured, schema)); + return mergeToStruct(allNamedCaptured, schema); + } + + /** + * Builds the output payload by honoring the target field and the number of extracted results. + * + * @param results list of extracted structs for each processed value. + * @return iterable wrapping the final record to merge. + */ + private RecordsIterable buildResult(final List results) { + final boolean hasTarget = config.target() != null; + final String targetField = hasTarget ? config.target() : config.source(); + + if (results.size() == 1 && !hasTarget) { + return RecordsIterable.of(results.get(0)); + } + + final TypedStruct result = TypedStruct.create(); + if (results.size() == 1) { + result.insert(targetField, results.get(0)); + } else { + result.insert(targetField, TypedValue.array(results, Type.STRUCT)); + } + return RecordsIterable.of(result); } /** diff --git a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java index e200a6423..9fe41368b 100644 --- a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java +++ b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java @@ -9,6 +9,7 @@ import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig; import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; import io.streamthoughts.kafka.connect.transform.GrokConfig; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,4 +87,116 @@ public void testGivenPatternWithNoGroupWhenCapturedNameOnlyIsFalse() { Assert.assertEquals("INFO", struct.getString("LOGLEVEL")); Assert.assertEquals("a dummy log message", struct.getString("GREEDYDATA")); } + + @Test + public void testGivenTargetField() { + configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN); + configs.put("target", "logData"); + filter.configure(configs, alias -> null); + List results = filter.apply(null, DATA, false).collect(); + + Assert.assertEquals(1, results.size()); + TypedStruct struct = results.get(0); + + // Original message field should still exist + Assert.assertEquals(INPUT, struct.getString("message")); + + // Extracted data should be in the target field + Assert.assertTrue(struct.exists("logData")); + TypedStruct logData = struct.getStruct("logData"); + Assert.assertNotNull(logData); + Assert.assertEquals("1970-01-01 00:00:00,000", logData.getString("timestamp")); + Assert.assertEquals("INFO", logData.getString("level")); + // Inside the target struct, message is just the extracted value (no merging with original) + Assert.assertEquals("a dummy log message", logData.getString("message")); + } + + @Test + public void testGivenTargetFieldWithOverwrite() { + configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN); + configs.put("target", "parsed"); + configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message"); + filter.configure(configs, alias -> null); + List results = filter.apply(null, DATA, false).collect(); + + Assert.assertEquals(1, results.size()); + TypedStruct struct = results.get(0); + + // Original message field should still exist + Assert.assertEquals(INPUT, struct.getString("message")); + + // Extracted data should be in the target field with overwrite applied + Assert.assertTrue(struct.exists("parsed")); + TypedStruct parsed = struct.getStruct("parsed"); + Assert.assertNotNull(parsed); + Assert.assertEquals("1970-01-01 00:00:00,000", parsed.getString("timestamp")); + Assert.assertEquals("INFO", parsed.getString("level")); + // With overwrite, message should be a string, not an array + Assert.assertEquals("a dummy log message", parsed.getString("message")); + } + + @Test + public void testGivenArraySourceWithNestedTarget() { + TypedStruct record = TypedStruct.create(); + record.put( + "logData", + Arrays.asList( + "1970-01-01 00:00:00,000 INFO first log message", + "1970-01-01 00:00:01,000 WARN second log message", + "1970-01-01 00:00:02,000 ERROR third log message")); + + configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN); + configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logData"); + configs.put("target", "parsed.records"); + filter.configure(configs, alias -> null); + + List results = filter.apply(null, record, false).collect(); + Assert.assertEquals(1, results.size()); + TypedStruct struct = results.get(0); + + Assert.assertTrue(struct.exists("parsed")); + TypedStruct parsed = struct.getStruct("parsed"); + Assert.assertNotNull(parsed); + Assert.assertTrue(parsed.exists("records")); + List records = parsed.getArray("records"); + Assert.assertEquals(3, records.size()); + + TypedStruct first = (TypedStruct) records.get(0); + Assert.assertEquals("1970-01-01 00:00:00,000", first.getString("timestamp")); + Assert.assertEquals("INFO", first.getString("level")); + Assert.assertEquals("first log message", first.getString("message")); + + TypedStruct last = (TypedStruct) records.get(2); + Assert.assertEquals("1970-01-01 00:00:02,000", last.getString("timestamp")); + Assert.assertEquals("ERROR", last.getString("level")); + Assert.assertEquals("third log message", last.getString("message")); + } + + @Test(expected = FilterException.class) + public void testGivenArraySourceWithInvalidElementTypeShouldFail() { + TypedStruct record = TypedStruct.create().put( + "logs", + Arrays.asList("1970-01-01 00:00:00,000 INFO first log message", 1)); + + configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN); + configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs"); + filter.configure(configs, alias -> null); + + filter.apply(null, record, false); + } + + @Test(expected = FilterException.class) + public void testGivenArraySourceWithNonMatchingEntryShouldFail() { + TypedStruct record = TypedStruct.create().put( + "logs", + Arrays.asList( + "1970-01-01 00:00:00,000 INFO first log message", + "INVALID")); + + configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN); + configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs"); + filter.configure(configs, alias -> null); + + filter.apply(null, record, false); + } } \ No newline at end of file diff --git a/docs/content/en/docs/Developer Guide/filters.md b/docs/content/en/docs/Developer Guide/filters.md index bfda235f0..631f222af 100644 --- a/docs/content/en/docs/Developer Guide/filters.md +++ b/docs/content/en/docs/Developer Guide/filters.md @@ -632,14 +632,15 @@ The `GrokFilter`is based on: https://github.com/streamthoughts/kafka-connect-tra ### Configuration -| Configuration | Description | Type | Default | Importance | -|----------------------|-----------------------------------------------|---------|-----------|------------| -| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high | -| `pattern` | The Grok pattern to match. | string | *-* | high | -| `overwrite` | The fields to overwrite. | list | medium | -| `patternDefinitions` | Custom pattern definitions. | list | *-* | low | -| `patternsDir` | List of user-defined pattern directories | string | *-* | low | -| `source` | The input field on which to apply the filter | string | *message* | medium | +| Configuration | Description | Type | Default | Importance | +|----------------------|----------------------------------------------------------|----------------------- |-----------|------------| +| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high | +| `pattern` | The Grok pattern to match. | string | *-* | high | +| `overwrite` | The fields to overwrite. | list | | medium | +| `patternDefinitions` | Custom pattern definitions. | list | *-* | low | +| `patternsDir` | List of user-defined pattern directories. | string | *-* | low | +| `source` | The input field on which to apply the filter | string / array | *message* | medium | +| `target` | (Optional) Destination field receiving extracted values. | string | *-* | medium | ### Examples