Skip to content

Commit e383e5a

Browse files
feat: Add singleton record fetch functionality to PyAirbyte
Implements Source.get_record() and DeclarativeExecutor.fetch_record() methods to enable fetching single records by primary key value from declarative sources. Key features: - Source.get_record(stream_name, pk_value) - Public API for fetching records - DeclarativeExecutor.fetch_record() - Internal implementation using CDK components - Primary key validation and normalization (supports string, int, dict formats) - Composite primary key detection (raises NotImplementedError) - New AirbyteRecordNotFoundError exception for missing records - Comprehensive unit tests with proper mocking This implementation reuses existing CDK components (SimpleRetriever, HttpClient, RecordSelector) without monkey-patching or pinning CDK versions, providing a hybrid approach that works with the current CDK release. Related to CDK PR airbytehq/airbyte-python-cdk#846 Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent 2981b3d commit e383e5a

File tree

4 files changed

+508
-0
lines changed

4 files changed

+508
-0
lines changed

airbyte/_executors/declarative.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
1616
ConcurrentDeclarativeSource,
1717
)
18+
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
19+
ModelToComponentFactory,
20+
)
21+
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
22+
from airbyte_cdk.sources.types import StreamSlice
1823

24+
from airbyte import exceptions as exc
1925
from airbyte._executors.base import Executor
2026

2127

