Skip to content

Commit db437c9

Browse files
committed
Continue implementing new realization. Implemented Connection. pre-commit disabled.
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent 77ca759 commit db437c9

File tree

3 files changed

+206
-16
lines changed

3 files changed

+206
-16
lines changed

src/driver/connection.rs

Lines changed: 201 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use deadpool_postgres::Object;
2-
use pyo3::{pyclass, pymethods, types::PyList, PyAny, Python};
3-
use std::{collections::HashSet, sync::Arc, vec};
2+
use pyo3::{pyclass, pymethods, Py, PyAny, Python};
3+
use std::{sync::Arc, vec};
44

55
use crate::{
6-
exceptions::rust_errors::RustPSQLDriverPyResult,
6+
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
77
query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
8-
runtime::tokio,
98
value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter},
109
};
11-
use tokio_postgres::Row;
1210

1311
// #[allow(clippy::module_name_repetitions)]
1412
// pub struct RustConnection {
@@ -411,30 +409,221 @@ impl Connection {
411409
parameters: Option<pyo3::Py<PyAny>>,
412410
) -> RustPSQLDriverPyResult<PSQLDriverPyQueryResult> {
413411
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
412+
414413
let mut params: Vec<PythonDTO> = vec![];
415414
if let Some(parameters) = parameters {
416415
params = convert_parameters(parameters)?;
417416
}
418417
let prepared = prepared.unwrap_or(true);
419418

420-
let vec_parameters: Vec<&QueryParameter> = params
421-
.iter()
422-
.map(|param| param as &QueryParameter)
423-
.collect();
424-
425419
let result = if prepared {
426420
db_client
427421
.query(
428422
&db_client.prepare_cached(&querystring).await?,
429-
&vec_parameters.into_boxed_slice(),
423+
&params
424+
.iter()
425+
.map(|param| param as &QueryParameter)
426+
.collect::<Vec<&QueryParameter>>()
427+
.into_boxed_slice(),
430428
)
431429
.await?
432430
} else {
433431
db_client
434-
.query(&querystring, &vec_parameters.into_boxed_slice())
432+
.query(
433+
&querystring,
434+
&params
435+
.iter()
436+
.map(|param| param as &QueryParameter)
437+
.collect::<Vec<&QueryParameter>>()
438+
.into_boxed_slice(),
439+
)
435440
.await?
436441
};
437442

438443
Ok(PSQLDriverPyQueryResult::new(result))
439444
}
445+
446+
/// Execute querystring with parameters.
447+
///
448+
/// It converts incoming parameters to rust readable
449+
/// and then execute the query with them.
450+
///
451+
/// # Errors
452+
///
453+
/// May return Err Result if:
454+
/// 1) Cannot convert python parameters
455+
/// 2) Cannot execute querystring.
456+
pub async fn execute_many<'a>(
457+
self_: pyo3::Py<Self>,
458+
querystring: String,
459+
prepared: Option<bool>,
460+
parameters: Option<Vec<Py<PyAny>>>,
461+
) -> RustPSQLDriverPyResult<()> {
462+
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
463+
let mut params: Vec<Vec<PythonDTO>> = vec![];
464+
if let Some(parameters) = parameters {
465+
for vec_of_py_any in parameters {
466+
params.push(convert_parameters(vec_of_py_any)?);
467+
}
468+
}
469+
let prepared = prepared.unwrap_or(true);
470+
471+
db_client.batch_execute("BEGIN;").await.map_err(|err| {
472+
RustPSQLDriverError::DataBaseTransactionError(format!(
473+
"Cannot start transaction to run execute_many: {err}"
474+
))
475+
})?;
476+
for param in params {
477+
let querystring_result = if prepared {
478+
let prepared_stmt = &db_client.prepare_cached(&querystring).await;
479+
if let Err(error) = prepared_stmt {
480+
return Err(RustPSQLDriverError::DataBaseTransactionError(format!(
481+
"Cannot prepare statement in execute_many, operation rolled back {error}",
482+
)));
483+
}
484+
db_client
485+
.query(
486+
&db_client.prepare_cached(&querystring).await?,
487+
&param
488+
.iter()
489+
.map(|param| param as &QueryParameter)
490+
.collect::<Vec<&QueryParameter>>()
491+
.into_boxed_slice(),
492+
)
493+
.await
494+
} else {
495+
db_client
496+
.query(
497+
&querystring,
498+
&param
499+
.iter()
500+
.map(|param| param as &QueryParameter)
501+
.collect::<Vec<&QueryParameter>>()
502+
.into_boxed_slice(),
503+
)
504+
.await
505+
};
506+
507+
if let Err(error) = querystring_result {
508+
db_client.batch_execute("ROLLBACK;").await?;
509+
return Err(RustPSQLDriverError::DataBaseTransactionError(format!(
510+
"Error occured in `execute_many` statement, transaction is rolled back: {error}"
511+
)));
512+
}
513+
}
514+
515+
db_client.batch_execute("COMMIT;").await?;
516+
517+
Ok(())
518+
}
519+
520+
/// Fetch exaclty single row from query.
521+
///
522+
/// Method doesn't acquire lock on any structure fields.
523+
/// It prepares and caches querystring in the inner Object object.
524+
///
525+
/// Then execute the query.
526+
///
527+
/// # Errors
528+
/// May return Err Result if:
529+
/// 1) Transaction is not started
530+
/// 2) Transaction is done already
531+
/// 3) Can not create/retrieve prepared statement
532+
/// 4) Can not execute statement
533+
/// 5) Query returns more than one row
534+
pub async fn fetch_row(
535+
self_: pyo3::Py<Self>,
536+
querystring: String,
537+
prepared: Option<bool>,
538+
parameters: Option<pyo3::Py<PyAny>>,
539+
) -> RustPSQLDriverPyResult<PSQLDriverSinglePyQueryResult> {
540+
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
541+
542+
let mut params: Vec<PythonDTO> = vec![];
543+
if let Some(parameters) = parameters {
544+
params = convert_parameters(parameters)?;
545+
}
546+
let prepared = prepared.unwrap_or(true);
547+
548+
let result = if prepared {
549+
db_client
550+
.query_one(
551+
&db_client.prepare_cached(&querystring).await?,
552+
&params
553+
.iter()
554+
.map(|param| param as &QueryParameter)
555+
.collect::<Vec<&QueryParameter>>()
556+
.into_boxed_slice(),
557+
)
558+
.await?
559+
} else {
560+
db_client
561+
.query_one(
562+
&querystring,
563+
&params
564+
.iter()
565+
.map(|param| param as &QueryParameter)
566+
.collect::<Vec<&QueryParameter>>()
567+
.into_boxed_slice(),
568+
)
569+
.await?
570+
};
571+
572+
Ok(PSQLDriverSinglePyQueryResult::new(result))
573+
}
574+
575+
/// Execute querystring with parameters and return first value in the first row.
576+
///
577+
/// It converts incoming parameters to rust readable,
578+
/// executes query with them and returns first row of response.
579+
///
580+
/// # Errors
581+
///
582+
/// May return Err Result if:
583+
/// 1) Cannot convert python parameters
584+
/// 2) Cannot execute querystring.
585+
/// 3) Query returns more than one row
586+
pub async fn fetch_val<'a>(
587+
self_: pyo3::Py<Self>,
588+
querystring: String,
589+
prepared: Option<bool>,
590+
parameters: Option<pyo3::Py<PyAny>>,
591+
) -> RustPSQLDriverPyResult<Py<PyAny>> {
592+
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
593+
594+
let mut params: Vec<PythonDTO> = vec![];
595+
if let Some(parameters) = parameters {
596+
params = convert_parameters(parameters)?;
597+
}
598+
let prepared = prepared.unwrap_or(true);
599+
600+
let result = if prepared {
601+
db_client
602+
.query_one(
603+
&db_client.prepare_cached(&querystring).await?,
604+
&params
605+
.iter()
606+
.map(|param| param as &QueryParameter)
607+
.collect::<Vec<&QueryParameter>>()
608+
.into_boxed_slice(),
609+
)
610+
.await?
611+
} else {
612+
db_client
613+
.query_one(
614+
&querystring,
615+
&params
616+
.iter()
617+
.map(|param| param as &QueryParameter)
618+
.collect::<Vec<&QueryParameter>>()
619+
.into_boxed_slice(),
620+
)
621+
.await?
622+
};
623+
624+
Python::with_gil(|gil| match result.columns().first() {
625+
Some(first_column) => postgres_to_py(gil, &result, first_column, 0),
626+
None => Ok(gil.None()),
627+
})
628+
}
440629
}

src/driver/connection_pool.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,9 +481,10 @@ impl ConnectionPool {
481481
pub async fn connection(self_: pyo3::Py<Self>) -> RustPSQLDriverPyResult<Connection> {
482482
let db_pool = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_pool.clone().unwrap());
483483
let db_connection = tokio()
484-
.spawn(async move { db_pool.get().await.unwrap() })
485-
.await
486-
.unwrap();
484+
.spawn(async move {
485+
Ok::<deadpool_postgres::Object, RustPSQLDriverError>(db_pool.get().await?)
486+
})
487+
.await??;
487488

488489
Ok(Connection::new(db_connection))
489490
}

src/value_converter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ pub fn convert_parameters(parameters: Py<PyAny>) -> RustPSQLDriverPyResult<Vec<P
238238
)
239239
})?;
240240
for parameter in params {
241-
result_vec.push(py_to_rust(parameter.bind(gil)).unwrap());
241+
result_vec.push(py_to_rust(parameter.bind(gil))?);
242242
}
243243
Ok::<Vec<PythonDTO>, RustPSQLDriverError>(result_vec)
244244
})?;

0 commit comments

Comments
 (0)