Skip to content

Commit c73b937

Browse files
Added support for fetching data frames using asyncio.
1 parent 77c3797 commit c73b937

File tree

9 files changed

+752
-4
lines changed

9 files changed

+752
-4
lines changed

doc/src/api_manual/async_connection.rst

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,61 @@ AsyncConnection Methods
124124
This is a shortcut for calling :meth:`AsyncConnection.cursor()`,
125125
:meth:`AsyncCursor.executemany()`, and then :meth:`AsyncCursor.close()`.
126126

127+
.. method:: AsyncConnection.fetch_df_all(statement, parameters=None, \
128+
arraysize=None)
129+
130+
Fetches all rows of the SQL query ``statement``, returning them in an
131+
:ref:`OracleDataFrame <oracledataframeobj>` object. An empty
132+
OracleDataFrame is returned if there are no rows available.
133+
134+
The ``parameters`` parameter can be a list of tuples, where each tuple item
135+
maps to one :ref:`bind variable placeholder <bind>` in ``statement``. It
136+
can also be a list of dictionaries, where the keys match the bind variable
137+
placeholder names in ``statement``.
138+
139+
The ``arraysize`` parameter can be specified to tune performance of fetching
140+
data across the network. It defaults to :attr:`defaults.arraysize`.
141+
Internally, the ``fetch_df_all()``'s :attr:`Cursor.prefetchrows` size is
142+
always set to the value of the explicit or default ``arraysize`` parameter
143+
value.
144+
145+
See :ref:`dataframeformat` for the supported data types and examples.
146+
147+
.. note::
148+
149+
The data frame support in python-oracledb 3.0.0 is a pre-release and
150+
may change in the next version.
151+
152+
.. versionadded:: 3.0.0
153+
154+
.. method:: AsyncConnection.fetch_df_batches(statement, parameters=None, \
155+
size=None)
156+
157+
This returns an iterator yielding the next ``size`` rows of the SQL query
158+
``statement`` in each iteration as an :ref:`OracleDataFrame
159+
<oracledataframeobj>` object. An empty OracleDataFrame is returned if there
160+
are no rows available.
161+
162+
The ``parameters`` parameter can be a list of tuples, where each tuple item
163+
maps to one :ref:`bind variable placeholder <bind>` in ``statement``. It
164+
can also be a list of dictionaries, where the keys match the bind variable
165+
placeholder names in ``statement``.
166+
167+
The ``size`` parameter controls the number of records fetched in each
168+
batch. It defaults to :attr:`defaults.arraysize`. Internally, the
169+
``fetch_df_batches()``'s :attr:`Cursor.arraysize` and
170+
:attr:`Cursor.prefetchrows` sizes are always set to the value of the
171+
explicit or default ``size`` parameter value.
172+
173+
See :ref:`dataframeformat` for the supported data types and examples.
174+
175+
.. note::
176+
177+
The data frame support in python-oracledb 3.0.0 is a pre-release and
178+
may change in the next version.
179+
180+
.. versionadded:: 3.0.0
181+
127182
.. method:: AsyncConnection.fetchall(statement, parameters=None, \
128183
arraysize=None, rowfactory=None)
129184

doc/src/release_notes.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ Thick Mode Changes
9292
Common Changes
9393
++++++++++++++
9494

95-
#) Added new methods :meth:`Connection.fetch_df_all()` and
96-
:meth:`Connection.fetch_df_batches()` to fetch data as DataFrames
95+
#) Added new methods :meth:`Connection.fetch_df_all()`,
96+
:meth:`Connection.fetch_df_batches()`,
97+
:meth:`AsyncConnection.fetch_df_all()`, and
98+
:meth:`AsyncConnection.fetch_df_batches()` to fetch data as DataFrames
9799
compliant with the Python DataFrame Interchange protocol. See
98100
:ref:`dataframeformat`.
99101
#) Added support for Oracle Database 23ai SPARSE vectors.

doc/src/user_guide/asyncio.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ You can also use shortcut methods on the :ref:`asyncconnobj` object such as
171171
:meth:`AsyncConnection.execute()` or
172172
:meth:`AsyncConnection.executemany()`. Rows can be fetched using one of the
173173
shortcut methods :meth:`AsyncConnection.fetchone()`,
174-
:meth:`AsyncConnection.fetchmany()`, or :meth:`AsyncConnection.fetchall()`.
174+
:meth:`AsyncConnection.fetchmany()`, :meth:`AsyncConnection.fetchall()`,
175+
:meth:`AsyncConnection.fetch_df_all()`, or
176+
:meth:`AsyncConnection.fetch_df_batches()`.
175177

176178
An example of using :meth:`AsyncConnection.fetchall()`:
177179

doc/src/user_guide/sql_execution.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ executed. Statements are executed using one of these methods
1313
:meth:`Cursor.execute()`, :meth:`Cursor.executemany()`,
1414
:meth:`Connection.fetch_df_all()`, :meth:`Connection.fetch_df_batches()`,
1515
:meth:`AsyncCursor.execute()`, :meth:`AsyncCursor.executemany()`,
16-
:meth:`AsyncConnection.execute()`, :meth:`AsyncConnection.executemany()`, or
16+
:meth:`AsyncConnection.execute()`, :meth:`AsyncConnection.executemany()`,
17+
:meth:`AsyncConnection.fetch_df_all()`,
18+
:meth:`AsyncConnection.fetch_df_batches()`, or
1719
:meth:`AsyncConnection.run_pipeline()`.
1820

1921
This chapter discusses python-oracledb's synchronous methods. The asynchronous