@@ -140,3 +146,152 @@ def install(self) -> None:
140146
def uninstall(self) -> None:
141147
"""No-op. The declarative source is included with PyAirbyte."""
142148
pass
149+
150+
def fetch_record( # noqa: PLR0914
151+
self,
152+
stream_name: str,
153+
primary_key_value: str,
154+
config: dict[str, Any] | None = None,
155+
) -> dict[str, Any]:
156+
"""Fetch a single record by primary key from a declarative stream.
157+
158+
This method constructs an HTTP GET request to fetch a single record by appending
159+
the primary key value to the stream's base path (e.g., /users/123).
160+
161+
Args:
162+
stream_name: The name of the stream to fetch from.
163+
primary_key_value: The primary key value as a string.
164+
config: Optional config overrides to merge with the executor's config.
165+
166+
Returns:
167+
The fetched record as a dictionary.
168+
169+
Raises:
170+
exc.AirbyteStreamNotFoundError: If the stream is not found in the manifest.
171+
exc.AirbyteRecordNotFoundError: If the record is not found (empty response).
172+
NotImplementedError: If the stream does not use SimpleRetriever.
173+
"""
174+
merged_config = {**self._config_dict, **(config or {})}
175+
176+
stream_configs = self._manifest_dict.get("streams", [])
177+
stream_config = None
178+
for config_item in stream_configs:
179+
if config_item.get("name") == stream_name:
180+
stream_config = config_item
181+
break
182+
183+
if stream_config is None:
184+
available_streams = [s.get("name") for s in stream_configs]
185+
raise exc.AirbyteStreamNotFoundError(
186+
stream_name=stream_name,
187+
connector_name=self.name,
188+
available_streams=available_streams,
189+
message=f"Stream '{stream_name}' not found in manifest.",
190+
)
191+
192+
factory = ModelToComponentFactory()
193+
194+
retriever_config = stream_config.get("retriever")
195+
if retriever_config is None:
196+
raise NotImplementedError(
197+
f"Stream '{stream_name}' does not have a retriever configuration. "
198+
"fetch_record() is only supported for streams with retrievers."
199+
)
200+
201+
try:
202+
retriever = factory.create_component(
203+
model_type=type(retriever_config), # type: ignore[arg-type]
204+
component_definition=retriever_config,
205+
config=merged_config,
206+
)
207+
except Exception as e:
208+
raise NotImplementedError(
209+
f"Failed to create retriever for stream '{stream_name}': {e}"
210+
) from e
211+
212+
if not isinstance(retriever, SimpleRetriever):
213+
raise NotImplementedError(
214+
f"Stream '{stream_name}' uses {type(retriever).__name__}, but fetch_record() "
215+
"only supports SimpleRetriever."
216+
)
217+
218+
empty_slice = StreamSlice(partition={}, cursor_slice={})
219+
base_path = retriever.requester.get_path(
220+
stream_state={},
221+
stream_slice=empty_slice,
222+
next_page_token=None,
223+
)
224+
225+
fetch_path = f"{base_path}/{primary_key_value}".lstrip("/")
226+
227+
response = retriever.requester.send_request(
228+
path=fetch_path,
229+
stream_state={},
230+
stream_slice=empty_slice,
231+
next_page_token=None,
232+
request_headers=retriever._request_headers( # noqa: SLF001
233+
stream_slice=empty_slice,
234+
next_page_token=None,
235+
),
236+
request_params=retriever._request_params( # noqa: SLF001
237+
stream_slice=empty_slice,
238+
next_page_token=None,
239+
),
240+
request_body_data=retriever._request_body_data( # noqa: SLF001
241+
stream_slice=empty_slice,
242+
next_page_token=None,
243+
),
244+
request_body_json=retriever._request_body_json( # noqa: SLF001
245+
stream_slice=empty_slice,
246+
next_page_token=None,
247+
),
248+
)
249+
250+
if response is None:
251+
msg = (
252+
f"No response received when fetching record with primary key "
253+
f"'{primary_key_value}' from stream '{stream_name}'."
254+
)
255+
raise exc.AirbyteRecordNotFoundError(
256+
stream_name=stream_name,
257+
primary_key_value=primary_key_value,
258+
connector_name=self.name,
259+
message=msg,
260+
)
261+
262+
schema = stream_config.get("schema_loader", {})
263+
records_schema = schema if isinstance(schema, dict) else {}
264+
265+
records = list(
266+
retriever.record_selector.select_records(
267+
response=response,
268+
stream_state={},
269+
records_schema=records_schema,
270+
stream_slice=empty_slice,
271+
next_page_token=None,
272+
)
273+
)
274+
275+
if not records:
276+
try:
277+
response_json = response.json()
278+
if isinstance(response_json, dict) and response_json:
279+
return response_json
280+
except Exception:
281+
pass
282+
283+
msg = (
284+
f"Record with primary key '{primary_key_value}' "
285+
f"not found in stream '{stream_name}'."
286+
)
287+
raise exc.AirbyteRecordNotFoundError(
288+
stream_name=stream_name,
289+
primary_key_value=primary_key_value,
290+
connector_name=self.name,
291+
message=msg,
292+
)
293+
294+
first_record = records[0]
295+
if hasattr(first_record, "data"):
296+
return dict(first_record.data) # type: ignore[arg-type]
297+
return dict(first_record) # type: ignore[arg-type]

airbyte/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,14 @@ class AirbyteStateNotFoundError(AirbyteConnectorError, KeyError):
412412
available_streams: list[str] | None = None
413413

414414

415+
@dataclass
416+
class AirbyteRecordNotFoundError(AirbyteConnectorError):
417+
"""Record not found in stream."""
418+
419+
stream_name: str | None = None
420+
primary_key_value: str | None = None
421+
422+
415423
@dataclass
416424
class PyAirbyteSecretNotFoundError(PyAirbyteError):
417425
"""Secret not found."""

airbyte/sources/base.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from airbyte import exceptions as exc
3232
from airbyte._connector_base import ConnectorBase
33+
from airbyte._executors.declarative import DeclarativeExecutor
3334
from airbyte._message_iterators import AirbyteMessageIterator
3435
from airbyte._util.temp_files import as_temp_files
3536
from airbyte.caches.util import get_default_cache
@@ -601,6 +602,117 @@ def get_documents(
601602
render_metadata=render_metadata,
602603
)
603604

