From 324dd16c96725f446f9faa0d07cf2331cef79aa3 Mon Sep 17 00:00:00 2001 From: Kamil Holubicki Date: Thu, 7 May 2026 15:41:17 +0200 Subject: [PATCH] PS-11080: Spoiled logical clock information in rewritten binlog https://perconadev.atlassian.net/browse/PS-11080 Rewrite mode strips source binlog framing events and merges transactions into local binlog files of configurable size. MySQL's logical clock (sequence_number, last_committed) is defined per physical binlog file on the source. Once those boundaries are removed on the consumer, the raw values from the source are no longer valid: sequence numbers can repeat after each source rotation, and last_committed=0 at a source file boundary no longer implies the same ordering guarantee when everything lives in one local file. Introduce a rewrite-mode GTID renumberer that assigns a monotonic local sequence_number and translates last_committed so dependencies stay within the current local file and cross-source-file ordering is preserved. Support both classic GTID events and GTID_TAGGED_LOG_EVENT, including re-encoding tagged bodies when logical-clock fields change encoded size. Persist enough renumberer state in per-binlog JSON metadata so that after a process restart or a new binsrv invocation the stream can continue appending to the same local file without colliding with sequence numbers already written. Track committed vs in-flight renumberer state so that after a mid-transaction disconnect the discarded partial transaction does not leave recovery metadata pointing past the last completed transaction on disk. Add MTR coverage for coalesced source rotations, local rotation by file size, resume across invocations, and resume after a partial transaction with a deterministic source-side stall. --- CMakeLists.txt | 4 + .../r/gtid_renumbering.result | 47 +++ .../r/gtid_renumbering_local_rotation.result | 34 ++ .../r/gtid_renumbering_resume.result | 50 +++ ...id_renumbering_resume_after_partial.result | 78 +++++ .../t/gtid_renumbering-master.opt | 2 + mtr/binlog_streaming/t/gtid_renumbering.test | 206 ++++++++++++ ...gtid_renumbering_local_rotation-master.opt | 2 + .../t/gtid_renumbering_local_rotation.test | 142 +++++++++ .../t/gtid_renumbering_resume-master.opt | 2 + .../t/gtid_renumbering_resume.test | 202 ++++++++++++ ...enumbering_resume_after_partial-master.opt | 2 + ...gtid_renumbering_resume_after_partial.test | 221 +++++++++++++ src/app.cpp | 173 +++++++++- src/binsrv/binlog_file_metadata.cpp | 9 +- src/binsrv/binlog_file_metadata.hpp | 16 +- src/binsrv/events/code_type.hpp | 10 + src/binsrv/events/event_view.hpp | 20 ++ src/binsrv/events/gtid_log_post_header.cpp | 15 + src/binsrv/events/gtid_log_post_header.hpp | 19 ++ src/binsrv/events/gtid_renumberer.cpp | 298 +++++++++++++++++ src/binsrv/events/gtid_renumberer.hpp | 280 ++++++++++++++++ src/binsrv/events/gtid_renumberer_fwd.hpp | 25 ++ .../events/gtid_tagged_log_body_impl.cpp | 299 ++++++++++++++++-- .../events/gtid_tagged_log_body_impl.hpp | 81 ++++- src/binsrv/storage.cpp | 41 ++- src/binsrv/storage.hpp | 33 ++ 27 files changed, 2253 insertions(+), 58 deletions(-) create mode 100644 mtr/binlog_streaming/r/gtid_renumbering.result create mode 100644 mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result create mode 100644 mtr/binlog_streaming/r/gtid_renumbering_resume.result create mode 100644 mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result create mode 100644 mtr/binlog_streaming/t/gtid_renumbering-master.opt create mode 100644 mtr/binlog_streaming/t/gtid_renumbering.test create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_resume.test create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt create mode 100644 mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test create mode 100644 src/binsrv/events/gtid_renumberer.cpp create mode 100644 src/binsrv/events/gtid_renumberer.hpp create mode 100644 src/binsrv/events/gtid_renumberer_fwd.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 932cfeb..c208b28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -164,6 +164,10 @@ set(source_files src/binsrv/events/gtid_log_post_header_impl_fwd.hpp src/binsrv/events/gtid_log_post_header_impl.hpp + src/binsrv/events/gtid_renumberer_fwd.hpp + src/binsrv/events/gtid_renumberer.hpp + src/binsrv/events/gtid_renumberer.cpp + src/binsrv/events/gtid_tagged_log_body_impl_fwd.hpp src/binsrv/events/gtid_tagged_log_body_impl.hpp src/binsrv/events/gtid_tagged_log_body_impl.cpp diff --git a/mtr/binlog_streaming/r/gtid_renumbering.result b/mtr/binlog_streaming/r/gtid_renumbering.result new file mode 100644 index 0000000..4fb6649 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering.result @@ -0,0 +1,47 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Building deterministic binlog content with two source-binlog +*** rotations between transaction groups. + +*** Source binlog A: CREATE TABLE + 9 INSERTs (10 GTIDs total). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Flushing source binary log (introduces the first +*** source-binlog rotation that the rewriter must absorb). +FLUSH BINARY LOGS; + +*** Source binlog B: 5 INSERTs. + +*** Flushing source binary log one more time (second +*** source-binlog rotation absorbed inside the same local file). +FLUSH BINARY LOGS; + +*** Source binlog C: 3 INSERTs. + +*** Executing the Binlog Server utility in rewrite mode. + +*** Materializing the local binlog file produced by the rewriter +*** and dumping it via mysqlbinlog for textual inspection. + +*** Validating (sequence_number, last_committed) of every GTID +*** event in the local binlog file. Aborts with --die on any +*** invariant violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result b/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result new file mode 100644 index 0000000..abd2418 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_local_rotation.result @@ -0,0 +1,34 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Building a single-source-binlog workload that is large enough +*** to span multiple 1K local binlog files after rewrite. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, payload VARCHAR(100) NOT NULL, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Executing the Binlog Server utility in rewrite mode. + +*** Asserting that the rewriter actually rotated locally +*** (bnlg.000002 must exist - otherwise the test does not +*** exercise the local-rotation reset code path). + +*** Validating per-local-file sequence_number reset and +*** last_committed range. Aborts with --die on violation; +*** produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_resume.result b/mtr/binlog_streaming/r/gtid_renumbering_resume.result new file mode 100644 index 0000000..107c847 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_resume.result @@ -0,0 +1,50 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Phase 1: CREATE TABLE + 4 INSERTs in source binlog A +*** (5 GTID events). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** First binsrv invocation: writes 5 GTID events with +*** sequence_number 1..5 to bnlg.000001 and persists the +*** renumberer recovery snapshot (next_local_seq=5) in the +*** per-binlog metadata file. + +*** Flushing source binary log between invocations so the +*** second binsrv run also has to absorb a source-side +*** rotation right after resuming. +FLUSH BINARY LOGS; + +*** Phase 2: 5 INSERTs in source binlog B (5 more GTID events). + +*** Second binsrv invocation: must resume the renumberer from +*** the persisted snapshot (next_local_seq=5) and append +*** sequence_number 6..10 to bnlg.000001. + +*** Materializing the local binlog file produced across the two +*** invocations and dumping it via mysqlbinlog for textual +*** inspection. + +*** Validating that sequence_number is gap-free across the +*** binsrv-invocation boundary and that the post-resume +*** source-binlog-boundary event has last_committed = +*** sequence_number - 1. Aborts with --die on any invariant +*** violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result b/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result new file mode 100644 index 0000000..d868470 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_renumbering_resume_after_partial.result @@ -0,0 +1,78 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Phase 1: T_first = CREATE TABLE t1 (1 GTID event, no +*** Write_rows). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** First binsrv invocation: writes T_first with +*** sequence_number=1 to bnlg.000001 and persists +*** next_local_seq=1 in the per-binlog metadata. + +*** Phase 2: arm the source dump thread to pause AFTER +*** sending the first WRITE_ROWS event of any transaction. +*** Combined with the short binsrv read_timeout, this freezes +*** the second binsrv invocation mid-T_third (after T_third's +*** GTID has been processed by binsrv but before T_third's +*** XID can be received). +SET @old_global_debug = @@global.debug; +SET GLOBAL DEBUG = '+d,dump_thread_wait_after_send_write_rows'; + +*** Workload that the second binsrv invocation will read: +*** T_second (CREATE TABLE t2, no Write_rows; will be +*** absorbed in full) and T_third (INSERT INTO t1, has +*** Write_rows; binsrv will receive its GTID and prefix +*** events and then time out waiting for the rest). +CREATE TABLE t2(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +INSERT INTO t1 VALUES(DEFAULT); + +*** Second binsrv invocation: expected to exit with non-zero +*** status after read_timeout. The storage destructor flushes +*** the COMMITTED transaction (T_second) and persists the +*** per-binlog metadata. The renumberer recovery snapshot +*** persisted here MUST be next_local_seq=2 (committed, in +*** lockstep with the flushed bytes) and not the speculative +*** next_local_seq=3 produced by T_third's GTID rewrite. + +*** Releasing the paused source dump thread: SIGNAL wakes it +*** from the debug_sync wait, after which its first attempt +*** to send the next event fails on the half-closed socket +*** and the dump thread exits. DEBUG flag is then restored so +*** the new dump thread spawned by binsrv 3 does not pause. +SET DEBUG_SYNC = 'now SIGNAL signal.continue'; +SET GLOBAL DEBUG = @old_global_debug; +SET DEBUG_SYNC = 'RESET'; + +*** Third binsrv invocation: the source resends T_third +*** (binsrv never acknowledged it), and the renumberer +*** continues from the persisted committed snapshot. T_third +*** must land as sequence_number=3 so bnlg.000001 ends up +*** gap-free. + +*** Materializing bnlg.000001 produced across all three +*** invocations and dumping it via mysqlbinlog for textual +*** inspection. + +*** Validating that bnlg.000001 contains exactly 3 GTID +*** events with contiguous sequence_numbers 1, 2, 3 and +*** that every last_committed value references an in-file +*** sequence_number. Aborts with --die on any invariant +*** violation; produces no output otherwise. + +*** Removing temporary files. + +*** Dropping the tables. +DROP TABLE t1, t2; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/gtid_renumbering-master.opt b/mtr/binlog_streaming/t/gtid_renumbering-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering.test b/mtr/binlog_streaming/t/gtid_renumbering.test new file mode 100644 index 0000000..b0fd2f2 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering.test @@ -0,0 +1,206 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# This test exercises the rewrite-mode GTID renumberer. In rewrite mode +# the binlog server discards the source's ROTATE / FORMAT_DESCRIPTION / +# PREVIOUS_GTIDS / STOP events and coalesces transactions into local +# binlog files of a configurable size. Two consequences: +# 1. Multiple source binlogs may be folded into a single local file. +# The source resets sequence_number to 1 at every rotation, which +# would produce duplicate / non-monotone sequence_number values in +# our local file unless the rewriter renumbers them. +# 2. The first event of each new source-file segment carries +# last_committed=0 from the source, relying on the implicit +# synchronization guarantee that every transaction of the previous +# source file commits before any transaction of the next file +# starts. After coalescing, that physical boundary is gone, so the +# rewriter must replace last_committed=0 with sequence_number-1 to +# preserve the cross-file commit ordering on the consumer side. +# +# We craft three source binlogs (10 + 5 + 3 GTIDs) coalesced into a +# single local file (1G rewrite_file_size) and verify that: +# (a) sequence_number starts at 1 and advances by exactly 1 per event +# (renumbering covers all source-side rotations the rewriter +# absorbed - without the renumberer we would see duplicate +# sequence_number=1 values from each source file); +# (b) last_committed is in [0, sequence_number - 1] for every event; +# (c) the cross-source-file boundary events (events #11 and #16 - +# the first transactions from source binlogs B and C) carry +# a non-zero last_committed (specifically, sequence_number - 1). +# Without the rewriter's segment-boundary fix these would inherit +# the source's verbatim last_committed=0 and let the local +# applier reorder them ahead of the previous source file's +# commits. This assertion is robust regardless of the source's +# binlog_transaction_dependency_tracking mode. + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +--let $binsrv_checkpoint_size = 1 +# rewrite_file_size set high enough that all transactions land in a +# single local file - the point of this test is to exercise multiple +# source-file boundaries WITHIN the same local file +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Building deterministic binlog content with two source-binlog +--echo *** rotations between transaction groups. +--echo +--echo *** Source binlog A: CREATE TABLE + 9 INSERTs (10 GTIDs total). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +--disable_query_log +--let $i = 1 +while ($i <= 9) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Flushing source binary log (introduces the first +--echo *** source-binlog rotation that the rewriter must absorb). +FLUSH BINARY LOGS; + +--echo +--echo *** Source binlog B: 5 INSERTs. +--disable_query_log +--let $i = 1 +while ($i <= 5) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Flushing source binary log one more time (second +--echo *** source-binlog rotation absorbed inside the same local file). +FLUSH BINARY LOGS; + +--echo +--echo *** Source binlog C: 3 INSERTs. +--disable_query_log +--let $i = 1 +while ($i <= 3) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility in rewrite mode. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing the local binlog file produced by the rewriter +--echo *** and dumping it via mysqlbinlog for textual inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating (sequence_number, last_committed) of every GTID +--echo *** event in the local binlog file. Aborts with --die on any +--echo *** invariant violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # Source A contributes 10 GTIDs (CREATE TABLE + 9 INSERTs), source B + # contributes 5, source C contributes 3 - 18 events in total. + my $expected_count = 18; + die "expected exactly $expected_count GTID events in the local binlog " + . "file, got " . scalar @events + unless @events == $expected_count; + + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + + # Property (a): per-local-file sequence_number must restart at 1 + # and advance gap-free by exactly 1 per event - this is what + # protects us from the source restarting its sequence_number at + # every rotation that we absorb. + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got $events[$i]{sn}" + unless $events[$i]{sn} == $expected_sn; + + # Property (b): last_committed must reference an in-file + # sequence_number, i.e. lie in [0, sequence_number - 1]. A value + # >= sequence_number would point to the future; a negative value + # is impossible on the wire and would indicate a corrupt rewrite. + die "last_committed out of range at sequence_number=$events[$i]{sn}: " + . "got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } + + # Property (c): every cross-source-file boundary event must carry + # last_committed = sequence_number - 1. By protocol the first event + # of each source binlog has source-side last_committed=0; if the + # rewriter's segment-boundary fix is missing, that 0 propagates + # verbatim into the local file. With the fix the value is replaced + # with the immediately preceding local sequence_number, restoring + # the source's "rotation = synchronization point" semantics that + # would otherwise be lost when we coalesce A, B, C into one local + # file. + # + # Source A contributes 10 GTIDs (CREATE TABLE + 9 INSERTs), source B + # contributes 5, source C contributes 3 - boundary events are + # therefore at sequence_number 11 (first of B) and 16 (first of C). + for my $boundary_seq (11, 16) { + my $idx = $boundary_seq - 1; + my $expected_lc = $boundary_seq - 1; + die "boundary-fix violation at sequence_number=$boundary_seq: " + . "expected last_committed=$expected_lc, got " + . "last_committed=$events[$idx]{lc}" + unless $events[$idx]{lc} == $expected_lc; + } +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test new file mode 100644 index 0000000..ba30a26 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_local_rotation.test @@ -0,0 +1,142 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion test to gtid_renumbering.test exercising the OPPOSITE +# side of the rewrite-mode renumberer: a single source binlog whose +# transactions get split across multiple local binlog files because +# the configured rewrite_file_size is much smaller than the total +# stream size. The renumberer must: +# 1. restart sequence_number at 1 in every new local file (otherwise +# sequence_number would keep growing across local files, leaving +# the consumer with a logical clock the local PREVIOUS_GTIDS +# header cannot describe); +# 2. only ever emit last_committed values that reference an event +# already present in the SAME local file (i.e. < sequence_number). +# +# We verify (1) and (2) on the first two local files; observing local +# rotation at all is what makes this test meaningful, so we also +# require that bnlg.000002 actually got created. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +--source ../include/identify_storage_backend.inc + +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +--let $binsrv_checkpoint_size = 1 +# small rewrite_file_size forces frequent local rotations +--let $binsrv_rewrite_file_size = 1K +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Building a single-source-binlog workload that is large enough +--echo *** to span multiple 1K local binlog files after rewrite. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, payload VARCHAR(100) NOT NULL, PRIMARY KEY(id)) ENGINE=InnoDB; + +--disable_query_log +--let $i = 1 +while ($i <= 60) +{ + INSERT INTO t1 (payload) VALUES (REPEAT('x', 80)); + --inc $i +} +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility in rewrite mode. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Asserting that the rewriter actually rotated locally +--echo *** (bnlg.000002 must exist - otherwise the test does not +--echo *** exercise the local-rotation reset code path). +--let $first_local_path = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.bnlg.000001 +--let $second_local_path = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.bnlg.000002 +if ($storage_backend == file) +{ + --file_exists $binsrv_storage_path/bnlg.000002 + --copy_file $binsrv_storage_path/bnlg.000001 $first_local_path + --copy_file $binsrv_storage_path/bnlg.000002 $second_local_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $first_local_path > /dev/null + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000002 $second_local_path > /dev/null +} + +--echo +--echo *** Validating per-local-file sequence_number reset and +--echo *** last_committed range. Aborts with --die on violation; +--echo *** produces no output otherwise. + +--let FIRST_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.first.dump +--let SECOND_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_local_rotation.second.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $first_local_path > $FIRST_DUMP +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $second_local_path > $SECOND_DUMP + +--perl + use strict; + use warnings; + + sub parse_gtid_dump { + my ($path) = @_; + open(my $fh, '<', $path) or die "Failed to open $path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + return @events; + } + + sub validate_local_file { + my ($label, $events_ref) = @_; + my @events = @$events_ref; + die "$label: no GTID events found in dump" unless @events > 0; + + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + + # sequence_number must restart at 1 and grow gap-free in EACH + # local file + die "$label: sequence_number gap or duplicate at event index $i: " + . "expected $expected_sn, got $events[$i]{sn}" + unless $events[$i]{sn} == $expected_sn; + + # last_committed must always reference an in-file + # sequence_number + die "$label: last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 + && $events[$i]{lc} < $events[$i]{sn}; + } + } + + my @first = parse_gtid_dump($ENV{'FIRST_DUMP'}); + my @second = parse_gtid_dump($ENV{'SECOND_DUMP'}); + + validate_local_file("bnlg.000001", \@first); + validate_local_file("bnlg.000002", \@second); +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $FIRST_DUMP +--remove_file $SECOND_DUMP +--remove_file $first_local_path +--remove_file $second_local_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume.test b/mtr/binlog_streaming/t/gtid_renumbering_resume.test new file mode 100644 index 0000000..3c3b841 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume.test @@ -0,0 +1,202 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion test to gtid_renumbering.test exercising the +# RESUME / RESTART path of the rewrite-mode renumberer. +# +# In rewrite mode the renumberer maintains an in-memory +# next_local_seq counter that allocates sequence_number values inside +# the current local binlog file. When the binsrv utility exits while +# a local file is still open and is later restarted (or when a +# reconnect drops the in-process state), the counter must continue +# forward instead of resetting to 1 - otherwise the post-resume +# events would collide with the sequence_number values already +# emitted into the same local file. +# +# The implementation persists next_local_seq (and last_emitted_offset) +# in the per-binlog .json metadata file at every checkpoint flush. +# On the next binsrv invocation, the storage layer hands that +# snapshot back to the renumberer via resume_in_existing_local_file() +# right after storage construction. +# +# This test verifies the property end-to-end: +# 1. Run binsrv against a partially-populated source, producing N +# GTID events with sequence_number 1..N in bnlg.000001. +# 2. Generate more transactions on the source, with a FLUSH BINARY +# LOGS in between so the second invocation also has to absorb +# a source-side rotation right after resume. +# 3. Run binsrv a second time. Expected: bnlg.000001 now contains +# N+M events with sequence_number 1..N+M (gap-free, no +# duplicates). +# 4. Cross-source-file boundary event at index N must carry +# last_committed = N (not 0): the segment-boundary fix has to +# keep working across a resume. +# +# Without the persisted-renumberer-state recovery, step 3 would +# instead produce sequence_number 1..N followed by 1..M - which the +# perl validator catches as a sequence_number collision. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +# checkpoint_size = 1 forces a metadata flush after every transaction +# boundary, so the renumberer recovery snapshot persisted to the +# metadata file is always in lockstep with the binlog content. With a +# larger checkpoint_size the test would still work for graceful exits +# but would be racy under abrupt termination - we want the strongest +# guarantee here. +--let $binsrv_checkpoint_size = 1 +# rewrite_file_size set high enough that all transactions land in a +# single local file - the point of this test is that the renumberer's +# counter MUST continue forward across binsrv invocations within the +# same local file, not that we exercise local rotation +# (gtid_renumbering_local_rotation.test covers that). +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Phase 1: CREATE TABLE + 4 INSERTs in source binlog A +--echo *** (5 GTID events). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +--disable_query_log +--let $i = 1 +while ($i <= 4) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** First binsrv invocation: writes 5 GTID events with +--echo *** sequence_number 1..5 to bnlg.000001 and persists the +--echo *** renumberer recovery snapshot (next_local_seq=5) in the +--echo *** per-binlog metadata file. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing source binary log between invocations so the +--echo *** second binsrv run also has to absorb a source-side +--echo *** rotation right after resuming. +FLUSH BINARY LOGS; + +--echo +--echo *** Phase 2: 5 INSERTs in source binlog B (5 more GTID events). +--disable_query_log +--let $i = 1 +while ($i <= 5) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $i +} +--enable_query_log + +--echo +--echo *** Second binsrv invocation: must resume the renumberer from +--echo *** the persisted snapshot (next_local_seq=5) and append +--echo *** sequence_number 6..10 to bnlg.000001. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing the local binlog file produced across the two +--echo *** invocations and dumping it via mysqlbinlog for textual +--echo *** inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering_resume.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_resume.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating that sequence_number is gap-free across the +--echo *** binsrv-invocation boundary and that the post-resume +--echo *** source-binlog-boundary event has last_committed = +--echo *** sequence_number - 1. Aborts with --die on any invariant +--echo *** violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # Phase 1 contributes 5 GTIDs (CREATE TABLE + 4 INSERTs); phase 2 + # contributes 5 INSERTs - 10 events in total, all in bnlg.000001. + my $expected_count = 10; + die "expected exactly $expected_count GTID events in the local " + . "binlog file, got " . scalar @events + unless @events == $expected_count; + + # Property 1: sequence_number must be gap-free across the + # invocation boundary - the renumberer in the second invocation + # must start at 6, not at 1. A regression of the persisted-state + # recovery would surface as duplicate sequence_number values + # (1..5 from each invocation). + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got " + . "$events[$i]{sn} (resume regression: renumberer most " + . "likely restarted at 1 instead of resuming from 6)" + unless $events[$i]{sn} == $expected_sn; + + # Property 2: last_committed must reference an in-file + # sequence_number, i.e. lie in [0, sequence_number - 1]. + die "last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } + + # Property 3: the very first event of source binlog B (the first + # event the SECOND binsrv invocation writes) is the cross-source- + # file boundary; its source-side last_committed is 0 by protocol + # and the renumberer must replace it with sequence_number - 1 to + # preserve the cross-rotation synchronization point. The boundary + # event lies at sequence_number = 6 (first event of phase 2). + my $boundary_seq = 6; + my $idx = $boundary_seq - 1; + my $expected_lc = $boundary_seq - 1; + die "boundary-fix violation at sequence_number=$boundary_seq " + . "(post-resume): expected last_committed=$expected_lc, got " + . "last_committed=$events[$idx]{lc}" + unless $events[$idx]{lc} == $expected_lc; +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test new file mode 100644 index 0000000..2cbbf48 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_renumbering_resume_after_partial.test @@ -0,0 +1,221 @@ +--source ../include/have_binsrv.inc + +--source include/have_debug_sync.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# Companion to gtid_renumbering_resume.test exercising the +# DURABILITY of the persisted renumberer recovery snapshot under a +# mid-transaction disconnect in rewrite mode. +# +# In rewrite mode the renumberer's per-file sequence_number counter +# is bumped speculatively the moment a GTID event is rewritten, but +# it is only "committed" (and mirrored into the per-binlog .json +# metadata) at the corresponding transaction boundary. If a connection +# drops between a GTID rewrite and the matching transaction boundary, +# the partial transaction is discarded and the renumberer must be +# rolled back to the previously committed state - otherwise the +# snapshot persisted alongside the durable bytes would point past the +# end of those bytes, and the next binsrv invocation would re-allocate +# a sequence_number for the resent transaction, leaving a permanent +# gap in the local sequence_number stream. +# +# The scenario: +# 1. binsrv processes T_first (CREATE TABLE t1) cleanly. Disk: +# bnlg.000001 contains the GTID event with sequence_number=1 +# and the metadata records next_local_seq=1. +# 2. The source-side dump thread is armed with +# 'dump_thread_wait_after_send_write_rows'. From now on it +# will pause right after sending the first WRITE_ROWS event +# of any transaction. +# 3. The source produces T_second (CREATE TABLE t2, no +# Write_rows) and T_third (INSERT INTO t1, has Write_rows). +# 4. The second binsrv invocation absorbs T_second in full but +# freezes mid-T_third (after T_third's GTID has already +# advanced the renumberer's speculative counter from 2 to 3). +# It then hits read_timeout and exits with a non-zero code. +# The storage destructor flushes whatever was complete in the +# in-memory buffer - i.e. T_second - and writes the per-binlog +# .json metadata. +# 5. The persisted snapshot at this point MUST carry +# next_local_seq=2 (the committed value, matching T_second +# which is the last GTID actually flushed to disk). With a +# regression of the commit/rollback semantics the snapshot +# would instead carry the speculative next_local_seq=3 - a +# value that points past the durable bytes. +# 6. The third binsrv invocation re-reads the metadata, seeds +# the renumberer accordingly and re-receives T_third (which +# the source resends because binsrv never acknowledged it). +# With the fix T_third lands as sequence_number=3 and +# bnlg.000001 ends up with the contiguous sequence +# 1, 2, 3. Without the fix it lands as sequence_number=4 and +# the perl validator at the end of this test catches the gap. + +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +# Short read_timeout to make the second binsrv invocation exit +# quickly once the source dump thread freezes mid-transaction. +--let $binsrv_read_timeout = 5 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +# Large checkpoint_size so flushes happen only at end-of-stream +# (storage destructor). With this configuration multiple complete +# transactions can stay buffered alongside an in-flight one - the +# layout that exposes the stale-snapshot bug when the disconnect +# happens mid-transaction. +--let $binsrv_checkpoint_size = 1G +# Single local file for the entire test - the property under test +# is the in-file recovery snapshot, not local rotation. +--let $binsrv_rewrite_file_size = 1G +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** Phase 1: T_first = CREATE TABLE t1 (1 GTID event, no +--echo *** Write_rows). +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** First binsrv invocation: writes T_first with +--echo *** sequence_number=1 to bnlg.000001 and persists +--echo *** next_local_seq=1 in the per-binlog metadata. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Phase 2: arm the source dump thread to pause AFTER +--echo *** sending the first WRITE_ROWS event of any transaction. +--echo *** Combined with the short binsrv read_timeout, this freezes +--echo *** the second binsrv invocation mid-T_third (after T_third's +--echo *** GTID has been processed by binsrv but before T_third's +--echo *** XID can be received). +SET @old_global_debug = @@global.debug; +SET GLOBAL DEBUG = '+d,dump_thread_wait_after_send_write_rows'; + +--echo +--echo *** Workload that the second binsrv invocation will read: +--echo *** T_second (CREATE TABLE t2, no Write_rows; will be +--echo *** absorbed in full) and T_third (INSERT INTO t1, has +--echo *** Write_rows; binsrv will receive its GTID and prefix +--echo *** events and then time out waiting for the rest). +CREATE TABLE t2(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +INSERT INTO t1 VALUES(DEFAULT); + +--echo +--echo *** Second binsrv invocation: expected to exit with non-zero +--echo *** status after read_timeout. The storage destructor flushes +--echo *** the COMMITTED transaction (T_second) and persists the +--echo *** per-binlog metadata. The renumberer recovery snapshot +--echo *** persisted here MUST be next_local_seq=2 (committed, in +--echo *** lockstep with the flushed bytes) and not the speculative +--echo *** next_local_seq=3 produced by T_third's GTID rewrite. +--error 1 +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Releasing the paused source dump thread: SIGNAL wakes it +--echo *** from the debug_sync wait, after which its first attempt +--echo *** to send the next event fails on the half-closed socket +--echo *** and the dump thread exits. DEBUG flag is then restored so +--echo *** the new dump thread spawned by binsrv 3 does not pause. +SET DEBUG_SYNC = 'now SIGNAL signal.continue'; +SET GLOBAL DEBUG = @old_global_debug; + +let $wait_condition = SELECT COUNT(*) = 0 FROM information_schema.processlist WHERE COMMAND IN ('Binlog Dump', 'Binlog Dump GTID'); +--source include/wait_condition.inc + +SET DEBUG_SYNC = 'RESET'; + +--echo +--echo *** Third binsrv invocation: the source resends T_third +--echo *** (binsrv never acknowledged it), and the renumberer +--echo *** continues from the persisted committed snapshot. T_third +--echo *** must land as sequence_number=3 so bnlg.000001 ends up +--echo *** gap-free. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Materializing bnlg.000001 produced across all three +--echo *** invocations and dumping it via mysqlbinlog for textual +--echo *** inspection. +--let $local_binlog_path = $MYSQL_TMP_DIR/gtid_renumbering_resume_after_partial.bnlg.000001 +if ($storage_backend == file) +{ + --copy_file $binsrv_storage_path/bnlg.000001 $local_binlog_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 cp s3://$aws_s3_bucket$binsrv_storage_path/bnlg.000001 $local_binlog_path > /dev/null +} + +--let GTID_DUMP = $MYSQL_TMP_DIR/gtid_renumbering_resume_after_partial.dump +--exec $MYSQL_BINLOG --base64-output=DECODE-ROWS $local_binlog_path > $GTID_DUMP + +--echo +--echo *** Validating that bnlg.000001 contains exactly 3 GTID +--echo *** events with contiguous sequence_numbers 1, 2, 3 and +--echo *** that every last_committed value references an in-file +--echo *** sequence_number. Aborts with --die on any invariant +--echo *** violation; produces no output otherwise. + +--perl + use strict; + use warnings; + + my $dump_path = $ENV{'GTID_DUMP'}; + open(my $fh, '<', $dump_path) or die "Failed to open $dump_path: $!"; + my @events; + while (my $line = <$fh>) { + if ($line =~ /last_committed=(\d+).*?sequence_number=(\d+)/) { + push @events, { lc => $1 + 0, sn => $2 + 0 }; + } + } + close($fh); + + # T_first (CREATE TABLE t1), T_second (CREATE TABLE t2), and the + # resent T_third (INSERT INTO t1) - 3 GTID events in total, all + # in bnlg.000001. + my $expected_count = 3; + die "expected exactly $expected_count GTID events in the local " + . "binlog file, got " . scalar @events + unless @events == $expected_count; + + # Property: sequence_number must be gap-free across the + # mid-transaction disconnect. A regression of the commit/rollback + # of the renumberer's recovery snapshot would surface as + # sequence_number=4 (instead of 3) at index 2 - the persisted + # snapshot would have advanced past the discarded T_third and + # caused the resent T_third to be allocated next_local_seq+1 + # twice. + for my $i (0 .. $#events) { + my $expected_sn = $i + 1; + die "sequence_number gap or duplicate at event index $i: " + . "expected sequence_number=$expected_sn, got " + . "$events[$i]{sn} (regression: persisted recovery info " + . "advanced past the discarded transaction's bytes)" + unless $events[$i]{sn} == $expected_sn; + + die "last_committed out of range at sequence_number=" + . "$events[$i]{sn}: got last_committed=$events[$i]{lc}" + unless $events[$i]{lc} >= 0 && $events[$i]{lc} < $events[$i]{sn}; + } +EOF + +--echo +--echo *** Removing temporary files. +--remove_file $GTID_DUMP +--remove_file $local_binlog_path + +--echo +--echo *** Dropping the tables. +DROP TABLE t1, t2; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index c47bffc..4a30122 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -64,6 +64,7 @@ #include "binsrv/events/common_header_flag_type.hpp" #include "binsrv/events/event.hpp" #include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_renumberer.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" @@ -285,6 +286,26 @@ void log_storage_info(binsrv::basic_logger &logger, logger.log(binsrv::log_severity::info, msg); } +// Seeds `renumberer` from the recovery snapshot persisted in the +// current binlog file's metadata, if applicable. No-op when not in +// rewrite mode or when storage is empty (nothing to resume from). +// Extracted out of main() to keep its cognitive complexity within +// the project-wide threshold. +void seed_renumberer_from_recovery_snapshot( + binsrv::events::gtid_renumberer &renumberer, const binsrv::storage &storage, + const binsrv::optional_rewrite_config &optional_rewrite_config, + binsrv::basic_logger &logger) { + if (!optional_rewrite_config.has_value() || storage.is_empty()) { + return; + } + const auto recovery_info{storage.get_current_renumberer_recovery_info()}; + renumberer.resume_in_existing_local_file(recovery_info.next_local_seq, + recovery_info.last_emitted_offset); + logger.log(binsrv::log_severity::info, + "rewrite: resuming renumberer at next sequence_number=" + + std::to_string(recovery_info.next_local_seq + 1)); +} + void log_library_info(binsrv::basic_logger &logger, const easymysql::library &mysql_lib) { std::string msg{}; @@ -486,10 +507,22 @@ void process_rotate_or_stop_event(binsrv::basic_logger &logger, "storage: closed binlog file: " + old_binlog_name); } -void process_binlog_event(const binsrv::events::event_view ¤t_event_v, - binsrv::basic_logger &logger, - binsrv::events::reader_context &context, - binsrv::storage &storage) { +// `renumberer`, if non-null, lets process_binlog_event() promote the +// renumberer's speculative state to committed (and mirror it into +// storage's per-binlog recovery info) right before storage.write_event +// is called. This is the moment the streaming pipeline detects a +// transaction boundary AND the upcoming write_event call may trigger a +// flush + save_binlog_metadata - i.e. the only spot where the .json +// recovery snapshot is durably advanced. Synthetic events generated in +// rewrite mode (artificial ROTATE, FDE, PREVIOUS_GTIDS, ...) and the +// non-rewrite-mode call from receive_binlog_events() pass nullptr; the +// renumberer's speculative counter is left untouched until the next +// genuine GTID transaction reaches its boundary. +void process_binlog_event( + const binsrv::events::event_view ¤t_event_v, + binsrv::basic_logger &logger, binsrv::events::reader_context &context, + binsrv::storage &storage, + binsrv::events::gtid_renumberer *renumberer = nullptr) { const auto current_common_header_v{current_event_v.get_common_header_view()}; const auto readable_flags{current_common_header_v.get_readable_flags()}; logger.log(binsrv::log_severity::info, @@ -533,6 +566,26 @@ void process_binlog_event(const binsrv::events::event_view ¤t_event_v, process_artificial_rotate_event(current_event_v, logger, storage); } + // In rewrite mode, this is the only point where the renumberer's + // committed snapshot is advanced and mirrored into storage's + // per-binlog recovery info. Doing it BEFORE storage.write_event is + // critical: write_event may trigger a checkpoint flush which calls + // save_binlog_metadata(), so the snapshot has to be in lockstep + // with the bytes about to be flushed. Doing it ONLY at transaction + // boundaries (instead of on every GTID event) is what keeps the + // persisted snapshot consistent under mid-transaction disconnects: + // if the connection drops between a GTID event and its commit, the + // renumberer's speculative bump is rolled back during the discard + // path (see rollback_to_committed() at the discard call site), and + // no flush in between had a chance to persist that speculative + // bump. + if (renumberer != nullptr && context.is_at_transaction_boundary()) { + renumberer->commit_pending_changes(); + storage.update_renumberer_recovery_info(binsrv::renumberer_recovery_info{ + .next_local_seq = renumberer->peek_next_local_seq(), + .last_emitted_offset = renumberer->peek_last_emitted_offset()}); + } + // checking if the event needs to be written to the binlog if (!info_only) { storage.write_event(current_event_v.get_portion(), @@ -640,14 +693,21 @@ generate_previous_gtids_log_event(binsrv::events::event_storage &event_buffer, void rewrite_and_process_binlog_event( const binsrv::events::event_view ¤t_event_v, binsrv::basic_logger &logger, binsrv::events::reader_context &context, - binsrv::storage &storage, std::uint32_t server_id, - std::string_view base_file_name, std::uint64_t file_size) { + binsrv::events::gtid_renumberer &renumberer, binsrv::storage &storage, + std::uint32_t server_id, std::string_view base_file_name, + std::uint64_t file_size) { const auto current_common_header_v = current_event_v.get_common_header_view(); const auto code = current_common_header_v.get_type_code(); // for ROTATE (both artificial and non-artificial), FORMAT_DESCRIPTION, // PREVIOUS_GTIDS_LOG, and STOP events we don't have to do anything - - // simply return early from this function + // simply return early from this function. Note we deliberately do not + // notify the renumberer about source-side rotations or FDEs: per the + // MySQL protocol last_committed only references sequence_numbers from + // the same source file, and the renumberer translates last_committed + // through the CURRENT event's offset (new_seq - source_seq), which + // automatically jumps in lockstep with the source's own + // sequence_number reset at file boundaries. if (code == binsrv::events::code_type::format_description || code == binsrv::events::code_type::previous_gtids_log || code == binsrv::events::code_type::rotate || @@ -680,8 +740,23 @@ void rewrite_and_process_binlog_event( // 2. FORMAT_DESCRIPTION // 3. PREVIOUS_GTIDS_LOG - if (context.is_fresh() || (context.is_at_transaction_boundary() && - storage.get_current_position() >= file_size)) { + const bool will_rotate{context.is_fresh() || + (context.is_at_transaction_boundary() && + storage.get_current_position() >= file_size)}; + // We reset the renumberer's sequence_number counter only when we are + // ACTUALLY starting a new local file (i.e. storage was empty, or we + // hit the configured file_size). When `is_fresh()` is true on a + // non-empty storage we are merely resuming the existing local file + // after a reconnect - in that case the renumberer is shared with + // the caller in main() and either still holds the in-memory state + // from before the reconnect, or (on a process restart) was re-seeded + // once from the persisted recovery snapshot right after storage + // construction. Either way, we deliberately do NOT call + // on_local_rotation() here and the counter keeps advancing from + // where the previous connection / process left off. + const bool will_open_fresh_local_file{ + will_rotate && (context.is_fresh() ? storage.is_empty() : true)}; + if (will_rotate) { binsrv::events::event_storage event_buffer; std::uint32_t offset{0U}; @@ -749,14 +824,40 @@ void rewrite_and_process_binlog_event( "rewrite: generated previous gtids log event in the rewrite mode"); process_binlog_event(generated_previous_gtids_log_event_v, logger, context, storage); + + if (will_open_fresh_local_file) { + renumberer.on_local_rotation(); + } } // in rewrite mode we need to update next_event_position (and optional // checksum in the footer) in the received event data portion binsrv::events::event_storage buffer{}; - const auto event_copy_uv{binsrv::events::materialize( + auto event_copy_uv{binsrv::events::materialize( current_event_v, buffer, binsrv::events::materialization_type::force_add_checksum)}; + + // For GTID events we have to overwrite (sequence_number, + // last_committed) so they index into our local file's logical clock + // namespace rather than the source's. The tagged variant may grow or + // shrink the buffer, in which case `event_copy_uv` is reseated onto + // the resized buffer. + // + // The rewrite advances the renumberer's SPECULATIVE counter (its + // committed snapshot is unchanged at this point); the snapshot is + // promoted later, inside process_binlog_event(), once we know the + // transaction has actually reached its boundary. That ordering is + // what keeps the persisted recovery info from getting ahead of the + // durable binlog content: if the connection drops between this + // rewrite and the upcoming transaction boundary, the speculative + // increment is rolled back during the discard path before any + // flush has a chance to capture it (see rollback_to_committed() + // wired up next to discard_incomplete_transaction_events() in + // receive_binlog_events()). + if (binsrv::events::is_gtid_log_event(code)) { + event_copy_uv = renumberer.rewrite_if_gtid_event(event_copy_uv, buffer); + } + { // TODO: optimize redundant checksum recalculation const auto proxy{event_copy_uv.get_write_proxy()}; @@ -764,7 +865,7 @@ void rewrite_and_process_binlog_event( static_cast(storage.get_current_position() + event_copy_uv.get_total_size())); } - process_binlog_event(event_copy_uv, logger, context, storage); + process_binlog_event(event_copy_uv, logger, context, storage, &renumberer); } bool open_connection_and_switch_to_replication( @@ -848,6 +949,7 @@ void receive_binlog_events( binsrv::basic_logger &logger, const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage, + binsrv::events::gtid_renumberer &renumberer, const binsrv::optional_rewrite_config &optional_rewrite_config) { easymysql::connection connection{}; if (!open_connection_and_switch_to_replication( @@ -867,6 +969,17 @@ void receive_binlog_events( storage.get_replication_mode(), storage.get_current_binlog_name().str(), static_cast(storage.get_current_position())}; + // The renumberer is owned by the caller and outlives this function, + // so its in-memory state survives reconnects within a single + // process. We deliberately do NOT touch its counter here: a + // reconnect to the same source merely resumes appending to the + // current local file, and the next allocated sequence_number must + // continue forward from where the previous connection left off. If + // the very first event triggers a will_open_fresh_local_file + // rotation (e.g. file_size threshold reached while we were idle), + // rewrite_and_process_binlog_event() calls on_local_rotation() + // explicitly at the right moment. + bool fetch_result{}; while (!termination_flag.test() && @@ -886,7 +999,7 @@ void receive_binlog_events( // FORMAT_DESCRIPTION, PREVIOUS_GTIDS_LOG, ROTATE (non-artificial), // and STOP events rewrite_and_process_binlog_event( - current_event_v, logger, context, storage, server_id, + current_event_v, logger, context, renumberer, storage, server_id, optional_rewrite_config->get<"base_file_name">(), optional_rewrite_config->get<"file_size">().get_value()); } else { @@ -917,6 +1030,15 @@ void receive_binlog_events( // transaction boundary if (storage.is_in_gtid_replication_mode()) { storage.discard_incomplete_transaction_events(); + // Symmetric rollback: storage just dropped any partial-transaction + // bytes from its event buffer, so we also have to rewind the + // renumberer's speculative state (which may have been bumped by a + // GTID event whose transaction never reached its boundary) back to + // the most recent committed snapshot. Without this rollback, the + // next reconnect would re-process the same source GTID and bump + // next_local_seq AGAIN, leaving a permanent gap in the local + // sequence_number stream. + renumberer.rollback_to_committed(); } // connection termination is a good place to flush any remaining data @@ -1202,6 +1324,29 @@ int main(int argc, char *argv[]) { replication_mode}; log_storage_info(*logger, storage); + // Rewrite-mode GTID renumberer state. Lives alongside `storage` + // so its in-memory counter survives reconnects: every local + // rotation resets the per-file sequence_number counter (via + // on_local_rotation()) and every incoming GTID event's + // logical-clock fields are rewritten before the event is appended + // to the local binlog. Idle (default-constructed) when we are not + // in rewrite mode. + // + // We seed the renumberer ONCE here, immediately after storage is + // loaded, from the recovery snapshot persisted in the current + // binlog file's metadata. This is the only place where the + // snapshot is consulted: subsequent reconnects within the same + // process keep the in-memory state running. The snapshot was last + // written at the previous transaction-boundary checkpoint and is + // in lockstep with `binlog_record.size` (and therefore with the + // bytes already on disk) by construction. For an empty storage + // there is nothing to resume; for legacy metadata without the + // renumberer fields the snapshot defaults to "no prior emissions + // in this file" - documented limitation. + binsrv::events::gtid_renumberer renumberer{}; + seed_renumberer_from_recovery_snapshot(renumberer, storage, + optional_rewrite_config, *logger); + const easymysql::library mysql_lib; logger->log(binsrv::log_severity::info, "initialized mysql client library"); @@ -1209,7 +1354,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, verify_checksum, - storage, optional_rewrite_config); + storage, renumberer, optional_rewrite_config); if (operation_mode == binsrv::operation_mode_type::pull) { std::size_t iteration_number{1U}; @@ -1230,7 +1375,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, - verify_checksum, storage, + verify_checksum, storage, renumberer, optional_rewrite_config); ++iteration_number; } diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp index e7f3c1d..48977e7 100644 --- a/src/binsrv/binlog_file_metadata.cpp +++ b/src/binsrv/binlog_file_metadata.cpp @@ -30,7 +30,14 @@ namespace binsrv { binlog_file_metadata::binlog_file_metadata() - : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {} + : impl_{{expected_binlog_file_metadata_version}, + {}, + {}, + {}, + {}, + {}, + {}, + {}} {} binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp index f03e602..2f70fb9 100644 --- a/src/binsrv/binlog_file_metadata.hpp +++ b/src/binsrv/binlog_file_metadata.hpp @@ -19,6 +19,7 @@ #include "binsrv/binlog_file_metadata_fwd.hpp" // IWYU pragma: export #include +#include #include #include @@ -32,6 +33,16 @@ namespace binsrv { class [[nodiscard]] binlog_file_metadata { private: + // The two `*_renumberer_*` fields persist the rewrite-mode GTID + // renumberer's recovery state for this binlog file. They are stored + // here (rather than re-derived from the binlog file content on + // resume) because every other piece of resume state already lives in + // this metadata record - keeping the renumberer state alongside it + // makes recovery a single read-and-load step. Both are + // std::optional<> so that legacy metadata files written by older + // binlog-server builds (which lacked these fields) load cleanly with + // both values defaulting to std::nullopt; on next save the up-to-date + // values are written back. using impl_type = util::nv_tuple< // clang-format off util::nv<"version", std::uint32_t>, @@ -39,7 +50,10 @@ class [[nodiscard]] binlog_file_metadata { util::nv<"previous_gtids", gtids::optional_gtid_set>, util::nv<"added_gtids", gtids::optional_gtid_set>, util::nv<"min_timestamp", ctime_timestamp>, - util::nv<"max_timestamp", ctime_timestamp> + util::nv<"max_timestamp", ctime_timestamp>, + util::nv<"renumberer_next_local_seq", std::optional>, + util::nv<"renumberer_last_emitted_offset", + std::optional> // clang-format on >; diff --git a/src/binsrv/events/code_type.hpp b/src/binsrv/events/code_type.hpp index e72ea40..e04518b 100644 --- a/src/binsrv/events/code_type.hpp +++ b/src/binsrv/events/code_type.hpp @@ -84,6 +84,16 @@ enum class code_type : std::uint8_t { }; #undef BINSRV_EVENTS_CODE_TYPE_XY_MACRO +// returns true for GTID_LOG, ANONYMOUS_GTID_LOG and GTID_TAGGED_LOG events, +// i.e. all three event types that carry GTID logical-clock fields +// (sequence_number / last_committed) which need to be rewritten when the +// rewrite mode causes our local binlog file boundaries to differ from the +// source's +[[nodiscard]] constexpr bool is_gtid_log_event(code_type code) noexcept { + return code == code_type::gtid_log || code == code_type::anonymous_gtid_log || + code == code_type::gtid_tagged_log; +} + inline std::string_view to_string_view(code_type code) noexcept { using namespace std::string_view_literals; using nv_pair = std::pair; diff --git a/src/binsrv/events/event_view.hpp b/src/binsrv/events/event_view.hpp index ada6f72..e3effd1 100644 --- a/src/binsrv/events/event_view.hpp +++ b/src/binsrv/events/event_view.hpp @@ -201,23 +201,43 @@ class [[nodiscard]] event_updatable_view : private event_view_base { event_updatable_view(const reader_context &context, util::byte_span portion) : event_view_base{context, portion} {} + // Construct a writable view directly from a byte span with explicit + // post_header / footer sizes. Bypasses the reader_context-driven + // validation; used by event reconstructors that materialize events + // from scratch (e.g. the rewrite-mode GTID renumberer, which has to + // emit a re-encoded GTID_TAGGED_LOG_EVENT possibly of a different + // size than the input). + [[nodiscard]] static event_updatable_view + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + from_raw_unchecked(util::byte_span portion, std::size_t post_header_size, + std::size_t footer_size) noexcept { + return event_updatable_view{portion, post_header_size, footer_size}; + } + // clang-format off using event_view_base::get_portion; + using event_view_base::get_updatable_portion; using event_view_base::get_total_size; using event_view_base::calculate_crc; // common header section using event_view_base::get_common_header_size; + using event_view_base::get_common_header_raw; + using event_view_base::get_common_header_view; // post header section using event_view_base::get_post_header_size; + using event_view_base::get_post_header_raw; // body section using event_view_base::get_body_size; + using event_view_base::get_body_raw; // footer section using event_view_base::get_footer_size; using event_view_base::has_footer; + using event_view_base::get_footer_raw; + using event_view_base::get_footer_view; // clang-format on [[nodiscard]] write_proxy get_write_proxy() const { diff --git a/src/binsrv/events/gtid_log_post_header.cpp b/src/binsrv/events/gtid_log_post_header.cpp index 46b56ae..4f4126e 100644 --- a/src/binsrv/events/gtid_log_post_header.cpp +++ b/src/binsrv/events/gtid_log_post_header.cpp @@ -33,6 +33,7 @@ #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" @@ -141,6 +142,20 @@ gtid_log_post_header::get_flags() const noexcept { return {get_uuid(), get_gno()}; } +void overwrite_logical_clock_in_post_header_raw(util::byte_span post_header_raw, + std::int64_t last_committed, + std::int64_t sequence_number) { + if (std::size(post_header_raw) != gtid_log_post_header::size_in_bytes) { + util::exception_location().raise( + "invalid gtid_log post header length for in-place logical clock " + "overwrite"); + } + auto remainder = post_header_raw.subspan( + gtid_log_post_header::last_committed_offset_in_bytes); + util::insert_fixed_int_to_byte_span(remainder, last_committed); + util::insert_fixed_int_to_byte_span(remainder, sequence_number); +} + std::ostream &operator<<(std::ostream &output, const gtid_log_post_header &obj) { return output << "flags: " << obj.get_readable_flags() diff --git a/src/binsrv/events/gtid_log_post_header.hpp b/src/binsrv/events/gtid_log_post_header.hpp index f50c9c0..0b13c41 100644 --- a/src/binsrv/events/gtid_log_post_header.hpp +++ b/src/binsrv/events/gtid_log_post_header.hpp @@ -35,6 +35,17 @@ class [[nodiscard]] gtid_log_post_header { public: static constexpr std::size_t size_in_bytes{42U}; + // byte offsets (within the 42-byte post header) of the two logical-clock + // fields that the rewrite-mode renumberer has to overwrite in place + // flags 1B @ 0 + // SID 16B @ 1 + // GNO 8B @ 17 + // lt_type 1B @ 25 + // last_committed 8B @ 26 + // sequence_number 8B @ 34 + static constexpr std::size_t last_committed_offset_in_bytes{26U}; + static constexpr std::size_t sequence_number_offset_in_bytes{34U}; + // https://github.com/mysql/mysql-server/blob/mysql-8.0.43/libbinlogevents/include/control_events.h#L1091 // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1202 static constexpr std::uint8_t expected_logical_ts_code{2U}; @@ -83,6 +94,14 @@ class [[nodiscard]] gtid_log_post_header { std::int64_t sequence_number_{}; // 5 }; +// Overwrites the (last_committed, sequence_number) pair in a raw 42-byte +// gtid_log post-header byte span. Used by the rewrite-mode GTID renumberer +// to retarget the parallel-applier dependency graph onto our local binlog +// file boundaries. The overall event size is unchanged. +void overwrite_logical_clock_in_post_header_raw(util::byte_span post_header_raw, + std::int64_t last_committed, + std::int64_t sequence_number); + } // namespace binsrv::events #endif // BINSRV_EVENTS_GTID_LOG_POST_HEADER_HPP diff --git a/src/binsrv/events/gtid_renumberer.cpp b/src/binsrv/events/gtid_renumberer.cpp new file mode 100644 index 0000000..ff69c04 --- /dev/null +++ b/src/binsrv/events/gtid_renumberer.cpp @@ -0,0 +1,298 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/events/gtid_renumberer.hpp" + +#include +#include +#include +#include +#include +#include +#include + +// `event_storage` (used by `rewrite_tagged_gtid_event()` below via the +// `assign()` member) is an alias for boost::container::small_vector<>, +// declared in event_fwd.hpp via container_fwd.hpp - so we need the full +// definition here in the .cpp. +#include // IWYU pragma: keep + +#include "binsrv/events/code_type.hpp" +#include "binsrv/events/common_header_view.hpp" +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_log_post_header.hpp" +#include "binsrv/events/gtid_tagged_log_body_impl.hpp" + +#include "util/byte_span.hpp" +#include "util/exception_location_helpers.hpp" + +namespace binsrv::events { + +namespace { + +// Defensive cap on the number of fixpoint iterations needed to converge +// on the tagged-event transaction_length. Each iteration changes the +// encoded width of either serializable_field_size or transaction_length, +// both of which are bounded to {1,2,3,4,5,9} bytes; in practice 2-3 +// iterations are sufficient. The cap is set high enough that it can only +// trigger in genuine logic bugs. +constexpr int max_tagged_fixpoint_iterations{16}; + +} // namespace + +void gtid_renumberer::on_local_rotation() noexcept { + next_local_seq_ = 0; + last_emitted_offset_.reset(); + committed_next_local_seq_ = 0; + committed_last_emitted_offset_.reset(); +} + +void gtid_renumberer::resume_in_existing_local_file( + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + std::int64_t next_local_seq, + std::optional last_emitted_offset) noexcept { + next_local_seq_ = next_local_seq; + last_emitted_offset_ = last_emitted_offset; + committed_next_local_seq_ = next_local_seq; + committed_last_emitted_offset_ = last_emitted_offset; +} + +void gtid_renumberer::commit_pending_changes() noexcept { + committed_next_local_seq_ = next_local_seq_; + committed_last_emitted_offset_ = last_emitted_offset_; +} + +void gtid_renumberer::rollback_to_committed() noexcept { + next_local_seq_ = committed_next_local_seq_; + last_emitted_offset_ = committed_last_emitted_offset_; +} + +[[nodiscard]] std::pair +// NOLINTNEXTLINE(bugprone-easily-swappable-parameters) +gtid_renumberer::translate_logical_clock(std::int64_t source_seq, + std::int64_t source_lc) noexcept { + // Allocate a fresh local sequence_number for this transaction. + ++next_local_seq_; + const std::int64_t new_seq{next_local_seq_}; + const std::int64_t offset{new_seq - source_seq}; + + // Translate last_committed. + // + // The general rule is "candidate = source_lc + offset", which works + // because within a single source file the offset stays constant + // (both source_seq and new_seq advance by exactly 1 per event) and + // last_committed always references a sequence_number from the same + // source file. So adding the current event's offset to source_lc + // lands exactly on the local_seq we previously issued for the + // dependency target. + // + // There are two cases where that translation is not appropriate: + // + // 1) `last_emitted_offset_` differs from `offset` (or is empty). + // A different offset means the previous event and this one + // belong to different source-file segments in the local file + // (offset is invariant within a source file, so a change is + // proof of a boundary). On the source side that boundary is a + // synchronization point - all of the previous file's + // transactions must commit before any of the new file's start + // applying - and the source encodes that by RESETTING its + // logical clock so the first event has source_lc == 0. In the + // local file we no longer have a physical boundary, so emitting + // last_committed == 0 verbatim would let the local applier + // reorder this txn before the previous segment's last commit. + // To restore the barrier we set new_lc = (new_seq - 1), which + // transitively chains through prior local txns and forces this + // one to commit after everything that came before it. For the + // very first txn in the local file (new_seq == 1) the formula + // yields 0, i.e. parallel-safe - which is correct because there + // is no prior local txn to serialize against. + // The "empty last_emitted_offset_" case (fresh local file or + // just after on_local_rotation()) is handled identically; from + // the local file's point of view its first event is by + // definition the start of a new segment. + // + // 2) source_lc != 0 inside a segment but the candidate falls out + // of range (candidate <= 0). The dependency target is in an + // earlier (now-closed) local file or in the source's pre-attach + // history; we have no local_seq to point at, so we fall back to + // over-serializing against (new_seq - 1). candidate < new_seq + // is guaranteed by the protocol invariant source_lc < + // source_seq, so we only need to bound-check the lower edge. + std::int64_t new_lc{0}; + const bool segment_boundary{!last_emitted_offset_.has_value() || + *last_emitted_offset_ != offset}; + if (segment_boundary) { + new_lc = new_seq - 1; + } else if (source_lc != 0) { + const std::int64_t candidate{source_lc + offset}; + new_lc = candidate > 0 ? candidate : new_seq - 1; + } + + last_emitted_offset_ = offset; + return {new_seq, new_lc}; +} + +void gtid_renumberer::rewrite_untagged_in_place( + const event_updatable_view &event_uv) { + // Untagged GTID events keep ALL the renumber-relevant data in the + // 42-byte post header; the body holds timestamps / commit-ticket / + // transaction_length but none of those depend on sequence_number or + // last_committed. The event size therefore stays the same. + const gtid_log_post_header decoded_post_header{ + event_uv.get_post_header_raw()}; + const auto [new_seq, new_lc]{ + translate_logical_clock(decoded_post_header.get_sequence_number_raw(), + decoded_post_header.get_last_committed_raw())}; + + // The write_proxy automatically recomputes the footer CRC at scope + // exit, so we patch under it. + const auto proxy{event_uv.get_write_proxy()}; + overwrite_logical_clock_in_post_header_raw( + proxy.get_post_header_updatable_raw(), new_lc, new_seq); +} + +[[nodiscard]] event_updatable_view +gtid_renumberer::rewrite_tagged(const event_updatable_view &event_uv, + event_storage &buffer) { + using tagged_body_type = generic_body_impl; + + const std::size_t common_header_size{ + event_view_base::get_common_header_size()}; + const std::size_t footer_size{event_uv.get_footer_size()}; + const std::size_t old_event_size{event_uv.get_total_size()}; + + // Sanity: tagged GTID events have a zero-byte post header. + if (event_uv.get_post_header_size() != 0U) { + util::exception_location().raise( + "gtid_tagged_log event unexpectedly has a non-empty post header"); + } + + // Decode the body. The view (and therefore tagged_body_type's source + // span) lives in `buffer`; we must finish reading before resizing. + tagged_body_type body{event_uv.get_body_raw()}; + + const auto [new_seq, new_lc]{translate_logical_clock( + body.get_sequence_number_raw(), body.get_last_committed_raw())}; + body.set_sequence_number_raw(new_seq); + body.set_last_committed_raw(new_lc); + + // Snapshot the original common header bytes BEFORE we touch the + // buffer; the input view is invalidated by the resize below. + std::array + original_common_header{}; + const auto original_common_header_raw{event_uv.get_common_header_raw()}; + if (std::size(original_common_header_raw) != + std::size(original_common_header)) { + util::exception_location().raise( + "common header size mismatch while rewriting gtid_tagged_log event"); + } + std::memcpy(std::data(original_common_header), + std::data(original_common_header_raw), + std::size(original_common_header)); + + // Solve for transaction_length and the new event size in tandem. + // trx_minus_event = total transaction size - this event's old size, + // which is independent of how this event ends up encoded; the only + // dependency that varies is the new event's size itself. + const std::uint64_t old_transaction_length{body.get_transaction_length_raw()}; + if (old_transaction_length < old_event_size) { + util::exception_location().raise( + "transaction_length in gtid_tagged_log event is smaller than the " + "event itself"); + } + const std::uint64_t trx_minus_event{old_transaction_length - old_event_size}; + + std::uint64_t guess_event_size{old_event_size}; + std::size_t new_body_size{0U}; + for (int attempt{0}; attempt < max_tagged_fixpoint_iterations; ++attempt) { + body.set_transaction_length_raw(trx_minus_event + guess_event_size); + new_body_size = body.calculate_encoded_size(); + const std::uint64_t candidate_event_size{common_header_size + + new_body_size + footer_size}; + if (candidate_event_size == guess_event_size) { + break; + } + guess_event_size = candidate_event_size; + } + // If we didn't converge, body's transaction_length is now stale; the + // last loop iteration's calculate_encoded_size() does not match the + // value embedded in the body. That would only happen on a logic bug + // (e.g. calculate_varlen_int_size disagreeing with the actual encoded + // width); refuse to emit a malformed event. + if (body.get_transaction_length_raw() != trx_minus_event + guess_event_size) { + util::exception_location().raise( + "gtid_tagged_log transaction_length fixpoint did not converge"); + } + + const std::size_t new_event_size{static_cast(guess_event_size)}; + + // Resize the destination buffer; this invalidates `event_uv` (and the + // body's source span, which we no longer need). + buffer.assign(new_event_size, std::byte{0}); + const util::byte_span destination{std::data(buffer), std::size(buffer)}; + + // 1) Common header: copy the original 19 bytes verbatim, then patch + // event_size to reflect the new total. next_event_position is set + // by the caller (which knows the storage cursor) AFTER this call + // returns; we leave the original value here and let the caller + // overwrite it as it does for the non-rewritten case. + std::memcpy(std::data(destination), std::data(original_common_header), + std::size(original_common_header)); + + // 2) Body: encode_to() advances `body_dest` past the bytes it wrote. + util::byte_span body_dest{ + destination.subspan(common_header_size, new_body_size)}; + body.encode_to(body_dest); + if (!body_dest.empty()) { + util::exception_location().raise( + "gtid_tagged_log body encoder wrote fewer bytes than reported by " + "calculate_encoded_size()"); + } + + // 3) Footer (4 bytes if present): leave zero-filled; the caller's + // write_proxy will recalculate the CRC. + + // Build a fresh writable view; bypass the reader_context-driven + // validation because we know the structure first-hand. + auto result{event_updatable_view::from_raw_unchecked( + destination, /*post_header_size=*/0U, footer_size)}; + + // Update event_size in the freshly-laid-out common header. + { + const auto proxy{result.get_write_proxy()}; + proxy.get_common_header_updatable_view().set_event_size_raw( + static_cast(new_event_size)); + } + return result; +} + +[[nodiscard]] event_updatable_view +gtid_renumberer::rewrite_if_gtid_event(const event_updatable_view &event_uv, + event_storage &buffer) { + const auto code{event_uv.get_common_header_view().get_type_code()}; + switch (code) { + case code_type::gtid_log: + case code_type::anonymous_gtid_log: + rewrite_untagged_in_place(event_uv); + return event_uv; + case code_type::gtid_tagged_log: + return rewrite_tagged(event_uv, buffer); + default: + return event_uv; + } +} + +} // namespace binsrv::events diff --git a/src/binsrv/events/gtid_renumberer.hpp b/src/binsrv/events/gtid_renumberer.hpp new file mode 100644 index 0000000..70851ad --- /dev/null +++ b/src/binsrv/events/gtid_renumberer.hpp @@ -0,0 +1,280 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_GTID_RENUMBERER_HPP +#define BINSRV_EVENTS_GTID_RENUMBERER_HPP + +#include "binsrv/events/gtid_renumberer_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view_fwd.hpp" + +namespace binsrv::events { + +// Rewrites GTID_LOG_EVENT, ANONYMOUS_GTID_LOG_EVENT and +// GTID_TAGGED_LOG_EVENT events so that their (sequence_number, +// last_committed) "logical clock" pair is consistent with the local +// binlog file boundaries produced by the rewrite-mode storage layer. +// +// Background: +// In MySQL, the per-transaction (sequence_number, last_committed) +// tuple drives the multi-threaded applier's intra-file parallelism. +// The values are PER-FILE: every binlog file restarts sequence_number +// from 1 and dependencies in last_committed reference sequence_numbers +// from the same file only. +// +// In rewrite mode the binlog server discards the source's ROTATE, +// FORMAT_DESCRIPTION_EVENT, PREVIOUS_GTIDS_LOG, and STOP events and +// coalesces transactions into local files at a configured size. Two +// issues result: +// 1. A source-side rotation we discard does NOT reset our local +// numbering, but the source has already restarted sequence_number +// from 1 and re-used small numbers as last_committed; we'd end up +// with collisions/duplicates that confuse the applier. +// 2. A local rotation we initiate splits the source's (still-running) +// sequence_number sequence across two of OUR files; the second +// file would start with sequence_number > 1, which is illegal. +// +// Approach: +// For every GTID event we compute a per-event offset +// offset = new_local_seq - source_seq +// and use it to translate last_committed: +// new_local_seq = next monotonically growing local counter +// new_last_committed = source_lc + offset (in range) +// | new_local_seq - 1 (out of range, OR +// | first event of a +// | source-file +// | segment in this +// | local file) +// | 0 (source_lc == 0 +// | AND not a segment +// | boundary) +// +// Why the offset trick works for in-segment events: +// (a) source_seq is monotonic and gap-free within a single source +// binlog file (incremented by 1 per binlogged transaction). +// (b) last_committed in any GTID event only references a +// sequence_number from the SAME source binlog file as the +// event itself (never across files). +// Plus by construction new_local_seq is also monotonic and gap-free. +// From (a) the offset stays constant for the duration of a single +// source file, and from (b) source_lc is guaranteed to live in that +// same source file's seq space - so adding the current event's +// offset to source_lc lands exactly on the local_seq we already +// issued for the dependency target. +// +// Why the FIRST event of each source-file segment needs special +// handling: +// The MySQL applier treats binlog file boundaries as implicit +// synchronization points: when the source rotates from file A to +// file B, all of A's transactions must commit before any of B's +// start applying. The source encodes that by RESETTING its +// logical clock at the boundary, so B's first event has +// last_committed == 0 ("no in-file dependency"). That zero is +// correct only as long as the file boundary is physically +// preserved on the consumer side. In rewrite mode we coalesce +// A's and B's events into a single local file, eliminating that +// physical boundary - so emitting last_committed == 0 verbatim +// would let the local applier reorder B's first commit BEFORE +// A's last commit, violating source ordering. To restore the +// barrier we set the first event of each new source-file segment +// to last_committed = new_local_seq - 1, which transitively +// chains through prior local txns and forces the segment to +// commit after everything that came before it in the local file. +// Subsequent events in the segment translate normally via the +// offset. +// +// Detecting "first event of a new segment": +// We track the offset of the most recently emitted event. A new +// event whose offset DIFFERS from that tracked offset is by +// invariant (a) the start of a new source-file segment (or, if +// the tracked offset is absent because of a local rotation or +// fresh start, simply the start of the local file - which is +// also the first event of a segment). No FDE / source-ROTATE +// observation is required. +// +// Out-of-range dependencies (candidate <= 0 inside a segment) come +// from a dependency target that lived in an earlier (now-closed) +// local file, or in the source's pre-attach history. Same fallback +// - over-serialize against new_local_seq - 1 - which is always +// conservative. +// +// No (source_seq -> local_seq) history map is needed; the state is +// the high-water mark `next_local_seq_` plus the most recently +// tracked offset. +// +// Untagged GTID events have a fixed-layout 42-byte post header; the two +// 8-byte fields are patched in place and the event size does not change. +// +// Tagged GTID events store the same fields as variable-length integers +// inside a TLV body. Mutating the values may change the encoded body +// size, which in turn requires updating transaction_length (also a +// varlen integer in the same body, and self-referential because it +// names the current event's own size). The fixpoint converges in a +// handful of iterations (varlen widths are bounded to {1,2,3,4,5,9}). +class gtid_renumberer { +public: + gtid_renumberer() = default; + + gtid_renumberer(const gtid_renumberer &) = delete; + gtid_renumberer(gtid_renumberer &&) = delete; + gtid_renumberer &operator=(const gtid_renumberer &) = delete; + gtid_renumberer &operator=(gtid_renumberer &&) = delete; + ~gtid_renumberer() = default; + + // Resets the per-file sequence_number counter to 0 so the next GTID + // event we observe is renumbered starting from local_seq == 1. Call + // this whenever a fresh local binlog file is opened (matching the + // moment the storage layer emits its own ROTATE_EVENT + + // FORMAT_DESCRIPTION_EVENT pair). + void on_local_rotation() noexcept; + + // Restores the renumberer state when resuming an existing local + // binlog file across a reconnect or process restart. Sets + // next_local_seq_ to `next_local_seq` (the highest sequence_number + // already emitted into that file) so the next GTID event we observe + // is allocated as `next_local_seq + 1`. `last_emitted_offset` is the + // tracked offset at the time of the last persisted snapshot; passing + // std::nullopt forces the next event to be treated as a segment + // boundary, which is conservative (forces last_committed = new_seq - + // 1 once for the first post-resume transaction). Must be called + // before any rewrite_if_gtid_event() call on this instance, i.e. + // while the renumberer is still in its default-constructed state. + // Also seeds the committed snapshot to the same value, so that an + // immediate rollback_to_committed() (e.g. on a connection drop + // before any user transaction is processed) restores the resumed + // state rather than the default-constructed one. + void resume_in_existing_local_file( + std::int64_t next_local_seq, + std::optional last_emitted_offset) noexcept; + + // Promotes the current (speculative) state to the committed + // snapshot. Must be called at every transaction boundary - after a + // GTID event has been rewritten and the rest of its transaction has + // been buffered, but before the storage layer flushes anything to + // disk - so that the persisted recovery snapshot (read back from + // peek_*()) is in lockstep with the bytes that hit storage. + // Idempotent: with no advancement since the last commit it is a + // no-op. + void commit_pending_changes() noexcept; + + // Reverts the speculative state to the most recent committed + // snapshot. Used by the streaming pipeline when the storage layer + // discards an in-flight transaction whose GTID event already + // advanced the renumberer (e.g. after a mid-transaction + // disconnect): without this rollback the next reconnect would + // re-allocate a sequence_number for the same source GTID, + // skipping over the speculative one and breaking gap-freedom of + // the local sequence_number stream. + void rollback_to_committed() noexcept; + + // If `event_uv` is one of the GTID event types, rewrites it in place + // (or via a buffer-resize for GTID_TAGGED_LOG_EVENT) so the logical + // clock matches our local file boundaries; otherwise returns + // `event_uv` unchanged. + // + // For the tagged variant the call may resize `buffer` (the event's + // backing storage). The returned view always references `buffer`. + // For non-tagged events the buffer is untouched and the returned + // view is the same one passed in. + [[nodiscard]] event_updatable_view + rewrite_if_gtid_event(const event_updatable_view &event_uv, + event_storage &buffer); + + // Snapshot accessors used by the storage layer to persist the + // renumberer state alongside other per-binlog metadata. Together + // they describe enough state to seed + // resume_in_existing_local_file() on a subsequent process + // start / reconnect. + // + // Both accessors return the COMMITTED snapshot (i.e. the state at + // the most recently completed transaction boundary) rather than + // the speculative in-flight state. This is intentional: if a + // mid-transaction disconnect causes the storage layer to discard + // the partial transaction, the persisted recovery info must + // describe the durable bytes - not the optimistic increment that + // was about to land before the drop. The translation logic + // continues to operate on the speculative pair + // (next_local_seq_, last_emitted_offset_) and is reconciled with + // the committed snapshot via commit_pending_changes() / + // rollback_to_committed(). + [[nodiscard]] std::int64_t peek_next_local_seq() const noexcept { + return committed_next_local_seq_; + } + [[nodiscard]] std::optional + peek_last_emitted_offset() const noexcept { + return committed_last_emitted_offset_; + } + +private: + // Allocates a fresh local sequence_number for the incoming GTID + // event and computes the corresponding last_committed translation. + // See class-level comment for the offset-based derivation; the + // method does not retain any per-event state beyond bumping + // next_local_seq_. + [[nodiscard]] std::pair + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + translate_logical_clock(std::int64_t source_seq, + std::int64_t source_lc) noexcept; + + // Patches sequence_number / last_committed in place inside an + // already-materialized GTID_LOG_EVENT or ANONYMOUS_GTID_LOG_EVENT. + // The event size and overall buffer size are unchanged. Checksum + // recalculation happens via the existing write_proxy mechanism. + void rewrite_untagged_in_place(const event_updatable_view &event_uv); + + // Decodes the tagged GTID body, mutates the renumbered fields and + // re-encodes into `buffer` (which is resized as needed). Returns a + // fresh event_updatable_view referencing `buffer`. + [[nodiscard]] event_updatable_view + rewrite_tagged(const event_updatable_view &event_uv, event_storage &buffer); + + // Monotonic counter representing the highest local sequence_number + // emitted so far in the current local binlog file. Initialized to 0 + // (no transactions yet) so that pre-increment yields 1 for the very + // first transaction (matching MySQL's "sequence_number starts at 1 + // in every binlog file" invariant). Reset to 0 by + // on_local_rotation(). + std::int64_t next_local_seq_{0}; + + // Offset (= new_local_seq - source_seq) that was active for the most + // recently emitted GTID event in the current local binlog file. If + // the upcoming event has a different offset we are crossing a + // source-file segment boundary and must serialize the event against + // the most recent local txn (see class-level comment). std::nullopt + // means "no event has been emitted in the current local file yet"; + // it is reset to nullopt by on_local_rotation() and indistinguishable + // from a segment boundary for translation purposes. + std::optional last_emitted_offset_{}; + + // Committed snapshot of the (next_local_seq_, last_emitted_offset_) + // pair, advanced only at transaction boundaries via + // commit_pending_changes(). It is the value persisted into the + // per-binlog .json metadata (see peek_*()) and the value + // restored by rollback_to_committed() when the storage layer drops + // an incomplete transaction. Both fields share the same lifecycle + // as their speculative counterparts: zeroed by on_local_rotation() + // and seeded by resume_in_existing_local_file(). + std::int64_t committed_next_local_seq_{0}; + std::optional committed_last_emitted_offset_{}; +}; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_GTID_RENUMBERER_HPP diff --git a/src/binsrv/events/gtid_renumberer_fwd.hpp b/src/binsrv/events/gtid_renumberer_fwd.hpp new file mode 100644 index 0000000..3c6e2b0 --- /dev/null +++ b/src/binsrv/events/gtid_renumberer_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP +#define BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP + +namespace binsrv::events { + +class gtid_renumberer; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_GTID_RENUMBERER_FWD_HPP diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.cpp b/src/binsrv/events/gtid_tagged_log_body_impl.cpp index 88a668d..47d849b 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.cpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.cpp @@ -45,6 +45,7 @@ #include "util/bounded_string_storage.hpp" #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" #include "util/conversion_helpers.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" @@ -52,21 +53,60 @@ namespace binsrv::events { +namespace { + +// Field identifiers in the order they are emitted by upstream's +// define_fields() (libs/mysql/binlog/event/control_events.h:1111). +// Both the decoder (process_field_data) and the encoder (encode_to / +// calculate_encoded_size) use these values; keeping a single definition +// avoids drift. +enum class field_id_type : std::uint8_t { + flags, + uuid, + gno, + tag, + last_committed, + sequence_number, + immediate_commit_timestamp, + original_commit_timestamp, + transaction_length, + immediate_server_version, + original_server_version, + commit_group_ticket, + + delimiter +}; + +[[nodiscard]] constexpr std::uint8_t +field_id_byte(field_id_type field_id) noexcept { + return util::to_underlying(field_id); +} + +// matches upstream serialization_format_version: ALWAYS 1 (see +// libs/mysql/serialization/readme.md and Gtid_event::Decoder_type) +constexpr std::uint8_t serialization_version_number_value{1U}; + +} // namespace + generic_body_impl::generic_body_impl( util::const_byte_span portion) { // TODO: rework with direct member initialization // make sure we did OK with data members reordering + // (summands listed in the same order as the declarations in the .hpp) static_assert( sizeof *this == boost::alignment::align_up( - sizeof flags_ + sizeof uuid_ + sizeof gno_ + sizeof tag_ + - sizeof last_committed_ + sizeof sequence_number_ + + sizeof gno_ + sizeof last_committed_ + sizeof sequence_number_ + sizeof immediate_commit_timestamp_ + sizeof original_commit_timestamp_ + - sizeof transaction_length_ + sizeof original_server_version_ + - sizeof immediate_server_version_ + - sizeof commit_group_ticket_, + sizeof transaction_length_ + sizeof commit_group_ticket_ + + sizeof uuid_ + sizeof tag_ + sizeof original_server_version_ + + sizeof immediate_server_version_ + sizeof flags_ + + sizeof last_non_ignorable_field_id_ + + sizeof has_original_commit_timestamp_ + + sizeof has_original_server_version_ + + sizeof has_commit_group_ticket_, alignof(decltype(*this))), "inefficient data member reordering in gtid_log event body"); @@ -75,7 +115,6 @@ generic_body_impl::generic_body_impl( // ::= // Extracting - static constexpr std::uint8_t expected_serialization_version_number{1U}; std::uint8_t serialization_version_number{}; if (!util::extract_varlen_int_from_byte_span_checked( remainder, serialization_version_number)) { @@ -83,7 +122,7 @@ generic_body_impl::generic_body_impl( "gtid_tagged_log event body is too short to extract " "serialization_version_number"); } - if (serialization_version_number != expected_serialization_version_number) { + if (serialization_version_number != serialization_version_number_value) { util::exception_location().raise( "unexpected serialization_version_number in the gtid_tagged_log event " "body"); @@ -107,9 +146,8 @@ generic_body_impl::generic_body_impl( } // Extracting - std::uint8_t last_non_ignorable_field_id{}; if (!util::extract_varlen_int_from_byte_span_checked( - remainder, last_non_ignorable_field_id)) { + remainder, last_non_ignorable_field_id_)) { util::exception_location().raise( "gtid_tagged_log event body is too short to extract " "last_non_ignorable_field_id"); @@ -126,7 +164,7 @@ generic_body_impl::generic_body_impl( util::exception_location().raise( "broken field_id sequence in the gtid_tagged_log event body"); } - if (field_id <= last_non_ignorable_field_id) { + if (field_id <= last_non_ignorable_field_id_) { if (field_id != 0 && field_id != last_seen_field_id + 1U) { util::exception_location().raise( "violated last_non_ignorable_field_id rule in the gtid_tagged_log " @@ -252,23 +290,6 @@ operator<<(std::ostream &output, void generic_body_impl::process_field_data( std::uint8_t field_id, util::const_byte_span &remainder) { // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1111 - enum class field_id_type : std::uint8_t { - flags, - uuid, - gno, - tag, - last_committed, - sequence_number, - immediate_commit_timestamp, - original_commit_timestamp, - transaction_length, - immediate_server_version, - original_server_version, - commit_group_ticket, - - delimiter - }; - const auto varlen_int_extractor{ [](util::const_byte_span &source, auto &target, std::string_view label) { if (!util::extract_varlen_int_from_byte_span_checked(source, target)) { @@ -286,11 +307,7 @@ void generic_body_impl::process_field_data( // Extracting a fixed-size (16 byte) array of varlen bytes std::uint8_t extracted_uuid_byte{}; for (auto &uuid_byte : uuid_) { - if (!util::extract_varlen_int_from_byte_span_checked( - remainder, extracted_uuid_byte)) { - util::exception_location().raise( - "gtid_tagged_log event body is too short to extract uuid"); - } + varlen_int_extractor(remainder, extracted_uuid_byte, "uuid"); uuid_byte = util::from_underlying(extracted_uuid_byte); } } break; @@ -323,6 +340,7 @@ void generic_body_impl::process_field_data( case field_id_type::original_commit_timestamp: varlen_int_extractor(remainder, original_commit_timestamp_, "original_commit_timestamp"); + has_original_commit_timestamp_ = true; break; case field_id_type::transaction_length: varlen_int_extractor(remainder, transaction_length_, "transaction_length"); @@ -334,10 +352,12 @@ void generic_body_impl::process_field_data( case field_id_type::original_server_version: varlen_int_extractor(remainder, original_server_version_, "original_server_version"); + has_original_server_version_ = true; break; case field_id_type::commit_group_ticket: varlen_int_extractor(remainder, commit_group_ticket_, "commit_group_ticket"); + has_commit_group_ticket_ = true; break; default: util::exception_location().raise( @@ -345,4 +365,219 @@ void generic_body_impl::process_field_data( } } +[[nodiscard]] std::size_t +generic_body_impl::calculate_tlv_section_size() + const noexcept { + std::size_t total{0U}; + + // field: flags + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::flags)); + total += util::calculate_varlen_int_size(flags_); + + // field: uuid (16 separate varlen-encoded bytes, one per UUID byte) + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::uuid)); + for (auto uuid_byte : uuid_) { + total += util::calculate_varlen_int_size(util::to_underlying(uuid_byte)); + } + + // field: gno + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::gno)); + total += util::calculate_varlen_int_size(gno_); + + // field: tag (varlen length + raw bytes) + total += util::calculate_varlen_int_size(field_id_byte(field_id_type::tag)); + total += util::calculate_varlen_int_size(std::size(tag_)); + total += std::size(tag_); + + // field: last_committed + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::last_committed)); + total += util::calculate_varlen_int_size(last_committed_); + + // field: sequence_number + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::sequence_number)); + total += util::calculate_varlen_int_size(sequence_number_); + + // field: immediate_commit_timestamp + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::immediate_commit_timestamp)); + total += util::calculate_varlen_int_size(immediate_commit_timestamp_); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::original_commit_timestamp)); + total += util::calculate_varlen_int_size(original_commit_timestamp_); + } + + // field: transaction_length + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::transaction_length)); + total += util::calculate_varlen_int_size(transaction_length_); + + // field: immediate_server_version + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::immediate_server_version)); + total += util::calculate_varlen_int_size(immediate_server_version_); + + // field: original_server_version (optional) + if (has_original_server_version_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::original_server_version)); + total += util::calculate_varlen_int_size(original_server_version_); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket_) { + total += util::calculate_varlen_int_size( + field_id_byte(field_id_type::commit_group_ticket)); + total += util::calculate_varlen_int_size(commit_group_ticket_); + } + + return total; +} + +[[nodiscard]] std::size_t +generic_body_impl::calculate_encoded_size() const { + // body layout: + // [varlen: serialization_version_number == 1 ] + // [varlen: serializable_field_size (== total body size) ] + // [varlen: last_non_ignorable_field_id ] + // [TLV section: calculate_tlv_section_size() bytes ] + // + // serializable_field_size encodes the entire body length INCLUDING + // itself, which makes it self-referential: the value depends on its + // own varlen-encoded width. Solve via a tiny iteration: increasing + // the value monotonically grows its width (1->2->3->...->9), so the + // loop always terminates in at most 9 steps. + const std::size_t framing_misc_size{ + util::calculate_varlen_int_size(serialization_version_number_value) + + util::calculate_varlen_int_size(last_non_ignorable_field_id_)}; + const std::size_t tlv_size{calculate_tlv_section_size()}; + const std::size_t tail{framing_misc_size + tlv_size}; + + std::size_t total{tail + 1U}; + for (;;) { + const std::size_t width{util::calculate_varlen_int_size(total)}; + const std::size_t new_total{tail + width}; + if (new_total == total) { + return total; + } + total = new_total; + } +} + +void generic_body_impl::encode_tlv_section_to( + util::byte_span &destination) const { + const auto check_inserted{[](bool inserted) { + if (!inserted) { + util::exception_location().raise( + "destination is too small to encode the gtid_tagged_log event " + "body"); + } + }}; + const auto emit_field_id{[&](field_id_type field_id) { + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, field_id_byte(field_id))); + }}; + + // field: flags + emit_field_id(field_id_type::flags); + check_inserted( + util::insert_varlen_int_to_byte_span_checked(destination, flags_)); + + // field: uuid (16 separate varlen-encoded bytes) + emit_field_id(field_id_type::uuid); + for (auto uuid_byte : uuid_) { + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, util::to_underlying(uuid_byte))); + } + + // field: gno + emit_field_id(field_id_type::gno); + check_inserted( + util::insert_varlen_int_to_byte_span_checked(destination, gno_)); + + // field: tag (varlen length + raw bytes) + emit_field_id(field_id_type::tag); + check_inserted(util::insert_varlen_int_to_byte_span_checked(destination, + std::size(tag_))); + const util::const_byte_span tag_span{tag_}; + check_inserted( + util::insert_byte_span_to_byte_span_checked(destination, tag_span)); + + // field: last_committed + emit_field_id(field_id_type::last_committed); + check_inserted(util::insert_varlen_int_to_byte_span_checked(destination, + last_committed_)); + + // field: sequence_number + emit_field_id(field_id_type::sequence_number); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, sequence_number_)); + + // field: immediate_commit_timestamp + emit_field_id(field_id_type::immediate_commit_timestamp); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, immediate_commit_timestamp_)); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp_) { + emit_field_id(field_id_type::original_commit_timestamp); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, original_commit_timestamp_)); + } + + // field: transaction_length + emit_field_id(field_id_type::transaction_length); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, transaction_length_)); + + // field: immediate_server_version + emit_field_id(field_id_type::immediate_server_version); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, immediate_server_version_)); + + // field: original_server_version (optional) + if (has_original_server_version_) { + emit_field_id(field_id_type::original_server_version); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, original_server_version_)); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket_) { + emit_field_id(field_id_type::commit_group_ticket); + check_inserted(util::insert_varlen_int_to_byte_span_checked( + destination, commit_group_ticket_)); + } +} + +void generic_body_impl::encode_to( + util::byte_span &destination) const { + // Contract: the caller MUST size `destination` to exactly the value + // returned by calculate_encoded_size(). That value is also what we + // emit on the wire as serializable_field_size (size of the entire + // body), and we read it back from std::size(destination) here - + // avoiding a redundant inner fixpoint. Discrepancies are caught by + // the caller observing whether `destination` was fully consumed + // post-encode. + const std::uint64_t encoded_size{std::size(destination)}; + + // The framing header: serialization_version_number, + // serializable_field_size, last_non_ignorable_field_id. + if (!util::insert_varlen_int_to_byte_span_checked( + destination, serialization_version_number_value) || + !util::insert_varlen_int_to_byte_span_checked(destination, + encoded_size) || + !util::insert_varlen_int_to_byte_span_checked( + destination, last_non_ignorable_field_id_)) { + util::exception_location().raise( + "failed to encode gtid_tagged_log event body framing header"); + } + + encode_tlv_section_to(destination); +} + } // namespace binsrv::events diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.hpp b/src/binsrv/events/gtid_tagged_log_body_impl.hpp index db465db..1a4be00 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.hpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.hpp @@ -87,7 +87,7 @@ template <> class [[nodiscard]] generic_body_impl { [[nodiscard]] std::string get_readable_immediate_commit_timestamp() const; [[nodiscard]] bool has_original_commit_timestamp() const noexcept { - return original_commit_timestamp_ != unset_commit_timestamp; + return has_original_commit_timestamp_; } [[nodiscard]] std::uint64_t get_original_commit_timestamp_raw() const noexcept { @@ -102,7 +102,7 @@ template <> class [[nodiscard]] generic_body_impl { } [[nodiscard]] bool has_original_server_version() const noexcept { - return original_server_version_ != unset_server_version; + return has_original_server_version_; } [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { return original_server_version_; @@ -123,12 +123,45 @@ template <> class [[nodiscard]] generic_body_impl { [[nodiscard]] std::string get_readable_immediate_server_version() const; [[nodiscard]] bool has_commit_group_ticket() const noexcept { - return commit_group_ticket_ != unset_commit_group_ticket; + return has_commit_group_ticket_; } [[nodiscard]] std::uint64_t get_commit_group_ticket_raw() const noexcept { return commit_group_ticket_; } + // Mutators used by the rewrite-mode GTID renumberer. They modify only + // the in-memory representation; serializing back to bytes requires + // calling encode_to() on a freshly sized buffer. + void set_last_committed_raw(std::int64_t value) noexcept { + last_committed_ = value; + } + void set_sequence_number_raw(std::int64_t value) noexcept { + sequence_number_ = value; + } + void set_transaction_length_raw(std::uint64_t value) noexcept { + transaction_length_ = value; + } + + // Returns the total number of bytes that encode_to() will emit for the + // current in-memory state, including the 3-field framing header + // (serialization_version_number, serializable_field_size, + // last_non_ignorable_field_id) and every TLV field. The value of the + // serializable_field_size field is computed self-consistently. + [[nodiscard]] std::size_t calculate_encoded_size() const; + + // Writes the body to *destination* using the same TLV layout as the + // input. The set of optional fields actually emitted matches the set + // observed during construction (decoding); only the values of + // last_committed / sequence_number / transaction_length may have been + // mutated through the corresponding setters above. + // + // Precondition: std::size(destination) == calculate_encoded_size(). + // The destination span size is read back as the on-wire + // serializable_field_size, so passing a wrong size produces a + // wrong-but-self-consistent encoding. Caller is expected to verify + // post-encode that `destination` was fully consumed. + void encode_to(util::byte_span &destination) const; + friend bool operator==(const generic_body_impl & /* first */, const generic_body_impl & /* second */) = default; @@ -142,22 +175,52 @@ template <> class [[nodiscard]] generic_body_impl { static constexpr std::uint64_t unset_commit_group_ticket{ std::numeric_limits::max()}; - // the members are deliberately reordered for better packing - std::uint8_t flags_{}; // 0 - gtids::uuid_storage uuid_{}; // 1 + // The protocol fields below are deliberately reordered for better + // packing (largest first, then 32-bit, then 8-bit/bool last). The + // trailing "// N" annotation is the protocol field_id_type value + // upstream assigns to the field (see field_id_type enum in + // gtid_tagged_log_body_impl.cpp / define_fields() in upstream + // control_events.h), so the deviation from protocol order is + // visible at a glance. std::int64_t gno_{}; // 2 - gtids::tag_storage tag_{}; // 3 std::int64_t last_committed_{}; // 4 std::int64_t sequence_number_{}; // 5 std::uint64_t immediate_commit_timestamp_{unset_commit_timestamp}; // 6 std::uint64_t original_commit_timestamp_{unset_commit_timestamp}; // 7 std::uint64_t transaction_length_{unset_transaction_length}; // 8 - std::uint32_t original_server_version_{unset_server_version}; // 9 - std::uint32_t immediate_server_version_{unset_server_version}; // 10 std::uint64_t commit_group_ticket_{unset_commit_group_ticket}; // 11 + gtids::uuid_storage uuid_{}; // 1 + gtids::tag_storage tag_{}; // 3 + std::uint32_t original_server_version_{unset_server_version}; // 10 + std::uint32_t immediate_server_version_{unset_server_version}; // 9 + std::uint8_t flags_{}; // 0 + + // Echoed verbatim on encode_to() so the framing header matches the + // original. Reading and re-emitting it preserves forward compatibility + // with newer servers that may add ignorable fields above the current + // last non-ignorable id. + std::uint8_t last_non_ignorable_field_id_{0U}; + + // Recorded during decoding so that encode_to() emits exactly the same + // set of optional fields as was observed in the input event. This makes + // re-serialization byte-stable for unchanged member values and avoids + // having to reverse-engineer upstream's encode predicates from sentinel + // values (which would be brittle if a real timestamp/version ever + // happened to coincide with a sentinel). + bool has_original_commit_timestamp_{false}; + bool has_original_server_version_{false}; + bool has_commit_group_ticket_{false}; void process_field_data(std::uint8_t field_id, util::const_byte_span &remainder); + + // Helpers for calculate_encoded_size() / encode_to(). + // Returns the size in bytes of the TLV section (every pair after the framing header). + [[nodiscard]] std::size_t calculate_tlv_section_size() const noexcept; + // Writes only the TLV section to *destination*; assumes destination has + // at least calculate_tlv_section_size() bytes available. + void encode_tlv_section_to(util::byte_span &destination) const; }; } // namespace binsrv::events diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 5beb67d..16804b6 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -318,6 +318,23 @@ storage::get_binlog_uri(const composite_binlog_name &binlog_name) const { return backend_->get_object_uri(binlog_name.str()); } +void storage::update_renumberer_recovery_info( + const renumberer_recovery_info &info) { + if (is_empty()) { + return; + } + get_current_binlog_record().renumberer_state = info; +} + +[[nodiscard]] renumberer_recovery_info +storage::get_current_renumberer_recovery_info() const { + if (is_empty()) { + util::exception_location().raise( + "cannot read renumberer recovery info from an empty storage"); + } + return get_current_binlog_record().renumberer_state; +} + void storage::ensure_streaming_mode() const { if (construction_mode_ != storage_construction_mode_type::streaming) { util::exception_location().raise( @@ -476,13 +493,31 @@ storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const { backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; + // Both renumberer fields are missing in legacy metadata files; their + // optionals come back as std::nullopt and the recovery info is left + // at its struct-default value. From the renumberer's point of view + // that is indistinguishable from "this file has no prior emissions" + // - safe for files that genuinely have none, but for legacy files + // with prior emissions the first post-resume transaction will collide + // with sequence_number 1 (documented limitation of the legacy + // metadata format). + renumberer_recovery_info renumberer_state{}; + const auto &optional_next_local_seq{ + metadata.root().get<"renumberer_next_local_seq">()}; + if (optional_next_local_seq.has_value()) { + renumberer_state.next_local_seq = *optional_next_local_seq; + } + renumberer_state.last_emitted_offset = + metadata.root().get<"renumberer_last_emitted_offset">(); + return binlog_record{.name = binlog_name, .size = metadata.root().get<"size">(), .previous_gtids = metadata.root().get<"previous_gtids">(), .added_gtids = metadata.root().get<"added_gtids">(), .timestamps = {metadata.root().get<"min_timestamp">(), - metadata.root().get<"max_timestamp">()}}; + metadata.root().get<"max_timestamp">()}, + .renumberer_state = renumberer_state}; } void storage::validate_binlog_metadata(const binlog_record &record) const { @@ -522,6 +557,10 @@ void storage::save_binlog_metadata(const binlog_record &record) const { ctime_timestamp{record.timestamps.get_min_timestamp()}; metadata.root().get<"max_timestamp">() = ctime_timestamp{record.timestamps.get_max_timestamp()}; + metadata.root().get<"renumberer_next_local_seq">() = + record.renumberer_state.next_local_seq; + metadata.root().get<"renumberer_last_emitted_offset">() = + record.renumberer_state.last_emitted_offset; const auto content{metadata.str()}; backend_->put_object(generate_binlog_metadata_name(record.name), util::as_const_byte_span(content)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index e2ed0f1..33f85fb 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -19,6 +19,8 @@ #include "binsrv/storage_fwd.hpp" // IWYU pragma: export #include +#include +#include #include #include #include @@ -37,6 +39,16 @@ namespace binsrv { +// Snapshot of the rewrite-mode GTID renumberer's recovery state, as +// persisted alongside other per-binlog metadata. Used by the storage +// layer to round-trip the state across reconnects and process +// restarts; the renumberer-side semantics live in +// `binsrv::events::gtid_renumberer`. +struct [[nodiscard]] renumberer_recovery_info { + std::int64_t next_local_seq{0}; + std::optional last_emitted_offset{}; +}; + class [[nodiscard]] storage { private: struct binlog_record { @@ -45,6 +57,7 @@ class [[nodiscard]] storage { gtids::optional_gtid_set previous_gtids{}; gtids::optional_gtid_set added_gtids{}; ctime_timestamp_range timestamps{}; + renumberer_recovery_info renumberer_state{}; }; using binlog_record_container = std::vector; @@ -129,6 +142,26 @@ class [[nodiscard]] storage { const ctime_timestamp &event_timestamp); void close_binlog(); + // Rewrite-mode hook: keeps the in-memory renumberer recovery state + // for the current binlog file in sync with the actual renumberer. + // The persisted snapshot is written out by the next + // save_binlog_metadata() call (i.e. on the next checkpoint flush). + // Caller is expected to invoke this whenever the renumberer's + // observable state (next_local_seq / last_emitted_offset) changes, + // typically right after rewrite_if_gtid_event(). No-op if the + // storage has no current binlog yet. + void update_renumberer_recovery_info(const renumberer_recovery_info &info); + + // Returns the renumberer recovery snapshot persisted for the current + // binlog file (i.e. read back from the per-binlog .json metadata at + // storage construction). If the current metadata predates the + // persisted-renumberer-state feature, both fields hold their default + // values (next_local_seq = 0, last_emitted_offset = std::nullopt) - + // which the renumberer treats as "no prior emissions in this file". + // Precondition: the storage must be non-empty. + [[nodiscard]] renumberer_recovery_info + get_current_renumberer_recovery_info() const; + void discard_incomplete_transaction_events(); void flush_event_buffer();