@@ -3,7 +3,8 @@ if Phoenix.Sync.sandbox_enabled?() do
33 @ moduledoc false
44
55 alias Electric.Replication.Changes . {
6- Transaction ,
6+ Begin ,
7+ Commit ,
78 NewRecord ,
89 UpdatedRecord ,
910 DeletedRecord ,
@@ -50,6 +51,12 @@ if Phoenix.Sync.sandbox_enabled?() do
5051
5152 def init ( stack_id ) do
5253 state = % { txid: 10000 , stack_id: stack_id }
54+
55+ Electric.LsnTracker . set_last_processed_lsn (
56+ stack_id ,
57+ Electric.Postgres.Lsn . from_integer ( 0 )
58+ )
59+
5360 { :ok , state }
5461 end
5562
@@ -62,7 +69,7 @@ if Phoenix.Sync.sandbox_enabled?() do
6269 :ok =
6370 txid
6471 |> transaction ( msgs )
65- |> ShapeLogCollector . store_transaction ( ShapeLogCollector . name ( stack_id ) )
72+ |> ShapeLogCollector . handle_operations ( ShapeLogCollector . name ( stack_id ) )
6673
6774 { :noreply , % { state | txid: next_txid } }
6875 end
@@ -73,21 +80,20 @@ if Phoenix.Sync.sandbox_enabled?() do
7380 :ok =
7481 state . txid
7582 |> transaction ( changes )
76- |> ShapeLogCollector . store_transaction ( ShapeLogCollector . name ( state . stack_id ) )
83+ |> ShapeLogCollector . handle_operations ( ShapeLogCollector . name ( state . stack_id ) )
7784
7885 { :noreply , % { state | txid: state . txid + 100 } }
7986 end
8087
8188 defp transaction ( txid , changes ) do
82- % Transaction {
83- xid: txid ,
84- lsn: Electric.Postgres.Lsn . from_integer ( txid ) ,
85- last_log_offset: Enum . at ( changes , - 1 ) |> Map . fetch! ( :log_offset ) ,
86- changes: changes ,
87- num_changes: length ( changes ) ,
88- commit_timestamp: DateTime . utc_now ( ) ,
89- affected_relations: Enum . into ( changes , MapSet . new ( ) , & & 1 . relation )
90- }
89+ [ % Begin { xid: txid } | changes ] ++
90+ [
91+ % Commit {
92+ lsn: Electric.Postgres.Lsn . from_integer ( txid ) ,
93+ transaction_size: 100 ,
94+ commit_timestamp: DateTime . utc_now ( )
95+ }
96+ ]
9197 end
9298
9399 defp msg_from_change ( { { :insert , schema_meta , values } , i } , lsn , txid ) do
0 commit comments