Skip to content

Commit 4374fc6

Browse files
committed
Added connection class
1 parent 8866f45 commit 4374fc6

File tree

6 files changed

+245
-30
lines changed

6 files changed

+245
-30
lines changed

README.md

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async def main() -> None:
5252
# rust does it instead.
5353

5454
```
55+
Please take into account that each new execute gets new connection from connection pool.
5556

5657
## Query parameters
5758
You can pass parameters into queries.
@@ -65,7 +66,37 @@ Any placeholder must be marked with `$< num>`.
6566
)
6667
```
6768

68-
Support all types from Python is in development, but there are support for all basic types. More you can find here: ...
69+
## Connection
70+
You can work with connection instead of DatabasePool.
71+
```python
72+
from typing import Any
73+
import asyncio
74+
75+
from rust_psql_driver import PSQLPool
76+
77+
78+
db_pool = PSQLPool(
79+
username="postgres",
80+
password="pg_password",
81+
host="localhost",
82+
port=5432,
83+
db_name="postgres",
84+
max_db_pool_size=2,
85+
)
86+
87+
async def main() -> None:
88+
await db_pool.startup()
89+
90+
connection = await db_pool.connection()
91+
92+
res: list[dict[str, Any]] = await connection.execute(
93+
"SELECT * FROM users",
94+
)
95+
96+
print(res)
97+
# You don't need to close connection by yourself,
98+
# rust does it instead.
99+
```
69100

70101
## Transactions
71102
Of course it's possible to use transactions with this driver.
@@ -85,7 +116,8 @@ db_pool = PSQLPool()
85116
async def main() -> None:
86117
await db_pool.startup()
87118

