Skip to content

Commit fb2d63e

Browse files
authored
Merge branch 'main' into feature/optional-pyarrow-dependency-1227
2 parents 62ec045 + 6f3b1ca commit fb2d63e

File tree

23 files changed

+428
-289
lines changed

23 files changed

+428
-289
lines changed

docs/source/conf.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa
9191
("method", "datafusion.context.SessionContext.tables"),
9292
("method", "datafusion.dataframe.DataFrame.unnest_column"),
9393
]
94+
# Explicitly skip certain members listed above. These are either
95+
# re-exports, duplicate module-level documentation, deprecated
96+
# API surfaces, or private variables that would otherwise appear
97+
# in the generated docs and cause confusing duplication.
98+
# Keeping this explicit list avoids surprising entries in the
99+
# AutoAPI output and gives us a single place to opt-out items
100+
# when we intentionally hide them from the docs.
94101
if (what, name) in skip_contents:
95102
skip = True
96103

docs/source/contributor-guide/ffi.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
3434
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.
3535

3636
At first glance, it may appear the best way to do this is to add the ``datafusion-python``
37-
crate as a dependency, provide a ``PyTable``, and then to register it with the
37+
crate as a dependency, provide a ``PyTable``, and then to register it with the
3838
``SessionContext``. Unfortunately, this will not work.
3939

4040
When you produce your code as a Python library and it needs to interact with the DataFusion

docs/source/user-guide/data-sources.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,11 @@ as Delta Lake. This will require a recent version of
154154
from deltalake import DeltaTable
155155
156156
delta_table = DeltaTable("path_to_table")
157-
ctx.register_table_provider("my_delta_table", delta_table)
157+
ctx.register_table("my_delta_table", delta_table)
158158
df = ctx.table("my_delta_table")
159159
df.show()
160160
161-
On older versions of ``deltalake`` (prior to 0.22) you can use the
161+
On older versions of ``deltalake`` (prior to 0.22) you can use the
162162
`Arrow DataSet <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
163163
interface to import to DataFusion, but this does not support features such as filter push down
164164
which can lead to a significant performance difference.

docs/source/user-guide/io/table_provider.rst

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,26 @@ A complete example can be found in the `examples folder <https://github.com/apac
3737
&self,
3838
py: Python<'py>,
3939
) -> PyResult<Bound<'py, PyCapsule>> {
40-
let name = CString::new("datafusion_table_provider").unwrap();
40+
let name = cr"datafusion_table_provider".into();
4141
42-
let provider = Arc::new(self.clone())
43-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
44-
let provider = FFI_TableProvider::new(Arc::new(provider), false);
42+
let provider = Arc::new(self.clone());
43+
let provider = FFI_TableProvider::new(provider, false, None);
4544
4645
PyCapsule::new_bound(py, provider, Some(name.clone()))
4746
}
4847
}
4948
50-
Once you have this library available, in python you can register your table provider
51-
to the ``SessionContext``.
49+
Once you have this library available, you can construct a
50+
:py:class:`~datafusion.Table` in Python and register it with the
51+
``SessionContext``.
5252

5353
.. code-block:: python
5454
55+
from datafusion import SessionContext, Table
56+
57+
ctx = SessionContext()
5558
provider = MyTableProvider()
56-
ctx.register_table_provider("my_table", provider)
5759
58-
ctx.table("my_table").show()
60+
ctx.register_table("capsule_table", provider)
61+
62+
ctx.table("capsule_table").show()

examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ def test_catalog_provider():
3636

3737
my_catalog_schemas = my_catalog.names()
3838
assert expected_schema_name in my_catalog_schemas
39-
my_database = my_catalog.database(expected_schema_name)
40-
assert expected_table_name in my_database.names()
41-
my_table = my_database.table(expected_table_name)
39+
my_schema = my_catalog.schema(expected_schema_name)
40+
assert expected_table_name in my_schema.names()
41+
my_table = my_schema.table(expected_table_name)
4242
assert expected_table_columns == my_table.schema.names
4343

4444
result = ctx.table(

examples/datafusion-ffi-example/python/tests/_test_table_function.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
5353
table_udtf = udtf(table_func, "my_table_func")
5454

5555
my_table = table_udtf()
56-
ctx.register_table_provider("t", my_table)
56+
ctx.register_table("t", my_table)
5757
result = ctx.table("t").collect()
5858

5959
assert len(result) == 2

examples/datafusion-ffi-example/python/tests/_test_table_provider.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
def test_table_loading():
2626
ctx = SessionContext()
2727
table = MyTableProvider(3, 2, 4)
28-
ctx.register_table_provider("t", table)
28+
ctx.register_table("t", table)
2929
result = ctx.table("t").collect()
3030

3131
assert len(result) == 4
@@ -40,3 +40,7 @@ def test_table_loading():
4040
]
4141

4242
assert result == expected
43+
44+
result = ctx.read_table(table).collect()
45+
result = [r.column(0) for r in result]
46+
assert result == expected