samples/dataframe_pandas_async.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# -----------------------------------------------------------------------------
2+
# Copyright (c) 2025, Oracle and/or its affiliates.
3+
#
4+
# This software is dual-licensed to you under the Universal Permissive License
5+
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
6+
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
# either license.
8+
#
9+
# If you elect to accept the software under the Apache License, Version 2.0,
10+
# the following applies:
11+
#
12+
# Licensed under the Apache License, Version 2.0 (the "License");
13+
# you may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# https://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
# -----------------------------------------------------------------------------
24+
25+
# -----------------------------------------------------------------------------
26+
# dataframe_pandas_async.py
27+
#
28+
# An asynchronous version of dataframe_pandas.py
29+
#
30+
# Shows how to use AsyncConnection.fetch_df_all() and
31+
# AsyncConnection.fetch_df_batches(). This example then creates Pandas
32+
# dataframes. Alternative dataframe libraries could be used similar to the
33+
# other, synchronous, data frame samples.
34+
# -----------------------------------------------------------------------------
35+
36+
import asyncio
37+
38+
import pandas
39+
import oracledb
40+
import sample_env
41+
42+
43+
async def main():
44+
connection = await oracledb.connect_async(
45+
user=sample_env.get_main_user(),
46+
password=sample_env.get_main_password(),
47+
dsn=sample_env.get_connect_string(),
48+
params=sample_env.get_connect_params(),
49+
)
50+
51+
SQL = "select id, name from SampleQueryTab order by id"
52+
53+
# Get an OracleDataFrame.
54+
# Adjust arraysize to tune the query fetch performance
55+
odf = await connection.fetch_df_all(statement=SQL, arraysize=100)
56+
57+
# Get a Pandas DataFrame from the data.
58+
# This is a zero copy call
59+
df = pandas.api.interchange.from_dataframe(odf)
60+
61+
# Perform various Pandas operations on the DataFrame
62+
63+
print("Columns:")
64+
print(df.columns)
65+
66+
print("\nDataframe description:")
67+
print(df.describe())
68+
69+
print("\nLast three rows:")
70+
print(df.tail(3))
71+
72+
print("\nTransform:")
73+
print(df.T)
74+
75+
# -------------------------------------------------------------------------
76+
77+
# An example of batch fetching
78+
#
79+
# Note that since this particular example ends up with all query rows being
80+
# held in memory, it would be more efficient to use fetch_df_all() as shown
81+
# above.
82+
83+
print("\nFetching in batches:")
84+
df = pandas.DataFrame()
85+
86+
# Tune 'size' for your data set. Here it is small to show the batch fetch
87+
# behavior on the sample table.
88+
async for odf in connection.fetch_df_batches(statement=SQL, size=10):
89+
df_b = pandas.api.interchange.from_dataframe(odf)
90+
print(f"Appending {df_b.shape[0]} rows")
91+
df = pandas.concat([df, df_b], ignore_index=True)
92+
93+
print("\nLast three rows:")
94+
print(df.tail(3))
95+
96+
97+
asyncio.run(main())

src/oracledb/connection.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,44 @@ async def fetchall(
17811781
cursor.rowfactory = rowfactory
17821782
return await cursor.fetchall()
17831783

1784+
async def fetch_df_all(
1785+
self,
1786+
statement: str,
1787+
parameters: Optional[Union[list, tuple, dict]] = None,
1788+
arraysize: Optional[int] = None,
1789+
):
1790+
"""
1791+
Fetch all data as OracleDataFrame.
1792+
"""
1793+
cursor = self.cursor()
1794+
cursor._impl.fetching_arrow = True
1795+
if arraysize is not None:
1796+
cursor.arraysize = arraysize
1797+
cursor.prefetchrows = cursor.arraysize
1798+
await cursor.execute(statement, parameters)
1799+
return await cursor._impl.fetch_df_all(cursor)
1800+
1801+
async def fetch_df_batches(
1802+
self,
1803+
statement: str,
1804+
parameters: Optional[Union[list, tuple, dict]] = None,
1805+
size: Optional[int] = None,
1806+
):
1807+
"""
1808+
Fetch data in batches. Each batch is an OracleDataFrame
1809+
"""
1810+
cursor = self.cursor()
1811+
cursor._impl.fetching_arrow = True
1812+
if size is not None:
1813+
cursor.arraysize = size
1814+
cursor.prefetchrows = cursor.arraysize
1815+
await cursor.execute(statement, parameters)
1816+
if size is None:
1817+
yield await cursor._impl.fetch_df_all(cursor)
1818+
else:
1819+
async for df in cursor._impl.fetch_df_batches(cursor, size):
1820+
yield df
1821+
17841822
async def fetchmany(
17851823
self,
17861824
statement: str,

src/oracledb/impl/thin/cursor.pyx

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,27 @@ cdef class AsyncThinCursorImpl(BaseThinCursorImpl):
340340
await protocol._process_single_message(message)
341341
self.warning = message.warning
342342

343+
async def fetch_df_all(self, cursor):
344+
"""
345+
Internal method used for fetching all data as OracleDataFrame
346+
"""
347+
while self._more_rows_to_fetch:
348+
await self._fetch_rows_async(cursor)
349+
return self._finish_building_arrow_arrays()
350+
351+
async def fetch_df_batches(self, cursor, int batch_size):
352+
"""
353+
Internal method used for fetching next batch as OracleDataFrame.
354+
"""
355+
# Return the prefetched batch
356+
yield self._finish_building_arrow_arrays()
357+
358+
while self._more_rows_to_fetch:
359+
self._create_arrow_arrays()
360+
await self._fetch_rows_async(cursor)
361+
if self._buffer_rowcount > 0:
362+
yield self._finish_building_arrow_arrays()
363+
343364
async def fetch_next_row(self, cursor):
344365
"""
345366
Internal method used for fetching the next row from a cursor.

0 commit comments

Comments
 (0)