88-
transaction = await db_pool.transaction(
119+
connection = await db_pool.connection()
120+
transaction = await connection.transaction(
89121
isolation_level=IsolationLevel.Serializable,
90122
)
91123

@@ -112,7 +144,8 @@ db_pool = PSQLPool()
112144
async def main() -> None:
113145
await db_pool.startup()
114146

115-
transaction = await db_pool.transaction(
147+
connection = await db_pool.connection()
148+
transaction = await connection.transaction(
116149
isolation_level=IsolationLevel.Serializable,
117150
)
118151

@@ -146,7 +179,8 @@ db_pool = PSQLPool()
146179
async def main() -> None:
147180
await db_pool.startup()
148181

149-
transaction = await db_pool.transaction(
182+
connection = await db_pool.connection()
183+
transaction = await connection.transaction(
150184
isolation_level=IsolationLevel.Serializable,
151185
)
152186

@@ -173,7 +207,8 @@ db_pool = PSQLPool()
173207
async def main() -> None:
174208
await db_pool.startup()
175209

176-
transaction = await db_pool.transaction(
210+
connection = await db_pool.connection()
211+
transaction = await connection.transaction(
177212
isolation_level=IsolationLevel.Serializable,
178213
)
179214

@@ -204,7 +239,8 @@ db_pool = PSQLPool()
204239
async def main() -> None:
205240
await db_pool.startup()
206241

207-
transaction = await db_pool.transaction(
242+
connection = await db_pool.connection()
243+
transaction = await connection.transaction(
208244
isolation_level=IsolationLevel.Serializable,
209245
)
210246

python/psql_rust_driver/_internal/__init__.pyi

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,57 @@ class Transaction:
206206
await transaction.release_savepoint
207207
```
208208
"""
209+
210+
211+
class Connection:
212+
"""Connection from Database Connection Pool.
213+
214+
It can be created only from connection pool.
215+
"""
216+
217+
async def execute(
218+
self: Self,
219+
querystring: str,
220+
parameters: List[Any] | None = None,
221+
) -> QueryResult:
222+
"""Execute the query.
223+
224+
Querystring can contain `$<number>` parameters
225+
for converting them in the driver side.
226+
227+
### Parameters:
228+
- `querystring`: querystring to execute.
229+
- `parameters`: list of parameters to pass in the query.
230+
231+
### Example:
232+
```python
233+
import asyncio
234+
235+
from psql_rust_driver import PSQLPool, QueryResult
236+
237+
238+
async def main() -> None:
239+
db_pool = PSQLPool()
240+
await db_pool.startup()
241+
242+
connection = await db_pool.connection()
243+
query_result: QueryResult = await connection.execute(
244+
"SELECT username FROM users WHERE id = $1",
245+
[100],
246+
)
247+
dict_result: List[Dict[Any, Any]] = query_result.result()
248+
```
249+
"""
250+
251+
async def transaction(
252+
self,
253+
isolation_level: IsolationLevel | None = IsolationLevel.ReadCommitted,
254+
) -> Transaction:
255+
"""Create new transaction.
256+
257+
### Parameters:
258+
- `isolation_level`: configure isolation level of the transaction.
259+
"""
209260

210261

211262
class PSQLPool:
@@ -284,15 +335,8 @@ class PSQLPool:
284335
"""
285336
...
286337

287-
async def transaction(
288-
self,
289-
isolation_level: IsolationLevel | None = IsolationLevel.ReadCommitted,
290-
) -> Transaction:
291-
"""Create new transaction.
338+
async def connection(self: Self) -> Connection:
339+
"""Create new connection.
292340
293-
It acquires new connection from the database pool
294-
and make it acts as transaction.
295-
296-
### Parameters:
297-
- `isolation_level`: configure isolation level of the transaction.
341+
It acquires new connection from the database pool.
298342
"""

src/driver/connection.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use deadpool_postgres::Object;
2+
use pyo3::{pyclass, pymethods, PyAny, Python};
3+
use std::{collections::HashSet, sync::Arc, vec};
4+
use tokio_postgres::types::ToSql;
5+
6+
use crate::{
7+
common::rustengine_future,
8+
exceptions::rust_errors::RustPSQLDriverPyResult,
9+
query_result::PSQLDriverPyQueryResult,
10+
value_converter::{convert_parameters, PythonDTO},
11+
};
12+
13+
use super::{
14+
transaction::{RustTransaction, Transaction},
15+
transaction_options::IsolationLevel,
16+
};
17+
18+
pub struct RustConnection {
19+
pub db_client: Arc<tokio::sync::RwLock<Object>>,
20+
}
21+
22+
impl RustConnection {
23+
/// Execute querystring with parameters.
24+
///
25+
/// Method doesn't acquire lock on database connection.
26+
/// It prepares and caches querystring in the inner Object object.
27+
///
28+
/// Then execute the query.
29+
///
30+
/// # Errors:
31+
/// May return Err Result if:
32+
/// 1) Can not create/retrieve prepared statement
33+
/// 2) Can not execute statement
34+
pub async fn inner_execute<'a>(
35+
&'a self,
36+
querystring: String,
37+
parameters: Vec<PythonDTO>,
38+
) -> RustPSQLDriverPyResult<PSQLDriverPyQueryResult> {
39+
let db_client_arc = self.db_client.clone();
40+
41+
let db_client_guard = db_client_arc.read().await;
42+
43+
let mut vec_parameters: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(parameters.len());
44+
for param in parameters.iter() {
45+
vec_parameters.push(param);
46+
}
47+
48+
let statement: tokio_postgres::Statement =
49+
db_client_guard.prepare_cached(&querystring).await?;
50+
51+
let result = db_client_guard
52+
.query(&statement, &vec_parameters.into_boxed_slice())
53+
.await?;
54+
55+
Ok(PSQLDriverPyQueryResult::new(result))
56+
}
57+
58+
pub fn inner_transaction<'a>(&'a self, isolation_level: Option<IsolationLevel>) -> Transaction {
59+
let inner_transaction = RustTransaction {
60+
db_client: self.db_client.clone(),
61+
is_started: Arc::new(tokio::sync::RwLock::new(false)),
62+
is_done: Arc::new(tokio::sync::RwLock::new(false)),
63+
rollback_savepoint: Arc::new(tokio::sync::RwLock::new(HashSet::new())),
64+
isolation_level: isolation_level,
65+
};
66+
67+
Transaction {
68+
transaction: Arc::new(tokio::sync::RwLock::new(inner_transaction)),
69+
}
70+
}
71+
}
72+
73+
#[pyclass]
74+
pub struct Connection(pub Arc<tokio::sync::RwLock<RustConnection>>);
75+
76+
#[pymethods]
77+
impl Connection {
78+
/// Execute querystring with parameters.
79+
///
80+
/// It converts incoming parameters to rust readable
81+
/// and then execute the query with them.
82+
///
83+
/// # Errors:
84+
///
85+
/// May return Err Result if:
86+
/// 1) Cannot convert python parameters
87+
/// 2) Cannot execute querystring.
88+
pub fn execute<'a>(
89+
&'a self,
90+
py: Python<'a>,
91+
querystring: String,
92+
parameters: Option<&'a PyAny>,
93+
) -> RustPSQLDriverPyResult<&PyAny> {
94+
let connection_arc = self.0.clone();
95+
let mut params: Vec<PythonDTO> = vec![];
96+
if let Some(parameters) = parameters {
97+
params = convert_parameters(parameters)?
98+
}
99+
100+
rustengine_future(py, async move {
101+
let connection_guard = connection_arc.read().await;
102+
Ok(connection_guard.inner_execute(querystring, params).await?)
103+
})
104+
}
105+
106+
pub fn transaction<'a>(
107+
&'a self,
108+
py: Python<'a>,
109+
isolation_level: Option<IsolationLevel>,
110+
) -> RustPSQLDriverPyResult<&PyAny> {
111+
let connection_arc = self.0.clone();
112+
113+
rustengine_future(py, async move {
114+
let connection_guard = connection_arc.read().await;
115+
Ok(connection_guard.inner_transaction(isolation_level))
116+
})
117+
}
118+
}

src/driver/connection_pool.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
value_converter::{convert_parameters, PythonDTO},
1212
};
1313

14-
use super::transaction_options::IsolationLevel;
14+
use super::{
15+
connection::{Connection, RustConnection},
16+
transaction_options::IsolationLevel,
17+
};
1518

1619
/// PSQLPool for internal use only.
1720
///
@@ -49,6 +52,27 @@ impl RustPSQLPool {
4952
}
5053

5154
impl RustPSQLPool {
55+
pub async fn inner_connection<'a>(&'a self) -> RustPSQLDriverPyResult<Connection> {
56+
let db_pool_arc = self.db_pool.clone();
57+
58+
let db_pool_guard = db_pool_arc.read().await;
59+
60+
let db_pool_manager = db_pool_guard
61+
.as_ref()
62+
.ok_or(RustPSQLDriverError::DatabasePoolError(
63+
"Database pool is not initialized".into(),
64+
))?
65+
.get()
66+
.await?;
67+
68+
let inner_connection = RustConnection {
69+
db_client: Arc::new(tokio::sync::RwLock::new(db_pool_manager)),
70+
};
71+
72+
Ok(Connection(Arc::new(tokio::sync::RwLock::new(
73+
inner_connection,
74+
))))
75+
}
5276
/// Execute querystring with parameters.
5377
///
5478
/// Prepare statement and cache it, then execute.
@@ -223,23 +247,16 @@ impl PSQLPool {
223247
})
224248
}
225249

226-
/// Return python transaction.
250+
/// Return single connection.
227251
///
228252
/// # Errors:
229-
/// May return Err Result if `inner_transaction` returns error.
230-
pub fn transaction<'a>(
231-
&'a self,
232-
py: Python<'a>,
233-
isolation_level: Option<IsolationLevel>,
234-
) -> RustPSQLDriverPyResult<&'a PyAny> {
253+
/// May return Err Result if `inner_connection` returns error.
254+
pub fn connection<'a>(&'a self, py: Python<'a>) -> RustPSQLDriverPyResult<&'a PyAny> {
235255
let psql_pool_arc = self.rust_psql_pool.clone();
236256

237257
rustengine_future(py, async move {
238258
let psql_pool_guard = psql_pool_arc.write().await;
239-
240-
let transaction = psql_pool_guard.inner_transaction(isolation_level).await?;
241-
242-
Ok(transaction)
259+
Ok(psql_pool_guard.inner_connection().await?)
243260
})
244261
}
245262

src/driver/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod connection;
12
pub mod connection_pool;
23
pub mod transaction;
34
pub mod transaction_options;

src/driver/transaction.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ impl RustTransaction {
6767
vec_parameters.push(param);
6868
}
6969

70-
let statement: tokio_postgres::Statement =
71-
db_client_guard.prepare_cached(&querystring).await?;
70+
let statement = db_client_guard.prepare_cached(&querystring).await?;
7271

7372
let result = db_client_guard
7473
.query(&statement, &vec_parameters.into_boxed_slice())

0 commit comments

Comments
 (0)