diff --git a/CMakeLists.txt b/CMakeLists.txt index 932cfeb..c208b28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -164,6 +164,10 @@ set(source_files src/binsrv/events/gtid_log_post_header_impl_fwd.hpp src/binsrv/events/gtid_log_post_header_impl.hpp + src/binsrv/events/gtid_renumberer_fwd.hpp + src/binsrv/events/gtid_renumberer.hpp + src/binsrv/events/gtid_renumberer.cpp + src/binsrv/events/gtid_tagged_log_body_impl_fwd.hpp src/binsrv/events/gtid_tagged_log_body_impl.hpp src/binsrv/events/gtid_tagged_log_body_impl.cpp diff --git a/mtr/binlog_streaming/r/gtid_renumbering.result b/mtr/binlog_streaming/r/gtid_renumbering.result new file mode 100644 index 0000000..4fb6649 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering.result @@ -0,0 +1,47 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Building deterministic binlog content with two source-binlog +*** rotations between transaction groups. + +*** Source binlog A: CREATE TABLE + 9 INSERTs (10 GTIDs total). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Flushing source binary log (introduces the first +*** source-binlog rotation that the rewriter must absorb). +FLUSH BINARY LOGS; + +*** Source binlog B: 5 INSERTs. + +*** Flushing source binary log one more time (second +*** source-binlog rotation absorbed inside the same local file). +FLUSH BINARY LOGS; + +*** Source binlog C: 3 INSERTs. + +*** Executing the Binlog Server utility in rewrite mode. + +*** Materializing the local binlog file produced by the rewriter +*** and dumping it via mysqlbinlog for textual inspection. + +*** Validating (sequence_number, last_committed) of every GTID +*** event in the local binlog file. Aborts with --die on any +*** invariant violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result b/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result new file mode 100644 index 0000000..abd2418 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result @@ -0,0 +1,34 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Building a single-source-binlog workload that is large enough +*** to span multiple 1K local binlog files after rewrite. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, payload VARCHAR(100) NOT NULL, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Executing the Binlog Server utility in rewrite mode. + +*** Asserting that the rewriter actually rotated locally +*** (bnlg.000002 must exist - otherwise the test does not +*** exercise the local-rotation reset code path). + +*** Validating per-local-file sequence_number reset and +*** last_committed range. Aborts with --die on violation; +*** produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_resume.result b/mtr/binlog_streaming/r/gtid_renumbering_resume.result new file mode 100644 index 0000000..107c847 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_resume.result @@ -0,0 +1,50 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Phase 1: CREATE TABLE + 4 INSERTs in source binlog A +*** (5 GTID events). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** First binsrv invocation: writes 5 GTID events with +*** sequence_number 1..5 to bnlg.000001 and persists the +*** renumberer recovery snapshot (next_local_seq=5) in the +*** per-binlog metadata file. + +*** Flushing source binary log between invocations so the +*** second binsrv run also has to absorb a source-side +*** rotation right after resuming. +FLUSH BINARY LOGS; + +*** Phase 2: 5 INSERTs in source binlog B (5 more GTID events). + +*** Second binsrv invocation: must resume the renumberer from +*** the persisted snapshot (next_local_seq=5) and append +*** sequence_number 6..10 to bnlg.000001. + +*** Materializing the local binlog file produced across the two +*** invocations and dumping it via mysqlbinlog for textual +*** inspection. + +*** Validating that sequence_number is gap-free across the +*** binsrv-invocation boundary and that the post-resume +*** source-binlog-boundary event has last_committed = +*** sequence_number - 1. Aborts with --die on any invariant +*** violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result b/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result new file mode 100644 index 0000000..d868470 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result @@ -0,0 +1,78 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Phase 1: T_first = CREATE TABLE t1 (1 GTID event, no +*** Write_rows). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** First binsrv invocation: writes T_first with +*** sequence_number=1 to bnlg.000001 and persists +*** next_local_seq=1 in the per-binlog metadata. + +*** Phase 2: arm the source dump thread to pause AFTER +*** sending the first WRITE_ROWS event of any transaction. +*** Combined with the short binsrv read_timeout, this freezes +*** the second binsrv invocation mid-T_third (after T_third's +*** GTID has been processed by binsrv but before T_third's +*** XID can be received). +SET @old_global_debug = @@global.debug; +SET GLOBAL DEBUG = '+d,dump_thread_wait_after_send_write_rows'; + +*** Workload that the second binsrv invocation will read: +*** T_second (CREATE TABLE t2, no Write_rows; will be +*** absorbed in full) and T_third (INSERT INTO t1, has +*** Write_rows; binsrv will receive its GTID and prefix +*** events and then time out waiting for the rest). +CREATE TABLE t2(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +INSERT INTO t1 VALUES(DEFAULT); + +*** Second binsrv invocation: expected to exit with non-zero +*** status after read_timeout. The storage destructor flushes +*** the COMMITTED transaction (T_second) and persists the +*** per-binlog metadata. The renumberer recovery snapshot +*** persisted here MUST be next_local_seq=2 (committed, in +*** lockstep with the flushed bytes) and not the speculative +*** next_local_seq=3 produced by T_third's GTID rewrite. + +*** Releasing the paused source dump thread: SIGNAL wakes it +*** from the debug_sync wait, after which its first attempt +*** to send the next event fails on the half-closed socket +*** and the dump thread exits. DEBUG flag is then restored so +*** the new dump thread spawned by binsrv 3 does not pause. +SET DEBUG_SYNC = 'now SIGNAL signal.continue'; +SET GLOBAL DEBUG = @old_global_debug; +SET DEBUG_SYNC = 'RESET'; + +*** Third binsrv invocation: the source resends T_third +*** (binsrv never acknowledged it), and the renumberer +*** continues from the persisted committed snapshot. T_third +*** must land as sequence_number=3 so bnlg.000001 ends up +*** gap-free. + +*** Materializing bnlg.000001 produced across all three +*** invocations and dumping it via mysqlbinlog for textual +*** inspection. + +*** Validating that bnlg.000001 contains exactly 3 GTID +*** events with contiguous sequence_numbers 1, 2, 3 and +*** that every last_committed value references an in-file +*** sequence_number. Aborts with --die on any invariant +*** violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the tables. +DROP TABLE t1, t2; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/gtid_renumbering-master.opt b/mtr/binlog_streaming/t/gtid_renumbering-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering.test b/mtr/binlog_streaming/t/gtid_renumbering.test new file mode 100644 index 0000000..b0fd2f2 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering.test @@ -0,0 +1,206 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# This test exercises the rewrite-mode GTID renumberer. In rewrite mode +# the binlog server discards the source's ROTATE / FORMAT_DESCRIPTION / +# PREVIOUS_GTIDS / STOP events and coalesces transactions into local +# binlog files of a configurable size. Two consequences: +# 1. Multiple source binlogs may be folded into a single local file. +# The source resets sequence_number to 1 at every rotation, which +# would produce duplicate / non-monotone sequence_number values in +# our local file unless the rewriter renumbers them. +# 2. The first event of each new source-file segment carries +# last_committed=0 from the source, relying on the implicit +# synchronization guarantee that every transaction of the previous +# source file commits before any transaction of the next file +# starts. After coalescing, that physical boundary is gone, so the +# rewriter must replace last_committed=0 with sequence_number-1 to +# preserve the cross-file commit ordering on the consumer side. +# +# We craft three source binlogs (10 + 5 + 3 GTIDs) coalesced into a +# single local file (1G rewrite_file_size) and verify that: +# (a) sequence_number starts at 1 and advances by exactly 1 per event +# (renumbering covers all source-side rotations the rewriter +# absorbed - without the renumberer we would see duplicate +# sequence_number=1 values from each source file); +# (b) last_committed is in [0, sequence_number - 1] for every event; +# (c) the cross-source-file boundary events (events #11 and #16 - +# the first transactions from source binlogs B and C) carry +# a non-zero last_committed (specifically, sequence_number - 1). +# Without the rewriter's segment-boundary fix these would inherit +# the source's verbatim last_committed=0 and let the local +# applier reorder them ahead of the previous source file's +# commits. This assertion is robust regardless of the source's +# binlog_transaction_dependency_tracking mode. + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +--let $binsrv_checkpoint_size = 1 +# rewrite_file_size set high enough that all transactions land in a +# single local file - the point of this test is to exercise multiple +# source-file boundaries WITHIN the same local file +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Building deterministic binlog content with two source-binlog +--echo *** rotations between transaction groups. +--echo +--echo *** Source binlog A: CREATE TABLE + 9 INSERTs (10 GTIDs total). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +--disable_query_log +--let $i = 1 +while ($i <= 9) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Flushing source binary log (introduces the first +--echo *** source-binlog rotation that the rewriter must absorb). +FLUSH BINARY LOGS; + +--echo +--echo *** Source binlog B: 5 INSERTs. +--disable_query_log +--let $i = 1 +while ($i <= 5) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Flushing source binary log one more time (second +--echo *** source-binlog rotation absorbed inside the same local file). +FLUSH BINARY LOGS; + +--echo +--echo *** Source binlog C: 3 INSERTs. +--disable_query_log +--let $i = 1 +while ($i <= 3) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility in rewrite mode. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing the local binlog file produced by the rewriter +--echo *** and dumping it via mysqlbinlog for textual inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating (sequence_number, last_committed) of every GTID +--echo *** event in the local binlog file. Aborts with --die on any +--echo *** invariant violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # Source A contributes 10 GTIDs (CREATE TABLE + 9 INSERTs), source B + # contributes 5, source C contributes 3 - 18 events in total. + my $expected_count = 18; + die "expected exactly $expected_count GTID events in the local binlog " + . "file, got " . scalar @events + unless @events == $expected_count; + + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + + # Property (a): per-local-file sequence_number must restart at 1 + # and advance gap-free by exactly 1 per event - this is what + # protects us from the source restarting its sequence_number at + # every rotation that we absorb. + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got $events[$i]{sn}" + unless $events[$i]{sn} == $expected_sn; + + # Property (b): last_committed must reference an in-file + # sequence_number, i.e. lie in [0, sequence_number - 1]. A value + # >= sequence_number would point to the future; a negative value + # is impossible on the wire and would indicate a corrupt rewrite. + die "last_committed out of range at sequence_number=$events[$i]{sn}: " + . "got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } + + # Property (c): every cross-source-file boundary event must carry + # last_committed = sequence_number - 1. By protocol the first event + # of each source binlog has source-side last_committed=0; if the + # rewriter's segment-boundary fix is missing, that 0 propagates + # verbatim into the local file. With the fix the value is replaced + # with the immediately preceding local sequence_number, restoring + # the source's "rotation = synchronization point" semantics that + # would otherwise be lost when we coalesce A, B, C into one local + # file. + # + # Source A contributes 10 GTIDs (CREATE TABLE + 9 INSERTs), source B + # contributes 5, source C contributes 3 - boundary events are + # therefore at sequence_number 11 (first of B) and 16 (first of C). + for my $boundary_seq (11, 16) { + my $idx = $boundary_seq - 1; + my $expected_lc = $boundary_seq - 1; + die "boundary-fix violation at sequence_number=$boundary_seq: " + . "expected last_committed=$expected_lc, got " + . "last_committed=$events[$idx]{lc}" + unless $events[$idx]{lc} == $expected_lc; + } +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test new file mode 100644 index 0000000..ba30a26 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test @@ -0,0 +1,142 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion test to gtid_renumbering.test exercising the OPPOSITE +# side of the rewrite-mode renumberer: a single source binlog whose +# transactions get split across multiple local binlog files because +# the configured rewrite_file_size is much smaller than the total +# stream size. The renumberer must: +# 1. restart sequence_number at 1 in every new local file (otherwise +# sequence_number would keep growing across local files, leaving +# the consumer with a logical clock the local PREVIOUS_GTIDS +# header cannot describe); +# 2. only ever emit last_committed values that reference an event +# already present in the SAME local file (i.e. < sequence_number). +# +# We verify (1) and (2) on the first two local files; observing local +# rotation at all is what makes this test meaningful, so we also +# require that bnlg.000002 actually got created. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +--source ../include/identify_storage_backend.inc + +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +--let $binsrv_checkpoint_size = 1 +# small rewrite_file_size forces frequent local rotations +--let $binsrv_rewrite_file_size = 1K +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Building a single-source-binlog workload that is large enough +--echo *** to span multiple 1K local binlog files after rewrite. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, payload VARCHAR(100) NOT NULL, PRIMARY KEY(id)) ENGINE=InnoDB; + +--disable_query_log +--let $i = 1 +while ($i <= 60) +{ + INSERT INTO t1 (payload) VALUES (REPEAT('x', 80)); + --inc $i +} +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility in rewrite mode. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Asserting that the rewriter actually rotated locally +--echo *** (bnlg.000002 must exist - otherwise the test does not +--echo *** exercise the local-rotation reset code path). +--let $first_local_path = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.bnlg.000001 +--let $second_local_path = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.bnlg.000002 +if ($storage_backend == file) +{ + --file_exists $binsrv_storage_path/bnlg.000002 + --copy_file $binsrv_storage_path/bnlg.000001 $first_local_path + --copy_file $binsrv_storage_path/bnlg.000002 $second_local_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $first_local_path > /dev/null + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000002 $second_local_path > /dev/null +} + +--echo +--echo *** Validating per-local-file sequence_number reset and +--echo *** last_committed range. Aborts with --die on violation; +--echo *** produces no output otherwise. + +--let FIRST_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.first.dump +--let SECOND_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.second.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $first_local_path > $FIRST_DUMP +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $second_local_path > $SECOND_DUMP + +--perl + use strict; + use warnings; + + sub parse_gtid_dump { + my ($path) = @_; + open(my $fh, '<', $path) or die "Failed to open $path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + return @events; + } + + sub validate_local_file { + my ($label, $events_ref) = @_; + my @events = @$events_ref; + die "$label: no GTID events found in dump" unless @events > 0; + + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + + # sequence_number must restart at 1 and grow gap-free in EACH + # local file + die "$label: sequence_number gap or duplicate at event index $i: " + . "expected $expected_sn, got $events[$i]{sn}" + unless $events[$i]{sn} == $expected_sn; + + # last_committed must always reference an in-file + # sequence_number + die "$label: last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 + && $events[$i]{lc} < $events[$i]{sn}; + } + } + + my @first = parse_gtid_dump($ENV{'FIRST_DUMP'}); + my @second = parse_gtid_dump($ENV{'SECOND_DUMP'}); + + validate_local_file("bnlg.000001", \@first); + validate_local_file("bnlg.000002", \@second); +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $FIRST_DUMP +--remove_file $SECOND_DUMP +--remove_file $first_local_path +--remove_file $second_local_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume.test b/mtr/binlog_streaming/t/gtid_renumbering_resume.test new file mode 100644 index 0000000..3c3b841 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume.test @@ -0,0 +1,202 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion test to gtid_renumbering.test exercising the +# RESUME / RESTART path of the rewrite-mode renumberer. +# +# In rewrite mode the renumberer maintains an in-memory +# next_local_seq counter that allocates sequence_number values inside +# the current local binlog file. When the binsrv utility exits while +# a local file is still open and is later restarted (or when a +# reconnect drops the in-process state), the counter must continue +# forward instead of resetting to 1 - otherwise the post-resume +# events would collide with the sequence_number values already +# emitted into the same local file. +# +# The implementation persists next_local_seq (and last_emitted_offset) +# in the per-binlog .json metadata file at every checkpoint flush. +# On the next binsrv invocation, the storage layer hands that +# snapshot back to the renumberer via resume_in_existing_local_file() +# right after storage construction. +# +# This test verifies the property end-to-end: +# 1. Run binsrv against a partially-populated source, producing N +# GTID events with sequence_number 1..N in bnlg.000001. +# 2. Generate more transactions on the source, with a FLUSH BINARY +# LOGS in between so the second invocation also has to absorb +# a source-side rotation right after resume. +# 3. Run binsrv a second time. Expected: bnlg.000001 now contains +# N+M events with sequence_number 1..N+M (gap-free, no +# duplicates). +# 4. Cross-source-file boundary event at index N must carry +# last_committed = N (not 0): the segment-boundary fix has to +# keep working across a resume. +# +# Without the persisted-renumberer-state recovery, step 3 would +# instead produce sequence_number 1..N followed by 1..M - which the +# perl validator catches as a sequence_number collision. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +# checkpoint_size = 1 forces a metadata flush after every transaction +# boundary, so the renumberer recovery snapshot persisted to the +# metadata file is always in lockstep with the binlog content. With a +# larger checkpoint_size the test would still work for graceful exits +# but would be racy under abrupt termination - we want the strongest +# guarantee here. +--let $binsrv_checkpoint_size = 1 +# rewrite_file_size set high enough that all transactions land in a +# single local file - the point of this test is that the renumberer's +# counter MUST continue forward across binsrv invocations within the +# same local file, not that we exercise local rotation +# (gtid_renumbering_local_rotation.test covers that). +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Phase 1: CREATE TABLE + 4 INSERTs in source binlog A +--echo *** (5 GTID events). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +--disable_query_log +--let $i = 1 +while ($i <= 4) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** First binsrv invocation: writes 5 GTID events with +--echo *** sequence_number 1..5 to bnlg.000001 and persists the +--echo *** renumberer recovery snapshot (next_local_seq=5) in the +--echo *** per-binlog metadata file. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing source binary log between invocations so the +--echo *** second binsrv run also has to absorb a source-side +--echo *** rotation right after resuming. +FLUSH BINARY LOGS; + +--echo +--echo *** Phase 2: 5 INSERTs in source binlog B (5 more GTID events). +--disable_query_log +--let $i = 1 +while ($i <= 5) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Second binsrv invocation: must resume the renumberer from +--echo *** the persisted snapshot (next_local_seq=5) and append +--echo *** sequence_number 6..10 to bnlg.000001. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing the local binlog file produced across the two +--echo *** invocations and dumping it via mysqlbinlog for textual +--echo *** inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering_resume.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_resume.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating that sequence_number is gap-free across the +--echo *** binsrv-invocation boundary and that the post-resume +--echo *** source-binlog-boundary event has last_committed = +--echo *** sequence_number - 1. Aborts with --die on any invariant +--echo *** violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # Phase 1 contributes 5 GTIDs (CREATE TABLE + 4 INSERTs); phase 2 + # contributes 5 INSERTs - 10 events in total, all in bnlg.000001. + my $expected_count = 10; + die "expected exactly $expected_count GTID events in the local " + . "binlog file, got " . scalar @events + unless @events == $expected_count; + + # Property 1: sequence_number must be gap-free across the + # invocation boundary - the renumberer in the second invocation + # must start at 6, not at 1. A regression of the persisted-state + # recovery would surface as duplicate sequence_number values + # (1..5 from each invocation). + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got " + . "$events[$i]{sn} (resume regression: renumberer most " + . "likely restarted at 1 instead of resuming from 6)" + unless $events[$i]{sn} == $expected_sn; + + # Property 2: last_committed must reference an in-file + # sequence_number, i.e. lie in [0, sequence_number - 1]. + die "last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } + + # Property 3: the very first event of source binlog B (the first + # event the SECOND binsrv invocation writes) is the cross-source- + # file boundary; its source-side last_committed is 0 by protocol + # and the renumberer must replace it with sequence_number - 1 to + # preserve the cross-rotation synchronization point. The boundary + # event lies at sequence_number = 6 (first event of phase 2). + my $boundary_seq = 6; + my $idx = $boundary_seq - 1; + my $expected_lc = $boundary_seq - 1; + die "boundary-fix violation at sequence_number=$boundary_seq " + . "(post-resume): expected last_committed=$expected_lc, got " + . "last_committed=$events[$idx]{lc}" + unless $events[$idx]{lc} == $expected_lc; +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test new file mode 100644 index 0000000..2cbbf48 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test @@ -0,0 +1,221 @@ +--source ../include/have_binsrv.inc + +--source include/have_debug_sync.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion to gtid_renumbering_resume.test exercising the +# DURABILITY of the persisted renumberer recovery snapshot under a +# mid-transaction disconnect in rewrite mode. +# +# In rewrite mode the renumberer's per-file sequence_number counter +# is bumped speculatively the moment a GTID event is rewritten, but +# it is only "committed" (and mirrored into the per-binlog .json +# metadata) at the corresponding transaction boundary. If a connection +# drops between a GTID rewrite and the matching transaction boundary, +# the partial transaction is discarded and the renumberer must be +# rolled back to the previously committed state - otherwise the +# snapshot persisted alongside the durable bytes would point past the +# end of those bytes, and the next binsrv invocation would re-allocate +# a sequence_number for the resent transaction, leaving a permanent +# gap in the local sequence_number stream. +# +# The scenario: +# 1. binsrv processes T_first (CREATE TABLE t1) cleanly. Disk: +# bnlg.000001 contains the GTID event with sequence_number=1 +# and the metadata records next_local_seq=1. +# 2. The source-side dump thread is armed with +# 'dump_thread_wait_after_send_write_rows'. From now on it +# will pause right after sending the first WRITE_ROWS event +# of any transaction. +# 3. The source produces T_second (CREATE TABLE t2, no +# Write_rows) and T_third (INSERT INTO t1, has Write_rows). +# 4. The second binsrv invocation absorbs T_second in full but +# freezes mid-T_third (after T_third's GTID has already +# advanced the renumberer's speculative counter from 2 to 3). +# It then hits read_timeout and exits with a non-zero code. +# The storage destructor flushes whatever was complete in the +# in-memory buffer - i.e. T_second - and writes the per-binlog +# .json metadata. +# 5. The persisted snapshot at this point MUST carry +# next_local_seq=2 (the committed value, matching T_second +# which is the last GTID actually flushed to disk). With a +# regression of the commit/rollback semantics the snapshot +# would instead carry the speculative next_local_seq=3 - a +# value that points past the durable bytes. +# 6. The third binsrv invocation re-reads the metadata, seeds +# the renumberer accordingly and re-receives T_third (which +# the source resends because binsrv never acknowledged it). +# With the fix T_third lands as sequence_number=3 and +# bnlg.000001 ends up with the contiguous sequence +# 1, 2, 3. Without the fix it lands as sequence_number=4 and +# the perl validator at the end of this test catches the gap. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +# Short read_timeout to make the second binsrv invocation exit +# quickly once the source dump thread freezes mid-transaction. +--let $binsrv_read_timeout = 5 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +# Large checkpoint_size so flushes happen only at end-of-stream +# (storage destructor). With this configuration multiple complete +# transactions can stay buffered alongside an in-flight one - the +# layout that exposes the stale-snapshot bug when the disconnect +# happens mid-transaction. +--let $binsrv_checkpoint_size = 1G +# Single local file for the entire test - the property under test +# is the in-file recovery snapshot, not local rotation. +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Phase 1: T_first = CREATE TABLE t1 (1 GTID event, no +--echo *** Write_rows). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** First binsrv invocation: writes T_first with +--echo *** sequence_number=1 to bnlg.000001 and persists +--echo *** next_local_seq=1 in the per-binlog metadata. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Phase 2: arm the source dump thread to pause AFTER +--echo *** sending the first WRITE_ROWS event of any transaction. +--echo *** Combined with the short binsrv read_timeout, this freezes +--echo *** the second binsrv invocation mid-T_third (after T_third's +--echo *** GTID has been processed by binsrv but before T_third's +--echo *** XID can be received). +SET @old_global_debug = @@global.debug; +SET GLOBAL DEBUG = '+d,dump_thread_wait_after_send_write_rows'; + +--echo +--echo *** Workload that the second binsrv invocation will read: +--echo *** T_second (CREATE TABLE t2, no Write_rows; will be +--echo *** absorbed in full) and T_third (INSERT INTO t1, has +--echo *** Write_rows; binsrv will receive its GTID and prefix +--echo *** events and then time out waiting for the rest). +CREATE TABLE t2(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +INSERT INTO t1 VALUES(DEFAULT); + +--echo +--echo *** Second binsrv invocation: expected to exit with non-zero +--echo *** status after read_timeout. The storage destructor flushes +--echo *** the COMMITTED transaction (T_second) and persists the +--echo *** per-binlog metadata. The renumberer recovery snapshot +--echo *** persisted here MUST be next_local_seq=2 (committed, in +--echo *** lockstep with the flushed bytes) and not the speculative +--echo *** next_local_seq=3 produced by T_third's GTID rewrite. +--error 1 +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Releasing the paused source dump thread: SIGNAL wakes it +--echo *** from the debug_sync wait, after which its first attempt +--echo *** to send the next event fails on the half-closed socket +--echo *** and the dump thread exits. DEBUG flag is then restored so +--echo *** the new dump thread spawned by binsrv 3 does not pause. +SET DEBUG_SYNC = 'now SIGNAL signal.continue'; +SET GLOBAL DEBUG = @old_global_debug; + +let $wait_condition = SELECT COUNT(*) = 0 FROM information_schema.processlist WHERE COMMAND IN ('Binlog Dump', 'Binlog Dump GTID'); +--source include/wait_condition.inc + +SET DEBUG_SYNC = 'RESET'; + +--echo +--echo *** Third binsrv invocation: the source resends T_third +--echo *** (binsrv never acknowledged it), and the renumberer +--echo *** continues from the persisted committed snapshot. T_third +--echo *** must land as sequence_number=3 so bnlg.000001 ends up +--echo *** gap-free. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing bnlg.000001 produced across all three +--echo *** invocations and dumping it via mysqlbinlog for textual +--echo *** inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering_resume_after_partial.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_resume_after_partial.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating that bnlg.000001 contains exactly 3 GTID +--echo *** events with contiguous sequence_numbers 1, 2, 3 and +--echo *** that every last_committed value references an in-file +--echo *** sequence_number. Aborts with --die on any invariant +--echo *** violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # T_first (CREATE TABLE t1), T_second (CREATE TABLE t2), and the + # resent T_third (INSERT INTO t1) - 3 GTID events in total, all + # in bnlg.000001. + my $expected_count = 3; + die "expected exactly $expected_count GTID events in the local " + . "binlog file, got " . scalar @events + unless @events == $expected_count; + + # Property: sequence_number must be gap-free across the + # mid-transaction disconnect. A regression of the commit/rollback + # of the renumberer's recovery snapshot would surface as + # sequence_number=4 (instead of 3) at index 2 - the persisted + # snapshot would have advanced past the discarded T_third and + # caused the resent T_third to be allocated next_local_seq+1 + # twice. + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got " + . "$events[$i]{sn} (regression: persisted recovery info " + . "advanced past the discarded transaction's bytes)" + unless $events[$i]{sn} == $expected_sn; + + die "last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the tables. +DROP TABLE t1, t2; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index c47bffc..4a30122 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -64,6 +64,7 @@ #include "binsrv/events/common_header_flag_type.hpp" #include "binsrv/events/event.hpp" #include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_renumberer.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" @@ -285,6 +286,26 @@ void log_storage_info(binsrv::basic_logger &logger, logger.log(binsrv::log_severity::info, msg); } +// Seeds `renumberer` from the recovery snapshot persisted in the +// current binlog file's metadata, if applicable. No-op when not in +// rewrite mode or when storage is empty (nothing to resume from). +// Extracted out of main() to keep its cognitive complexity within +// the project-wide threshold. +void seed_renumberer_from_recovery_snapshot( + binsrv::events::gtid_renumberer &renumberer, const binsrv::storage &storage, + const binsrv::optional_rewrite_config &optional_rewrite_config, + binsrv::basic_logger &logger) { + if (!optional_rewrite_config.has_value() || storage.is_empty()) { + return; + } + const auto recovery_info{storage.get_current_renumberer_recovery_info()}; + renumberer.resume_in_existing_local_file(recovery_info.next_local_seq, + recovery_info.last_emitted_offset); + logger.log(binsrv::log_severity::info, + "rewrite: resuming renumberer at next sequence_number=" + + std::to_string(recovery_info.next_local_seq + 1)); +} + void log_library_info(binsrv::basic_logger &logger, const easymysql::library &mysql_lib) { std::string msg{}; @@ -486,10 +507,22 @@ void process_rotate_or_stop_event(binsrv::basic_logger &logger, "storage: closed binlog file: " + old_binlog_name); } -void process_binlog_event(const binsrv::events::event_view ¤t_event_v, - binsrv::basic_logger &logger, - binsrv::events::reader_context &context, - binsrv::storage &storage) { +// `renumberer`, if non-null, lets process_binlog_event() promote the +// renumberer's speculative state to committed (and mirror it into +// storage's per-binlog recovery info) right before storage.write_event +// is called. This is the moment the streaming pipeline detects a +// transaction boundary AND the upcoming write_event call may trigger a +// flush + save_binlog_metadata - i.e. the only spot where the .json +// recovery snapshot is durably advanced. Synthetic events generated in +// rewrite mode (artificial ROTATE, FDE, PREVIOUS_GTIDS, ...) and the +// non-rewrite-mode call from receive_binlog_events() pass nullptr; the +// renumberer's speculative counter is left untouched until the next +// genuine GTID transaction reaches its boundary. +void process_binlog_event( + const binsrv::events::event_view ¤t_event_v, + binsrv::basic_logger &logger, binsrv::events::reader_context &context, + binsrv::storage &storage, + binsrv::events::gtid_renumberer *renumberer = nullptr) { const auto current_common_header_v{current_event_v.get_common_header_view()}; const auto readable_flags{current_common_header_v.get_readable_flags()}; logger.log(binsrv::log_severity::info, @@ -533,6 +566,26 @@ void process_binlog_event(const binsrv::events::event_view ¤t_event_v, process_artificial_rotate_event(current_event_v, logger, storage); } + // In rewrite mode, this is the only point where the renumberer's + // committed snapshot is advanced and mirrored into storage's + // per-binlog recovery info. Doing it BEFORE storage.write_event is + // critical: write_event may trigger a checkpoint flush which calls + // save_binlog_metadata(), so the snapshot has to be in lockstep + // with the bytes about to be flushed. Doing it ONLY at transaction + // boundaries (instead of on every GTID event) is what keeps the + // persisted snapshot consistent under mid-transaction disconnects: + // if the connection drops between a GTID event and its commit, the + // renumberer's speculative bump is rolled back during the discard + // path (see rollback_to_committed() at the discard call site), and + // no flush in between had a chance to persist that speculative + // bump. + if (renumberer != nullptr && context.is_at_transaction_boundary()) { + renumberer->commit_pending_changes(); + storage.update_renumberer_recovery_info(binsrv::renumberer_recovery_info{ + .next_local_seq = renumberer->peek_next_local_seq(), + .last_emitted_offset = renumberer->peek_last_emitted_offset()}); + } + // checking if the event needs to be written to the binlog if (!info_only) { storage.write_event(current_event_v.get_portion(), @@ -640,14 +693,21 @@ generate_previous_gtids_log_event(binsrv::events::event_storage &event_buffer, void rewrite_and_process_binlog_event( const binsrv::events::event_view ¤t_event_v, binsrv::basic_logger &logger, binsrv::events::reader_context &context, - binsrv::storage &storage, std::uint32_t server_id, - std::string_view base_file_name, std::uint64_t file_size) { + binsrv::events::gtid_renumberer &renumberer, binsrv::storage &storage, + std::uint32_t server_id, std::string_view base_file_name, + std::uint64_t file_size) { const auto current_common_header_v = current_event_v.get_common_header_view(); const auto code = current_common_header_v.get_type_code(); // for ROTATE (both artificial and non-artificial), FORMAT_DESCRIPTION, // PREVIOUS_GTIDS_LOG, and STOP events we don't have to do anything - - // simply return early from this function + // simply return early from this function. Note we deliberately do not + // notify the renumberer about source-side rotations or FDEs: per the + // MySQL protocol last_committed only references sequence_numbers from + // the same source file, and the renumberer translates last_committed + // through the CURRENT event's offset (new_seq - source_seq), which + // automatically jumps in lockstep with the source's own + // sequence_number reset at file boundaries. if (code == binsrv::events::code_type::format_description || code == binsrv::events::code_type::previous_gtids_log || code == binsrv::events::code_type::rotate || @@ -680,8 +740,23 @@ void rewrite_and_process_binlog_event( // 2. FORMAT_DESCRIPTION // 3. PREVIOUS_GTIDS_LOG - if (context.is_fresh() || (context.is_at_transaction_boundary() && - storage.get_current_position() >= file_size)) { + const bool will_rotate{context.is_fresh() || + (context.is_at_transaction_boundary() && + storage.get_current_position() >= file_size)}; + // We reset the renumberer's sequence_number counter only when we are + // ACTUALLY starting a new local file (i.e. storage was empty, or we + // hit the configured file_size). When `is_fresh()` is true on a + // non-empty storage we are merely resuming the existing local file + // after a reconnect - in that case the renumberer is shared with + // the caller in main() and either still holds the in-memory state + // from before the reconnect, or (on a process restart) was re-seeded + // once from the persisted recovery snapshot right after storage + // construction. Either way, we deliberately do NOT call + // on_local_rotation() here and the counter keeps advancing from + // where the previous connection / process left off. + const bool will_open_fresh_local_file{ + will_rotate && (context.is_fresh() ? storage.is_empty() : true)}; + if (will_rotate) { binsrv::events::event_storage event_buffer; std::uint32_t offset{0U}; @@ -749,14 +824,40 @@ void rewrite_and_process_binlog_event( "rewrite: generated previous gtids log event in the rewrite mode"); process_binlog_event(generated_previous_gtids_log_event_v, logger, context, storage); + + if (will_open_fresh_local_file) { + renumberer.on_local_rotation(); + } } // in rewrite mode we need to update next_event_position (and optional // checksum in the footer) in the received event data portion binsrv::events::event_storage buffer{}; - const auto event_copy_uv{binsrv::events::materialize( + auto event_copy_uv{binsrv::events::materialize( current_event_v, buffer, binsrv::events::materialization_type::force_add_checksum)}; + + // For GTID events we have to overwrite (sequence_number, + // last_committed) so they index into our local file's logical clock + // namespace rather than the source's. The tagged variant may grow or + // shrink the buffer, in which case `event_copy_uv` is reseated onto + // the resized buffer. + // + // The rewrite advances the renumberer's SPECULATIVE counter (its + // committed snapshot is unchanged at this point); the snapshot is + // promoted later, inside process_binlog_event(), once we know the + // transaction has actually reached its boundary. That ordering is + // what keeps the persisted recovery info from getting ahead of the + // durable binlog content: if the connection drops between this + // rewrite and the upcoming transaction boundary, the speculative + // increment is rolled back during the discard path before any + // flush has a chance to capture it (see rollback_to_committed() + // wired up next to discard_incomplete_transaction_events() in + // receive_binlog_events()). + if (binsrv::events::is_gtid_log_event(code)) { + event_copy_uv = renumberer.rewrite_if_gtid_event(event_copy_uv, buffer); + } + { // TODO: optimize redundant checksum recalculation const auto proxy{event_copy_uv.get_write_proxy()}; @@ -764,7 +865,7 @@ void rewrite_and_process_binlog_event( static_cast(storage.get_current_position() + event_copy_uv.get_total_size())); } - process_binlog_event(event_copy_uv, logger, context, storage); + process_binlog_event(event_copy_uv, logger, context, storage, &renumberer); } bool open_connection_and_switch_to_replication( @@ -848,6 +949,7 @@ void receive_binlog_events( binsrv::basic_logger &logger, const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage, + binsrv::events::gtid_renumberer &renumberer, const binsrv::optional_rewrite_config &optional_rewrite_config) { easymysql::connection connection{}; if (!open_connection_and_switch_to_replication( @@ -867,6 +969,17 @@ void receive_binlog_events( storage.get_replication_mode(), storage.get_current_binlog_name().str(), static_cast(storage.get_current_position())}; + // The renumberer is owned by the caller and outlives this function, + // so its in-memory state survives reconnects within a single + // process. We deliberately do NOT touch its counter here: a + // reconnect to the same source merely resumes appending to the + // current local file, and the next allocated sequence_number must + // continue forward from where the previous connection left off. If + // the very first event triggers a will_open_fresh_local_file + // rotation (e.g. file_size threshold reached while we were idle), + // rewrite_and_process_binlog_event() calls on_local_rotation() + // explicitly at the right moment. + bool fetch_result{}; while (!termination_flag.test() && @@ -886,7 +999,7 @@ void receive_binlog_events( // FORMAT_DESCRIPTION, PREVIOUS_GTIDS_LOG, ROTATE (non-artificial), // and STOP events rewrite_and_process_binlog_event( - current_event_v, logger, context, storage, server_id, + current_event_v, logger, context, renumberer, storage, server_id, optional_rewrite_config->get<"base_file_name">(), optional_rewrite_config->get<"file_size">().get_value()); } else { @@ -917,6 +1030,15 @@ void receive_binlog_events( // transaction boundary if (storage.is_in_gtid_replication_mode()) { storage.discard_incomplete_transaction_events(); + // Symmetric rollback: storage just dropped any partial-transaction + // bytes from its event buffer, so we also have to rewind the + // renumberer's speculative state (which may have been bumped by a + // GTID event whose transaction never reached its boundary) back to + // the most recent committed snapshot. Without this rollback, the + // next reconnect would re-process the same source GTID and bump + // next_local_seq AGAIN, leaving a permanent gap in the local + // sequence_number stream. + renumberer.rollback_to_committed(); } // connection termination is a good place to flush any remaining data @@ -1202,6 +1324,29 @@ int main(int argc, char *argv[]) { replication_mode}; log_storage_info(*logger, storage); + // Rewrite-mode GTID renumberer state. Lives alongside `storage` + // so its in-memory counter survives reconnects: every local + // rotation resets the per-file sequence_number counter (via + // on_local_rotation()) and every incoming GTID event's + // logical-clock fields are rewritten before the event is appended + // to the local binlog. Idle (default-constructed) when we are not + // in rewrite mode. + // + // We seed the renumberer ONCE here, immediately after storage is + // loaded, from the recovery snapshot persisted in the current + // binlog file's metadata. This is the only place where the + // snapshot is consulted: subsequent reconnects within the same + // process keep the in-memory state running. The snapshot was last + // written at the previous transaction-boundary checkpoint and is + // in lockstep with `binlog_record.size` (and therefore with the + // bytes already on disk) by construction. For an empty storage + // there is nothing to resume; for legacy metadata without the + // renumberer fields the snapshot defaults to "no prior emissions + // in this file" - documented limitation. + binsrv::events::gtid_renumberer renumberer{}; + seed_renumberer_from_recovery_snapshot(renumberer, storage, + optional_rewrite_config, *logger); + const easymysql::library mysql_lib; logger->log(binsrv::log_severity::info, "initialized mysql client library"); @@ -1209,7 +1354,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, verify_checksum, - storage, optional_rewrite_config); + storage, renumberer, optional_rewrite_config); if (operation_mode == binsrv::operation_mode_type::pull) { std::size_t iteration_number{1U}; @@ -1230,7 +1375,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, - verify_checksum, storage, + verify_checksum, storage, renumberer, optional_rewrite_config); ++iteration_number; } diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp index e7f3c1d..48977e7 100644 --- a/src/binsrv/binlog_file_metadata.cpp +++ b/src/binsrv/binlog_file_metadata.cpp @@ -30,7 +30,14 @@ namespace binsrv { binlog_file_metadata::binlog_file_metadata() - : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {} + : impl_{{expected_binlog_file_metadata_version}, + {}, + {}, + {}, + {}, + {}, + {}, + {}} {} binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp index f03e602..2f70fb9 100644 --- a/src/binsrv/binlog_file_metadata.hpp +++ b/src/binsrv/binlog_file_metadata.hpp @@ -19,6 +19,7 @@ #include "binsrv/binlog_file_metadata_fwd.hpp" // IWYU pragma: export #include +#include #include #include @@ -32,6 +33,16 @@ namespace binsrv { class [[nodiscard]] binlog_file_metadata { private: + // The two `*_renumberer_*` fields persist the rewrite-mode GTID + // renumberer's recovery state for this binlog file. They are stored + // here (rather than re-derived from the binlog file content on + // resume) because every other piece of resume state already lives in + // this metadata record - keeping the renumberer state alongside it + // makes recovery a single read-and-load step. Both are + // std::optional<> so that legacy metadata files written by older + // binlog-server builds (which lacked these fields) load cleanly with + // both values defaulting to std::nullopt; on next save the up-to-date + // values are written back. using impl_type = util::nv_tuple< // clang-format off util::nv<"version", std::uint32_t>, @@ -39,7 +50,10 @@ class [[nodiscard]] binlog_file_metadata { util::nv<"previous_gtids", gtids::optional_gtid_set>, util::nv<"added_gtids", gtids::optional_gtid_set>, util::nv<"min_timestamp", ctime_timestamp>, - util::nv<"max_timestamp", ctime_timestamp> + util::nv<"max_timestamp", ctime_timestamp>, + util::nv<"renumberer_next_local_seq", std::optional>, + util::nv<"renumberer_last_emitted_offset", + std::optional> // clang-format on >; diff --git a/src/binsrv/events/code_type.hpp b/src/binsrv/events/code_type.hpp index e72ea40..e04518b 100644 --- a/src/binsrv/events/code_type.hpp +++ b/src/binsrv/events/code_type.hpp @@ -84,6 +84,16 @@ enum class code_type : std::uint8_t { }; #undef BINSRV_EVENTS_CODE_TYPE_XY_MACRO +// returns true for GTID_LOG, ANONYMOUS_GTID_LOG and GTID_TAGGED_LOG events, +// i.e. all three event types that carry GTID logical-clock fields +// (sequence_number / last_committed) which need to be rewritten when the +// rewrite mode causes our local binlog file boundaries to differ from the +// source's +[[nodiscard]] constexpr bool is_gtid_log_event(code_type code) noexcept { + return code == code_type::gtid_log || code == code_type::anonymous_gtid_log || + code == code_type::gtid_tagged_log; +} + inline std::string_view to_string_view(code_type code) noexcept { using namespace std::string_view_literals; using nv_pair = std::pair; diff --git a/src/binsrv/events/event_view.hpp b/src/binsrv/events/event_view.hpp index ada6f72..e3effd1 100644 --- a/src/binsrv/events/event_view.hpp +++ b/src/binsrv/events/event_view.hpp @@ -201,23 +201,43 @@ class [[nodiscard]] event_updatable_view : private event_view_base { event_updatable_view(const reader_context &context, util::byte_span portion) : event_view_base{context, portion} {} + // Construct a writable view directly from a byte span with explicit + // post_header / footer sizes. Bypasses the reader_context-driven + // validation; used by event reconstructors that materialize events + // from scratch (e.g. the rewrite-mode GTID renumberer, which has to + // emit a re-encoded GTID_TAGGED_LOG_EVENT possibly of a different + // size than the input). + [[nodiscard]] static event_updatable_view + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + from_raw_unchecked(util::byte_span portion, std::size_t post_header_size, + std::size_t footer_size) noexcept { + return event_updatable_view{portion, post_header_size, footer_size}; + } + // clang-format off using event_view_base::get_portion; + using event_view_base::get_updatable_portion; using event_view_base::get_total_size; using event_view_base::calculate_crc; // common header section using event_view_base::get_common_header_size; + using event_view_base::get_common_header_raw; + using event_view_base::get_common_header_view; // post header section using event_view_base::get_post_header_size; + using event_view_base::get_post_header_raw; // body section using event_view_base::get_body_size; + using event_view_base::get_body_raw; // footer section using event_view_base::get_footer_size; using event_view_base::has_footer; + using event_view_base::get_footer_raw; + using event_view_base::get_footer_view; // clang-format on [[nodiscard]] write_proxy get_write_proxy() const { diff --git a/src/binsrv/events/gtid_log_post_header.cpp b/src/binsrv/events/gtid_log_post_header.cpp index 46b56ae..4f4126e 100644 --- a/src/binsrv/events/gtid_log_post_header.cpp +++ b/src/binsrv/events/gtid_log_post_header.cpp @@ -33,6 +33,7 @@ #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" @@ -141,6 +142,20 @@ gtid_log_post_header::get_flags() const noexcept { return {get_uuid(), get_gno()}; } +void overwrite_logical_clock_in_post_header_raw(util::byte_span post_header_raw, + std::int64_t last_committed, + std::int64_t sequence_number) { + if (std::size(post_header_raw) != gtid_log_post_header::size_in_bytes) { + util::exception_location().raise( + "invalid gtid_log post header length for in-place logical clock " + "overwrite"); + } + auto remainder = post_header_raw.subspan( + gtid_log_post_header::last_committed_offset_in_bytes); + util::insert_fixed_int_to_byte_span(remainder, last_committed); + util::insert_fixed_int_to_byte_span(remainder, sequence_number); +} + std::ostream &operator<<(std::ostream &output, const gtid_log_post_header &obj) { return output << "flags: " << obj.get_readable_flags() diff --git a/src/binsrv/events/gtid_log_post_header.hpp b/src/binsrv/events/gtid_log_post_header.hpp index f50c9c0..0b13c41 100644 --- a/src/binsrv/events/gtid_log_post_header.hpp +++ b/src/binsrv/events/gtid_log_post_header.hpp @@ -35,6 +35,17 @@ class [[nodiscard]] gtid_log_post_header { public: static constexpr std::size_t size_in_bytes{42U}; + // byte offsets (within the 42-byte post header) of the two logical-clock + // fields that the rewrite-mode renumberer has to overwrite in place + // flags 1B @ 0 + // SID 16B @ 1 + // GNO 8B @ 17 + // lt_type 1B @ 25 + // last_committed 8B @ 26 + // sequence_number 8B @ 34 + static constexpr std::size_t last_committed_offset_in_bytes{26U}; + static constexpr std::size_t sequence_number_offset_in_bytes{34U}; + // https://github.com/mysql/mysql-server/blob/mysql-8.0.43/libbinlogevents/include/control_events.h#L1091 // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1202 static constexpr std::uint8_t expected_logical_ts_code{2U}; @@ -83,6 +94,14 @@ class [[nodiscard]] gtid_log_post_header { std::int64_t sequence_number_{}; // 5 }; +// Overwrites the (last_committed, sequence_number) pair in a raw 42-byte +// gtid_log post-header byte span. Used by the rewrite-mode GTID renumberer +// to retarget the parallel-applier dependency graph onto our local binlog +// file boundaries. The overall event size is unchanged. +void overwrite_logical_clock_in_post_header_raw(util::byte_span post_header_raw, + std::int64_t last_committed, + std::int64_t sequence_number); + } // namespace binsrv::events #endif // BINSRV_EVENTS_GTID_LOG_POST_HEADER_HPP diff --git a/src/binsrv/events/gtid_renumberer.cpp b/src/binsrv/events/gtid_renumberer.cpp new file mode 100644 index 0000000..ff69c04 --- /dev/null +++ b/src/binsrv/events/gtid_renumberer.cpp @@ -0,0 +1,298 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/events/gtid_renumberer.hpp" + +#include +#include +#include +#include +#include +#include +#include + +// `event_storage` (used by `rewrite_tagged_gtid_event()` below via the +// `assign()` member) is an alias for boost::container::small_vector<>, +// declared in event_fwd.hpp via container_fwd.hpp - so we need the full +// definition here in the .cpp. +#include // IWYU pragma: keep + +#include "binsrv/events/code_type.hpp" +#include "binsrv/events/common_header_view.hpp" +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_log_post_header.hpp" +#include "binsrv/events/gtid_tagged_log_body_impl.hpp" + +#include "util/byte_span.hpp" +#include "util/exception_location_helpers.hpp" + +namespace binsrv::events { + +namespace { + +// Defensive cap on the number of fixpoint iterations needed to converge +// on the tagged-event transaction_length. Each iteration changes the +// encoded width of either serializable_field_size or transaction_length, +// both of which are bounded to {1,2,3,4,5,9} bytes; in practice 2-3 +// iterations are sufficient. The cap is set high enough that it can only +// trigger in genuine logic bugs. +constexpr int max_tagged_fixpoint_iterations{16}; + +} // namespace + +void gtid_renumberer::on_local_rotation() noexcept { + next_local_seq_ = 0; + last_emitted_offset_.reset(); + committed_next_local_seq_ = 0; + committed_last_emitted_offset_.reset(); +} + +void gtid_renumberer::resume_in_existing_local_file( + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + std::int64_t next_local_seq, + std::optional last_emitted_offset) noexcept { + next_local_seq_ = next_local_seq; + last_emitted_offset_ = last_emitted_offset; + committed_next_local_seq_ = next_local_seq; + committed_last_emitted_offset_ = last_emitted_offset; +} + +void gtid_renumberer::commit_pending_changes() noexcept { + committed_next_local_seq_ = next_local_seq_; + committed_last_emitted_offset_ = last_emitted_offset_; +} + +void gtid_renumberer::rollback_to_committed() noexcept { + next_local_seq_ = committed_next_local_seq_; + last_emitted_offset_ = committed_last_emitted_offset_; +} + +[[nodiscard]] std::pair +// NOLINTNEXTLINE(bugprone-easily-swappable-parameters) +gtid_renumberer::translate_logical_clock(std::int64_t source_seq, + std::int64_t source_lc) noexcept { + // Allocate a fresh local sequence_number for this transaction. + ++next_local_seq_; + const std::int64_t new_seq{next_local_seq_}; + const std::int64_t offset{new_seq - source_seq}; + + // Translate last_committed. + // + // The general rule is "candidate = source_lc + offset", which works + // because within a single source file the offset stays constant + // (both source_seq and new_seq advance by exactly 1 per event) and + // last_committed always references a sequence_number from the same + // source file. So adding the current event's offset to source_lc + // lands exactly on the local_seq we previously issued for the + // dependency target. + // + // There are two cases where that translation is not appropriate: + // + // 1) `last_emitted_offset_` differs from `offset` (or is empty). + // A different offset means the previous event and this one + // belong to different source-file segments in the local file + // (offset is invariant within a source file, so a change is + // proof of a boundary). On the source side that boundary is a + // synchronization point - all of the previous file's + // transactions must commit before any of the new file's start + // applying - and the source encodes that by RESETTING its + // logical clock so the first event has source_lc == 0. In the + // local file we no longer have a physical boundary, so emitting + // last_committed == 0 verbatim would let the local applier + // reorder this txn before the previous segment's last commit. + // To restore the barrier we set new_lc = (new_seq - 1), which + // transitively chains through prior local txns and forces this + // one to commit after everything that came before it. For the + // very first txn in the local file (new_seq == 1) the formula + // yields 0, i.e. parallel-safe - which is correct because there + // is no prior local txn to serialize against. + // The "empty last_emitted_offset_" case (fresh local file or + // just after on_local_rotation()) is handled identically; from + // the local file's point of view its first event is by + // definition the start of a new segment. + // + // 2) source_lc != 0 inside a segment but the candidate falls out + // of range (candidate <= 0). The dependency target is in an + // earlier (now-closed) local file or in the source's pre-attach + // history; we have no local_seq to point at, so we fall back to + // over-serializing against (new_seq - 1). candidate < new_seq + // is guaranteed by the protocol invariant source_lc < + // source_seq, so we only need to bound-check the lower edge. + std::int64_t new_lc{0}; + const bool segment_boundary{!last_emitted_offset_.has_value() || + *last_emitted_offset_ != offset}; + if (segment_boundary) { + new_lc = new_seq - 1; + } else if (source_lc != 0) { + const std::int64_t candidate{source_lc + offset}; + new_lc = candidate > 0 ? candidate : new_seq - 1; + } + + last_emitted_offset_ = offset; + return {new_seq, new_lc}; +} + +void gtid_renumberer::rewrite_untagged_in_place( + const event_updatable_view &event_uv) { + // Untagged GTID events keep ALL the renumber-relevant data in the + // 42-byte post header; the body holds timestamps / commit-ticket / + // transaction_length but none of those depend on sequence_number or + // last_committed. The event size therefore stays the same. + const gtid_log_post_header decoded_post_header{ + event_uv.get_post_header_raw()}; + const auto [new_seq, new_lc]{ + translate_logical_clock(decoded_post_header.get_sequence_number_raw(), + decoded_post_header.get_last_committed_raw())}; + + // The write_proxy automatically recomputes the footer CRC at scope + // exit, so we patch under it. + const auto proxy{event_uv.get_write_proxy()}; + overwrite_logical_clock_in_post_header_raw( + proxy.get_post_header_updatable_raw(), new_lc, new_seq); +} + +[[nodiscard]] event_updatable_view +gtid_renumberer::rewrite_tagged(const event_updatable_view &event_uv, + event_storage &buffer) { + using tagged_body_type = generic_body_impl; + + const std::size_t common_header_size{ + event_view_base::get_common_header_size()}; + const std::size_t footer_size{event_uv.get_footer_size()}; + const std::size_t old_event_size{event_uv.get_total_size()}; + + // Sanity: tagged GTID events have a zero-byte post header. + if (event_uv.get_post_header_size() != 0U) { + util::exception_location().raise( + "gtid_tagged_log event unexpectedly has a non-empty post header"); + } + + // Decode the body. The view (and therefore tagged_body_type's source + // span) lives in `buffer`; we must finish reading before resizing. + tagged_body_type body{event_uv.get_body_raw()}; + + const auto [new_seq, new_lc]{translate_logical_clock( + body.get_sequence_number_raw(), body.get_last_committed_raw())}; + body.set_sequence_number_raw(new_seq); + body.set_last_committed_raw(new_lc); + + // Snapshot the original common header bytes BEFORE we touch the + // buffer; the input view is invalidated by the resize below. + std::array + original_common_header{}; + const auto original_common_header_raw{event_uv.get_common_header_raw()}; + if (std::size(original_common_header_raw) != + std::size(original_common_header)) { + util::exception_location().raise( + "common header size mismatch while rewriting gtid_tagged_log event"); + } + std::memcpy(std::data(original_common_header), + std::data(original_common_header_raw), + std::size(original_common_header)); + + // Solve for transaction_length and the new event size in tandem. + // trx_minus_event = total transaction size - this event's old size, + // which is independent of how this event ends up encoded; the only + // dependency that varies is the new event's size itself. + const std::uint64_t old_transaction_length{body.get_transaction_length_raw()}; + if (old_transaction_length < old_event_size) { + util::exception_location().raise( + "transaction_length in gtid_tagged_log event is smaller than the " + "event itself"); + } + const std::uint64_t trx_minus_event{old_transaction_length - old_event_size}; + + std::uint64_t guess_event_size{old_event_size}; + std::size_t new_body_size{0U}; + for (int attempt{0}; attempt < max_tagged_fixpoint_iterations; ++attempt) { + body.set_transaction_length_raw(trx_minus_event + guess_event_size); + new_body_size = body.calculate_encoded_size(); + const std::uint64_t candidate_event_size{common_header_size + + new_body_size + footer_size}; + if (candidate_event_size == guess_event_size) { + break; + } + guess_event_size = candidate_event_size; + } + // If we didn't converge, body's transaction_length is now stale; the + // last loop iteration's calculate_encoded_size() does not match the + // value embedded in the body. That would only happen on a logic bug + // (e.g. calculate_varlen_int_size disagreeing with the actual encoded + // width); refuse to emit a malformed event. + if (body.get_transaction_length_raw() != trx_minus_event + guess_event_size) { + util::exception_location().raise( + "gtid_tagged_log transaction_length fixpoint did not converge"); + } + + const std::size_t new_event_size{static_cast(guess_event_size)}; + + // Resize the destination buffer; this invalidates `event_uv` (and the + // body's source span, which we no longer need). + buffer.assign(new_event_size, std::byte{0}); + const util::byte_span destination{std::data(buffer), std::size(buffer)}; + + // 1) Common header: copy the original 19 bytes verbatim, then patch + // event_size to reflect the new total. next_event_position is set + // by the caller (which knows the storage cursor) AFTER this call + // returns; we leave the original value here and let the caller + // overwrite it as it does for the non-rewritten case. + std::memcpy(std::data(destination), std::data(original_common_header), + std::size(original_common_header)); + + // 2) Body: encode_to() advances `body_dest` past the bytes it wrote. + util::byte_span body_dest{ + destination.subspan(common_header_size, new_body_size)}; + body.encode_to(body_dest); + if (!body_dest.empty()) { + util::exception_location().raise( + "gtid_tagged_log body encoder wrote fewer bytes than reported by " + "calculate_encoded_size()"); + } + + // 3) Footer (4 bytes if present): leave zero-filled; the caller's + // write_proxy will recalculate the CRC. + + // Build a fresh writable view; bypass the reader_context-driven + // validation because we know the structure first-hand. + auto result{event_updatable_view::from_raw_unchecked( + destination, /*post_header_size=*/0U, footer_size)}; + + // Update event_size in the freshly-laid-out common header. + { + const auto proxy{result.get_write_proxy()}; + proxy.get_common_header_updatable_view().set_event_size_raw( + static_cast(new_event_size)); + } + return result; +} + +[[nodiscard]] event_updatable_view +gtid_renumberer::rewrite_if_gtid_event(const event_updatable_view &event_uv, + event_storage &buffer) { + const auto code{event_uv.get_common_header_view().get_type_code()}; + switch (code) { + case code_type::gtid_log: + case code_type::anonymous_gtid_log: + rewrite_untagged_in_place(event_uv); + return event_uv; + case code_type::gtid_tagged_log: + return rewrite_tagged(event_uv, buffer); + default: + return event_uv; + } +} + +} // namespace binsrv::events diff --git a/src/binsrv/events/gtid_renumberer.hpp b/src/binsrv/events/gtid_renumberer.hpp new file mode 100644 index 0000000..70851ad --- /dev/null +++ b/src/binsrv/events/gtid_renumberer.hpp @@ -0,0 +1,280 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_GTID_RENUMBERER_HPP +#define BINSRV_EVENTS_GTID_RENUMBERER_HPP + +#include "binsrv/events/gtid_renumberer_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view_fwd.hpp" + +namespace binsrv::events { + +// Rewrites GTID_LOG_EVENT, ANONYMOUS_GTID_LOG_EVENT and +// GTID_TAGGED_LOG_EVENT events so that their (sequence_number, +// last_committed) "logical clock" pair is consistent with the local +// binlog file boundaries produced by the rewrite-mode storage layer. +// +// Background: +// In MySQL, the per-transaction (sequence_number, last_committed) +// tuple drives the multi-threaded applier's intra-file parallelism. +// The values are PER-FILE: every binlog file restarts sequence_number +// from 1 and dependencies in last_committed reference sequence_numbers +// from the same file only. +// +// In rewrite mode the binlog server discards the source's ROTATE, +// FORMAT_DESCRIPTION_EVENT, PREVIOUS_GTIDS_LOG, and STOP events and +// coalesces transactions into local files at a configured size. Two +// issues result: +// 1. A source-side rotation we discard does NOT reset our local +// numbering, but the source has already restarted sequence_number +// from 1 and re-used small numbers as last_committed; we'd end up +// with collisions/duplicates that confuse the applier. +// 2. A local rotation we initiate splits the source's (still-running) +// sequence_number sequence across two of OUR files; the second +// file would start with sequence_number > 1, which is illegal. +// +// Approach: +// For every GTID event we compute a per-event offset +// offset = new_local_seq - source_seq +// and use it to translate last_committed: +// new_local_seq = next monotonically growing local counter +// new_last_committed = source_lc + offset (in range) +// | new_local_seq - 1 (out of range, OR +// | first event of a +// | source-file +// | segment in this +// | local file) +// | 0 (source_lc == 0 +// | AND not a segment +// | boundary) +// +// Why the offset trick works for in-segment events: +// (a) source_seq is monotonic and gap-free within a single source +// binlog file (incremented by 1 per binlogged transaction). +// (b) last_committed in any GTID event only references a +// sequence_number from the SAME source binlog file as the +// event itself (never across files). +// Plus by construction new_local_seq is also monotonic and gap-free. +// From (a) the offset stays constant for the duration of a single +// source file, and from (b) source_lc is guaranteed to live in that +// same source file's seq space - so adding the current event's +// offset to source_lc lands exactly on the local_seq we already +// issued for the dependency target. +// +// Why the FIRST event of each source-file segment needs special +// handling: +// The MySQL applier treats binlog file boundaries as implicit +// synchronization points: when the source rotates from file A to +// file B, all of A's transactions must commit before any of B's +// start applying. The source encodes that by RESETTING its +// logical clock at the boundary, so B's first event has +// last_committed == 0 ("no in-file dependency"). That zero is +// correct only as long as the file boundary is physically +// preserved on the consumer side. In rewrite mode we coalesce +// A's and B's events into a single local file, eliminating that +// physical boundary - so emitting last_committed == 0 verbatim +// would let the local applier reorder B's first commit BEFORE +// A's last commit, violating source ordering. To restore the +// barrier we set the first event of each new source-file segment +// to last_committed = new_local_seq - 1, which transitively +// chains through prior local txns and forces the segment to +// commit after everything that came before it in the local file. +// Subsequent events in the segment translate normally via the +// offset. +// +// Detecting "first event of a new segment": +// We track the offset of the most recently emitted event. A new +// event whose offset DIFFERS from that tracked offset is by +// invariant (a) the start of a new source-file segment (or, if +// the tracked offset is absent because of a local rotation or +// fresh start, simply the start of the local file - which is +// also the first event of a segment). No FDE / source-ROTATE +// observation is required. +// +// Out-of-range dependencies (candidate <= 0 inside a segment) come +// from a dependency target that lived in an earlier (now-closed) +// local file, or in the source's pre-attach history. Same fallback +// - over-serialize against new_local_seq - 1 - which is always +// conservative. +// +// No (source_seq -> local_seq) history map is needed; the state is +// the high-water mark `next_local_seq_` plus the most recently +// tracked offset. +// +// Untagged GTID events have a fixed-layout 42-byte post header; the two +// 8-byte fields are patched in place and the event size does not change. +// +// Tagged GTID events store the same fields as variable-length integers +// inside a TLV body. Mutating the values may change the encoded body +// size, which in turn requires updating transaction_length (also a +// varlen integer in the same body, and self-referential because it +// names the current event's own size). The fixpoint converges in a +// handful of iterations (varlen widths are bounded to {1,2,3,4,5,9}). +class gtid_renumberer { +public: + gtid_renumberer() = default; + + gtid_renumberer(const gtid_renumberer &) = delete; + gtid_renumberer(gtid_renumberer &&) = delete; + gtid_renumberer &operator=(const gtid_renumberer &) = delete; + gtid_renumberer &operator=(gtid_renumberer &&) = delete; + ~gtid_renumberer() = default; + + // Resets the per-file sequence_number counter to 0 so the next GTID + // event we observe is renumbered starting from local_seq == 1. Call + // this whenever a fresh local binlog file is opened (matching the + // moment the storage layer emits its own ROTATE_EVENT + + // FORMAT_DESCRIPTION_EVENT pair). + void on_local_rotation() noexcept; + + // Restores the renumberer state when resuming an existing local + // binlog file across a reconnect or process restart. Sets + // next_local_seq_ to `next_local_seq` (the highest sequence_number + // already emitted into that file) so the next GTID event we observe + // is allocated as `next_local_seq + 1`. `last_emitted_offset` is the + // tracked offset at the time of the last persisted snapshot; passing + // std::nullopt forces the next event to be treated as a segment + // boundary, which is conservative (forces last_committed = new_seq - + // 1 once for the first post-resume transaction). Must be called + // before any rewrite_if_gtid_event() call on this instance, i.e. + // while the renumberer is still in its default-constructed state. + // Also seeds the committed snapshot to the same value, so that an + // immediate rollback_to_committed() (e.g. on a connection drop + // before any user transaction is processed) restores the resumed + // state rather than the default-constructed one. + void resume_in_existing_local_file( + std::int64_t next_local_seq, + std::optional last_emitted_offset) noexcept; + + // Promotes the current (speculative) state to the committed + // snapshot. Must be called at every transaction boundary - after a + // GTID event has been rewritten and the rest of its transaction has + // been buffered, but before the storage layer flushes anything to + // disk - so that the persisted recovery snapshot (read back from + // peek_*()) is in lockstep with the bytes that hit storage. + // Idempotent: with no advancement since the last commit it is a + // no-op. + void commit_pending_changes() noexcept; + + // Reverts the speculative state to the most recent committed + // snapshot. Used by the streaming pipeline when the storage layer + // discards an in-flight transaction whose GTID event already + // advanced the renumberer (e.g. after a mid-transaction + // disconnect): without this rollback the next reconnect would + // re-allocate a sequence_number for the same source GTID, + // skipping over the speculative one and breaking gap-freedom of + // the local sequence_number stream. + void rollback_to_committed() noexcept; + + // If `event_uv` is one of the GTID event types, rewrites it in place + // (or via a buffer-resize for GTID_TAGGED_LOG_EVENT) so the logical + // clock matches our local file boundaries; otherwise returns + // `event_uv` unchanged. + // + // For the tagged variant the call may resize `buffer` (the event's + // backing storage). The returned view always references `buffer`. + // For non-tagged events the buffer is untouched and the returned + // view is the same one passed in. + [[nodiscard]] event_updatable_view + rewrite_if_gtid_event(const event_updatable_view &event_uv, + event_storage &buffer); + + // Snapshot accessors used by the storage layer to persist the + // renumberer state alongside other per-binlog metadata. Together + // they describe enough state to seed + // resume_in_existing_local_file() on a subsequent process + // start / reconnect. + // + // Both accessors return the COMMITTED snapshot (i.e. the state at + // the most recently completed transaction boundary) rather than + // the speculative in-flight state. This is intentional: if a + // mid-transaction disconnect causes the storage layer to discard + // the partial transaction, the persisted recovery info must + // describe the durable bytes - not the optimistic increment that + // was about to land before the drop. The translation logic + // continues to operate on the speculative pair + // (next_local_seq_, last_emitted_offset_) and is reconciled with + // the committed snapshot via commit_pending_changes() / + // rollback_to_committed(). + [[nodiscard]] std::int64_t peek_next_local_seq() const noexcept { + return committed_next_local_seq_; + } + [[nodiscard]] std::optional + peek_last_emitted_offset() const noexcept { + return committed_last_emitted_offset_; + } + +private: + // Allocates a fresh local sequence_number for the incoming GTID + // event and computes the corresponding last_committed translation. + // See class-level comment for the offset-based derivation; the + // method does not retain any per-event state beyond bumping + // next_local_seq_. + [[nodiscard]] std::pair + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + translate_logical_clock(std::int64_t source_seq, + std::int64_t source_lc) noexcept; + + // Patches sequence_number / last_committed in place inside an + // already-materialized GTID_LOG_EVENT or ANONYMOUS_GTID_LOG_EVENT. + // The event size and overall buffer size are unchanged. Checksum + // recalculation happens via the existing write_proxy mechanism. + void rewrite_untagged_in_place(const event_updatable_view &event_uv); + + // Decodes the tagged GTID body, mutates the renumbered fields and + // re-encodes into `buffer` (which is resized as needed). Returns a + // fresh event_updatable_view referencing `buffer`. + [[nodiscard]] event_updatable_view + rewrite_tagged(const event_updatable_view &event_uv, event_storage &buffer); + + // Monotonic counter representing the highest local sequence_number + // emitted so far in the current local binlog file. Initialized to 0 + // (no transactions yet) so that pre-increment yields 1 for the very + // first transaction (matching MySQL's "sequence_number starts at 1 + // in every binlog file" invariant). Reset to 0 by + // on_local_rotation(). + std::int64_t next_local_seq_{0}; + + // Offset (= new_local_seq - source_seq) that was active for the most + // recently emitted GTID event in the current local binlog file. If + // the upcoming event has a different offset we are crossing a + // source-file segment boundary and must serialize the event against + // the most recent local txn (see class-level comment). std::nullopt + // means "no event has been emitted in the current local file yet"; + // it is reset to nullopt by on_local_rotation() and indistinguishable + // from a segment boundary for translation purposes. + std::optional last_emitted_offset_{}; + + // Committed snapshot of the (next_local_seq_, last_emitted_offset_) + // pair, advanced only at transaction boundaries via + // commit_pending_changes(). It is the value persisted into the + // per-binlog .json metadata (see peek_*()) and the value + // restored by rollback_to_committed() when the storage layer drops + // an incomplete transaction. Both fields share the same lifecycle + // as their speculative counterparts: zeroed by on_local_rotation() + // and seeded by resume_in_existing_local_file(). + std::int64_t committed_next_local_seq_{0}; + std::optional committed_last_emitted_offset_{}; +}; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_GTID_RENUMBERER_HPP diff --git a/src/binsrv/events/gtid_renumberer_fwd.hpp b/src/binsrv/events/gtid_renumberer_fwd.hpp new file mode 100644 index 0000000..3c6e2b0 --- /dev/null +++ b/src/binsrv/events/gtid_renumberer_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP +#define BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP + +namespace binsrv::events { + +class gtid_renumberer; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.cpp b/src/binsrv/events/gtid_tagged_log_body_impl.cpp index 88a668d..47d849b 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.cpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.cpp @@ -45,6 +45,7 @@ #include "util/bounded_string_storage.hpp" #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" #include "util/conversion_helpers.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" @@ -52,21 +53,60 @@ namespace binsrv::events { +namespace { + +// Field identifiers in the order they are emitted by upstream's +// define_fields() (libs/mysql/binlog/event/control_events.h:1111). +// Both the decoder (process_field_data) and the encoder (encode_to / +// calculate_encoded_size) use these values; keeping a single definition +// avoids drift. +enum class field_id_type : std::uint8_t { + flags, + uuid, + gno, + tag, + last_committed, + sequence_number, + immediate_commit_timestamp, + original_commit_timestamp, + transaction_length, + immediate_server_version, + original_server_version, + commit_group_ticket, + + delimiter +}; + +[[nodiscard]] constexpr std::uint8_t +field_id_byte(field_id_type field_id) noexcept { + return util::to_underlying(field_id); +} + +// matches upstream serialization_format_version: ALWAYS 1 (see +// libs/mysql/serialization/readme.md and Gtid_event::Decoder_type) +constexpr std::uint8_t serialization_version_number_value{1U}; + +} // namespace + generic_body_impl::generic_body_impl( util::const_byte_span portion) { // TODO: rework with direct member initialization // make sure we did OK with data members reordering + // (summands listed in the same order as the declarations in the .hpp) static_assert( sizeof *this == boost::alignment::align_up( - sizeof flags_ + sizeof uuid_ + sizeof gno_ + sizeof tag_ + - sizeof last_committed_ + sizeof sequence_number_ + + sizeof gno_ + sizeof last_committed_ + sizeof sequence_number_ + sizeof immediate_commit_timestamp_ + sizeof original_commit_timestamp_ + - sizeof transaction_length_ + sizeof original_server_version_ + - sizeof immediate_server_version_ + - sizeof commit_group_ticket_, + sizeof transaction_length_ + sizeof commit_group_ticket_ + + sizeof uuid_ + sizeof tag_ + sizeof original_server_version_ + + sizeof immediate_server_version_ + sizeof flags_ + + sizeof last_non_ignorable_field_id_ + + sizeof has_original_commit_timestamp_ + + sizeof has_original_server_version_ + + sizeof has_commit_group_ticket_, alignof(decltype(*this))), "inefficient data member reordering in gtid_log event body"); @@ -75,7 +115,6 @@ generic_body_impl::generic_body_impl( // ::= // Extracting - static constexpr std::uint8_t expected_serialization_version_number{1U}; std::uint8_t serialization_version_number{}; if (!util::extract_varlen_int_from_byte_span_checked( remainder, serialization_version_number)) { @@ -83,7 +122,7 @@ generic_body_impl::generic_body_impl( "gtid_tagged_log event body is too short to extract " "serialization_version_number"); } - if (serialization_version_number != expected_serialization_version_number) { + if (serialization_version_number != serialization_version_number_value) { util::exception_location().raise( "unexpected serialization_version_number in the gtid_tagged_log event " "body"); @@ -107,9 +146,8 @@ generic_body_impl::generic_body_impl( } // Extracting - std::uint8_t last_non_ignorable_field_id{}; if (!util::extract_varlen_int_from_byte_span_checked( - remainder, last_non_ignorable_field_id)) { + remainder, last_non_ignorable_field_id_)) { util::exception_location().raise( "gtid_tagged_log event body is too short to extract " "last_non_ignorable_field_id"); @@ -126,7 +164,7 @@ generic_body_impl::generic_body_impl( util::exception_location().raise( "broken field_id sequence in the gtid_tagged_log event body"); } - if (field_id <= last_non_ignorable_field_id) { + if (field_id <= last_non_ignorable_field_id_) { if (field_id != 0 && field_id != last_seen_field_id + 1U) { util::exception_location().raise( "violated last_non_ignorable_field_id rule in the gtid_tagged_log " @@ -252,23 +290,6 @@ operator<<(std::ostream &output, void generic_body_impl::process_field_data( std::uint8_t field_id, util::const_byte_span &remainder) { // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1111 - enum class field_id_type : std::uint8_t { - flags, - uuid, - gno, - tag, - last_committed, - sequence_number, - immediate_commit_timestamp, - original_commit_timestamp, - transaction_length, - immediate_server_version, - original_server_version, - commit_group_ticket, - - delimiter - }; - const auto varlen_int_extractor{ [](util::const_byte_span &source, auto &target, std::string_view label) { if (!util::extract_varlen_int_from_byte_span_checked(source, target)) { @@ -286,11 +307,7 @@ void generic_body_impl::process_field_data( // Extracting a fixed-size (16 byte) array of varlen bytes std::uint8_t extracted_uuid_byte{}; for (auto &uuid_byte : uuid_) { - if (!util::extract_varlen_int_from_byte_span_checked( - remainder, extracted_uuid_byte)) { - util::exception_location().raise( - "gtid_tagged_log event body is too short to extract uuid"); - } + varlen_int_extractor(remainder, extracted_uuid_byte, "uuid"); uuid_byte = util::from_underlying(extracted_uuid_byte); } } break; @@ -323,6 +340,7 @@ void generic_body_impl::process_field_data( case field_id_type::original_commit_timestamp: varlen_int_extractor(remainder, original_commit_timestamp_, "original_commit_timestamp"); + has_original_commit_timestamp_ = true; break; case field_id_type::transaction_length: varlen_int_extractor(remainder, transaction_length_, "transaction_length"); @@ -334,10 +352,12 @@ void generic_body_impl::process_field_data( case field_id_type::original_server_version: varlen_int_extractor(remainder, original_server_version_, "original_server_version"); + has_original_server_version_ = true; break; case field_id_type::commit_group_ticket: varlen_int_extractor(remainder, commit_group_ticket_, "commit_group_ticket"); + has_commit_group_ticket_ = true; break; default: util::exception_location().raise( @@ -345,4 +365,219 @@ void generic_body_impl::process_field_data( } } +[[nodiscard]] std::size_t +generic_body_impl::calculate_tlv_section_size() + const noexcept { + std::size_t total{0U}; + + // field: flags + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::flags)); + total += util::calculate_varlen_int_size(flags_); + + // field: uuid (16 separate varlen-encoded bytes, one per UUID byte) + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::uuid)); + for (auto uuid_byte : uuid_) { + total += util::calculate_varlen_int_size(util::to_underlying(uuid_byte)); + } + + // field: gno + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::gno)); + total += util::calculate_varlen_int_size(gno_); + + // field: tag (varlen length + raw bytes) + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::tag)); + total += util::calculate_varlen_int_size(std::size(tag_)); + total += std::size(tag_); + + // field: last_committed + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::last_committed)); + total += util::calculate_varlen_int_size(last_committed_); + + // field: sequence_number + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::sequence_number)); + total += util::calculate_varlen_int_size(sequence_number_); + + // field: immediate_commit_timestamp + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::immediate_commit_timestamp)); + total += util::calculate_varlen_int_size(immediate_commit_timestamp_); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::original_commit_timestamp)); + total += util::calculate_varlen_int_size(original_commit_timestamp_); + } + + // field: transaction_length + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::transaction_length)); + total += util::calculate_varlen_int_size(transaction_length_); + + // field: immediate_server_version + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::immediate_server_version)); + total += util::calculate_varlen_int_size(immediate_server_version_); + + // field: original_server_version (optional) + if (has_original_server_version_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::original_server_version)); + total += util::calculate_varlen_int_size(original_server_version_); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::commit_group_ticket)); + total += util::calculate_varlen_int_size(commit_group_ticket_); + } + + return total; +} + +[[nodiscard]] std::size_t +generic_body_impl::calculate_encoded_size() const { + // body layout: + // [varlen: serialization_version_number == 1 ] + // [varlen: serializable_field_size (== total body size) ] + // [varlen: last_non_ignorable_field_id ] + // [TLV section: calculate_tlv_section_size() bytes ] + // + // serializable_field_size encodes the entire body length INCLUDING + // itself, which makes it self-referential: the value depends on its + // own varlen-encoded width. Solve via a tiny iteration: increasing + // the value monotonically grows its width (1->2->3->...->9), so the + // loop always terminates in at most 9 steps. + const std::size_t framing_misc_size{ + util::calculate_varlen_int_size(serialization_version_number_value) + + util::calculate_varlen_int_size(last_non_ignorable_field_id_)}; + const std::size_t tlv_size{calculate_tlv_section_size()}; + const std::size_t tail{framing_misc_size + tlv_size}; + + std::size_t total{tail + 1U}; + for (;;) { + const std::size_t width{util::calculate_varlen_int_size(total)}; + const std::size_t new_total{tail + width}; + if (new_total == total) { + return total; + } + total = new_total; + } +} + +void generic_body_impl::encode_tlv_section_to( + util::byte_span &destination) const { + const auto check_inserted{[](bool inserted) { + if (!inserted) { + util::exception_location().raise( + "destination is too small to encode the gtid_tagged_log event " + "body"); + } + }}; + const auto emit_field_id{[&](field_id_type field_id) { + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, field_id_byte(field_id))); + }}; + + // field: flags + emit_field_id(field_id_type::flags); + check_inserted( + util::insert_varlen_int_to_byte_span_checked(destination, flags_)); + + // field: uuid (16 separate varlen-encoded bytes) + emit_field_id(field_id_type::uuid); + for (auto uuid_byte : uuid_) { + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, util::to_underlying(uuid_byte))); + } + + // field: gno + emit_field_id(field_id_type::gno); + check_inserted( + util::insert_varlen_int_to_byte_span_checked(destination, gno_)); + + // field: tag (varlen length + raw bytes) + emit_field_id(field_id_type::tag); + check_inserted(util::insert_varlen_int_to_byte_span_checked(destination, + std::size(tag_))); + const util::const_byte_span tag_span{tag_}; + check_inserted( + util::insert_byte_span_to_byte_span_checked(destination, tag_span)); + + // field: last_committed + emit_field_id(field_id_type::last_committed); + check_inserted(util::insert_varlen_int_to_byte_span_checked(destination, + last_committed_)); + + // field: sequence_number + emit_field_id(field_id_type::sequence_number); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, sequence_number_)); + + // field: immediate_commit_timestamp + emit_field_id(field_id_type::immediate_commit_timestamp); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, immediate_commit_timestamp_)); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp_) { + emit_field_id(field_id_type::original_commit_timestamp); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, original_commit_timestamp_)); + } + + // field: transaction_length + emit_field_id(field_id_type::transaction_length); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, transaction_length_)); + + // field: immediate_server_version + emit_field_id(field_id_type::immediate_server_version); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, immediate_server_version_)); + + // field: original_server_version (optional) + if (has_original_server_version_) { + emit_field_id(field_id_type::original_server_version); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, original_server_version_)); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket_) { + emit_field_id(field_id_type::commit_group_ticket); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, commit_group_ticket_)); + } +} + +void generic_body_impl::encode_to( + util::byte_span &destination) const { + // Contract: the caller MUST size `destination` to exactly the value + // returned by calculate_encoded_size(). That value is also what we + // emit on the wire as serializable_field_size (size of the entire + // body), and we read it back from std::size(destination) here - + // avoiding a redundant inner fixpoint. Discrepancies are caught by + // the caller observing whether `destination` was fully consumed + // post-encode. + const std::uint64_t encoded_size{std::size(destination)}; + + // The framing header: serialization_version_number, + // serializable_field_size, last_non_ignorable_field_id. + if (!util::insert_varlen_int_to_byte_span_checked( + destination, serialization_version_number_value) || + !util::insert_varlen_int_to_byte_span_checked(destination, + encoded_size) || + !util::insert_varlen_int_to_byte_span_checked( + destination, last_non_ignorable_field_id_)) { + util::exception_location().raise( + "failed to encode gtid_tagged_log event body framing header"); + } + + encode_tlv_section_to(destination); +} + } // namespace binsrv::events diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.hpp b/src/binsrv/events/gtid_tagged_log_body_impl.hpp index db465db..1a4be00 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.hpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.hpp @@ -87,7 +87,7 @@ template <> class [[nodiscard]] generic_body_impl { [[nodiscard]] std::string get_readable_immediate_commit_timestamp() const; [[nodiscard]] bool has_original_commit_timestamp() const noexcept { - return original_commit_timestamp_ != unset_commit_timestamp; + return has_original_commit_timestamp_; } [[nodiscard]] std::uint64_t get_original_commit_timestamp_raw() const noexcept { @@ -102,7 +102,7 @@ template <> class [[nodiscard]] generic_body_impl { } [[nodiscard]] bool has_original_server_version() const noexcept { - return original_server_version_ != unset_server_version; + return has_original_server_version_; } [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { return original_server_version_; @@ -123,12 +123,45 @@ template <> class [[nodiscard]] generic_body_impl { [[nodiscard]] std::string get_readable_immediate_server_version() const; [[nodiscard]] bool has_commit_group_ticket() const noexcept { - return commit_group_ticket_ != unset_commit_group_ticket; + return has_commit_group_ticket_; } [[nodiscard]] std::uint64_t get_commit_group_ticket_raw() const noexcept { return commit_group_ticket_; } + // Mutators used by the rewrite-mode GTID renumberer. They modify only + // the in-memory representation; serializing back to bytes requires + // calling encode_to() on a freshly sized buffer. + void set_last_committed_raw(std::int64_t value) noexcept { + last_committed_ = value; + } + void set_sequence_number_raw(std::int64_t value) noexcept { + sequence_number_ = value; + } + void set_transaction_length_raw(std::uint64_t value) noexcept { + transaction_length_ = value; + } + + // Returns the total number of bytes that encode_to() will emit for the + // current in-memory state, including the 3-field framing header + // (serialization_version_number, serializable_field_size, + // last_non_ignorable_field_id) and every TLV field. The value of the + // serializable_field_size field is computed self-consistently. + [[nodiscard]] std::size_t calculate_encoded_size() const; + + // Writes the body to *destination* using the same TLV layout as the + // input. The set of optional fields actually emitted matches the set + // observed during construction (decoding); only the values of + // last_committed / sequence_number / transaction_length may have been + // mutated through the corresponding setters above. + // + // Precondition: std::size(destination) == calculate_encoded_size(). + // The destination span size is read back as the on-wire + // serializable_field_size, so passing a wrong size produces a + // wrong-but-self-consistent encoding. Caller is expected to verify + // post-encode that `destination` was fully consumed. + void encode_to(util::byte_span &destination) const; + friend bool operator==(const generic_body_impl & /* first */, const generic_body_impl & /* second */) = default; @@ -142,22 +175,52 @@ template <> class [[nodiscard]] generic_body_impl { static constexpr std::uint64_t unset_commit_group_ticket{ std::numeric_limits::max()}; - // the members are deliberately reordered for better packing - std::uint8_t flags_{}; // 0 - gtids::uuid_storage uuid_{}; // 1 + // The protocol fields below are deliberately reordered for better + // packing (largest first, then 32-bit, then 8-bit/bool last). The + // trailing "// N" annotation is the protocol field_id_type value + // upstream assigns to the field (see field_id_type enum in + // gtid_tagged_log_body_impl.cpp / define_fields() in upstream + // control_events.h), so the deviation from protocol order is + // visible at a glance. std::int64_t gno_{}; // 2 - gtids::tag_storage tag_{}; // 3 std::int64_t last_committed_{}; // 4 std::int64_t sequence_number_{}; // 5 std::uint64_t immediate_commit_timestamp_{unset_commit_timestamp}; // 6 std::uint64_t original_commit_timestamp_{unset_commit_timestamp}; // 7 std::uint64_t transaction_length_{unset_transaction_length}; // 8 - std::uint32_t original_server_version_{unset_server_version}; // 9 - std::uint32_t immediate_server_version_{unset_server_version}; // 10 std::uint64_t commit_group_ticket_{unset_commit_group_ticket}; // 11 + gtids::uuid_storage uuid_{}; // 1 + gtids::tag_storage tag_{}; // 3 + std::uint32_t original_server_version_{unset_server_version}; // 10 + std::uint32_t immediate_server_version_{unset_server_version}; // 9 + std::uint8_t flags_{}; // 0 + + // Echoed verbatim on encode_to() so the framing header matches the + // original. Reading and re-emitting it preserves forward compatibility + // with newer servers that may add ignorable fields above the current + // last non-ignorable id. + std::uint8_t last_non_ignorable_field_id_{0U}; + + // Recorded during decoding so that encode_to() emits exactly the same + // set of optional fields as was observed in the input event. This makes + // re-serialization byte-stable for unchanged member values and avoids + // having to reverse-engineer upstream's encode predicates from sentinel + // values (which would be brittle if a real timestamp/version ever + // happened to coincide with a sentinel). + bool has_original_commit_timestamp_{false}; + bool has_original_server_version_{false}; + bool has_commit_group_ticket_{false}; void process_field_data(std::uint8_t field_id, util::const_byte_span &remainder); + + // Helpers for calculate_encoded_size() / encode_to(). + // Returns the size in bytes of the TLV section (every pair after the framing header). + [[nodiscard]] std::size_t calculate_tlv_section_size() const noexcept; + // Writes only the TLV section to *destination*; assumes destination has + // at least calculate_tlv_section_size() bytes available. + void encode_tlv_section_to(util::byte_span &destination) const; }; } // namespace binsrv::events diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 5beb67d..16804b6 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -318,6 +318,23 @@ storage::get_binlog_uri(const composite_binlog_name &binlog_name) const { return backend_->get_object_uri(binlog_name.str()); } +void storage::update_renumberer_recovery_info( + const renumberer_recovery_info &info) { + if (is_empty()) { + return; + } + get_current_binlog_record().renumberer_state = info; +} + +[[nodiscard]] renumberer_recovery_info +storage::get_current_renumberer_recovery_info() const { + if (is_empty()) { + util::exception_location().raise( + "cannot read renumberer recovery info from an empty storage"); + } + return get_current_binlog_record().renumberer_state; +} + void storage::ensure_streaming_mode() const { if (construction_mode_ != storage_construction_mode_type::streaming) { util::exception_location().raise( @@ -476,13 +493,31 @@ storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const { backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; + // Both renumberer fields are missing in legacy metadata files; their + // optionals come back as std::nullopt and the recovery info is left + // at its struct-default value. From the renumberer's point of view + // that is indistinguishable from "this file has no prior emissions" + // - safe for files that genuinely have none, but for legacy files + // with prior emissions the first post-resume transaction will collide + // with sequence_number 1 (documented limitation of the legacy + // metadata format). + renumberer_recovery_info renumberer_state{}; + const auto &optional_next_local_seq{ + metadata.root().get<"renumberer_next_local_seq">()}; + if (optional_next_local_seq.has_value()) { + renumberer_state.next_local_seq = *optional_next_local_seq; + } + renumberer_state.last_emitted_offset = + metadata.root().get<"renumberer_last_emitted_offset">(); + return binlog_record{.name = binlog_name, .size = metadata.root().get<"size">(), .previous_gtids = metadata.root().get<"previous_gtids">(), .added_gtids = metadata.root().get<"added_gtids">(), .timestamps = {metadata.root().get<"min_timestamp">(), - metadata.root().get<"max_timestamp">()}}; + metadata.root().get<"max_timestamp">()}, + .renumberer_state = renumberer_state}; } void storage::validate_binlog_metadata(const binlog_record &record) const { @@ -522,6 +557,10 @@ void storage::save_binlog_metadata(const binlog_record &record) const { ctime_timestamp{record.timestamps.get_min_timestamp()}; metadata.root().get<"max_timestamp">() = ctime_timestamp{record.timestamps.get_max_timestamp()}; + metadata.root().get<"renumberer_next_local_seq">() = + record.renumberer_state.next_local_seq; + metadata.root().get<"renumberer_last_emitted_offset">() = + record.renumberer_state.last_emitted_offset; const auto content{metadata.str()}; backend_->put_object(generate_binlog_metadata_name(record.name), util::as_const_byte_span(content)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index e2ed0f1..33f85fb 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -19,6 +19,8 @@ #include "binsrv/storage_fwd.hpp" // IWYU pragma: export #include +#include +#include #include #include #include @@ -37,6 +39,16 @@ namespace binsrv { +// Snapshot of the rewrite-mode GTID renumberer's recovery state, as +// persisted alongside other per-binlog metadata. Used by the storage +// layer to round-trip the state across reconnects and process +// restarts; the renumberer-side semantics live in +// `binsrv::events::gtid_renumberer`. +struct [[nodiscard]] renumberer_recovery_info { + std::int64_t next_local_seq{0}; + std::optional last_emitted_offset{}; +}; + class [[nodiscard]] storage { private: struct binlog_record { @@ -45,6 +57,7 @@ class [[nodiscard]] storage { gtids::optional_gtid_set previous_gtids{}; gtids::optional_gtid_set added_gtids{}; ctime_timestamp_range timestamps{}; + renumberer_recovery_info renumberer_state{}; }; using binlog_record_container = std::vector; @@ -129,6 +142,26 @@ class [[nodiscard]] storage { const ctime_timestamp &event_timestamp); void close_binlog(); + // Rewrite-mode hook: keeps the in-memory renumberer recovery state + // for the current binlog file in sync with the actual renumberer. + // The persisted snapshot is written out by the next + // save_binlog_metadata() call (i.e. on the next checkpoint flush). + // Caller is expected to invoke this whenever the renumberer's + // observable state (next_local_seq / last_emitted_offset) changes, + // typically right after rewrite_if_gtid_event(). No-op if the + // storage has no current binlog yet. + void update_renumberer_recovery_info(const renumberer_recovery_info &info); + + // Returns the renumberer recovery snapshot persisted for the current + // binlog file (i.e. read back from the per-binlog .json metadata at + // storage construction). If the current metadata predates the + // persisted-renumberer-state feature, both fields hold their default + // values (next_local_seq = 0, last_emitted_offset = std::nullopt) - + // which the renumberer treats as "no prior emissions in this file". + // Precondition: the storage must be non-empty. + [[nodiscard]] renumberer_recovery_info + get_current_renumberer_recovery_info() const; + void discard_incomplete_transaction_events(); void flush_event_buffer();