Skip to content

Commit 37d4fb8

Browse files
committed
save
1 parent e422ba5 commit 37d4fb8

File tree

33 files changed

+609
-209
lines changed

33 files changed

+609
-209
lines changed

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ pub mod timestamp_sorting;
2020
pub mod tls_enforced;
2121
pub mod tls_reload;
2222
pub mod transaction_state;
23+
pub mod unique_id;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use rust::setup::connections_sqlx;
2+
use sqlx::{Executor, Row};
3+
4+
#[tokio::test]
5+
async fn unique_id_returns_bigint() -> Result<(), Box<dyn std::error::Error>> {
6+
let conns = connections_sqlx().await;
7+
let sharded = conns.get(1).cloned().unwrap();
8+
9+
// Simple query
10+
let row = sharded.fetch_one("SELECT pgdog.unique_id() AS id").await?;
11+
let mut id: i64 = row.get("id");
12+
13+
assert!(
14+
id > 0,
15+
"unique_id should return a positive bigint, got {id}"
16+
);
17+
18+
for _ in 0..100 {
19+
// Prepared statement
20+
let row = sqlx::query("SELECT pgdog.unique_id() AS id")
21+
.fetch_one(&sharded)
22+
.await?;
23+
let prepared_id: i64 = row.get("id");
24+
assert!(
25+
prepared_id > 0,
26+
"prepared unique_id should return a positive bigint, got {prepared_id}"
27+
);
28+
29+
assert!(
30+
prepared_id > id,
31+
"prepared id should be greater than simple query id"
32+
);
33+
id = prepared_id;
34+
}
35+
36+
Ok(())
37+
}

