Skip to content

Commit 8b700be

Browse files
committed
reuse the existing opened database connection for the current query in sqlpage.run_sql instead of opening a new one
fixes #338
1 parent 3ed80cf commit 8b700be

File tree

8 files changed

+84
-52
lines changed

8 files changed

+84
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- new `tooltip` property in the button component.
77
- New `search_value` property in the shell component.
88
- Fixed a display issue in the hero component when the button text is long and the viewport is narrow.
9+
- reuse the existing opened database connection for the current query in `sqlpage.run_sql` instead of opening a new one. This makes it possible to create a temporary table in a file, and reuse it in an included script, create a SQL transaction that spans over multiple run_sql calls, and should generally make run_sql more performant.
910

1011
## 0.22.0 (2024-05-29)
1112
- **Important Security Fix:** The behavior of `SET $x` has been modified to match `SELECT $x`.

src/webserver/database/execute_queries.rs

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use super::syntax_tree::{extract_req_param, StmtParam};
1717
use super::{highlight_sql_error, Database, DbItem};
1818
use sqlx::any::{AnyArguments, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo};
1919
use sqlx::pool::PoolConnection;
20-
use sqlx::{Any, AnyConnection, Arguments, Either, Executor, Statement};
20+
use sqlx::{Any, Arguments, Either, Executor, Statement};
21+
22+
pub type DbConn = Option<PoolConnection<sqlx::Any>>;
2123