python/datafusion/__init__.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,16 @@
2828
try:
2929
import importlib.metadata as importlib_metadata
3030
except ImportError:
31-
import importlib_metadata
31+
import importlib_metadata # type: ignore[import]
3232

33+
# Public submodules
3334
from . import functions, object_store, substrait, unparser
3435

3536
# The following imports are okay to remain as opaque to the user.
3637
from ._internal import Config
3738
from .catalog import Catalog, Database, Table
3839
from .col import col, column
39-
from .common import (
40-
DFSchema,
41-
)
40+
from .common import DFSchema
4241
from .context import (
4342
RuntimeEnvBuilder,
4443
SessionConfig,
@@ -47,10 +46,7 @@
4746
)
4847
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
4948
from .dataframe_formatter import configure_formatter
50-
from .expr import (
51-
Expr,
52-
WindowFrame,
53-
)
49+
from .expr import Expr, WindowFrame
5450
from .io import read_avro, read_csv, read_json, read_parquet
5551
from .plan import ExecutionPlan, LogicalPlan
5652
from .record_batch import RecordBatch, RecordBatchStream

python/datafusion/catalog.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
from __future__ import annotations
2121

2222
from abc import ABC, abstractmethod
23-
from typing import TYPE_CHECKING, Protocol
23+
from typing import TYPE_CHECKING, Any, Protocol
2424

2525
import datafusion._internal as df_internal
2626

2727
if TYPE_CHECKING:
2828
import pyarrow as pa
2929

30+
from datafusion import DataFrame
31+
from datafusion.context import TableProviderExportable
32+
3033
try:
3134
from warnings import deprecated # Python 3.13+
3235
except ImportError:
@@ -82,7 +85,11 @@ def database(self, name: str = "public") -> Schema:
8285
"""Returns the database with the given ``name`` from this catalog."""
8386
return self.schema(name)
8487

85-
def register_schema(self, name, schema) -> Schema | None:
88+
def register_schema(
89+
self,
90+
name: str,
91+
schema: Schema | SchemaProvider | SchemaProviderExportable,
92+
) -> Schema | None:
8693
"""Register a schema with this catalog."""
8794
if isinstance(schema, Schema):
8895
return self.catalog.register_schema(name, schema._raw_schema)
@@ -122,10 +129,12 @@ def table(self, name: str) -> Table:
122129
"""Return the table with the given ``name`` from this schema."""
123130
return Table(self._raw_schema.table(name))
124131

125-
def register_table(self, name, table) -> None:
126-
"""Register a table provider in this schema."""
127-
if isinstance(table, Table):
128-
return self._raw_schema.register_table(name, table.table)
132+
def register_table(
133+
self,
134+
name: str,
135+
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
136+
) -> None:
137+
"""Register a table in this schema."""
129138
return self._raw_schema.register_table(name, table)
130139

131140
def deregister_table(self, name: str) -> None:
@@ -139,30 +148,45 @@ class Database(Schema):
139148

140149

141150
class Table:
142-
"""DataFusion table."""
151+
"""A DataFusion table.
143152
144-
def __init__(self, table: df_internal.catalog.RawTable) -> None:
145-
"""This constructor is not typically called by the end user."""
146-
self.table = table
153+
Internally we currently support the following types of tables:
154+
155+
- Tables created using built-in DataFusion methods, such as
156+
reading from CSV or Parquet
157+
- pyarrow datasets
158+
- DataFusion DataFrames, which will be converted into a view
159+
- Externally provided tables implemented with the FFI PyCapsule
160+
interface (advanced)
161+
"""
162+
163+
__slots__ = ("_inner",)
164+
165+
def __init__(
166+
self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset
167+
) -> None:
168+
"""Constructor."""
169+
self._inner = df_internal.catalog.RawTable(table)
147170

148171
def __repr__(self) -> str:
149172
"""Print a string representation of the table."""
150-
return self.table.__repr__()
173+
return repr(self._inner)
151174

152175
@staticmethod
176+
@deprecated("Use Table() constructor instead.")
153177
def from_dataset(dataset: pa.dataset.Dataset) -> Table:
154-
"""Turn a pyarrow Dataset into a Table."""
155-
return Table(df_internal.catalog.RawTable.from_dataset(dataset))
178+
"""Turn a :mod:`pyarrow.dataset` ``Dataset`` into a :class:`Table`."""
179+
return Table(dataset)
156180

157181
@property
158182
def schema(self) -> pa.Schema:
159183
"""Returns the schema associated with this table."""
160-
return self.table.schema
184+
return self._inner.schema
161185

162186
@property
163187
def kind(self) -> str:
164188
"""Returns the kind of table."""
165-
return self.table.kind
189+
return self._inner.kind
166190

167191

168192
class CatalogProvider(ABC):
@@ -219,14 +243,16 @@ def table(self, name: str) -> Table | None:
219243
"""Retrieve a specific table from this schema."""
220244
...
221245

