diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 153a5d105..09bf07da1 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1912,6 +1912,10 @@ definitions: - ["data", "records"] - ["data", "{{ parameters.name }}"] - ["data", "*", "record"] + record_expander: + title: Record Expander + description: Optional component to expand records by extracting items from nested array fields. + "$ref": "#/definitions/RecordExpander" $parameters: type: object additionalProperties: true @@ -1928,6 +1932,38 @@ definitions: $parameters: type: object additionalProperties: true + RecordExpander: + title: Record Expander + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays. + type: object + required: + - type + - expand_records_from_field + properties: + type: + type: string + enum: [RecordExpander] + expand_records_from_field: + title: Expand Records From Field + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["lines", "data"] + - ["items"] + - ["nested", "array"] + - ["sections", "*", "items"] + remain_original_record: + title: Remain Original Record + description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. + type: boolean + default: false + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. diff --git a/airbyte_cdk/sources/declarative/expanders/__init__.py b/airbyte_cdk/sources/declarative/expanders/__init__.py new file mode 100644 index 000000000..c89197fc3 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander + +__all__ = ["RecordExpander"] diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py new file mode 100644 index 000000000..b4579c385 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -0,0 +1,98 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.types import Config + + +@dataclass +class RecordExpander: + """ + Expands records by extracting items from a nested array field. + + When configured, this component extracts items from a specified nested array path + within each record and emits each item as a separate record. Optionally, the original + parent record can be embedded in each expanded item for context preservation. + + The expand_records_from_field path supports wildcards (*) for matching multiple arrays. + When wildcards are used, items from all matched arrays are extracted and emitted. + + Examples of instantiating this component: + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "sections" + - "*" + - "items" + remain_original_record: false + ``` + + Attributes: + expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*). + remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False. + config (Config): The user-provided configuration as specified by the source's spec + """ + + expand_records_from_field: List[Union[InterpolatedString, str]] + config: Config + parameters: InitVar[Mapping[str, Any]] + remain_original_record: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._expand_path: Optional[List[InterpolatedString]] = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + + def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not self._expand_path: + yield record + return + + parent_record = record + expand_path = [path.eval(self.config) for path in self._expand_path] + + if "*" in expand_path: + extracted: Any = dpath.values(parent_record, expand_path) + for record in extracted: + if isinstance(record, list): + for item in record: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = parent_record + yield expanded_record + else: + yield item + else: + try: + extracted = dpath.get(parent_record, expand_path) + except KeyError: + return + if not isinstance(extracted, list): + return + for item in extracted: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = parent_record + yield expanded_record + else: + yield item diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 9c97773e3..1d8831056 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,12 +3,13 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, List, Mapping, MutableMapping, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.types import Config @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. + Optionally, records can be expanded by providing a RecordExpander component. + When record_expander is configured, each extracted record is passed through the + expander which extracts items from nested array fields and emits each item as a + separate record. + Examples of instantiating this transform: ``` extractor: @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor): field_path: [] ``` + ``` + extractor: + type: DpathExtractor + field_path: + - "data" + - "object" + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping + record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) + record_expander: Optional[RecordExpander] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - yield from extracted + if not self.record_expander: + yield from extracted + else: + for record in extracted: + yield from self.record_expander.expand_record(record) elif extracted: - yield extracted + if self.record_expander: + yield from self.record_expander.expand_record(extracted) + else: + yield extracted else: yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0f5c0f1f9..da1cc9474 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -482,24 +482,29 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( +class ResponseToFileExtractor(BaseModel): + type: Literal["ResponseToFileExtractor"] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], ], - title="Field Path", + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - -class ResponseToFileExtractor(BaseModel): - type: Literal["ResponseToFileExtractor"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2034,6 +2039,27 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 3ed86bf06..00b1a18ff 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -42,6 +42,7 @@ "DefaultPaginator.page_size_option": "RequestOption", # DpathExtractor "DpathExtractor.decoder": "JsonDecoder", + "DpathExtractor.record_expander": "RecordExpander", # HttpRequester "HttpRequester.error_handler": "DefaultErrorHandler", # ListPartitionRouter diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 867c93a22..f71968258 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2314,11 +2314,20 @@ def create_dpath_extractor( else: decoder_to_use = JsonDecoder(parameters={}) model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + + record_expander = None + if hasattr(model, "record_expander") and model.record_expander: + record_expander = self._create_component_from_model( + model=model.record_expander, + config=config, + ) + return DpathExtractor( decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {}, + record_expander=record_expander, ) @staticmethod diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 05e586592..affd03216 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -14,6 +14,7 @@ IterableDecoder, JsonDecoder, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor config = {"field": "record_array"} @@ -121,3 +122,228 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, body, expected_records", + [ + ( + ["data", "object"], + ["lines", "data"], + False, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ] + }, + } + } + }, + [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ], + ), + ( + ["data", "object"], + ["lines", "data"], + True, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + ] + }, + } + } + }, + [ + { + "id": "il_1", + "amount": 100, + "original_record": { + "id": "in_123", + "created": 1234567890, + "lines": {"data": [{"id": "il_1", "amount": 100}]}, + }, + }, + ], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": []}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1"}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [], + ), + ( + ["data"], + ["nested", "array"], + False, + { + "data": { + "id": "parent_1", + "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}, + } + }, + [{"id": "child_1"}, {"id": "child_2"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}}, + [1, 2, "string", {"id": "dict_item"}], + ), + ( + [], + ["items"], + False, + [ + {"id": "parent_1", "items": [{"id": "child_1"}]}, + {"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]}, + ], + [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]}, + {"name": "section2", "items": [{"id": "item_3"}]}, + ] + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + True, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + } + }, + [ + { + "id": "item_1", + "original_record": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + }, + } + ], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": []}, + {"name": "section2", "items": []}, + ] + } + }, + [], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1"}, + {"name": "section2", "items": "not_an_array"}, + ] + } + }, + [], + ), + ( + ["data"], + ["*", "items"], + False, + { + "data": { + "group1": {"items": [{"id": "item_1"}]}, + "group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]}, + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ], + ids=[ + "test_expand_nested_array", + "test_expand_with_original_record", + "test_expand_empty_array_yields_nothing", + "test_expand_missing_path_yields_nothing", + "test_expand_non_array_yields_nothing", + "test_expand_deeply_nested_path", + "test_expand_mixed_types_in_array", + "test_expand_multiple_parent_records", + "test_expand_wildcard_multiple_lists", + "test_expand_wildcard_with_original_record", + "test_expand_wildcard_all_empty_arrays", + "test_expand_wildcard_no_list_matches", + "test_expand_wildcard_dict_values", + ], +) +def test_dpath_extractor_with_expansion( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records