Skip to content

Commit 57f5a71

Browse files
committed
implement bulk csv import
1 parent b83d68e commit 57f5a71

File tree

8 files changed

+330
-1
lines changed

8 files changed

+330
-1
lines changed

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,36 @@ insert into files (content) values (sqlpage.read_file_as_data_url(sqlpage.upload
3030
returning 'text' as component, 'Uploaded new file with id: ' || id as contents;
3131
```
3232

33+
#### Parsing CSV files
34+
35+
SQLPage can also parse uploaded CSV files and insert them directly into a database table.
36+
SQLPage re-uses PostgreSQL's `COPY` statement to import the CSV file into the database, but makes it work with any database, by emulating the same behavior with simple `INSERT` statements.
37+
38+
`user_file_upload.sql` :
39+
```sql
40+
select 'form' as component, 'bulk_user_import.sql' as action;
41+
select 'user_file' as name, 'file' as type, 'text/csv' as accept;
42+
```
43+
44+
`bulk_user_import.sql` :
45+
```sql
46+
-- create a temporary table to preprocess the data
47+
create temporary table if not exists csv_import(name text, age text);
48+
delete from csv_import; -- empty the table
49+
-- If you don't have any preprocessing to do, you can skip the temporary table and use the target table directly
50+
51+
copy csv_import(name, age) from 'user_file'
52+
with (header true, delimiter ',', quote '"', null 'NaN'); -- all the options are optional
53+
-- since header is true, the first line of the file will be used to find the "name" and "age" columns
54+
-- if you don't have a header line, the first column in the CSV will be interpreted as the first column of the table, etc
55+
56+
-- run any preprocessing you want on the data here
57+
58+
-- insert the data into the users table
59+
insert into users (name, email)
60+
select upper(name), cast(email as int) from csv_import;
61+
```
62+
3363
#### New functions
3464

3565
##### Handle uploaded files

Cargo.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ actix-multipart = "0.6.1"
4949
base64 = "0.21.5"
5050
rustls-acme = "0.7.7"
5151
dotenvy = "0.15.7"
52+
csv-async = { version = "1.2.6", features = ["tokio"] }
5253

5354
[build-dependencies]
5455
awc = { version = "3", features = ["rustls"] }

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,4 @@ fn init_logging() {
6363
),
6464
Err(e) => log::error!("Error loading .env file: {}", e),
6565
}
66-
}
66+
}

src/webserver/database/csv_import.rs

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
use std::collections::HashMap;
2+
3+
use anyhow::Context;
4+
use futures_util::StreamExt;
5+
use sqlparser::ast::{
6+
CopyLegacyCsvOption, CopyLegacyOption, CopyOption, CopySource, CopyTarget, Statement,
7+
};
8+
use sqlx::{any::AnyArguments, AnyConnection, Arguments, Executor};
9+
10+
use crate::webserver::http_request_info::RequestInfo;
11+
12+
use super::make_placeholder;
13+
14+
#[derive(Debug, PartialEq)]
15+
pub(super) struct CsvImport {
16+
/// Used only in postgres
17+
pub query: String,
18+
pub table_name: String,
19+
pub columns: Vec<String>,
20+
pub delimiter: Option<char>,
21+
pub quote: Option<char>,
22+
// If true, the first line of the CSV file will be interpreted as a header
23+
// If false, then the column order will be determined by the order of the columns in the table
24+
pub header: Option<bool>,
25+
// A string that will be interpreted as null
26+
pub null_str: Option<String>,
27+
pub escape: Option<char>,
28+
/// Reference the the uploaded file name
29+
pub uploaded_file: String,
30+
}
31+
32+
enum CopyCsvOption<'a> {
33+
Legacy(&'a sqlparser::ast::CopyLegacyOption),
34+
CopyLegacyCsvOption(&'a sqlparser::ast::CopyLegacyCsvOption),
35+
New(&'a sqlparser::ast::CopyOption),
36+
}
37+
38+
impl<'a> CopyCsvOption<'a> {
39+
fn delimiter(&self) -> Option<char> {
40+
match self {
41+
CopyCsvOption::Legacy(CopyLegacyOption::Delimiter(c)) => Some(*c),
42+
CopyCsvOption::New(CopyOption::Delimiter(c)) => Some(*c),
43+
_ => None,
44+
}
45+
}
46+
47+
fn quote(&self) -> Option<char> {
48+
match self {
49+
CopyCsvOption::CopyLegacyCsvOption(CopyLegacyCsvOption::Quote(c)) => Some(*c),
50+
CopyCsvOption::New(CopyOption::Quote(c)) => Some(*c),
51+
_ => None,
52+
}
53+
}
54+
55+
fn header(&self) -> Option<bool> {
56+
match self {
57+
CopyCsvOption::CopyLegacyCsvOption(CopyLegacyCsvOption::Header) => Some(true),
58+
CopyCsvOption::New(CopyOption::Header(b)) => Some(*b),
59+
_ => None,
60+
}
61+
}
62+
63+
fn null(&self) -> Option<String> {
64+
match self {
65+
CopyCsvOption::New(CopyOption::Null(s)) => Some(s.clone()),
66+
_ => None,
67+
}
68+
}
69+
70+
fn escape(&self) -> Option<char> {
71+
match self {
72+
CopyCsvOption::New(CopyOption::Escape(c)) => Some(*c),
73+
CopyCsvOption::CopyLegacyCsvOption(CopyLegacyCsvOption::Escape(c)) => Some(*c),
74+
_ => None,
75+
}
76+
}
77+
}
78+
79+
pub fn extract_csv_import(stmt: &mut Statement) -> Option<CsvImport> {
80+
if let Statement::Copy {
81+
source: CopySource::Table {
82+
table_name,
83+
columns,
84+
},
85+
to: false,
86+
target: source,
87+
options,
88+
legacy_options,
89+
values,
90+
} = stmt
91+
{
92+
if !values.is_empty() {
93+
log::warn!("COPY ... VALUES not compatible with SQLPage: {stmt}");
94+
return None;
95+
}
96+
let uploaded_file = match std::mem::replace(source, CopyTarget::Stdin) {
97+
CopyTarget::File { filename } => filename,
98+
other => {
99+
log::warn!("COPY from {other} not compatible with SQLPage: {stmt}");
100+
return None;
101+
}
102+
};
103+
104+
let all_options: Vec<CopyCsvOption> = legacy_options
105+
.iter()
106+
.flat_map(|o| match o {
107+
CopyLegacyOption::Csv(o) => {
108+
o.iter().map(CopyCsvOption::CopyLegacyCsvOption).collect()
109+
}
110+
o => vec![CopyCsvOption::Legacy(o)],
111+
})
112+
.chain(options.iter().map(CopyCsvOption::New))
113+
.collect();
114+
115+
let table_name = table_name.to_string();
116+
let columns = columns.iter().map(|ident| ident.value.clone()).collect();
117+
let delimiter = all_options.iter().find_map(CopyCsvOption::delimiter);
118+
let quote = all_options.iter().find_map(CopyCsvOption::quote);
119+
let header = all_options.iter().find_map(CopyCsvOption::header);
120+
let null = all_options.iter().find_map(CopyCsvOption::null);
121+
let escape = all_options.iter().find_map(CopyCsvOption::escape);
122+
let query = stmt.to_string();
123+
124+
Some(CsvImport {
125+
query,
126+
table_name,
127+
columns,
128+
delimiter,
129+
quote,
130+
header,
131+
null_str: null,
132+
escape,
133+
uploaded_file,
134+
})
135+
} else {
136+
log::warn!("COPY statement not compatible with SQLPage: {stmt}");
137+
None
138+
}
139+
}
140+
141+
pub(super) async fn run_csv_import(
142+
db: &mut AnyConnection,
143+
csv_import: &CsvImport,
144+
request: &RequestInfo,
145+
) -> anyhow::Result<()> {
146+
let file_path = request
147+
.uploaded_files
148+
.get(&csv_import.uploaded_file)
149+
.ok_or_else(|| anyhow::anyhow!("File not found"))?
150+
.file
151+
.path();
152+
let file = tokio::fs::File::open(file_path)
153+
.await
154+
.with_context(|| "opening csv")?;
155+
let insert_stmt = create_insert_stmt(db, csv_import);
156+
log::debug!("CSV data insert statement: {insert_stmt}");
157+
let mut reader = make_csv_reader(csv_import, file);
158+
let col_idxs = compute_column_indices(&mut reader, csv_import).await?;
159+
let mut records = reader.into_records();
160+
while let Some(record) = records.next().await {
161+
let r = record.with_context(|| "reading csv record")?;
162+
process_csv_record(r, db, &insert_stmt, csv_import, &col_idxs).await?;
163+
}
164+
Ok(())
165+
}
166+
167+
async fn compute_column_indices(
168+
reader: &mut csv_async::AsyncReader<tokio::fs::File>,
169+
csv_import: &CsvImport,
170+
) -> anyhow::Result<Vec<usize>> {
171+
let mut col_idxs = Vec::with_capacity(csv_import.columns.len());
172+
if csv_import.header.unwrap_or(true) {
173+
let headers = reader
174+
.headers()
175+
.await?
176+
.iter()
177+
.enumerate()
178+
.map(|(i, h)| (h, i))
179+
.collect::<HashMap<&str, usize>>();
180+
for column in &csv_import.columns {
181+
let &idx = headers
182+
.get(column.as_str())
183+
.ok_or_else(|| anyhow::anyhow!("CSV Column not found: {column}"))?;
184+
col_idxs.push(idx);
185+
}
186+
} else {
187+
col_idxs.extend(0..csv_import.columns.len());
188+
}
189+
Ok(col_idxs)
190+
}
191+
192+
fn create_insert_stmt(db: &mut AnyConnection, csv_import: &CsvImport) -> String {
193+
let kind = db.kind();
194+
let columns = csv_import.columns.join(", ");
195+
let placeholders = csv_import
196+
.columns
197+
.iter()
198+
.enumerate()
199+
.map(|(i, _)| make_placeholder(kind, i))
200+
.fold(String::new(), |mut acc, f| {
201+
acc.push_str(", ");
202+
acc.push_str(&f);
203+
acc
204+
});
205+
let table_name = &csv_import.table_name;
206+
format!("INSERT INTO {table_name} ({columns}) VALUES ({placeholders})")
207+
}
208+
209+
async fn process_csv_record(
210+
record: csv_async::StringRecord,
211+
db: &mut AnyConnection,
212+
insert_stmt: &str,
213+
csv_import: &CsvImport,
214+
column_indices: &[usize],
215+
) -> anyhow::Result<()> {
216+
let mut arguments = AnyArguments::default();
217+
let null_str = csv_import.null_str.as_deref().unwrap_or_default();
218+
for (&i, column) in column_indices.iter().zip(csv_import.columns.iter()) {
219+
let value = record.get(i).unwrap_or_default();
220+
let value = if value == null_str { None } else { Some(value) };
221+
log::trace!("CSV value: {column}={value:?}");
222+
arguments.add(value);
223+
}
224+
db.execute((insert_stmt, Some(arguments))).await?;
225+
Ok(())
226+
}
227+
228+
fn make_csv_reader(
229+
csv_import: &CsvImport,
230+
file: tokio::fs::File,
231+
) -> csv_async::AsyncReader<tokio::fs::File> {
232+
let delimiter = csv_import
233+
.delimiter
234+
.and_then(|c| u8::try_from(c).ok())
235+
.unwrap_or(b',');
236+
let quote = csv_import
237+
.quote
238+
.and_then(|c| u8::try_from(c).ok())
239+
.unwrap_or(b'"');
240+
let has_headers = csv_import.header.unwrap_or(true);
241+
let escape = csv_import.escape.and_then(|c| u8::try_from(c).ok());
242+
csv_async::AsyncReaderBuilder::new()
243+
.delimiter(delimiter)
244+
.quote(quote)
245+
.has_headers(has_headers)
246+
.escape(escape)
247+
.create_reader(file)
248+
}

src/webserver/database/execute_queries.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures_util::StreamExt;
44
use std::borrow::Cow;
55
use std::collections::HashMap;
66

7+
use super::csv_import::run_csv_import;
78
use super::sql::{ParsedSqlFile, ParsedStatement, StmtWithParams};
89
use crate::webserver::database::sql_pseudofunctions::extract_req_param;
910
use crate::webserver::database::sql_to_json::row_to_string;
@@ -40,6 +41,10 @@ pub fn stream_query_results<'a>(
4041
let mut connection_opt = None;
4142
for res in &sql_file.statements {
4243
match res {
44+
ParsedStatement::CsvImport(csv_import) => {
45+
let connection = take_connection(db, &mut connection_opt).await?;
46+
run_csv_import(connection, csv_import, request).await?;
47+
},
4348
ParsedStatement::StmtWithParams(stmt) => {
4449
let query = bind_parameters(stmt, request).await?;
4550
let connection = take_connection(db, &mut connection_opt).await?;

src/webserver/database/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod connect;
2+
mod csv_import;
23
pub mod execute_queries;
34
pub mod migrations;
45
mod sql;

0 commit comments

Comments
 (0)