2224
impl Database {
2325
pub(crate) async fn prepare_with(
@@ -32,23 +34,23 @@ impl Database {
3234
.map_err(|e| highlight_sql_error("Failed to prepare SQL statement", query, e))
3335
}
3436
}
35-
pub fn stream_query_results<'a>(
36-
db: &'a Database,
37+
38+
pub fn stream_query_results_with_conn<'a>(
3739
sql_file: &'a ParsedSqlFile,
3840
request: &'a mut RequestInfo,
41+
db_connection: &'a mut DbConn,
3942
) -> impl Stream<Item = DbItem> + 'a {
4043
async_stream::try_stream! {
41-
let mut connection_opt = None;
4244
for res in &sql_file.statements {
4345
match res {
4446
ParsedStatement::CsvImport(csv_import) => {
45-
let connection = take_connection(db, &mut connection_opt).await?;
47+
let connection = take_connection(&request.app_state.db, db_connection).await?;
4648
log::debug!("Executing CSV import: {:?}", csv_import);
4749
run_csv_import(connection, csv_import, request).await?;
4850
},
4951
ParsedStatement::StmtWithParams(stmt) => {
50-
let query = bind_parameters(stmt, request).await?;
51-
let connection = take_connection(db, &mut connection_opt).await?;
52+
let query = bind_parameters(stmt, request, db_connection).await?;
53+
let connection = take_connection(&request.app_state.db, db_connection).await?;
5254
log::trace!("Executing query {:?}", query.sql);
5355
let mut stream = connection.fetch_many(query);
5456
while let Some(elem) = stream.next().await {
@@ -62,13 +64,13 @@ pub fn stream_query_results<'a>(
6264
}
6365
},
6466
ParsedStatement::SetVariable { variable, value} => {
65-
execute_set_variable_query(db, &mut connection_opt, request, variable, value).await
67+
execute_set_variable_query(db_connection, request, variable, value).await
6668
.with_context(||
6769
format!("Failed to set the {variable} variable to {value:?}")
6870
)?;
6971
},
7072
ParsedStatement::StaticSimpleSelect(value) => {
71-
for i in parse_dynamic_rows(DbItem::Row(exec_static_simple_select(value, request).await?)) {
73+
for i in parse_dynamic_rows(DbItem::Row(exec_static_simple_select(value, request, db_connection).await?)) {
7274
yield i;
7375
}
7476
}
@@ -83,12 +85,15 @@ pub fn stream_query_results<'a>(
8385
async fn exec_static_simple_select(
8486
columns: &[(String, SimpleSelectValue)],
8587
req: &RequestInfo,
88+
db_connection: &mut DbConn,
8689
) -> anyhow::Result<serde_json::Value> {
8790
let mut map = serde_json::Map::with_capacity(columns.len());
8891
for (name, value) in columns {
8992
let value = match value {
9093
SimpleSelectValue::Static(s) => s.clone(),
91-
SimpleSelectValue::Dynamic(p) => extract_req_param_as_json(p, req).await?,
94+
SimpleSelectValue::Dynamic(p) => {
95+
extract_req_param_as_json(p, req, db_connection).await?
96+
}
9297
};
9398
map = add_value_to_map(map, (name.clone(), value));
9499
}
@@ -100,8 +105,9 @@ async fn exec_static_simple_select(
100105
async fn extract_req_param_as_json(
101106
param: &StmtParam,
102107
request: &RequestInfo,
108+
db_connection: &mut DbConn,
103109
) -> anyhow::Result<serde_json::Value> {
104-
if let Some(val) = extract_req_param(param, request).await? {
110+
if let Some(val) = extract_req_param(param, request, db_connection).await? {
105111
Ok(serde_json::Value::String(val.into_owned()))
106112
} else {
107113
Ok(serde_json::Value::Null)
@@ -111,22 +117,25 @@ async fn extract_req_param_as_json(
111117
/// This function is used to create a pinned boxed stream of query results.
112118
/// This allows recursive calls.
113119
pub fn stream_query_results_boxed<'a>(
114-
db: &'a Database,
115120
sql_file: &'a ParsedSqlFile,
116121
request: &'a mut RequestInfo,
122+
db_connection: &'a mut DbConn,
117123
) -> Pin<Box<dyn Stream<Item = DbItem> + 'a>> {
118-
Box::pin(stream_query_results(db, sql_file, request))
124+
Box::pin(stream_query_results_with_conn(
125+
sql_file,
126+
request,
127+
db_connection,
128+
))
119129
}
120130

121131
async fn execute_set_variable_query<'a>(
122-
db: &'a Database,
123-
connection_opt: &mut Option<PoolConnection<sqlx::Any>>,
132+
db_connection: &'a mut DbConn,
124133
request: &'a mut RequestInfo,
125134
variable: &StmtParam,
126135
statement: &StmtWithParams,
127136
) -> anyhow::Result<()> {
128-
let query = bind_parameters(statement, request).await?;
129-
let connection = take_connection(db, connection_opt).await?;
137+
let query = bind_parameters(statement, request, db_connection).await?;
138+
let connection = take_connection(&request.app_state.db, db_connection).await?;
130139
log::debug!(
131140
"Executing query to set the {variable:?} variable: {:?}",
132141
query.sql
@@ -169,21 +178,21 @@ fn vars_and_name<'a, 'b>(
169178

170179
async fn take_connection<'a, 'b>(
171180
db: &'a Database,
172-
conn: &'b mut Option<PoolConnection<sqlx::Any>>,
173-
) -> anyhow::Result<&'b mut AnyConnection> {
174-
match conn {
175-
Some(c) => Ok(c),
176-
None => match db.connection.acquire().await {
177-
Ok(c) => {
178-
log::debug!("Acquired a database connection");
179-
*conn = Some(c);
180-
Ok(conn.as_mut().unwrap())
181-
}
182-
Err(e) => {
183-
let err_msg = format!("Unable to acquire a database connection to execute the SQL file. All of the {} {:?} connections are busy.", db.connection.size(), db.connection.any_kind());
184-
Err(anyhow::Error::new(e).context(err_msg))
185-
}
186-
},
181+
conn: &'b mut DbConn,
182+
) -> anyhow::Result<&'b mut PoolConnection<sqlx::Any>> {
183+
if let Some(c) = conn {
184+
return Ok(c);
185+
}
186+
match db.connection.acquire().await {
187+
Ok(c) => {
188+
log::debug!("Acquired a database connection");
189+
*conn = Some(c);
190+
Ok(conn.as_mut().unwrap())
191+
}
192+
Err(e) => {
193+
let err_msg = format!("Unable to acquire a database connection to execute the SQL file. All of the {} {:?} connections are busy.", db.connection.size(), db.connection.any_kind());
194+
Err(anyhow::Error::new(e).context(err_msg))
195+
}
187196
}
188197
}
189198

@@ -211,16 +220,17 @@ fn clone_anyhow_err(err: &anyhow::Error) -> anyhow::Error {
211220
e
212221
}
213222

214-
async fn bind_parameters<'a>(
223+
async fn bind_parameters<'a, 'b>(
215224
stmt: &'a StmtWithParams,
216225
request: &'a RequestInfo,
226+
db_connection: &'b mut DbConn,
217227
) -> anyhow::Result<StatementWithParams<'a>> {
218228
let sql = stmt.query.as_str();
219229
log::debug!("Preparing statement: {}", sql);
220230
let mut arguments = AnyArguments::default();
221231
for (param_idx, param) in stmt.params.iter().enumerate() {
222232
log::trace!("\tevaluating parameter {}: {}", param_idx + 1, param);
223-
let argument = extract_req_param(param, request).await?;
233+
let argument = extract_req_param(param, request, db_connection).await?;
224234
log::debug!(
225235
"\tparameter {}: {}",
226236
param_idx + 1,

src/webserver/database/sqlpage_functions/function_definition_macro.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
/// Defines all sqlpage functions
22
#[macro_export]
33
macro_rules! sqlpage_functions {
4-
($($func_name:ident($(($request:ty)$(,)?)? $($param_name:ident : $param_type:ty),*);)*) => {
4+
($($func_name:ident(
5+
$(($request:ty $(, $db_conn:ty)?))?
6+
$(,)?
7+
$($param_name:ident : $param_type:ty),*
8+
);
9+
)*) => {
510
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
611
pub enum SqlPageFunctionName {
712
$( #[allow(non_camel_case_types)] $func_name ),*
@@ -47,10 +52,11 @@ macro_rules! sqlpage_functions {
4752
}
4853
}
4954
impl SqlPageFunctionName {
50-
pub(crate) async fn evaluate<'a>(
55+
pub(crate) async fn evaluate<'a, 'b>(
5156
&self,
5257
#[allow(unused_variables)]
5358
request: &'a RequestInfo,
59+
db_connection: &'b mut Option<sqlx::pool::PoolConnection<sqlx::Any>>,
5460
params: Vec<Option<Cow<'a, str>>>
5561
) -> anyhow::Result<Option<Cow<'a, str>>> {
5662
use $crate::webserver::database::sqlpage_functions::function_traits::*;
@@ -66,7 +72,10 @@ macro_rules! sqlpage_functions {
6672
anyhow::bail!("Too many arguments. Remove extra argument {}", as_sql(extraneous_param));
6773
}
6874
let result = $func_name(
69-
$(<$request>::from(request),)*
75+
$(
76+
<$request>::from(request),
77+
$(<$db_conn>::from(db_connection),)*
78+
)*
7079
$($param_name.into()),*
7180
).await;
7281
result.into_cow_result()

src/webserver/database/sqlpage_functions/functions.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::RequestInfo;
2-
use crate::webserver::{http::SingleOrVec, ErrorWithStatus};
2+
use crate::webserver::{database::execute_queries::DbConn, http::SingleOrVec, ErrorWithStatus};
33
use anyhow::{anyhow, Context};
44
use futures_util::StreamExt;
55
use std::{borrow::Cow, ffi::OsStr, str::FromStr};
@@ -27,7 +27,7 @@ super::function_definition_macro::sqlpage_functions! {
2727
read_file_as_data_url((&RequestInfo), file_path: Option<Cow<str>>);
2828
read_file_as_text((&RequestInfo), file_path: Option<Cow<str>>);
2929
request_method((&RequestInfo));
30-
run_sql((&RequestInfo), sql_file_path: Option<Cow<str>>);
30+
run_sql((&RequestInfo, &mut DbConn), sql_file_path: Option<Cow<str>>);
3131

3232
uploaded_file_mime_type((&RequestInfo), upload_name: Cow<str>);
3333
uploaded_file_path((&RequestInfo), upload_name: Cow<str>);
@@ -347,6 +347,7 @@ async fn request_method(request: &RequestInfo) -> String {
347347

348348
async fn run_sql<'a>(
349349
request: &'a RequestInfo,
350+
db_connection: &mut DbConn,
350351
sql_file_path: Option<Cow<'a, str>>,
351352
) -> anyhow::Result<Option<Cow<'a, str>>> {
352353
use serde::ser::{SerializeSeq, Serializer};
@@ -373,9 +374,9 @@ async fn run_sql<'a>(
373374
}
374375
let mut results_stream =
375376
crate::webserver::database::execute_queries::stream_query_results_boxed(
376-
&request.app_state.db,
377377
&sql_file,
378378
&mut tmp_req,
379+
db_connection,
379380
);
380381
let mut json_results_bytes = Vec::new();
381382
let mut json_encoder = serde_json::Serializer::new(&mut json_results_bytes);

src/webserver/database/syntax_tree.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::webserver::database::sql::function_arg_to_stmt_param;
2020
use crate::webserver::http::SingleOrVec;
2121
use crate::webserver::http_request_info::RequestInfo;
2222

23-
use super::sqlpage_functions::functions::SqlPageFunctionName;
23+
use super::{execute_queries::DbConn, sqlpage_functions::functions::SqlPageFunctionName};
2424
use anyhow::{anyhow, Context as _};
2525

2626
/// Represents a parameter to a SQL statement.
@@ -100,13 +100,16 @@ impl SqlPageFunctionCall {
100100
})
101101
}
102102

103-
pub async fn evaluate<'a>(
103+
pub async fn evaluate<'a, 'b>(
104104
&self,
105105
request: &'a RequestInfo,
106+
db_connection: &'b mut DbConn,
106107
) -> anyhow::Result<Option<Cow<'a, str>>> {
107-
let evaluated_args = self.arguments.iter().map(|x| extract_req_param(x, request));
108-
let evaluated_args = futures_util::future::try_join_all(evaluated_args).await?;
109-
self.function.evaluate(request, evaluated_args).await
108+
let mut params = Vec::with_capacity(self.arguments.len());
109+
for param in &self.arguments {
110+
params.push(Box::pin(extract_req_param(param, request, db_connection)).await?);
111+
}
112+
self.function.evaluate(request, db_connection, params).await
110113
}
111114
}
112115

@@ -127,9 +130,10 @@ impl std::fmt::Display for SqlPageFunctionCall {
127130

128131
/// Extracts the value of a parameter from the request.
129132
/// Returns `Ok(None)` when NULL should be used as the parameter value.
130-
pub(super) async fn extract_req_param<'a>(
133+
pub(super) async fn extract_req_param<'a, 'b>(
131134
param: &StmtParam,
132135
request: &'a RequestInfo,
136+
db_connection: &'b mut DbConn,
133137
) -> anyhow::Result<Option<Cow<'a, str>>> {
134138
Ok(match param {
135139
// sync functions
@@ -145,8 +149,8 @@ pub(super) async fn extract_req_param<'a>(
145149
StmtParam::Error(x) => anyhow::bail!("{}", x),
146150
StmtParam::Literal(x) => Some(Cow::Owned(x.to_string())),
147151
StmtParam::Null => None,
148-
StmtParam::Concat(args) => concat_params(&args[..], request).await?,
149-
StmtParam::FunctionCall(func) => func.evaluate(request).await.with_context(|| {
152+
StmtParam::Concat(args) => concat_params(&args[..], request, db_connection).await?,
153+
StmtParam::FunctionCall(func) => func.evaluate(request, db_connection).await.with_context(|| {
150154
format!(
151155
"Error in function call {func}.\nExpected {:#}",
152156
func.function
@@ -155,13 +159,14 @@ pub(super) async fn extract_req_param<'a>(
155159
})
156160
}
157161

158-
async fn concat_params<'a>(
162+
async fn concat_params<'a, 'b>(
159163
args: &[StmtParam],
160164
request: &'a RequestInfo,
165+
db_connection: &'b mut DbConn,
161166
) -> anyhow::Result<Option<Cow<'a, str>>> {
162167
let mut result = String::new();
163168
for arg in args {
164-
let Some(arg) = Box::pin(extract_req_param(arg, request)).await? else {
169+
let Some(arg) = Box::pin(extract_req_param(arg, request, db_connection)).await? else {
165170
return Ok(None);
166171
};
167172
result.push_str(&arg);

src/webserver/http.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::render::{HeaderContext, PageContext, RenderContext};
2-
use crate::webserver::database::{execute_queries::stream_query_results, DbItem};
2+
use crate::webserver::database::{execute_queries::stream_query_results_with_conn, DbItem};
33
use crate::webserver::http_request_info::extract_request_info;
44
use crate::webserver::ErrorWithStatus;
55
use crate::{app_config, AppConfig, AppState, ParsedSqlFile};
@@ -229,8 +229,9 @@ async fn render_sql(
229229
let layout_context = &LayoutContext {
230230
is_embedded: req_param.get_variables.contains_key("_sqlpage_embed"),
231231
};
232+
let mut conn = None;
232233
let database_entries_stream =
233-
stream_query_results(&app_state.db, &sql_file, &mut req_param);
234+
stream_query_results_with_conn(&sql_file, &mut req_param, &mut conn);
234235
let response_with_writer = build_response_header_and_stream(
235236
Arc::clone(&app_state),
236237
database_entries_stream,

tests/select_temp_t.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- see tests/sql_test_files/it_works_temp_table_accessible_in_run_sql.sql
2+
select 'text' as component, x as contents from temp_t;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
create temporary table temp_t(x text);
2+
insert into temp_t(x) values ('It works !');
3+
select 'dynamic' as component, sqlpage.run_sql('tests/select_temp_t.sql') AS properties;

0 commit comments

Comments
 (0)