@@ -7,10 +7,13 @@ use anyhow::Context as _;
77use bottomless:: replicator:: Options ;
88use bytes:: Bytes ;
99use enclose:: enclose;
10+ use fallible_iterator:: FallibleIterator ;
1011use futures:: Stream ;
1112use libsql_sys:: EncryptionConfig ;
1213use rusqlite:: hooks:: { AuthAction , AuthContext , Authorization } ;
13- use tokio:: io:: AsyncBufReadExt as _;
14+ use sqlite3_parser:: ast:: { Cmd , Stmt } ;
15+ use sqlite3_parser:: lexer:: sql:: { Parser , ParserError } ;
16+ use tokio:: io:: AsyncReadExt ;
1417use tokio:: task:: JoinSet ;
1518use tokio_util:: io:: StreamReader ;
1619
@@ -33,9 +36,6 @@ use crate::{StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT
3336
3437use super :: { BaseNamespaceConfig , PrimaryConfig } ;
3538
36- const WASM_TABLE_CREATE : & str =
37- "CREATE TABLE libsql_wasm_func_table (name text PRIMARY KEY, body text) WITHOUT ROWID;" ;
38-
3939#[ tracing:: instrument( skip_all) ]
4040pub ( super ) async fn make_primary_connection_maker (
4141 primary_config : & PrimaryConfig ,
@@ -295,84 +295,89 @@ where
295295 S : Stream < Item = std:: io:: Result < Bytes > > + Unpin ,
296296{
297297 let mut reader = tokio:: io:: BufReader :: new ( StreamReader :: new ( dump) ) ;
298- let mut curr = String :: new ( ) ;
299- let mut line = String :: new ( ) ;
298+ let mut dump_content = String :: new ( ) ;
299+ reader
300+ . read_to_string ( & mut dump_content)
301+ . await
302+ . map_err ( |e| LoadDumpError :: Internal ( format ! ( "Failed to read dump content: {}" , e) ) ) ?;
303+
304+ if dump_content. to_lowercase ( ) . contains ( "attach" ) {
305+ return Err ( LoadDumpError :: InvalidSqlInput (
306+ "attach statements are not allowed in dumps" . to_string ( ) ,
307+ ) ) ;
308+ }
309+
310+ let mut parser = Box :: new ( Parser :: new ( dump_content. as_bytes ( ) ) ) ;
300311 let mut skipped_wasm_table = false ;
301312 let mut n_stmt = 0 ;
302- let mut line_id = 0 ;
303313
304- while let Ok ( n) = reader. read_line ( & mut curr) . await {
305- line_id += 1 ;
306- if n == 0 {
307- break ;
308- }
309- let trimmed = curr. trim ( ) ;
310- if trimmed. is_empty ( ) || trimmed. starts_with ( "--" ) {
311- curr. clear ( ) ;
312- continue ;
313- }
314- // FIXME: it's well known bug that comment ending with semicolon will be handled incorrectly by currend dump processing code
315- let statement_end = trimmed. ends_with ( ';' ) ;
316-
317- // we want to concat original(non-trimmed) lines as trimming will join all them in one
318- // single-line statement which is incorrect if comments in the end are present
319- line. push_str ( & curr) ;
320- curr. clear ( ) ;
321-
322- // This is a hack to ignore the libsql_wasm_func_table table because it is already created
323- // by the system.
324- if !skipped_wasm_table && line. trim ( ) == WASM_TABLE_CREATE {
325- skipped_wasm_table = true ;
326- line. clear ( ) ;
327- continue ;
328- }
314+ loop {
315+ match parser. next ( ) {
316+ Ok ( Some ( cmd) ) => {
317+ n_stmt += 1 ;
318+
319+ if !skipped_wasm_table {
320+ if let Cmd :: Stmt ( Stmt :: CreateTable { tbl_name, .. } ) = & cmd {
321+ if tbl_name. name . 0 == "libsql_wasm_func_table" {
322+ skipped_wasm_table = true ;
323+ tracing:: debug!( "Skipping WASM table creation" ) ;
324+ continue ;
325+ }
326+ }
327+ }
328+
329+ if n_stmt > 2 && conn. is_autocommit ( ) . await . unwrap ( ) {
330+ return Err ( LoadDumpError :: NoTxn ) ;
331+ }
329332
330- if statement_end {
331- n_stmt += 1 ;
332- // dump must be performd within a txn
333- if n_stmt > 2 && conn. is_autocommit ( ) . await . unwrap ( ) {
334- return Err ( LoadDumpError :: NoTxn ) ;
333+ let stmt_sql = cmd. to_string ( ) ;
334+ tokio:: task:: spawn_blocking ( {
335+ let conn = conn. clone ( ) ;
336+ move || -> crate :: Result < ( ) , LoadDumpError > {
337+ conn. with_raw ( |conn| {
338+ conn. authorizer ( Some ( |auth : AuthContext < ' _ > | match auth. action {
339+ AuthAction :: Attach { filename : _ } => Authorization :: Deny ,
340+ _ => Authorization :: Allow ,
341+ } ) ) ;
342+ conn. execute ( & stmt_sql, ( ) )
343+ } )
344+ . map_err ( |e| match e {
345+ rusqlite:: Error :: SqlInputError {
346+ msg, sql, offset, ..
347+ } => LoadDumpError :: InvalidSqlInput ( format ! (
348+ "msg: {}, sql: {}, offset: {}" ,
349+ msg, sql, offset
350+ ) ) ,
351+ e => LoadDumpError :: Internal ( format ! (
352+ "statement: {}, error: {}" ,
353+ n_stmt, e
354+ ) ) ,
355+ } ) ?;
356+ Ok ( ( ) )
357+ }
358+ } )
359+ . await ??;
335360 }
361+ Ok ( None ) => break ,
362+ Err ( e) => {
363+ let error_msg = match e {
364+ sqlite3_parser:: lexer:: sql:: Error :: ParserError (
365+ ParserError :: SyntaxError { token_type, found } ,
366+ Some ( ( line, col) ) ,
367+ ) => {
368+ let near_token = found. as_deref ( ) . unwrap_or ( & token_type) ;
369+ format ! (
370+ "syntax error near '{}' at line {}, column {}" ,
371+ near_token, line, col
372+ )
373+ }
374+ _ => format ! ( "parse error: {}" , e) ,
375+ } ;
336376
337- line = tokio:: task:: spawn_blocking ( {
338- let conn = conn. clone ( ) ;
339- move || -> crate :: Result < String , LoadDumpError > {
340- conn. with_raw ( |conn| {
341- conn. authorizer ( Some ( |auth : AuthContext < ' _ > | match auth. action {
342- AuthAction :: Attach { filename : _ } => Authorization :: Deny ,
343- _ => Authorization :: Allow ,
344- } ) ) ;
345- conn. execute ( & line, ( ) )
346- } )
347- . map_err ( |e| match e {
348- rusqlite:: Error :: SqlInputError {
349- msg, sql, offset, ..
350- } => {
351- let msg = if sql. to_lowercase ( ) . contains ( "attach" ) {
352- format ! (
353- "attach statements are not allowed in dumps, msg: {}, sql: {}, offset: {}" ,
354- msg,
355- sql,
356- offset
357- )
358- } else {
359- format ! ( "msg: {}, sql: {}, offset: {}" , msg, sql, offset)
360- } ;
361-
362- LoadDumpError :: InvalidSqlInput ( msg)
363- }
364- e => LoadDumpError :: Internal ( format ! ( "line: {}, error: {}" , line_id, e) ) ,
365- } ) ?;
366- Ok ( line)
367- }
368- } )
369- . await ??;
370- line. clear ( ) ;
371- } else {
372- line. push ( ' ' ) ;
377+ return Err ( LoadDumpError :: InvalidSqlInput ( error_msg) ) ;
378+ }
373379 }
374380 }
375- tracing:: debug!( "loaded {} lines from dump" , line_id) ;
376381
377382 if !conn. is_autocommit ( ) . await . unwrap ( ) {
378383 tokio:: task:: spawn_blocking ( {
0 commit comments