Skip to content

Commit 77ca759

Browse files
committed
Continue implementing new realization. pre-commit disabled.
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent 639594e commit 77ca759

File tree

11 files changed

+224
-152
lines changed

11 files changed

+224
-152
lines changed

python/psqlpy/_internal/__init__.pyi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,3 +1017,9 @@ class ConnectionPool:
10171017
# it will be dropped on Rust side.
10181018
```
10191019
"""
1020+
1021+
async def connection(self: Self) -> Connection:
1022+
"""Create new connection.
1023+
1024+
It acquires new connection from the database pool.
1025+
"""

src/common.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::future::Future;
2-
3-
use pyo3::{types::PyModule, IntoPy, PyAny, PyObject, PyResult, Python};
4-
5-
use crate::exceptions::rust_errors::RustPSQLDriverPyResult;
1+
use pyo3::{types::PyModule, PyResult, Python};
62

73
/// Add new module to the parent one.
84
///
@@ -27,20 +23,3 @@ pub fn add_module(
2723
)?;
2824
Ok(())
2925
}
30-
31-
// /// Simple wrapper for pyo3 `pyo3_asyncio::tokio::future_into_py`.
32-
// ///
33-
// /// It wraps incoming Future and return internal Result.
34-
// ///
35-
// /// # Errors
36-
// ///
37-
// /// May return Err Result if future acts incorrect.
38-
// pub fn rustdriver_future<F, T>(py: Python<'_>, future: F) -> RustPSQLDriverPyResult<&PyAny>
39-
// where
40-
// F: Future<Output = RustPSQLDriverPyResult<T>> + Send + 'static,
41-
// T: IntoPy<PyObject>,
42-
// {
43-
// let res = pyo3_asyncio::tokio::future_into_py(py, async { future.await.map_err(Into::into) })
44-
// .map(Into::into)?;
45-
// Ok(res)
46-
// }

src/driver/connection.rs

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
1-
// use deadpool_postgres::Object;
2-
// use pyo3::{pyclass, pymethods, types::PyList, PyAny, Python};
3-
// use std::{collections::HashSet, sync::Arc, vec};
4-
5-
// use crate::{
6-
// common::rustdriver_future,
7-
// exceptions::rust_errors::RustPSQLDriverPyResult,
8-
// query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
9-
// value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter},
10-
// };
11-
// use tokio_postgres::Row;
12-
13-
// use super::{
14-
// transaction::{RustTransaction, Transaction},
15-
// transaction_options::{IsolationLevel, ReadVariant},
16-
// };
1+
use deadpool_postgres::Object;
2+
use pyo3::{pyclass, pymethods, types::PyList, PyAny, Python};
3+
use std::{collections::HashSet, sync::Arc, vec};
4+
5+
use crate::{
6+
exceptions::rust_errors::RustPSQLDriverPyResult,
7+
query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
8+
runtime::tokio,
9+
value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter},
10+
};
11+
use tokio_postgres::Row;
1712

1813
// #[allow(clippy::module_name_repetitions)]
1914
// pub struct RustConnection {
@@ -294,37 +289,37 @@
294289
// })
295290
// }
296291

297-
// /// Execute querystring with parameters and return first row.
298-
// ///
299-
// /// It converts incoming parameters to rust readable,
300-
// /// executes query with them and returns first row of response.
301-
// ///
302-
// /// # Errors
303-
// ///
304-
// /// May return Err Result if:
305-
// /// 1) Cannot convert python parameters
306-
// /// 2) Cannot execute querystring.
307-
// /// 3) Query returns more than one row.
308-
// pub fn fetch_row<'a>(
309-
// &'a self,
310-
// py: Python<'a>,
311-
// querystring: String,
312-
// parameters: Option<&'a PyList>,
313-
// prepared: Option<bool>,
314-
// ) -> RustPSQLDriverPyResult<&PyAny> {
315-
// let transaction_arc = self.inner_connection.clone();
316-
// let mut params: Vec<PythonDTO> = vec![];
317-
// if let Some(parameters) = parameters {
318-
// params = convert_parameters(parameters)?;
319-
// }
320-
321-
// rustdriver_future(py, async move {
322-
// transaction_arc
323-
// .inner_fetch_row(querystring, params, prepared.unwrap_or(true))
324-
// .await
325-
// })
292+
// / Execute querystring with parameters and return first row.
293+
// /
294+
// / It converts incoming parameters to rust readable,
295+
// / executes query with them and returns first row of response.
296+
// /
297+
// / # Errors
298+
// /
299+
// / May return Err Result if:
300+
// / 1) Cannot convert python parameters
301+
// / 2) Cannot execute querystring.
302+
// / 3) Query returns more than one row.
303+
// pub fn fetch_row<'a>(
304+
// &'a self,
305+
// py: Python<'a>,
306+
// querystring: String,
307+
// parameters: Option<&'a PyList>,
308+
// prepared: Option<bool>,
309+
// ) -> RustPSQLDriverPyResult<&PyAny> {
310+
// let transaction_arc = self.inner_connection.clone();
311+
// let mut params: Vec<PythonDTO> = vec![];
312+
// if let Some(parameters) = parameters {
313+
// params = convert_parameters(parameters)?;
326314
// }
327315

316+
// rustdriver_future(py, async move {
317+
// transaction_arc
318+
// .inner_fetch_row(querystring, params, prepared.unwrap_or(true))
319+
// .await
320+
// })
321+
// }
322+
328323
// /// Execute querystring with parameters and return first value in the first row.
329324
// ///
330325
// /// It converts incoming parameters to rust readable,
@@ -385,3 +380,61 @@
385380
// )
386381
// }
387382
// }
383+
384+
#[pyclass]
385+
pub struct Connection {
386+
pub db_client: Arc<Object>,
387+
}
388+
389+
impl Connection {
390+
pub fn new(db_client: Object) -> Self {
391+
Connection {
392+
db_client: Arc::new(db_client),
393+
}
394+
}
395+
}
396+
397+
#[pymethods]
398+
impl Connection {
399+
/// Execute statement with or witout parameters.
400+
///
401+
/// # Errors
402+
///
403+
/// May return Err Result if
404+
/// 1) Cannot convert incoming parameters
405+
/// 2) Cannot prepare statement
406+
/// 3) Cannot execute query
407+
pub async fn execute(
408+
self_: pyo3::Py<Self>,
409+
querystring: String,
410+
prepared: Option<bool>,
411+
parameters: Option<pyo3::Py<PyAny>>,
412+
) -> RustPSQLDriverPyResult<PSQLDriverPyQueryResult> {
413+
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
414+
let mut params: Vec<PythonDTO> = vec![];
415+
if let Some(parameters) = parameters {
416+
params = convert_parameters(parameters)?;
417+
}
418+
let prepared = prepared.unwrap_or(true);
419+
420+
let vec_parameters: Vec<&QueryParameter> = params
421+
.iter()
422+
.map(|param| param as &QueryParameter)
423+
.collect();
424+
425+
let result = if prepared {
426+
db_client
427+
.query(
428+
&db_client.prepare_cached(&querystring).await?,
429+
&vec_parameters.into_boxed_slice(),
430+
)
431+
.await?
432+
} else {
433+
db_client
434+
.query(&querystring, &vec_parameters.into_boxed_slice())
435+
.await?
436+
};
437+
438+
Ok(PSQLDriverPyQueryResult::new(result))
439+
}
440+
}

src/driver/connection_pool.rs

Lines changed: 75 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::driver::runtime::tokio;
1+
use crate::runtime::tokio;
22
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
3-
use pyo3::{pyclass, pymethods, types::PyAnyMethods, Bound, PyAny, PyObject, Python};
4-
use std::{str::FromStr, sync::Arc, vec};
5-
use tokio_postgres::NoTls;
3+
use pyo3::{pyclass, pymethods, PyAny};
4+
use std::{str::FromStr, vec};
5+
use tokio_postgres::{NoTls, Row};
66

77
use crate::{
88
// common::rustdriver_future,
@@ -13,6 +13,7 @@ use crate::{
1313

1414
use super::{
1515
common_options::ConnRecyclingMethod,
16+
connection::Connection,
1617
// connection::{Connection, RustConnection},
1718
};
1819

@@ -311,38 +312,13 @@ pub struct ConnectionPool {
311312
db_pool: Option<Pool>,
312313
}
313314

314-
#[pymethods]
315315
impl ConnectionPool {
316-
#[new]
317-
pub fn new(
318-
dsn: Option<String>,
319-
username: Option<String>,
320-
password: Option<String>,
321-
host: Option<String>,
322-
port: Option<u16>,
323-
db_name: Option<String>,
324-
max_db_pool_size: Option<usize>,
325-
conn_recycling_method: Option<ConnRecyclingMethod>,
326-
) -> Self {
327-
ConnectionPool {
328-
dsn,
329-
username,
330-
password,
331-
host,
332-
port,
333-
db_name,
334-
max_db_pool_size,
335-
conn_recycling_method,
336-
db_pool: None,
337-
}
338-
}
339-
340316
/// Create new Database pool.
341317
///
342318
/// # Errors
343319
/// May return Err Result if Database pool is already initialized,
344320
/// `max_db_pool_size` is less than 2 or it's impossible to build db pool.
345-
pub fn startup(&mut self) -> RustPSQLDriverPyResult<()> {
321+
pub fn startup(mut self) -> RustPSQLDriverPyResult<Self> {
346322
let dsn = self.dsn.clone();
347323
let password = self.password.clone();
348324
let username = self.username.clone();
@@ -406,7 +382,35 @@ impl ConnectionPool {
406382
}
407383

408384
self.db_pool = Some(db_pool_builder.build()?);
409-
Ok(())
385+
Ok(self)
386+
}
387+
}
388+
389+
#[pymethods]
390+
impl ConnectionPool {
391+
#[new]
392+
pub fn new(
393+
dsn: Option<String>,
394+
username: Option<String>,
395+
password: Option<String>,
396+
host: Option<String>,
397+
port: Option<u16>,
398+
db_name: Option<String>,
399+
max_db_pool_size: Option<usize>,
400+
conn_recycling_method: Option<ConnRecyclingMethod>,
401+
) -> RustPSQLDriverPyResult<Self> {
402+
let conn_pool = ConnectionPool {
403+
dsn,
404+
username,
405+
password,
406+
host,
407+
port,
408+
db_name,
409+
max_db_pool_size,
410+
conn_recycling_method,
411+
db_pool: None,
412+
};
413+
conn_pool.startup()
410414
}
411415

412416
/// Execute querystring with parameters.
@@ -435,35 +439,52 @@ impl ConnectionPool {
435439
let result = if prepared {
436440
tokio()
437441
.spawn(async move {
438-
let vec_parameters: Vec<&QueryParameter> = params
439-
.iter()
440-
.map(|param| param as &QueryParameter)
441-
.collect();
442-
db_pool_manager
443-
.query(
444-
&db_pool_manager.prepare_cached(&querystring).await.unwrap(),
445-
&vec_parameters.into_boxed_slice(),
446-
)
447-
.await
448-
.unwrap()
442+
Ok::<Vec<Row>, RustPSQLDriverError>(
443+
db_pool_manager
444+
.query(
445+
&db_pool_manager.prepare_cached(&querystring).await.unwrap(),
446+
&params
447+
.iter()
448+
.map(|param| param as &QueryParameter)
449+
.collect::<Vec<&QueryParameter>>()
450+
.into_boxed_slice(),
451+
)
452+
.await?,
453+
)
449454
})
450-
.await
451-
.unwrap()
455+
.await??
452456
} else {
453457
tokio()
454458
.spawn(async move {
455-
let vec_parameters: Vec<&QueryParameter> = params
456-
.iter()
457-
.map(|param| param as &QueryParameter)
458-
.collect();
459-
db_pool_manager
460-
.query(&querystring, &vec_parameters.into_boxed_slice())
461-
.await
462-
.unwrap()
459+
Ok::<Vec<Row>, RustPSQLDriverError>(
460+
db_pool_manager
461+
.query(
462+
&querystring,
463+
&params
464+
.iter()
465+
.map(|param| param as &QueryParameter)
466+
.collect::<Vec<&QueryParameter>>()
467+
.into_boxed_slice(),
468+
)
469+
.await?,
470+
)
463471
})
464-
.await
465-
.unwrap()
472+
.await??
466473
};
467474
Ok(PSQLDriverPyQueryResult::new(result))
468475
}
476+
477+
/// Return new single connection.
478+
///
479+
/// # Errors
480+
/// May return Err Result if cannot get new connection from the pool.
481+
pub async fn connection(self_: pyo3::Py<Self>) -> RustPSQLDriverPyResult<Connection> {
482+
let db_pool = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_pool.clone().unwrap());
483+
let db_connection = tokio()
484+
.spawn(async move { db_pool.get().await.unwrap() })
485+
.await
486+
.unwrap();
487+
488+
Ok(Connection::new(db_connection))
489+
}
469490
}

src/driver/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@ pub mod common_options;
22
pub mod connection;
33
pub mod connection_pool;
44
pub mod cursor;
5-
pub mod runtime;
65
pub mod transaction;
76
pub mod transaction_options;

0 commit comments

Comments
 (0)