222-
def register_table(self, name: str, table: Table) -> None: # noqa: B027
223-
"""Add a table from this schema.
246+
def register_table( # noqa: B027
247+
self, name: str, table: Table | TableProviderExportable | Any
248+
) -> None:
249+
"""Add a table to this schema.
224250
225251
This method is optional. If your schema provides a fixed list of tables, you do
226252
not need to implement this method.
227253
"""
228254

229-
def deregister_table(self, name, cascade: bool) -> None: # noqa: B027
255+
def deregister_table(self, name: str, cascade: bool) -> None: # noqa: B027
230256
"""Remove a table from this schema.
231257
232258
This method is optional. If your schema provides a fixed list of tables, you do

python/datafusion/context.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@
2727
except ImportError:
2828
from typing_extensions import deprecated # Python 3.12
2929

30-
from datafusion.catalog import Catalog, CatalogProvider, Table
30+
import pyarrow as pa
31+
32+
from datafusion.catalog import Catalog
3133
from datafusion.dataframe import DataFrame
32-
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
34+
from datafusion.expr import sort_list_to_raw_sort_list
3335
from datafusion.record_batch import RecordBatchStream
34-
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF
3536

3637
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
3738
from ._internal import SessionConfig as SessionConfigInternal
@@ -47,7 +48,15 @@
4748
import polars as pl # type: ignore[import]
4849
import pyarrow as pa # Optional: only needed for type hints
4950

51+
from datafusion.catalog import CatalogProvider, Table
52+
from datafusion.expr import SortKey
5053
from datafusion.plan import ExecutionPlan, LogicalPlan
54+
from datafusion.user_defined import (
55+
AggregateUDF,
56+
ScalarUDF,
57+
TableFunction,
58+
WindowUDF,
59+
)
5160

5261

5362
class ArrowSchemaExportable(Protocol):
@@ -746,7 +755,7 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
746755
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
747756
# is the discussion on how we arrived at adding register_view
748757
def register_view(self, name: str, df: DataFrame) -> None:
749-
"""Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
758+
"""Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
750759
751760
Args:
752761
name (str): The name to register the view under.
@@ -755,16 +764,21 @@ def register_view(self, name: str, df: DataFrame) -> None:
755764
view = df.into_view()
756765
self.ctx.register_table(name, view)
757766

758-
def register_table(self, name: str, table: Table) -> None:
759-
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
767+
def register_table(
768+
self,
769+
name: str,
770+
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
771+
) -> None:
772+
"""Register a :py:class:`~datafusion.Table` with this context.
760773
761-
The registered table can be referenced from SQL statement executed against.
774+
The registered table can be referenced from SQL statements executed against
775+
this context.
762776
763777
Args:
764778
name: Name of the resultant table.
765-
table: DataFusion table to add to the session context.
779+
table: Any object that can be converted into a :class:`Table`.
766780
"""
767-
self.ctx.register_table(name, table.table)
781+
self.ctx.register_table(name, table)
768782

769783
def deregister_table(self, name: str) -> None:
770784
"""Remove a table from the session."""
@@ -783,15 +797,17 @@ def register_catalog_provider(
783797
else:
784798
self.ctx.register_catalog_provider(name, provider)
785799

800+
@deprecated("Use register_table() instead.")
786801
def register_table_provider(
787-
self, name: str, provider: TableProviderExportable
802+
self,
803+
name: str,
804+
provider: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
788805
) -> None:
789806
"""Register a table provider.
790807
791-
This table provider must have a method called ``__datafusion_table_provider__``
792-
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
808+
Deprecated: use :meth:`register_table` instead.
793809
"""
794-
self.ctx.register_table_provider(name, provider)
810+
self.register_table(name, provider)
795811

796812
def register_udtf(self, func: TableFunction) -> None:
797813
"""Register a user defined table function."""
@@ -1176,14 +1192,11 @@ def read_avro(
11761192
self.ctx.read_avro(str(path), schema, file_partition_cols, file_extension)
11771193
)
11781194

1179-
def read_table(self, table: Table) -> DataFrame:
1180-
"""Creates a :py:class:`~datafusion.dataframe.DataFrame` from a table.
1181-
1182-
For a :py:class:`~datafusion.catalog.Table` such as a
1183-
:py:class:`~datafusion.catalog.ListingTable`, create a
1184-
:py:class:`~datafusion.dataframe.DataFrame`.
1185-
"""
1186-
return DataFrame(self.ctx.read_table(table.table))
1195+
def read_table(
1196+
self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset
1197+
) -> DataFrame:
1198+
"""Creates a :py:class:`~datafusion.dataframe.DataFrame` from a table."""
1199+
return DataFrame(self.ctx.read_table(table))
11871200

11881201
def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11891202
"""Execute the ``plan`` and return the results."""

0 commit comments

Comments
 (0)