605+
def _get_stream_primary_key(self, stream_name: str) -> list[str]:
606+
"""Get the primary key for a stream.
607+
608+
Returns the primary key as a flat list of field names.
609+
Handles the Airbyte protocol's nested list structure.
610+
"""
611+
catalog = self.configured_catalog
612+
for configured_stream in catalog.streams:
613+
if configured_stream.stream.name == stream_name:
614+
pk = configured_stream.primary_key
615+
if not pk:
616+
return []
617+
if isinstance(pk, list) and len(pk) > 0:
618+
if isinstance(pk[0], list):
619+
return [field[0] if isinstance(field, list) else field for field in pk]
620+
return list(pk) # type: ignore[arg-type]
621+
return []
622+
raise exc.AirbyteStreamNotFoundError(
623+
stream_name=stream_name,
624+
connector_name=self.name,
625+
available_streams=self.get_available_streams(),
626+
)
627+
628+
def _normalize_and_validate_pk_value(
629+
self,
630+
stream_name: str,
631+
pk_value: Any, # noqa: ANN401
632+
) -> str:
633+
"""Normalize and validate a primary key value.
634+
635+
Accepts:
636+
- A string or int (converted to string)
637+
- A dict with a single entry matching the stream's primary key field
638+
639+
Returns the PK value as a string.
640+
"""
641+
primary_key_fields = self._get_stream_primary_key(stream_name)
642+
643+
if not primary_key_fields:
644+
raise exc.PyAirbyteInputError(
645+
message=f"Stream '{stream_name}' does not have a primary key defined.",
646+
input_value=str(pk_value),
647+
)
648+
649+
if len(primary_key_fields) > 1:
650+
raise NotImplementedError(
651+
f"Stream '{stream_name}' has a composite primary key {primary_key_fields}. "
652+
"Fetching by composite primary key is not yet supported."
653+
)
654+
655+
pk_field = primary_key_fields[0]
656+
657+
if isinstance(pk_value, dict):
658+
if len(pk_value) != 1:
659+
raise exc.PyAirbyteInputError(
660+
message="When providing pk_value as a dict, it must contain exactly one entry.",
661+
input_value=str(pk_value),
662+
)
663+
provided_key = next(iter(pk_value.keys()))
664+
if provided_key != pk_field:
665+
msg = (
666+
f"Primary key field '{provided_key}' does not match "
667+
f"stream's primary key '{pk_field}'."
668+
)
669+
raise exc.PyAirbyteInputError(
670+
message=msg,
671+
input_value=str(pk_value),
672+
)
673+
return str(pk_value[provided_key])
674+
675+
return str(pk_value)
676+
677+
def get_record(
678+
self,
679+
stream_name: str,
680+
*,
681+
pk_value: Any, # noqa: ANN401
682+
) -> dict[str, Any]:
683+
"""Fetch a single record by primary key value.
684+
685+
This method is currently only supported for declarative (YAML-based) sources.
686+
687+
Args:
688+
stream_name: The name of the stream to fetch from.
689+
pk_value: The primary key value. Can be:
690+
- A string or integer value (e.g., "123" or 123)
691+
- A dict with a single entry (e.g., {"id": "123"})
692+
693+
Returns:
694+
The fetched record as a dictionary.
695+
696+
Raises:
697+
exc.AirbyteStreamNotFoundError: If the stream does not exist.
698+
exc.AirbyteRecordNotFoundError: If the record is not found.
699+
exc.PyAirbyteInputError: If the pk_value format is invalid.
700+
NotImplementedError: If the source is not declarative or uses composite keys.
701+
"""
702+
if not isinstance(self.executor, DeclarativeExecutor):
703+
raise NotImplementedError(
704+
f"get_record() is only supported for declarative sources. "
705+
f"This source uses {type(self.executor).__name__}."
706+
)
707+
708+
pk_value_str = self._normalize_and_validate_pk_value(stream_name, pk_value)
709+
710+
return self.executor.fetch_record(
711+
stream_name=stream_name,
712+
primary_key_value=pk_value_str,
713+
config=self._config_dict,
714+
)
715+
604716
def get_samples(
605717
self,
606718
streams: list[str] | Literal["*"] | None = None,

0 commit comments

Comments
 (0)