From ebc4a013177fbb7be5456e243d2e082e219ca7d4 Mon Sep 17 00:00:00 2001 From: Kamil Holubicki Date: Mon, 18 May 2026 21:45:54 +0200 Subject: [PATCH] PS-11080 fix: Spoiled logical clock information in rewritten binlog (part 2) https://perconadev.atlassian.net/browse/PS-11080 This is a prerequisite commit required to implement rewriting 'sequence_number' / 'last_committed' / 'transaction_length' in GTID events. Along with last seen transaction GTID, 'binstv::events::reader_context' now also keeps track of the last seen GTID event sequence_number. At the same time 'binsrv::storage' expects one extra 'transaction_sequence_number' argument passed to its 'write_event()' method (which is expected to be taken from the 'binstv::events::reader_context::get_transaction_sequence_number()'). It implements the same logic of storing transaction's sequence_number for both "ready to be flushed" buffered event data and for the "incomplete transaction" buffered event data as for 'timestamps' and 'gtids'. Binlog metadata file extended with one more value 'last_sequence_number' that holds the 'sequence_number' value of the last GTID event (either GTID_LOG, ANONYMOUS_GTID_LOG or GTID_TAGGED_LOG events) written to the binlog file. Co-authored-by: Yura Sorokin --- src/app.cpp | 8 +-- src/binsrv/binlog_file_metadata.cpp | 2 +- src/binsrv/binlog_file_metadata.hpp | 11 ++- src/binsrv/events/reader_context.cpp | 4 ++ src/binsrv/events/reader_context.hpp | 14 ++++ src/binsrv/storage.cpp | 103 +++++++++++++++++---------- src/binsrv/storage.hpp | 25 ++++++- 7 files changed, 121 insertions(+), 46 deletions(-) diff --git a/src/app.cpp b/src/app.cpp index 7815ffc..16c8d9d 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -535,10 +535,10 @@ void process_binlog_event(const binsrv::events::event_view ¤t_event_v, // checking if the event needs to be written to the binlog if (!info_only) { - storage.write_event(current_event_v.get_portion(), - context.is_at_transaction_boundary(), - context.get_transaction_gtid(), - current_common_header_v.get_timestamp()); + storage.write_event( + current_event_v.get_portion(), context.is_at_transaction_boundary(), + context.get_transaction_gtid(), current_common_header_v.get_timestamp(), + context.get_transaction_sequence_number()); } // processing the very last event in the sequence - either a non-artificial diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp index e7f3c1d..e5cfc12 100644 --- a/src/binsrv/binlog_file_metadata.cpp +++ b/src/binsrv/binlog_file_metadata.cpp @@ -30,7 +30,7 @@ namespace binsrv { binlog_file_metadata::binlog_file_metadata() - : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {} + : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}, {}} {} binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp index f03e602..f46510d 100644 --- a/src/binsrv/binlog_file_metadata.hpp +++ b/src/binsrv/binlog_file_metadata.hpp @@ -26,12 +26,20 @@ #include "binsrv/gtids/gtid_set.hpp" +#include "binsrv/events/common_types.hpp" + #include "util/nv_tuple.hpp" namespace binsrv { class [[nodiscard]] binlog_file_metadata { private: + // The 'last_sequence_number' field persists the rewrite-mode GTID + // recovery state for this binlog file. It is stored here (rather + // than re-derived from the binlog file content on resume) because + // every other piece of resume state already lives in this metadata + // record - this addition simply makes recovery a single + // read-and-load step. using impl_type = util::nv_tuple< // clang-format off util::nv<"version", std::uint32_t>, @@ -39,7 +47,8 @@ class [[nodiscard]] binlog_file_metadata { util::nv<"previous_gtids", gtids::optional_gtid_set>, util::nv<"added_gtids", gtids::optional_gtid_set>, util::nv<"min_timestamp", ctime_timestamp>, - util::nv<"max_timestamp", ctime_timestamp> + util::nv<"max_timestamp", ctime_timestamp>, + util::nv<"last_sequence_number", events::seq_no_t> // clang-format on >; diff --git a/src/binsrv/events/reader_context.cpp b/src/binsrv/events/reader_context.cpp index 6c537a2..132b0a6 100644 --- a/src/binsrv/events/reader_context.cpp +++ b/src/binsrv/events/reader_context.cpp @@ -943,6 +943,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered non-empty gtid in the anonymous gtid log event"); } + transaction_sequence_number_ = current_post_header.get_sequence_number(); const auto current_body{generic_body{ current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ @@ -962,6 +963,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered an empty gtid in the gtid log event"); } + transaction_sequence_number_ = current_post_header.get_sequence_number(); const auto current_body{ generic_body{current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ @@ -981,6 +983,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered an empty gtid in the gtid tagged log event"); } + transaction_sequence_number_ = current_body.get_sequence_number(); const auto expected_transaction_length_raw{ current_body.get_transaction_length_raw()}; if (!std::in_range(expected_transaction_length_raw)) { @@ -1011,6 +1014,7 @@ void reader_context::finish_transaction() { "transaction did not end at the declared length"); } transaction_gtid_ = gtids::gtid{}; + transaction_sequence_number_ = 0ULL; expected_transaction_length_ = 0U; current_transaction_length_ = 0U; } diff --git a/src/binsrv/events/reader_context.hpp b/src/binsrv/events/reader_context.hpp index b4c1149..541947a 100644 --- a/src/binsrv/events/reader_context.hpp +++ b/src/binsrv/events/reader_context.hpp @@ -25,6 +25,7 @@ #include "binsrv/gtids/gtid.hpp" #include "binsrv/events/common_header_view_fwd.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/event_fwd.hpp" #include "binsrv/events/event_view_fwd.hpp" #include "binsrv/events/protocol_traits.hpp" @@ -63,6 +64,13 @@ class [[nodiscard]] reader_context { [[nodiscard]] const gtids::gtid &get_transaction_gtid() const noexcept { return transaction_gtid_; } + [[nodiscard]] bool has_transaction_sequence_number() const noexcept { + return transaction_sequence_number_ != 0ULL; + } + [[nodiscard]] seq_no_t get_transaction_sequence_number() const noexcept { + return transaction_sequence_number_; + } + [[nodiscard]] bool is_at_transaction_boundary() const noexcept { return (state_ == state_type::any_other_expected && current_transaction_length_ == expected_transaction_length_) || @@ -112,8 +120,14 @@ class [[nodiscard]] reader_context { std::uint32_t position_; post_header_length_container post_header_lengths_{}; + // last seen transaction GTID (empty GTID if between transactions) gtids::gtid transaction_gtid_{}; + // sequence number of the last transaction seen (0 if between transactions) + seq_no_t transaction_sequence_number_{0ULL}; + // the expected length of the current transaction + // (extracted from one of the GTID_LOG events) std::uint32_t expected_transaction_length_{0U}; + // the length of the transaction seen so far std::uint32_t current_transaction_length_{0U}; bool expect_info_only_preamble_events_{false}; diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 5beb67d..faaf4e6 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "binsrv/storage_config.hpp" #include "binsrv/storage_metadata.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/gtids/gtid.hpp" @@ -184,38 +186,10 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { : storage_backend_open_stream_mode::create}; const auto open_stream_offset{backend_->open_stream(binlog_name.str(), mode)}; - if (!binlog_exists) { - // writing the magic binlog footprint only if this is a newly - // created file - backend_->write_data_to_stream(events::magic_binlog_payload); - - gtids::optional_gtid_set previous_binlog_gtids{}; - gtids::optional_gtid_set added_binlog_gtids{}; - if (is_in_gtid_replication_mode()) { - previous_binlog_gtids = get_gtids(); - added_binlog_gtids = gtids::gtid_set{}; - } - - binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset, - std::move(previous_binlog_gtids), - std::move(added_binlog_gtids), - ctime_timestamp_range{}); - save_binlog_metadata(get_current_binlog_record()); - save_binlog_index(); - result = open_binlog_status::created; + if (binlog_exists) { + result = open_existing_binlog_file_internal(open_stream_offset); } else { - assert(get_current_position() == open_stream_offset); - if (open_stream_offset == 0ULL) { - backend_->write_data_to_stream(events::magic_binlog_payload); - get_current_binlog_record().size = events::magic_binlog_offset; - result = open_binlog_status::opened_empty; - } else if (open_stream_offset == events::magic_binlog_offset) { - result = open_binlog_status::opened_at_magic_payload_offset; - } else { - // position is beyond magic payload offset - assert(open_stream_offset > events::magic_binlog_offset); - result = open_binlog_status::opened_with_data_present; - } + result = open_new_binlog_file_internal(binlog_name); } update_last_checkpoint_info(); @@ -225,6 +199,8 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { assert(gtids_in_event_buffer_.is_empty()); assert(ready_to_flush_timestamps_.is_empty()); assert(incomplete_transaction_timestamps_.is_empty()); + assert(ready_to_flush_last_sequence_number_ == 0ULL); + assert(incomplete_transaction_last_sequence_number_ == 0ULL); return result; } @@ -232,12 +208,16 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { void storage::write_event(util::const_byte_span event_data, bool at_transaction_boundary, const gtids::gtid &transaction_gtid, - const ctime_timestamp &event_timestamp) { + const ctime_timestamp &event_timestamp, + events::seq_no_t transaction_sequence_number) { ensure_streaming_mode(); event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data), std::cend(event_data)); incomplete_transaction_timestamps_.add_timestamp(event_timestamp); + if (transaction_sequence_number != 0ULL) { + incomplete_transaction_last_sequence_number_ = transaction_sequence_number; + } if (at_transaction_boundary) { last_transaction_boundary_position_in_event_buffer_ = @@ -247,6 +227,10 @@ void storage::write_event(util::const_byte_span event_data, } ready_to_flush_timestamps_.add_range(incomplete_transaction_timestamps_); incomplete_transaction_timestamps_.clear(); + + ready_to_flush_last_sequence_number_ = + incomplete_transaction_last_sequence_number_; + incomplete_transaction_last_sequence_number_ = 0ULL; } // now we are writing data from the event buffer to the storage backend if @@ -303,6 +287,7 @@ void storage::discard_incomplete_transaction_events() { event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_); incomplete_transaction_timestamps_.clear(); + incomplete_transaction_last_sequence_number_ = 0ULL; } void storage::flush_event_buffer() { @@ -334,6 +319,41 @@ void storage::update_last_checkpoint_info() { } } +[[nodiscard]] open_binlog_status storage::open_new_binlog_file_internal( + const composite_binlog_name &binlog_name) { + // writing the magic binlog footprint only if this is a newly + // created file + backend_->write_data_to_stream(events::magic_binlog_payload); + + gtids::optional_gtid_set previous_binlog_gtids{}; + gtids::optional_gtid_set added_binlog_gtids{}; + if (is_in_gtid_replication_mode()) { + previous_binlog_gtids = get_gtids(); + added_binlog_gtids = gtids::gtid_set{}; + } + + binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset, + std::move(previous_binlog_gtids), + std::move(added_binlog_gtids), + ctime_timestamp_range{}); + save_binlog_metadata(get_current_binlog_record()); + save_binlog_index(); + return open_binlog_status::created; +} +[[nodiscard]] open_binlog_status +storage::open_existing_binlog_file_internal(std::uint64_t open_stream_offset) { + assert(get_current_position() == open_stream_offset); + if (open_stream_offset >= events::magic_binlog_offset) { + return open_stream_offset == events::magic_binlog_offset + ? open_binlog_status::opened_at_magic_payload_offset + : open_binlog_status::opened_with_data_present; + } + assert(open_stream_offset == 0ULL); + backend_->write_data_to_stream(events::magic_binlog_payload); + get_current_binlog_record().size = events::magic_binlog_offset; + return open_binlog_status::opened_empty; +} + void storage::flush_event_buffer_internal() { assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= @@ -354,6 +374,8 @@ void storage::flush_event_buffer_internal() { } } get_current_binlog_record().timestamps.add_range(ready_to_flush_timestamps_); + get_current_binlog_record().last_sequence_number = + ready_to_flush_last_sequence_number_; save_binlog_metadata(get_current_binlog_record()); @@ -369,6 +391,7 @@ void storage::flush_event_buffer_internal() { gtids_in_event_buffer_.clear(); } ready_to_flush_timestamps_.clear(); + ready_to_flush_last_sequence_number_ = 0ULL; } void storage::load_binlog_index() { @@ -476,13 +499,14 @@ storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const { backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; - return binlog_record{.name = binlog_name, - .size = metadata.root().get<"size">(), - .previous_gtids = - metadata.root().get<"previous_gtids">(), - .added_gtids = metadata.root().get<"added_gtids">(), - .timestamps = {metadata.root().get<"min_timestamp">(), - metadata.root().get<"max_timestamp">()}}; + return binlog_record{ + .name = binlog_name, + .size = metadata.root().get<"size">(), + .previous_gtids = metadata.root().get<"previous_gtids">(), + .added_gtids = metadata.root().get<"added_gtids">(), + .timestamps = {metadata.root().get<"min_timestamp">(), + metadata.root().get<"max_timestamp">()}, + .last_sequence_number = metadata.root().get<"last_sequence_number">()}; } void storage::validate_binlog_metadata(const binlog_record &record) const { @@ -522,6 +546,7 @@ void storage::save_binlog_metadata(const binlog_record &record) const { ctime_timestamp{record.timestamps.get_min_timestamp()}; metadata.root().get<"max_timestamp">() = ctime_timestamp{record.timestamps.get_max_timestamp()}; + metadata.root().get<"last_sequence_number">() = record.last_sequence_number; const auto content{metadata.str()}; backend_->put_object(generate_binlog_metadata_name(record.name), util::as_const_byte_span(content)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index e2ed0f1..ac34d80 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -33,6 +33,8 @@ #include "binsrv/gtids/gtid_fwd.hpp" #include "binsrv/gtids/gtid_set.hpp" +#include "binsrv/events/common_types.hpp" + #include "util/byte_span_fwd.hpp" namespace binsrv { @@ -40,11 +42,19 @@ namespace binsrv { class [[nodiscard]] storage { private: struct binlog_record { + // binlog file name composite_binlog_name name; + // binlog file size in bytes std::uint64_t size{0ULL}; + // accumulated GTIDs present in the binlog files before this one gtids::optional_gtid_set previous_gtids{}; + // GTIDs present in this binlog file gtids::optional_gtid_set added_gtids{}; + // minimum and maximum event timestamps observed in this binlog file ctime_timestamp_range timestamps{}; + // sequence_number of the last transaction seen in this file - + // used for GTID rewrite-mode resume state persistence + events::seq_no_t last_sequence_number{0ULL}; }; using binlog_record_container = std::vector; @@ -119,6 +129,11 @@ class [[nodiscard]] storage { return result; } + [[nodiscard]] events::seq_no_t + get_current_transaction_sequence_number() const noexcept { + return is_empty() ? 0ULL : get_current_binlog_record().last_sequence_number; + } + [[nodiscard]] bool is_binlog_open() const noexcept; [[nodiscard]] open_binlog_status @@ -126,7 +141,8 @@ class [[nodiscard]] storage { void write_event(util::const_byte_span event_data, bool at_transaction_boundary, const gtids::gtid &transaction_gtid, - const ctime_timestamp &event_timestamp); + const ctime_timestamp &event_timestamp, + events::seq_no_t transaction_sequence_number); void close_binlog(); void discard_incomplete_transaction_events(); @@ -156,6 +172,8 @@ class [[nodiscard]] storage { gtids::gtid_set gtids_in_event_buffer_{}; ctime_timestamp_range ready_to_flush_timestamps_{}; ctime_timestamp_range incomplete_transaction_timestamps_{}; + events::seq_no_t ready_to_flush_last_sequence_number_{0ULL}; + events::seq_no_t incomplete_transaction_last_sequence_number_{0ULL}; void ensure_streaming_mode() const; @@ -187,6 +205,11 @@ class [[nodiscard]] storage { return get_flushed_position() + last_transaction_boundary_position_in_event_buffer_; } + [[nodiscard]] open_binlog_status + open_new_binlog_file_internal(const composite_binlog_name &binlog_name); + [[nodiscard]] open_binlog_status + open_existing_binlog_file_internal(std::uint64_t open_stream_offset); + void flush_event_buffer_internal(); void load_binlog_index();