Skip to content

Commit c71a7cb

Browse files
committed
Implemented fetch_row for transaction
1 parent ef7e770 commit c71a7cb

File tree

4 files changed

+203
-16
lines changed

4 files changed

+203
-16
lines changed

python/psqlpy/_internal/__init__.pyi

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ class QueryResult:
1010
def result(self: Self) -> list[dict[Any, Any]]:
1111
"""Return result from database as a list of dicts."""
1212

13+
class SingleQueryResult:
14+
"""Single result."""
15+
16+
def result(self: Self) -> dict[Any, Any]:
17+
"""Return result from database as a dict."""
18+
1319
class IsolationLevel(Enum):
1420
"""Class for Isolation Level for transactions."""
1521

@@ -320,6 +326,57 @@ class Transaction:
320326
# This way transaction begins and commits by itself.
321327
```
322328
"""
329+
async def fetch_row(
330+
self: Self,
331+
querystring: str,
332+
parameters: list[Any] | None = None,
333+
) -> SingleQueryResult:
334+
"""Execute the query and return first row.
335+
336+
Querystring can contain `$<number>` parameters
337+
for converting them in the driver side.
338+
339+
### Parameters:
340+
- `querystring`: querystring to execute.
341+
- `parameters`: list of parameters to pass in the query.
342+
343+
### Example:
344+
```python
345+
import asyncio
346+
347+
from psqlpy import PSQLPool, QueryResult
348+
349+
350+
async def main() -> None:
351+
db_pool = PSQLPool()
352+
await db_pool.startup()
353+
354+
transaction = await db_pool.transaction()
355+
await transaction.begin()
356+
query_result: SingleQueryResult = await transaction.execute(
357+
"SELECT username FROM users WHERE id = $1",
358+
[100],
359+
)
360+
dict_result: Dict[Any, Any] = query_result.result()
361+
# You must call commit manually
362+
await transaction.commit()
363+
364+
# Or you can transaction as a async context manager
365+
366+
async def main() -> None:
367+
db_pool = PSQLPool()
368+
await psqlpy.startup()
369+
370+
transaction = await db_pool.transaction()
371+
async with transaction:
372+
query_result: SingleQueryResult = await transaction.execute(
373+
"SELECT username FROM users WHERE id = $1",
374+
[100],
375+
)
376+
dict_result: Dict[Any, Any] = query_result.result()
377+
# This way transaction begins and commits by itself.
378+
```
379+
"""
323380
async def savepoint(self: Self, savepoint_name: str) -> None:
324381
"""Create new savepoint.
325382

python/tests/test_transaction.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,19 @@ async def test_transaction_execute_many(
234234
table_name,
235235
transaction,
236236
) - number_database_records == len(insert_values)
237+
238+
239+
async def test_transaction_fetch_row(
240+
psql_pool: PSQLPool,
241+
table_name: str,
242+
) -> None:
243+
connection = await psql_pool.connection()
244+
async with connection.transaction() as transaction:
245+
database_single_query_result: typing.Final = (
246+
await transaction.fetch_row(
247+
f"SELECT * FROM {table_name}",
248+
[],
249+
)
250+
)
251+
result = database_single_query_result.result()
252+
assert isinstance(result, dict)

src/driver/transaction.rs

Lines changed: 90 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
1-
use deadpool_postgres::Object;
2-
use pyo3::{
3-
pyclass, pymethods,
4-
types::{PyList, PyString},
5-
Py, PyAny, PyObject, PyRef, PyRefMut, Python,
1+
use super::{
2+
cursor::Cursor,
3+
transaction_options::{IsolationLevel, ReadVariant},
64
};
7-
use std::{collections::HashSet, sync::Arc, vec};
8-
use tokio_postgres::types::ToSql;
9-
105
use crate::{
116
common::rustengine_future,
127
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
13-
query_result::PSQLDriverPyQueryResult,
8+
query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
149
value_converter::{convert_parameters, PythonDTO},
1510
};
16-
17-
use super::{
18-
cursor::Cursor,
19-
transaction_options::{IsolationLevel, ReadVariant},
11+
use deadpool_postgres::Object;
12+
use pyo3::{
13+
pyclass, pymethods,
14+
types::{PyList, PyString},
15+
Py, PyAny, PyErr, PyObject, PyRef, PyRefMut, Python,
2016
};
21-
17+
use std::{collections::HashSet, sync::Arc, vec};
18+
use tokio_postgres::types::ToSql;
2219
/// Transaction for internal use only.
2320
///
2421
/// It is not exposed to python.
@@ -166,7 +163,56 @@ impl RustTransaction {
166163

167164
Ok(())
168165
}
166+
/// Fetch single row from query.
167+
///
168+
/// Method doesn't acquire lock on any structure fields.
169+
/// It prepares and caches querystring in the inner Object object.
170+
///
171+
/// Then execute the query.
172+
///
173+
/// # Errors
174+
/// May return Err Result if:
175+
/// 1) Transaction is not started
176+
/// 2) Transaction is done already
177+
/// 3) Can not create/retrieve prepared statement
178+
/// 4) Can not execute statement
179+
pub async fn inner_fetch_row(
180+
&self,
181+
querystring: String,
182+
parameters: Vec<PythonDTO>,
183+
) -> RustPSQLDriverPyResult<PSQLDriverSinglePyQueryResult> {
184+
let db_client_arc = self.db_client.clone();
185+
let is_started_arc = self.is_started.clone();
186+
let is_done_arc = self.is_done.clone();
169187

188+
let db_client_guard = db_client_arc.read().await;
189+
let is_started_guard = is_started_arc.read().await;
190+
let is_done_guard = is_done_arc.read().await;
191+
192+
if !*is_started_guard {
193+
return Err(RustPSQLDriverError::DataBaseTransactionError(
194+
"Transaction is not started, please call begin() on transaction".into(),
195+
));
196+
}
197+
if *is_done_guard {
198+
return Err(RustPSQLDriverError::DataBaseTransactionError(
199+
"Transaction is already committed or rolled back".into(),
200+
));
201+
}
202+
203+
let mut vec_parameters: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(parameters.len());
204+
for param in &parameters {
205+
vec_parameters.push(param);
206+
}
207+
208+
let statement = db_client_guard.prepare_cached(&querystring).await?;
209+
210+
let result = db_client_guard
211+
.query(&statement, &vec_parameters.into_boxed_slice())
212+
.await?;
213+
214+
Ok(PSQLDriverSinglePyQueryResult::new(result))
215+
}
170216
/// Start transaction
171217
/// Set up isolation level if specified
172218
/// Set up deferable if specified
@@ -599,7 +645,8 @@ impl Transaction {
599645
let transaction_arc = slf.transaction.clone();
600646
let transaction_arc2 = slf.transaction.clone();
601647
let is_no_exc = exception.is_none();
602-
let exc_message = format!("{exception}");
648+
let py_err = PyErr::from_value(exception);
649+
603650
rustengine_future(py, async move {
604651
let transaction_guard = transaction_arc.read().await;
605652
if is_no_exc {
@@ -609,7 +656,7 @@ impl Transaction {
609656
})
610657
} else {
611658
transaction_guard.inner_rollback().await?;
612-
Err(RustPSQLDriverError::DataBaseTransactionError(exc_message))
659+
Err(RustPSQLDriverError::PyError(py_err))
613660
}
614661
})
615662
}
@@ -673,6 +720,33 @@ impl Transaction {
673720
.await
674721
})
675722
}
723+
/// Execute querystring with parameters and return first row.
724+
///
725+
/// It converts incoming parameters to rust readable,
726+
/// executes query with them and returns first row of response.
727+
///
728+
/// # Errors
729+
///
730+
/// May return Err Result if:
731+
/// 1) Cannot convert python parameters
732+
/// 2) Cannot execute querystring.
733+
pub fn fetch_row<'a>(
734+
&'a self,
735+
py: Python<'a>,
736+
querystring: String,
737+
parameters: Option<&'a PyList>,
738+
) -> RustPSQLDriverPyResult<&PyAny> {
739+
let transaction_arc = self.transaction.clone();
740+
let mut params: Vec<PythonDTO> = vec![];
741+
if let Some(parameters) = parameters {
742+
params = convert_parameters(parameters)?;
743+
}
744+
745+
rustengine_future(py, async move {
746+
let transaction_guard = transaction_arc.read().await;
747+
transaction_guard.inner_fetch_row(querystring, params).await
748+
})
749+
}
676750

677751
/// Start the transaction.
678752
///

src/query_result.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,43 @@ impl PSQLDriverPyQueryResult {
4343
Ok(result.to_object(py))
4444
}
4545
}
46+
47+
#[pyclass(name = "SingleQueryResult")]
48+
#[allow(clippy::module_name_repetitions)]
49+
pub struct PSQLDriverSinglePyQueryResult {
50+
inner: Vec<Row>,
51+
}
52+
53+
impl PSQLDriverSinglePyQueryResult {
54+
#[must_use]
55+
pub fn new(database_row: Vec<Row>) -> Self {
56+
PSQLDriverSinglePyQueryResult {
57+
inner: database_row,
58+
}
59+
}
60+
}
61+
62+
#[pymethods]
63+
impl PSQLDriverSinglePyQueryResult {
64+
/// Return result as a Python list of dicts.
65+
///
66+
/// It's a common variant how to return a result for the future
67+
/// processing.
68+
///
69+
/// # Errors
70+
///
71+
/// May return Err Result if can not convert
72+
/// postgres type to python or set new key-value pair
73+
/// in python dict.
74+
pub fn result(&self, py: Python<'_>) -> RustPSQLDriverPyResult<Py<PyAny>> {
75+
if let Some(row) = self.inner.first() {
76+
let python_dict = PyDict::new(py);
77+
for (column_idx, column) in row.columns().iter().enumerate() {
78+
let python_type = postgres_to_py(py, row, column, column_idx)?;
79+
python_dict.set_item(column.name().to_object(py), python_type)?;
80+
}
81+
return Ok(python_dict.to_object(py));
82+
}
83+
return Ok(PyDict::new(py).to_object(py));
84+
}
85+
}

0 commit comments

Comments
 (0)