@@ -8,6 +8,7 @@ use hyper::Body;
88use std:: path:: Path ;
99use tokio:: io:: AsyncWriteExt as _;
1010use uuid:: Uuid ;
11+ use zerocopy:: big_endian;
1112
1213#[ cfg( test) ]
1314mod test;
@@ -953,18 +954,24 @@ pub async fn try_pull(
953954 sync_ctx : & mut SyncContext ,
954955 conn : & Connection ,
955956) -> Result < crate :: database:: Replicated > {
956- let insert_handle = conn. wal_insert_handle ( ) ?;
957+ // note, that updates of durable_frame_num are valid only after SQLite commited the WAL
958+ // (because if WAL has uncommited suffix - it will be omitted by any other SQLite connection - for example after restart)
959+ // so, try_pull maintains local next_frame_no during the pull operation and update durable_frame_num when it's appropriate
960+ let mut next_frame_no = sync_ctx. durable_frame_num + 1 ;
957961
958- let mut err = None ;
962+ // libsql maintain consistent state about WAL sync session locally in the insert_handle
963+ // note, that insert_handle will always close the session on drop - so we never keep active WAL session after we exit from the method
964+ let insert_handle = conn. wal_insert_handle ( ) ?;
959965
960966 loop {
967+ // get current generation (it may be updated multiple times during execution)
961968 let generation = sync_ctx. durable_generation ( ) ;
962- let frame_no = sync_ctx . durable_frame_num ( ) + 1 ;
963- match sync_ctx. pull_frames ( generation, frame_no ) . await {
969+
970+ match sync_ctx. pull_frames ( generation, next_frame_no ) . await {
964971 Ok ( PullResult :: Frames ( frames) ) => {
965972 tracing:: debug!(
966973 "pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}" ,
967- generation, frame_no , frame_no + sync_ctx. pull_batch_size, sync_ctx. pull_batch_size, frames. len( ) ,
974+ generation, next_frame_no , next_frame_no + sync_ctx. pull_batch_size, sync_ctx. pull_batch_size, frames. len( ) ,
968975 ) ;
969976 if frames. len ( ) % FRAME_SIZE != 0 {
970977 tracing:: error!(
@@ -974,63 +981,71 @@ pub async fn try_pull(
974981 ) ;
975982 return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
976983 }
977- for ( i, chunk) in frames. chunks ( FRAME_SIZE ) . enumerate ( ) {
978- let r = insert_handle. insert_at ( frame_no + i as u32 , & chunk) ;
979- if let Err ( e) = r {
980- tracing:: error!(
981- "insert error (frame= {}) : {:?}" ,
982- sync_ctx. durable_frame_num + 1 ,
983- e
984+ for chunk in frames. chunks ( FRAME_SIZE ) {
985+ let mut size_after_buf = [ 0u8 ; 4 ] ;
986+ size_after_buf. copy_from_slice ( & chunk[ 4 ..8 ] ) ;
987+ let size_after = big_endian:: U32 :: from_bytes ( size_after_buf) ;
988+ // start WAL sync session if it was closed
989+ // (this can happen if on previous iteration client received commit frame)
990+ if !insert_handle. in_session ( ) {
991+ tracing:: debug!(
992+ "pull_frames: generation={}, frame={}, start wal transaction session" ,
993+ generation,
994+ next_frame_no
984995 ) ;
996+ insert_handle. begin ( ) ?;
997+ }
998+ let result = insert_handle. insert_at ( next_frame_no, & chunk) ;
999+ if let Err ( e) = result {
1000+ tracing:: error!( "insert error (frame={}) : {:?}" , next_frame_no, e) ;
9851001 return Err ( e) ;
9861002 }
987- sync_ctx. durable_frame_num += 1 ;
1003+ // if this is commit frame - we can close WAL sync session and update durable_frame_num
1004+ if size_after. get ( ) > 0 {
1005+ tracing:: debug!(
1006+ "pull_frames: generation={}, frame={}, finish wal transaction session, size_after={}" ,
1007+ generation,
1008+ next_frame_no,
1009+ size_after. get( )
1010+ ) ;
1011+ insert_handle. end ( ) ?;
1012+ sync_ctx. durable_frame_num = next_frame_no;
1013+ sync_ctx. write_metadata ( ) . await ?;
1014+ }
1015+
1016+ next_frame_no += 1 ;
9881017 }
9891018 }
9901019 Ok ( PullResult :: EndOfGeneration { max_generation } ) => {
9911020 // If there are no more generations to pull, we're done.
9921021 if generation >= max_generation {
9931022 break ;
9941023 }
995- insert_handle. end ( ) ?;
996- sync_ctx. write_metadata ( ) . await ?;
1024+ assert ! (
1025+ !insert_handle. in_session( ) ,
1026+ "WAL transaction must be finished"
1027+ ) ;
9971028
1029+ tracing:: debug!(
1030+ "pull_frames: generation={}, frame={}, checkpoint in order to move to next generation" ,
1031+ generation,
1032+ next_frame_no
1033+ ) ;
9981034 // TODO: Make this crash-proof.
9991035 conn. wal_checkpoint ( true ) ?;
10001036
10011037 sync_ctx. next_generation ( ) ;
10021038 sync_ctx. write_metadata ( ) . await ?;
1003-
1004- insert_handle. begin ( ) ?;
1005- }
1006- Err ( e) => {
1007- tracing:: debug!( "pull_frames error: {:?}" , e) ;
1008- err. replace ( e) ;
1009- break ;
1039+ next_frame_no = 1 ;
10101040 }
1041+ Err ( e) => return Err ( e) ,
10111042 }
10121043 }
1013- // This is crash-proof because we:
1014- //
1015- // 1. Write WAL frame first
1016- // 2. Write new max frame to temporary metadata
1017- // 3. Atomically rename the temporary metadata to the real metadata
1018- //
1019- // If we crash before metadata rename completes, the old metadata still
1020- // points to last successful frame, allowing safe retry from that point.
1021- // If we happen to have the frame already in the WAL, it's fine to re-pull
1022- // because append locally is idempotent.
1023- insert_handle. end ( ) ?;
1024- sync_ctx. write_metadata ( ) . await ?;
10251044
1026- if let Some ( err) = err {
1027- Err ( err)
1028- } else {
1029- Ok ( crate :: database:: Replicated {
1030- frame_no : None ,
1031- frames_synced : 1 ,
1032- } )
1033- }
1045+ Ok ( crate :: database:: Replicated {
1046+ frame_no : None ,
1047+ frames_synced : 1 ,
1048+ } )
10341049}
10351050
10361051fn check_if_file_exists ( path : & str ) -> core:: result:: Result < bool , SyncError > {
0 commit comments