diff --git a/src/app.cpp b/src/app.cpp index 7815ffc..16c8d9d 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -535,10 +535,10 @@ void process_binlog_event(const binsrv::events::event_view ¤t_event_v, // checking if the event needs to be written to the binlog if (!info_only) { - storage.write_event(current_event_v.get_portion(), - context.is_at_transaction_boundary(), - context.get_transaction_gtid(), - current_common_header_v.get_timestamp()); + storage.write_event( + current_event_v.get_portion(), context.is_at_transaction_boundary(), + context.get_transaction_gtid(), current_common_header_v.get_timestamp(), + context.get_transaction_sequence_number()); } // processing the very last event in the sequence - either a non-artificial diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp index e7f3c1d..e5cfc12 100644 --- a/src/binsrv/binlog_file_metadata.cpp +++ b/src/binsrv/binlog_file_metadata.cpp @@ -30,7 +30,7 @@ namespace binsrv { binlog_file_metadata::binlog_file_metadata() - : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {} + : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}, {}} {} binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp index f03e602..f46510d 100644 --- a/src/binsrv/binlog_file_metadata.hpp +++ b/src/binsrv/binlog_file_metadata.hpp @@ -26,12 +26,20 @@ #include "binsrv/gtids/gtid_set.hpp" +#include "binsrv/events/common_types.hpp" + #include "util/nv_tuple.hpp" namespace binsrv { class [[nodiscard]] binlog_file_metadata { private: + // The 'last_sequence_number' field persists the rewrite-mode GTID + // recovery state for this binlog file. It is stored here (rather + // than re-derived from the binlog file content on resume) because + // every other piece of resume state already lives in this metadata + // record - this addition simply makes recovery a single + // read-and-load step. using impl_type = util::nv_tuple< // clang-format off util::nv<"version", std::uint32_t>, @@ -39,7 +47,8 @@ class [[nodiscard]] binlog_file_metadata { util::nv<"previous_gtids", gtids::optional_gtid_set>, util::nv<"added_gtids", gtids::optional_gtid_set>, util::nv<"min_timestamp", ctime_timestamp>, - util::nv<"max_timestamp", ctime_timestamp> + util::nv<"max_timestamp", ctime_timestamp>, + util::nv<"last_sequence_number", events::seq_no_t> // clang-format on >; diff --git a/src/binsrv/events/reader_context.cpp b/src/binsrv/events/reader_context.cpp index 6c537a2..132b0a6 100644 --- a/src/binsrv/events/reader_context.cpp +++ b/src/binsrv/events/reader_context.cpp @@ -943,6 +943,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered non-empty gtid in the anonymous gtid log event"); } + transaction_sequence_number_ = current_post_header.get_sequence_number(); const auto current_body{generic_body{ current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ @@ -962,6 +963,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered an empty gtid in the gtid log event"); } + transaction_sequence_number_ = current_post_header.get_sequence_number(); const auto current_body{ generic_body{current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ @@ -981,6 +983,7 @@ void reader_context::start_transaction(const event_view ¤t_event_v) { util::exception_location().raise( "encountered an empty gtid in the gtid tagged log event"); } + transaction_sequence_number_ = current_body.get_sequence_number(); const auto expected_transaction_length_raw{ current_body.get_transaction_length_raw()}; if (!std::in_range(expected_transaction_length_raw)) { @@ -1011,6 +1014,7 @@ void reader_context::finish_transaction() { "transaction did not end at the declared length"); } transaction_gtid_ = gtids::gtid{}; + transaction_sequence_number_ = 0ULL; expected_transaction_length_ = 0U; current_transaction_length_ = 0U; } diff --git a/src/binsrv/events/reader_context.hpp b/src/binsrv/events/reader_context.hpp index b4c1149..541947a 100644 --- a/src/binsrv/events/reader_context.hpp +++ b/src/binsrv/events/reader_context.hpp @@ -25,6 +25,7 @@ #include "binsrv/gtids/gtid.hpp" #include "binsrv/events/common_header_view_fwd.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/event_fwd.hpp" #include "binsrv/events/event_view_fwd.hpp" #include "binsrv/events/protocol_traits.hpp" @@ -63,6 +64,13 @@ class [[nodiscard]] reader_context { [[nodiscard]] const gtids::gtid &get_transaction_gtid() const noexcept { return transaction_gtid_; } + [[nodiscard]] bool has_transaction_sequence_number() const noexcept { + return transaction_sequence_number_ != 0ULL; + } + [[nodiscard]] seq_no_t get_transaction_sequence_number() const noexcept { + return transaction_sequence_number_; + } + [[nodiscard]] bool is_at_transaction_boundary() const noexcept { return (state_ == state_type::any_other_expected && current_transaction_length_ == expected_transaction_length_) || @@ -112,8 +120,14 @@ class [[nodiscard]] reader_context { std::uint32_t position_; post_header_length_container post_header_lengths_{}; + // last seen transaction GTID (empty GTID if between transactions) gtids::gtid transaction_gtid_{}; + // sequence number of the last transaction seen (0 if between transactions) + seq_no_t transaction_sequence_number_{0ULL}; + // the expected length of the current transaction + // (extracted from one of the GTID_LOG events) std::uint32_t expected_transaction_length_{0U}; + // the length of the transaction seen so far std::uint32_t current_transaction_length_{0U}; bool expect_info_only_preamble_events_{false}; diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 5beb67d..faaf4e6 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "binsrv/storage_config.hpp" #include "binsrv/storage_metadata.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/gtids/gtid.hpp" @@ -184,38 +186,10 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { : storage_backend_open_stream_mode::create}; const auto open_stream_offset{backend_->open_stream(binlog_name.str(), mode)}; - if (!binlog_exists) { - // writing the magic binlog footprint only if this is a newly - // created file - backend_->write_data_to_stream(events::magic_binlog_payload); - - gtids::optional_gtid_set previous_binlog_gtids{}; - gtids::optional_gtid_set added_binlog_gtids{}; - if (is_in_gtid_replication_mode()) { - previous_binlog_gtids = get_gtids(); - added_binlog_gtids = gtids::gtid_set{}; - } - - binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset, - std::move(previous_binlog_gtids), - std::move(added_binlog_gtids), - ctime_timestamp_range{}); - save_binlog_metadata(get_current_binlog_record()); - save_binlog_index(); - result = open_binlog_status::created; + if (binlog_exists) { + result = open_existing_binlog_file_internal(open_stream_offset); } else { - assert(get_current_position() == open_stream_offset); - if (open_stream_offset == 0ULL) { - backend_->write_data_to_stream(events::magic_binlog_payload); - get_current_binlog_record().size = events::magic_binlog_offset; - result = open_binlog_status::opened_empty; - } else if (open_stream_offset == events::magic_binlog_offset) { - result = open_binlog_status::opened_at_magic_payload_offset; - } else { - // position is beyond magic payload offset - assert(open_stream_offset > events::magic_binlog_offset); - result = open_binlog_status::opened_with_data_present; - } + result = open_new_binlog_file_internal(binlog_name); } update_last_checkpoint_info(); @@ -225,6 +199,8 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { assert(gtids_in_event_buffer_.is_empty()); assert(ready_to_flush_timestamps_.is_empty()); assert(incomplete_transaction_timestamps_.is_empty()); + assert(ready_to_flush_last_sequence_number_ == 0ULL); + assert(incomplete_transaction_last_sequence_number_ == 0ULL); return result; } @@ -232,12 +208,16 @@ storage::open_binlog(const composite_binlog_name &binlog_name) { void storage::write_event(util::const_byte_span event_data, bool at_transaction_boundary, const gtids::gtid &transaction_gtid, - const ctime_timestamp &event_timestamp) { + const ctime_timestamp &event_timestamp, + events::seq_no_t transaction_sequence_number) { ensure_streaming_mode(); event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data), std::cend(event_data)); incomplete_transaction_timestamps_.add_timestamp(event_timestamp); + if (transaction_sequence_number != 0ULL) { + incomplete_transaction_last_sequence_number_ = transaction_sequence_number; + } if (at_transaction_boundary) { last_transaction_boundary_position_in_event_buffer_ = @@ -247,6 +227,10 @@ void storage::write_event(util::const_byte_span event_data, } ready_to_flush_timestamps_.add_range(incomplete_transaction_timestamps_); incomplete_transaction_timestamps_.clear(); + + ready_to_flush_last_sequence_number_ = + incomplete_transaction_last_sequence_number_; + incomplete_transaction_last_sequence_number_ = 0ULL; } // now we are writing data from the event buffer to the storage backend if @@ -303,6 +287,7 @@ void storage::discard_incomplete_transaction_events() { event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_); incomplete_transaction_timestamps_.clear(); + incomplete_transaction_last_sequence_number_ = 0ULL; } void storage::flush_event_buffer() { @@ -334,6 +319,41 @@ void storage::update_last_checkpoint_info() { } } +[[nodiscard]] open_binlog_status storage::open_new_binlog_file_internal( + const composite_binlog_name &binlog_name) { + // writing the magic binlog footprint only if this is a newly + // created file + backend_->write_data_to_stream(events::magic_binlog_payload); + + gtids::optional_gtid_set previous_binlog_gtids{}; + gtids::optional_gtid_set added_binlog_gtids{}; + if (is_in_gtid_replication_mode()) { + previous_binlog_gtids = get_gtids(); + added_binlog_gtids = gtids::gtid_set{}; + } + + binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset, + std::move(previous_binlog_gtids), + std::move(added_binlog_gtids), + ctime_timestamp_range{}); + save_binlog_metadata(get_current_binlog_record()); + save_binlog_index(); + return open_binlog_status::created; +} +[[nodiscard]] open_binlog_status +storage::open_existing_binlog_file_internal(std::uint64_t open_stream_offset) { + assert(get_current_position() == open_stream_offset); + if (open_stream_offset >= events::magic_binlog_offset) { + return open_stream_offset == events::magic_binlog_offset + ? open_binlog_status::opened_at_magic_payload_offset + : open_binlog_status::opened_with_data_present; + } + assert(open_stream_offset == 0ULL); + backend_->write_data_to_stream(events::magic_binlog_payload); + get_current_binlog_record().size = events::magic_binlog_offset; + return open_binlog_status::opened_empty; +} + void storage::flush_event_buffer_internal() { assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= @@ -354,6 +374,8 @@ void storage::flush_event_buffer_internal() { } } get_current_binlog_record().timestamps.add_range(ready_to_flush_timestamps_); + get_current_binlog_record().last_sequence_number = + ready_to_flush_last_sequence_number_; save_binlog_metadata(get_current_binlog_record()); @@ -369,6 +391,7 @@ void storage::flush_event_buffer_internal() { gtids_in_event_buffer_.clear(); } ready_to_flush_timestamps_.clear(); + ready_to_flush_last_sequence_number_ = 0ULL; } void storage::load_binlog_index() { @@ -476,13 +499,14 @@ storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const { backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; - return binlog_record{.name = binlog_name, - .size = metadata.root().get<"size">(), - .previous_gtids = - metadata.root().get<"previous_gtids">(), - .added_gtids = metadata.root().get<"added_gtids">(), - .timestamps = {metadata.root().get<"min_timestamp">(), - metadata.root().get<"max_timestamp">()}}; + return binlog_record{ + .name = binlog_name, + .size = metadata.root().get<"size">(), + .previous_gtids = metadata.root().get<"previous_gtids">(), + .added_gtids = metadata.root().get<"added_gtids">(), + .timestamps = {metadata.root().get<"min_timestamp">(), + metadata.root().get<"max_timestamp">()}, + .last_sequence_number = metadata.root().get<"last_sequence_number">()}; } void storage::validate_binlog_metadata(const binlog_record &record) const { @@ -522,6 +546,7 @@ void storage::save_binlog_metadata(const binlog_record &record) const { ctime_timestamp{record.timestamps.get_min_timestamp()}; metadata.root().get<"max_timestamp">() = ctime_timestamp{record.timestamps.get_max_timestamp()}; + metadata.root().get<"last_sequence_number">() = record.last_sequence_number; const auto content{metadata.str()}; backend_->put_object(generate_binlog_metadata_name(record.name), util::as_const_byte_span(content)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index e2ed0f1..ac34d80 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -33,6 +33,8 @@ #include "binsrv/gtids/gtid_fwd.hpp" #include "binsrv/gtids/gtid_set.hpp" +#include "binsrv/events/common_types.hpp" + #include "util/byte_span_fwd.hpp" namespace binsrv { @@ -40,11 +42,19 @@ namespace binsrv { class [[nodiscard]] storage { private: struct binlog_record { + // binlog file name composite_binlog_name name; + // binlog file size in bytes std::uint64_t size{0ULL}; + // accumulated GTIDs present in the binlog files before this one gtids::optional_gtid_set previous_gtids{}; + // GTIDs present in this binlog file gtids::optional_gtid_set added_gtids{}; + // minimum and maximum event timestamps observed in this binlog file ctime_timestamp_range timestamps{}; + // sequence_number of the last transaction seen in this file - + // used for GTID rewrite-mode resume state persistence + events::seq_no_t last_sequence_number{0ULL}; }; using binlog_record_container = std::vector; @@ -119,6 +129,11 @@ class [[nodiscard]] storage { return result; } + [[nodiscard]] events::seq_no_t + get_current_transaction_sequence_number() const noexcept { + return is_empty() ? 0ULL : get_current_binlog_record().last_sequence_number; + } + [[nodiscard]] bool is_binlog_open() const noexcept; [[nodiscard]] open_binlog_status @@ -126,7 +141,8 @@ class [[nodiscard]] storage { void write_event(util::const_byte_span event_data, bool at_transaction_boundary, const gtids::gtid &transaction_gtid, - const ctime_timestamp &event_timestamp); + const ctime_timestamp &event_timestamp, + events::seq_no_t transaction_sequence_number); void close_binlog(); void discard_incomplete_transaction_events(); @@ -156,6 +172,8 @@ class [[nodiscard]] storage { gtids::gtid_set gtids_in_event_buffer_{}; ctime_timestamp_range ready_to_flush_timestamps_{}; ctime_timestamp_range incomplete_transaction_timestamps_{}; + events::seq_no_t ready_to_flush_last_sequence_number_{0ULL}; + events::seq_no_t incomplete_transaction_last_sequence_number_{0ULL}; void ensure_streaming_mode() const; @@ -187,6 +205,11 @@ class [[nodiscard]] storage { return get_flushed_position() + last_transaction_boundary_position_in_event_buffer_; } + [[nodiscard]] open_binlog_status + open_new_binlog_file_internal(const composite_binlog_name &binlog_name); + [[nodiscard]] open_binlog_status + open_existing_binlog_file_internal(std::uint64_t open_stream_offset); + void flush_event_buffer_internal(); void load_binlog_index();