pgdog/src/backend/databases.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,9 @@ impl Databases {
359359
for cluster in self.all().values() {
360360
cluster.launch();
361361

362-
if cluster.pooler_mode() == PoolerMode::Session && cluster.router_needed() {
362+
if cluster.pooler_mode() == PoolerMode::Session && cluster.use_parser() {
363363
warn!(
364-
r#"user "{}" for database "{}" requires transaction mode to route queries"#,
364+
r#"user "{}" for database "{}" requires transaction mode to parse and route queries"#,
365365
cluster.user(),
366366
cluster.name()
367367
);

pgdog/src/backend/pool/cluster.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,14 @@ impl Cluster {
413413
self.stats.clone()
414414
}
415415

416-
/// We'll need the query router to figure out
417-
/// where a query should go.
418-
pub fn router_needed(&self) -> bool {
416+
/// We need to parse the query using pg_query.
417+
pub fn use_parser(&self) -> bool {
419418
!(self.shards().len() == 1 && (self.read_only() || self.write_only()))
419+
|| self.query_parser_enabled
420+
|| self.multi_tenant.is_some()
421+
|| self.pub_sub_enabled()
422+
|| self.prepared_statements() == &PreparedStatements::Full
423+
|| self.dry_run
420424
}
421425

422426
/// Multi-tenant config.

pgdog/src/backend/replication/logical/subscriber/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl<'a> StreamContext<'a> {
5757
&self.params,
5858
None,
5959
1,
60+
None,
6061
)?)
6162
}
6263
}

pgdog/src/frontend/client/query_engine/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
backend::pool::{connection::mirror::Mirror, stats::MemoryStats},
55
frontend::{
66
client::{timeouts::Timeouts, TransactionType},
7+
router::parser::cache::CachedAst,
78
Client, ClientRequest, PreparedStatements,
89
},
910
net::{BackendKeyData, Parameters, Stream},
@@ -38,6 +39,8 @@ pub struct QueryEngineContext<'a> {
3839
pub(super) rollback: bool,
3940
/// Omnisharded modulo.
4041
pub(super) omni_sticky_index: usize,
42+
/// Query AST.
43+
pub(super) ast: Option<CachedAst>,
4144
}
4245

4346
impl<'a> QueryEngineContext<'a> {
@@ -58,6 +61,7 @@ impl<'a> QueryEngineContext<'a> {
5861
requests_left: 0,
5962
rollback: false,
6063
omni_sticky_index: client.omni_sticky_index,
64+
ast: None,
6165
}
6266
}
6367

@@ -83,6 +87,7 @@ impl<'a> QueryEngineContext<'a> {
8387
requests_left: 0,
8488
rollback: false,
8589
omni_sticky_index: thread_rng().gen_range(1..usize::MAX),
90+
ast: None,
8691
}
8792
}
8893

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ use crate::{
33
config::config,
44
frontend::{
55
client::query_engine::hooks::QueryEngineHooks,
6-
router::{parser::Shard, Route},
6+
router::{
7+
parser::Shard,
8+
rewrite::{self, RewriteRequest},
9+
Route,
10+
},
711
BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats,
812
},
913
net::{BackendKeyData, ErrorResponse, Message, Parameters},
@@ -114,6 +118,28 @@ impl QueryEngine {
114118
&mut self,
115119
context: &mut QueryEngineContext<'_>,
116120
) -> Result<QueryEngineOutput, Error> {
121+
// Check that we have the latest version of the config.
122+
if let Some(error) = self.ensure_cluster(context.in_transaction()).await {
123+
self.error_response(context, error).await?;
124+
return Ok(QueryEngineOutput::Executed);
125+
}
126+
127+
if let Ok(cluster) = self.backend.cluster() {
128+
if cluster.use_parser() && context.ast.is_none() {
129+
// Execute request rewrite, if needed.
130+
let mut rewrite = RewriteRequest::new(
131+
context.client_request,
132+
self.backend.cluster()?,
133+
context.prepared_statements,
134+
);
135+
match rewrite.execute() {
136+
Ok(ast) => context.ast = Some(ast),
137+
Err(rewrite::Error::EmptyQuery) => (),
138+
Err(err) => return Err(err.into()),
139+
}
140+
}
141+
}
142+
117143
self.stats
118144
.received(context.client_request.total_message_len());
119145
self.set_state(State::Active); // Client is active.

pgdog/src/frontend/client/query_engine/route_query.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,41 @@
11
use pgdog_config::PoolerMode;
22
use tracing::trace;
33

4+
use crate::backend::Cluster;
5+
46
use super::*;
57

68
impl QueryEngine {
9+
pub fn cluster(&self) -> Result<&Cluster, Error> {
10+
Ok(self.backend.cluster()?)
11+
}
12+
13+
/// Check that cluster still exists.
14+
pub async fn ensure_cluster(&mut self, in_transaction: bool) -> Option<ErrorResponse> {
15+
if let Ok(cluster) = self.backend.cluster() {
16+
let identifier = cluster.identifier();
17+
18+
if !in_transaction && !cluster.online() {
19+
// Reload cluster config.
20+
if let Err(_) = self.backend.safe_reload().await {
21+
return Some(ErrorResponse::connection(
22+
&identifier.user,
23+
&identifier.database,
24+
));
25+
}
26+
27+
if let Err(_) = self.backend.cluster() {
28+
return Some(ErrorResponse::connection(
29+
&identifier.user,
30+
&identifier.database,
31+
));
32+
}
33+
}
34+
}
35+
36+
None
37+
}
38+
739
pub(super) async fn route_transaction(
840
&mut self,
941
context: &mut QueryEngineContext<'_>,
@@ -18,28 +50,7 @@ impl QueryEngine {
1850

1951
// Admin doesn't have a cluster.
2052
let cluster = if let Ok(cluster) = self.backend.cluster() {
21-
if !context.in_transaction() && !cluster.online() {
22-
let identifier = cluster.identifier();
23-
24-
// Reload cluster config.
25-
self.backend.safe_reload().await?;
26-
27-
match self.backend.cluster() {
28-
Ok(cluster) => cluster,
29-
Err(_) => {
30-
// Cluster is gone.
31-
self.error_response(
32-
context,
33-
ErrorResponse::connection(&identifier.user, &identifier.database),
34-
)
35-
.await?;
36-
37-
return Ok(false);
38-
}
39-
}
40-
} else {
41-
cluster
42-
}
53+
cluster
4354
} else {
4455
return Ok(true);
4556
};
@@ -51,6 +62,7 @@ impl QueryEngine {
5162
context.params,
5263
context.transaction,
5364
context.omni_sticky_index,
65+
context.ast.as_ref(),
5466
)?;
5567
match self.router.query(router_context) {
5668
Ok(cmd) => {

pgdog/src/frontend/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ pub enum Error {
5050

5151
#[error("unique id: {0}")]
5252
UniqueId(#[from] unique_id::Error),
53+
54+
#[error("rewrite: {0}")]
55+
Rewrite(#[from] crate::frontend::router::rewrite::Error),
5356
}
5457

5558
impl Error {

pgdog/src/frontend/prepared_statements/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tracing::debug;
99

1010
use crate::{
1111
config::{config, PreparedStatements as PreparedStatementsLevel},
12-
frontend::router::parser::RewritePlan,
12+
frontend::router::parser::{cache::CachedAst, RewritePlan},
1313
net::{Parse, ProtocolMessage},
1414
stats::memory::MemoryUsage,
1515
};
@@ -31,6 +31,7 @@ pub struct PreparedStatements {
3131
pub(super) local: HashMap<String, String>,
3232
pub(super) level: PreparedStatementsLevel,
3333
pub(super) memory_used: usize,
34+
pub(super) rewrite: HashMap<String, CachedAst>,
3435
}
3536

3637
impl MemoryUsage for PreparedStatements {
@@ -49,6 +50,7 @@ impl Default for PreparedStatements {
4950
local: HashMap::default(),
5051
level: PreparedStatementsLevel::Extended,
5152
memory_used: 0,
53+
rewrite: HashMap::new(),
5254
}
5355
}
5456
}
@@ -97,15 +99,15 @@ impl PreparedStatements {
9799
parse.rename_fast(&name)
98100
}
99101

100-
/// Store a rewritten statement in the global cache forever.
101-
pub fn cache_rewritten(parse: &Parse) -> String {
102-
let exists = Self::global().read().name(parse);
103-
if let Some(exists) = exists {
104-
exists
105-
} else {
106-
let (_, name) = Self::global().write().insert(parse);
107-
name
108-
}
102+
/// Get original AST for a prepared statement
103+
/// we have rewritten.
104+
pub fn get_original_ast(&self, name: &str) -> Option<&CachedAst> {
105+
self.rewrite.get(name)
106+
}
107+
108+
/// Save original AST for re-use by subsequent Bind messages.
109+
pub fn save_original_ast(&mut self, name: &str, ast: &CachedAst) {
110+
self.rewrite.insert(name.to_string(), ast.clone());
109111
}
110112

111113
/// Retrieve stored rewrite plan for a prepared statement, if any.

0 commit comments

Comments
 (0)