diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 2bc7c29..ba41050 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -321,7 +321,7 @@ jobs: working-directory: ${{github.workspace}}/src-build-${{matrix.config.label}} # Execute tests defined by the CMake configuration. # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest --parallel + run: ctest --parallel --output-on-failure - name: Info Build artefacts run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 932cfeb..42f035c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,6 +109,8 @@ set(source_files src/binsrv/events/common_header_flag_type_fwd.hpp src/binsrv/events/common_header_flag_type.hpp + src/binsrv/events/common_types.hpp + src/binsrv/events/empty_body_fwd.hpp src/binsrv/events/empty_body.hpp src/binsrv/events/empty_body.cpp @@ -389,6 +391,10 @@ set(source_files src/util/semantic_version.hpp src/util/semantic_version.cpp + src/util/timestamp_types.hpp + src/util/timestamp_helpers.hpp + src/util/timestamp_helpers.cpp + # mysql wrapper library files src/easymysql/core_error_helpers_private.hpp src/easymysql/core_error_helpers_private.cpp diff --git a/src/binsrv/events/common_types.hpp b/src/binsrv/events/common_types.hpp new file mode 100644 index 0000000..901bcbb --- /dev/null +++ b/src/binsrv/events/common_types.hpp @@ -0,0 +1,27 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_COMMON_TYPES_HPP +#define BINSRV_EVENTS_COMMON_TYPES_HPP + +#include + +namespace binsrv::events { + +using seq_no_t = std::uint64_t; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_COMMON_TYPES_HPP diff --git a/src/binsrv/events/event.hpp b/src/binsrv/events/event.hpp index d9aed11..a3ae041 100644 --- a/src/binsrv/events/event.hpp +++ b/src/binsrv/events/event.hpp @@ -120,10 +120,10 @@ class [[nodiscard]] event { template static event - create_event(std::uint32_t offset, const ctime_timestamp ×tamp, - std::uint32_t server_id, common_header_flag_set flags, - const generic_post_header &post_header, - const generic_body &body, bool include_checksum) { + create_event_simple(std::uint32_t offset, const ctime_timestamp ×tamp, + std::uint32_t server_id, common_header_flag_set flags, + const generic_post_header &post_header, + const generic_body &body, bool include_checksum) { return event{Code, offset, timestamp, server_id, flags, post_header, body, include_checksum, nullptr}; } diff --git a/src/binsrv/events/gtid_log_body.cpp b/src/binsrv/events/gtid_log_body.cpp index 4dc8394..e2bf3e5 100644 --- a/src/binsrv/events/gtid_log_body.cpp +++ b/src/binsrv/events/gtid_log_body.cpp @@ -17,23 +17,48 @@ #include #include +#include #include #include #include #include -#include -#include -#include - #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" +#include "util/common_optional_types.hpp" #include "util/exception_location_helpers.hpp" #include "util/semantic_version.hpp" +#include "util/timestamp_helpers.hpp" namespace binsrv::events { +gtid_log_body::gtid_log_body( + const util::high_resolution_time_point &immediate_commit_timestamp, + const util::optional_high_resolution_time_point &original_commit_timestamp, + std::uint64_t transaction_length, + const util::optional_semantic_version &immediate_server_version, + const util::optional_semantic_version &original_server_version, + const util::optional_uint64_t &commit_group_ticket) noexcept + : immediate_commit_timestamp_( + util::high_resolution_time_point_to_microseconds( + immediate_commit_timestamp)), + original_commit_timestamp_{ + original_commit_timestamp + ? util::high_resolution_time_point_to_microseconds( + *original_commit_timestamp) + : unset_commit_timestamp}, + transaction_length_{transaction_length}, + immediate_server_version_{immediate_server_version + ? immediate_server_version->get_encoded() + : unset_server_version}, + original_server_version_{original_server_version + ? original_server_version->get_encoded() + : unset_server_version}, + commit_group_ticket_{commit_group_ticket ? *commit_group_ticket + : unset_commit_group_ticket} {} + gtid_log_body::gtid_log_body(util::const_byte_span portion) { // TODO: rework with direct member initialization @@ -42,8 +67,8 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) { sizeof immediate_commit_timestamp_ + sizeof original_commit_timestamp_ + sizeof transaction_length_ + - sizeof original_server_version_ + sizeof immediate_server_version_ + + sizeof original_server_version_ + sizeof commit_group_ticket_, alignof(decltype(*this))), "inefficient data member reordering in gtid_log event body"); @@ -59,12 +84,12 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) { "gtid_log event body is too short to extract immediate commit " "timestamp"); } - const std::uint64_t commit_timestamp_mask{ + static constexpr std::uint64_t commit_timestamp_mask{ 1ULL << (commit_timestamp_field_length * 8ULL - 1ULL)}; if ((immediate_commit_timestamp_ & commit_timestamp_mask) != 0ULL) { // clearing the most significant bit in the // commit_timestamp_field_length-byte sequence - original_commit_timestamp_ &= ~commit_timestamp_mask; + immediate_commit_timestamp_ &= ~commit_timestamp_mask; if (!util::extract_fixed_int_from_byte_span_checked( remainder, original_commit_timestamp_, commit_timestamp_field_length)) { @@ -81,15 +106,16 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) { } if (std::size(remainder) >= server_version_field_length) { - util::extract_fixed_int_from_byte_span(remainder, original_server_version_); - const std::uint32_t server_version_mask{ - 1ULL << (server_version_field_length * 8ULL - 1ULL)}; - if ((original_server_version_ & server_version_mask) != 0UL) { - original_server_version_ &= ~server_version_mask; + util::extract_fixed_int_from_byte_span(remainder, + immediate_server_version_); + static constexpr std::uint32_t server_version_mask{ + 1U << (server_version_field_length * 8U - 1U)}; + if ((immediate_server_version_ & server_version_mask) != 0U) { + immediate_server_version_ &= ~server_version_mask; if (!util::extract_fixed_int_from_byte_span_checked( - remainder, immediate_server_version_)) { + remainder, original_server_version_)) { util::exception_location().raise( - "gtid_log event body is too short to extract immediate server " + "gtid_log event body is too short to extract original server " "version"); } } @@ -104,34 +130,110 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) { } [[nodiscard]] std::string gtid_log_body::get_readable_immediate_commit_timestamp() const { - // threre is still no way to get string representationof the - // std::chrono::high_resolution_clock::time_point using standard stdlib means, - // so using boost::posix_time::ptime here - boost::posix_time::ptime timestamp{ - boost::posix_time::from_time_t(std::time_t{})}; - timestamp += - boost::posix_time::microseconds{get_immediate_commit_timestamp_raw()}; - return boost::posix_time::to_iso_extended_string(timestamp); + return util::microseconds_to_iso_extended_string( + get_immediate_commit_timestamp_raw()); +} + +[[nodiscard]] std::string +gtid_log_body::get_readable_original_commit_timestamp() const { + if (!has_original_commit_timestamp()) { + return {}; + } + return util::microseconds_to_iso_extended_string( + get_original_commit_timestamp_raw()); +} + +[[nodiscard]] util::optional_semantic_version +gtid_log_body::get_immediate_server_version() const noexcept { + if (!has_immediate_server_version()) { + return std::nullopt; + } + return util::semantic_version{get_immediate_server_version_raw()}; } -[[nodiscard]] util::semantic_version +[[nodiscard]] std::string +gtid_log_body::get_readable_immediate_server_version() const { + const auto immediate_server_version{get_immediate_server_version()}; + if (!immediate_server_version) { + return {}; + } + return immediate_server_version->get_string(); +} + +[[nodiscard]] util::optional_semantic_version gtid_log_body::get_original_server_version() const noexcept { + if (!has_original_server_version()) { + return std::nullopt; + } return util::semantic_version{get_original_server_version_raw()}; } [[nodiscard]] std::string gtid_log_body::get_readable_original_server_version() const { - return get_original_server_version().get_string(); + const auto original_server_version{get_original_server_version()}; + if (!original_server_version) { + return {}; + } + return original_server_version->get_string(); } -[[nodiscard]] util::semantic_version -gtid_log_body::get_immediate_server_version() const noexcept { - return util::semantic_version{get_immediate_server_version_raw()}; +[[nodiscard]] std::size_t +gtid_log_body::calculate_encoded_size() const noexcept { + std::size_t result{0U}; + result += commit_timestamp_field_length; + result += + (has_original_commit_timestamp() ? commit_timestamp_field_length : 0U); + result += util::calculate_packed_int_size(transaction_length_); + if (has_immediate_server_version()) { + result += server_version_field_length; + result += + (has_original_server_version() ? server_version_field_length : 0U); + result += + (has_commit_group_ticket() ? commit_group_ticket_field_length : 0U); + } + + return result; } -[[nodiscard]] std::string -gtid_log_body::get_readable_immediate_server_version() const { - return get_immediate_server_version().get_string(); +void gtid_log_body::encode_to(util::byte_span &destination) const { + if (std::size(destination) < calculate_encoded_size()) { + util::exception_location().raise( + "cannot encode rotate event body"); + } + + static constexpr std::uint64_t commit_timestamp_mask{ + 1ULL << (commit_timestamp_field_length * 8ULL - 1ULL)}; + + const std::uint64_t patched_immediate_commit_timestamp{ + immediate_commit_timestamp_ | + (has_original_commit_timestamp() ? commit_timestamp_mask : 0ULL)}; + util::insert_fixed_int_to_byte_span(destination, + patched_immediate_commit_timestamp, + commit_timestamp_field_length); + if (has_original_commit_timestamp()) { + util::insert_fixed_int_to_byte_span(destination, original_commit_timestamp_, + commit_timestamp_field_length); + } + [[maybe_unused]] const auto insertion_result{ + util::insert_packed_int_to_byte_span_checked(destination, + transaction_length_)}; + + if (has_immediate_server_version()) { + static constexpr std::uint32_t server_version_mask{ + 1U << (server_version_field_length * 8U - 1U)}; + const std::uint32_t patched_immediate_server_version{ + immediate_server_version_ | + (has_original_server_version() ? server_version_mask : 0U)}; + util::insert_fixed_int_to_byte_span(destination, + patched_immediate_server_version); + if (has_original_server_version()) { + util::insert_fixed_int_to_byte_span(destination, + original_server_version_); + } + if (has_commit_group_ticket()) { + util::insert_fixed_int_to_byte_span(destination, commit_group_ticket_); + } + } } std::ostream &operator<<(std::ostream &output, const gtid_log_body &obj) { @@ -139,19 +241,17 @@ std::ostream &operator<<(std::ostream &output, const gtid_log_body &obj) { << obj.get_readable_immediate_commit_timestamp(); if (obj.has_original_commit_timestamp()) { output << ", original_commit_timestamp: " - << obj.get_original_commit_timestamp_raw(); + << obj.get_readable_original_commit_timestamp(); } - if (obj.has_transaction_length()) { - output << ", transaction_length: " << obj.get_transaction_length_raw(); + output << ", transaction_length: " << obj.get_transaction_length_raw(); + if (obj.has_immediate_server_version()) { + output << ", immediate_server_version: " + << obj.get_readable_immediate_server_version(); } if (obj.has_original_server_version()) { output << ", original_server_version: " << obj.get_readable_original_server_version(); } - if (obj.has_immediate_server_version()) { - output << ", immediate_server_version: " - << obj.get_readable_immediate_server_version(); - } if (obj.has_commit_group_ticket()) { output << ", commit_group_ticket: " << obj.get_commit_group_ticket_raw(); } diff --git a/src/binsrv/events/gtid_log_body.hpp b/src/binsrv/events/gtid_log_body.hpp index 2a81684..2e2e1ed 100644 --- a/src/binsrv/events/gtid_log_body.hpp +++ b/src/binsrv/events/gtid_log_body.hpp @@ -27,23 +27,33 @@ #include "util/byte_span_fwd.hpp" #include "util/common_optional_types.hpp" #include "util/semantic_version_fwd.hpp" +#include "util/timestamp_helpers.hpp" namespace binsrv::events { class [[nodiscard]] gtid_log_body { public: + gtid_log_body( + const util::high_resolution_time_point &immediate_commit_timestamp, + const util::optional_high_resolution_time_point + &original_commit_timestamp, + std::uint64_t transaction_length, + const util::optional_semantic_version &immediate_server_version, + const util::optional_semantic_version &original_server_version, + const util::optional_uint64_t &commit_group_ticket) noexcept; explicit gtid_log_body(util::const_byte_span portion); [[nodiscard]] std::uint64_t get_immediate_commit_timestamp_raw() const noexcept { return immediate_commit_timestamp_; } - [[nodiscard]] std::chrono::high_resolution_clock::time_point + [[nodiscard]] util::high_resolution_time_point get_immediate_commit_timestamp() const noexcept { - return std::chrono::high_resolution_clock::time_point{ - std::chrono::microseconds(get_immediate_commit_timestamp_raw())}; + return util::microseconds_to_high_resolution_time_point( + get_immediate_commit_timestamp_raw()); } [[nodiscard]] std::string get_readable_immediate_commit_timestamp() const; + [[nodiscard]] bool has_original_commit_timestamp() const noexcept { return original_commit_timestamp_ != unset_commit_timestamp; } @@ -51,24 +61,20 @@ class [[nodiscard]] gtid_log_body { get_original_commit_timestamp_raw() const noexcept { return original_commit_timestamp_; } - - [[nodiscard]] bool has_transaction_length() const noexcept { - return transaction_length_ != unset_transaction_length; + [[nodiscard]] util::optional_high_resolution_time_point + get_original_commit_timestamp() const noexcept { + if (!has_original_commit_timestamp()) { + return std::nullopt; + } + return util::microseconds_to_high_resolution_time_point( + get_original_commit_timestamp_raw()); } + [[nodiscard]] std::string get_readable_original_commit_timestamp() const; + [[nodiscard]] std::uint64_t get_transaction_length_raw() const noexcept { return transaction_length_; } - [[nodiscard]] bool has_original_server_version() const noexcept { - return original_server_version_ != unset_server_version; - } - [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { - return original_server_version_; - } - [[nodiscard]] util::semantic_version - get_original_server_version() const noexcept; - [[nodiscard]] std::string get_readable_original_server_version() const; - [[nodiscard]] bool has_immediate_server_version() const noexcept { return immediate_server_version_ != unset_server_version; } @@ -76,16 +82,36 @@ class [[nodiscard]] gtid_log_body { get_immediate_server_version_raw() const noexcept { return immediate_server_version_; } - [[nodiscard]] util::semantic_version + [[nodiscard]] util::optional_semantic_version get_immediate_server_version() const noexcept; [[nodiscard]] std::string get_readable_immediate_server_version() const; + [[nodiscard]] bool has_original_server_version() const noexcept { + return original_server_version_ != unset_server_version; + } + [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { + return original_server_version_; + } + [[nodiscard]] util::optional_semantic_version + get_original_server_version() const noexcept; + [[nodiscard]] std::string get_readable_original_server_version() const; + [[nodiscard]] bool has_commit_group_ticket() const noexcept { return commit_group_ticket_ != unset_commit_group_ticket; } [[nodiscard]] std::uint64_t get_commit_group_ticket_raw() const noexcept { return commit_group_ticket_; } + [[nodiscard]] util::optional_uint64_t + get_commit_group_ticket() const noexcept { + if (!has_commit_group_ticket()) { + return std::nullopt; + } + return commit_group_ticket_; + } + + [[nodiscard]] std::size_t calculate_encoded_size() const noexcept; + void encode_to(util::byte_span &destination) const; friend bool operator==(const gtid_log_body & /* first */, const gtid_log_body & /* second */) = default; @@ -102,13 +128,13 @@ class [[nodiscard]] gtid_log_body { static constexpr std::size_t commit_timestamp_field_length{7U}; static constexpr std::size_t server_version_field_length{4U}; - static constexpr std::size_t commit_group_ticket_field_length{8}; + static constexpr std::size_t commit_group_ticket_field_length{8U}; std::uint64_t immediate_commit_timestamp_{unset_commit_timestamp}; // 0 std::uint64_t original_commit_timestamp_{unset_commit_timestamp}; // 1 std::uint64_t transaction_length_{unset_transaction_length}; // 2 - std::uint32_t original_server_version_{unset_server_version}; // 3 - std::uint32_t immediate_server_version_{unset_server_version}; // 4 + std::uint32_t immediate_server_version_{unset_server_version}; // 3 + std::uint32_t original_server_version_{unset_server_version}; // 4 std::uint64_t commit_group_ticket_{unset_commit_group_ticket}; // 5 }; diff --git a/src/binsrv/events/gtid_log_post_header.cpp b/src/binsrv/events/gtid_log_post_header.cpp index 46b56ae..af756ee 100644 --- a/src/binsrv/events/gtid_log_post_header.cpp +++ b/src/binsrv/events/gtid_log_post_header.cpp @@ -25,6 +25,7 @@ #include +#include "binsrv/events/common_types.hpp" #include "binsrv/events/gtid_log_flag_type.hpp" #include "binsrv/gtids/common_types.hpp" @@ -33,6 +34,7 @@ #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" @@ -109,7 +111,7 @@ gtid_log_post_header::gtid_log_post_header(util::const_byte_span portion) { // TODO: for gtid_log (not anonymous gtid_log) add validation of gno - // (gno >= gtid::min_gno && gno_ < gtid::max_gno) util::extract_fixed_int_from_byte_span(remainder, logical_ts_code_); - if (logical_ts_code_ != expected_logical_ts_code) { + if (logical_ts_code_ != known_logical_ts_code) { util::exception_location().raise( "unsupported logical timestamp code in gtid_log post header"); } @@ -117,6 +119,16 @@ gtid_log_post_header::gtid_log_post_header(util::const_byte_span portion) { util::extract_fixed_int_from_byte_span(remainder, sequence_number_); } +gtid_log_post_header::gtid_log_post_header( + const gtid_log_flag_set &flags, const gtids::uuid &uuid, + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + gtids::gno_t gno, std::uint8_t logical_ts_code, + events::seq_no_t last_committed, events::seq_no_t sequence_number) noexcept + : flags_{flags.get_bits()}, logical_ts_code_{logical_ts_code}, + uuid_{uuid.get_raw()}, gno_{static_cast(gno)}, + last_committed_{static_cast(last_committed)}, + sequence_number_{static_cast(sequence_number)} {} + [[nodiscard]] gtid_log_flag_set gtid_log_post_header::get_flags() const noexcept { return gtid_log_flag_set{get_flags_raw()}; @@ -141,6 +153,19 @@ gtid_log_post_header::get_flags() const noexcept { return {get_uuid(), get_gno()}; } +void gtid_log_post_header::encode_to(util::byte_span &destination) const { + if (std::size(destination) < calculate_encoded_size()) { + util::exception_location().raise( + "cannot encode rotate event post header"); + } + util::insert_fixed_int_to_byte_span(destination, flags_); + util::insert_byte_array_to_byte_span(destination, uuid_); + util::insert_fixed_int_to_byte_span(destination, gno_); + util::insert_fixed_int_to_byte_span(destination, logical_ts_code_); + util::insert_fixed_int_to_byte_span(destination, last_committed_); + util::insert_fixed_int_to_byte_span(destination, sequence_number_); +} + std::ostream &operator<<(std::ostream &output, const gtid_log_post_header &obj) { return output << "flags: " << obj.get_readable_flags() diff --git a/src/binsrv/events/gtid_log_post_header.hpp b/src/binsrv/events/gtid_log_post_header.hpp index f50c9c0..154d08f 100644 --- a/src/binsrv/events/gtid_log_post_header.hpp +++ b/src/binsrv/events/gtid_log_post_header.hpp @@ -21,6 +21,7 @@ #include #include +#include "binsrv/events/common_types.hpp" #include "binsrv/events/gtid_log_flag_type_fwd.hpp" #include "binsrv/gtids/common_types.hpp" @@ -37,9 +38,13 @@ class [[nodiscard]] gtid_log_post_header { // https://github.com/mysql/mysql-server/blob/mysql-8.0.43/libbinlogevents/include/control_events.h#L1091 // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1202 - static constexpr std::uint8_t expected_logical_ts_code{2U}; + static constexpr std::uint8_t known_logical_ts_code{2U}; explicit gtid_log_post_header(util::const_byte_span portion); + gtid_log_post_header(const gtid_log_flag_set &flags, const gtids::uuid &uuid, + gtids::gno_t gno, std::uint8_t logical_ts_code, + events::seq_no_t last_committed, + events::seq_no_t sequence_number) noexcept; [[nodiscard]] std::uint8_t get_flags_raw() const noexcept { return flags_; } [[nodiscard]] gtid_log_flag_set get_flags() const noexcept; @@ -65,10 +70,27 @@ class [[nodiscard]] gtid_log_post_header { [[nodiscard]] std::int64_t get_last_committed_raw() const noexcept { return last_committed_; } + [[nodiscard]] events::seq_no_t get_last_committed() const noexcept { + return static_cast(last_committed_); + } + void set_last_committed_raw(std::int64_t last_committed) noexcept { + last_committed_ = last_committed; + } [[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept { return sequence_number_; } + [[nodiscard]] events::seq_no_t get_sequence_number() const noexcept { + return static_cast(sequence_number_); + } + void set_sequence_number_raw(std::int64_t sequence_number) noexcept { + sequence_number_ = sequence_number; + } + + [[nodiscard]] static std::size_t calculate_encoded_size() noexcept { + return size_in_bytes; + } + void encode_to(util::byte_span &destination) const; friend bool operator==(const gtid_log_post_header & /* first */, const gtid_log_post_header & /* second */) = default; diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.cpp b/src/binsrv/events/gtid_tagged_log_body_impl.cpp index 88a668d..cab5246 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.cpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.cpp @@ -15,12 +15,14 @@ #include "binsrv/events/gtid_tagged_log_body_impl.hpp" +#include #include #include #include #include #include #include +#include #include #include #include @@ -31,13 +33,11 @@ #include -#include -#include -#include - #include "binsrv/events/code_type.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/gtid_log_flag_type.hpp" +#include "binsrv/gtids/common_types.hpp" #include "binsrv/gtids/gtid.hpp" #include "binsrv/gtids/tag.hpp" #include "binsrv/gtids/uuid.hpp" @@ -45,37 +45,108 @@ #include "util/bounded_string_storage.hpp" #include "util/byte_span.hpp" #include "util/byte_span_extractors.hpp" +#include "util/byte_span_inserters.hpp" +#include "util/common_optional_types.hpp" #include "util/conversion_helpers.hpp" #include "util/exception_location_helpers.hpp" #include "util/flag_set.hpp" #include "util/semantic_version.hpp" +#include "util/timestamp_helpers.hpp" namespace binsrv::events { +namespace { + +// Field identifiers in the order they are emitted by upstream's +// define_fields() (libs/mysql/binlog/event/control_events.h:1111). +// Both the decoder (process_field_data) and the encoder (encode_to / +// calculate_encoded_size) use these values; keeping a single definition +// avoids drift. +enum class field_id_type : std::uint8_t { + flags, + uuid, + gno, + tag, + last_committed, + sequence_number, + immediate_commit_timestamp, + original_commit_timestamp, + transaction_length, + immediate_server_version, + original_server_version, + commit_group_ticket, + + delimiter +}; + +// matches upstream serialization_format_version: ALWAYS 1 (see +// libs/mysql/serialization/readme.md and Gtid_event::Decoder_type) +constexpr std::uint8_t known_serialization_version_number{1U}; + +// GTID_TAGGED_LOG events ALWAYS have last_non_ignorable_field_id in their +// TLV serialization header set to 0 - this is why we do not store it inside +// the class members and declare it as a constant for both encoding and +// decoding +constexpr std::uint8_t known_last_non_ignorable_field_id{0U}; + +} // namespace + +generic_body_impl::generic_body_impl( + const gtid_log_flag_set &flags, const gtids::uuid &uuid, gtids::gno_t gno, + const gtids::tag &tag, + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + events::seq_no_t last_committed, events::seq_no_t sequence_number, + const util::high_resolution_time_point &immediate_commit_timestamp, + const util::optional_high_resolution_time_point &original_commit_timestamp, + std::uint64_t transaction_length, + const util::semantic_version &immediate_server_version, + const util::optional_semantic_version &original_server_version, + const util::optional_uint64_t &commit_group_ticket) + : gno_{static_cast(gno)}, + last_committed_{static_cast(last_committed)}, + sequence_number_{static_cast(sequence_number)}, + immediate_commit_timestamp_{ + util::high_resolution_time_point_to_microseconds( + immediate_commit_timestamp)}, + original_commit_timestamp_{ + original_commit_timestamp + ? util::high_resolution_time_point_to_microseconds( + *original_commit_timestamp) + : unset_commit_timestamp}, + transaction_length_{transaction_length}, + immediate_server_version_{immediate_server_version.get_encoded()}, + original_server_version_{original_server_version + ? original_server_version->get_encoded() + : unset_server_version}, + commit_group_ticket_{commit_group_ticket ? *commit_group_ticket + : unset_commit_group_ticket}, + uuid_{uuid.get_raw()}, tag_{tag.get_raw()}, flags_{flags.get_bits()} {} + generic_body_impl::generic_body_impl( util::const_byte_span portion) { // TODO: rework with direct member initialization // make sure we did OK with data members reordering - static_assert( - sizeof *this == - boost::alignment::align_up( - sizeof flags_ + sizeof uuid_ + sizeof gno_ + sizeof tag_ + - sizeof last_committed_ + sizeof sequence_number_ + - sizeof immediate_commit_timestamp_ + - sizeof original_commit_timestamp_ + - sizeof transaction_length_ + sizeof original_server_version_ + - sizeof immediate_server_version_ + - sizeof commit_group_ticket_, - alignof(decltype(*this))), - "inefficient data member reordering in gtid_log event body"); + // (summands listed in the same order as the declarations in the .hpp) + static_assert(sizeof *this == + boost::alignment::align_up( + sizeof gno_ + sizeof last_committed_ + + sizeof sequence_number_ + + sizeof immediate_commit_timestamp_ + + sizeof original_commit_timestamp_ + + sizeof transaction_length_ + + sizeof immediate_server_version_ + + sizeof original_server_version_ + + sizeof commit_group_ticket_ + sizeof uuid_ + + sizeof tag_ + sizeof flags_, + alignof(decltype(*this))), + "inefficient data member reordering in gtid_log event body"); auto remainder = portion; // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/serialization/readme.md?plain=1#L102 // ::= // Extracting - static constexpr std::uint8_t expected_serialization_version_number{1U}; std::uint8_t serialization_version_number{}; if (!util::extract_varlen_int_from_byte_span_checked( remainder, serialization_version_number)) { @@ -83,7 +154,7 @@ generic_body_impl::generic_body_impl( "gtid_tagged_log event body is too short to extract " "serialization_version_number"); } - if (serialization_version_number != expected_serialization_version_number) { + if (serialization_version_number != known_serialization_version_number) { util::exception_location().raise( "unexpected serialization_version_number in the gtid_tagged_log event " "body"); @@ -114,6 +185,26 @@ generic_body_impl::generic_body_impl( "gtid_tagged_log event body is too short to extract " "last_non_ignorable_field_id"); } + if (last_non_ignorable_field_id != known_last_non_ignorable_field_id) { + util::exception_location().raise( + "unexpected last_non_ignorable_field_id in the gtid_tagged_log event " + "body"); + } + + // Creating a bitset for tracking which fields we have seen in the field_id + // sequence. At the beginning we add all ids of the non-optional fields + using field_bitset_type = + std::bitset; + field_bitset_type remaining_field_ids{}; + // setting every bit except for original_commit_timestamp, + // original_server_version and commit_group_ticket, which are optional + remaining_field_ids.flip(); + remaining_field_ids.reset( + util::enum_to_index(field_id_type::original_commit_timestamp)); + remaining_field_ids.reset( + util::enum_to_index(field_id_type::original_server_version)); + remaining_field_ids.reset( + util::enum_to_index(field_id_type::commit_group_ticket)); // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/serialization/readme.md?plain=1#L114 // The sequence of @@ -126,7 +217,9 @@ generic_body_impl::generic_body_impl( util::exception_location().raise( "broken field_id sequence in the gtid_tagged_log event body"); } - if (field_id <= last_non_ignorable_field_id) { + // although known_last_non_ignorable_field_id is currently 0, the logic + // below still checks for ignorable fields rule violations + if (field_id <= known_last_non_ignorable_field_id) { if (field_id != 0 && field_id != last_seen_field_id + 1U) { util::exception_location().raise( "violated last_non_ignorable_field_id rule in the gtid_tagged_log " @@ -134,9 +227,15 @@ generic_body_impl::generic_body_impl( } } process_field_data(field_id, remainder); + remaining_field_ids.reset(static_cast(field_id)); last_seen_field_id = field_id; } + if (remaining_field_ids.any()) { + util::exception_location().raise( + "missing required non-optional fields in the gtid_tagged_log event " + "body"); + } if (!remainder.empty()) { util::exception_location().raise( "extra bytes in the gtid_log event body"); @@ -181,25 +280,17 @@ generic_body_impl::get_gtid() const { [[nodiscard]] std::string generic_body_impl:: get_readable_immediate_commit_timestamp() const { - // threre is still no way to get string representationof the - // std::chrono::high_resolution_clock::time_point using standard stdlib means, - // so using boost::posix_time::ptime here - boost::posix_time::ptime timestamp{ - boost::posix_time::from_time_t(std::time_t{})}; - timestamp += - boost::posix_time::microseconds{get_immediate_commit_timestamp_raw()}; - return boost::posix_time::to_iso_extended_string(timestamp); -} - -[[nodiscard]] util::semantic_version -generic_body_impl::get_original_server_version() - const noexcept { - return util::semantic_version{get_original_server_version_raw()}; + return util::microseconds_to_iso_extended_string( + get_immediate_commit_timestamp_raw()); } -[[nodiscard]] std::string generic_body_impl< - code_type::gtid_tagged_log>::get_readable_original_server_version() const { - return get_original_server_version().get_string(); +[[nodiscard]] std::string generic_body_impl:: + get_readable_original_commit_timestamp() const { + if (!has_original_commit_timestamp()) { + return {}; + } + return util::microseconds_to_iso_extended_string( + get_original_commit_timestamp_raw()); } [[nodiscard]] util::semantic_version @@ -213,35 +304,45 @@ generic_body_impl::get_immediate_server_version() return get_immediate_server_version().get_string(); } +[[nodiscard]] util::optional_semantic_version +generic_body_impl::get_original_server_version() + const noexcept { + if (!has_original_server_version()) { + return std::nullopt; + } + return util::semantic_version{get_original_server_version_raw()}; +} + +[[nodiscard]] std::string generic_body_impl< + code_type::gtid_tagged_log>::get_readable_original_server_version() const { + const auto original_server_version{get_original_server_version()}; + if (!original_server_version) { + return {}; + } + return original_server_version->get_string(); +} + std::ostream & operator<<(std::ostream &output, const generic_body_impl &obj) { output << "flags: " << obj.get_readable_flags() - << ", uuid: " << obj.get_readable_uuid() - << ", gno: " << obj.get_gno_raw() + << ", uuid: " << obj.get_readable_uuid() << ", gno: " << obj.get_gno() << ", tag: " << obj.get_readable_tag() - << ", last_committed: " << obj.get_last_committed_raw() - << ", sequence_number: " << obj.get_sequence_number_raw(); - - if (obj.has_immediate_commit_timestamp()) { - output << ", immediate_commit_timestamp: " - << obj.get_readable_immediate_commit_timestamp(); - } + << ", last_committed: " << obj.get_last_committed() + << ", sequence_number: " << obj.get_sequence_number() + << ", immediate_commit_timestamp: " + << obj.get_readable_immediate_commit_timestamp(); if (obj.has_original_commit_timestamp()) { output << ", original_commit_timestamp: " - << obj.get_original_commit_timestamp_raw(); - } - if (obj.has_transaction_length()) { - output << ", transaction_length: " << obj.get_transaction_length_raw(); + << obj.get_readable_original_commit_timestamp(); } + output << ", transaction_length: " << obj.get_transaction_length_raw() + << ", immediate_server_version: " + << obj.get_readable_immediate_server_version(); if (obj.has_original_server_version()) { output << ", original_server_version: " << obj.get_readable_original_server_version(); } - if (obj.has_immediate_server_version()) { - output << ", immediate_server_version: " - << obj.get_readable_immediate_server_version(); - } if (obj.has_commit_group_ticket()) { output << ", commit_group_ticket: " << obj.get_commit_group_ticket_raw(); } @@ -252,23 +353,6 @@ operator<<(std::ostream &output, void generic_body_impl::process_field_data( std::uint8_t field_id, util::const_byte_span &remainder) { // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1111 - enum class field_id_type : std::uint8_t { - flags, - uuid, - gno, - tag, - last_committed, - sequence_number, - immediate_commit_timestamp, - original_commit_timestamp, - transaction_length, - immediate_server_version, - original_server_version, - commit_group_ticket, - - delimiter - }; - const auto varlen_int_extractor{ [](util::const_byte_span &source, auto &target, std::string_view label) { if (!util::extract_varlen_int_from_byte_span_checked(source, target)) { @@ -286,11 +370,7 @@ void generic_body_impl::process_field_data( // Extracting a fixed-size (16 byte) array of varlen bytes std::uint8_t extracted_uuid_byte{}; for (auto &uuid_byte : uuid_) { - if (!util::extract_varlen_int_from_byte_span_checked( - remainder, extracted_uuid_byte)) { - util::exception_location().raise( - "gtid_tagged_log event body is too short to extract uuid"); - } + varlen_int_extractor(remainder, extracted_uuid_byte, "uuid"); uuid_byte = util::from_underlying(extracted_uuid_byte); } } break; @@ -345,4 +425,221 @@ void generic_body_impl::process_field_data( } } +[[nodiscard]] std::size_t +generic_body_impl::calculate_tlv_section_size() + const noexcept { + std::size_t total{0U}; + + // field: flags + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::flags)); + total += util::calculate_varlen_int_size(flags_); + + // field: uuid (16 separate varlen-encoded bytes, one per UUID byte) + total += + util::calculate_varlen_int_size(util::to_underlying(field_id_type::uuid)); + for (auto uuid_byte : uuid_) { + total += util::calculate_varlen_int_size(util::to_underlying(uuid_byte)); + } + + // field: gno + total += + util::calculate_varlen_int_size(util::to_underlying(field_id_type::gno)); + total += util::calculate_varlen_int_size(gno_); + + // field: tag (varlen length + raw bytes) + total += + util::calculate_varlen_int_size(util::to_underlying(field_id_type::tag)); + total += util::calculate_varlen_int_size(std::size(tag_)); + total += std::size(tag_); + + // field: last_committed + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::last_committed)); + total += util::calculate_varlen_int_size(last_committed_); + + // field: sequence_number + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::sequence_number)); + total += util::calculate_varlen_int_size(sequence_number_); + + // field: immediate_commit_timestamp + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::immediate_commit_timestamp)); + total += util::calculate_varlen_int_size(immediate_commit_timestamp_); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp()) { + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::original_commit_timestamp)); + total += util::calculate_varlen_int_size(original_commit_timestamp_); + } + + // field: transaction_length + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::transaction_length)); + total += util::calculate_varlen_int_size(transaction_length_); + + // field: immediate_server_version + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::immediate_server_version)); + total += util::calculate_varlen_int_size(immediate_server_version_); + + // field: original_server_version (optional) + if (has_original_server_version()) { + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::original_server_version)); + total += util::calculate_varlen_int_size(original_server_version_); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket()) { + total += util::calculate_varlen_int_size( + util::to_underlying(field_id_type::commit_group_ticket)); + total += util::calculate_varlen_int_size(commit_group_ticket_); + } + + return total; +} + +[[nodiscard]] std::size_t +generic_body_impl::calculate_encoded_size() const { + // body layout: + // [varlen: serialization_version_number == 1 ] + // [varlen: serializable_field_size (== total body size) ] + // [varlen: last_non_ignorable_field_id ] + // [TLV section: calculate_tlv_section_size() bytes ] + // + // serializable_field_size encodes the entire body length INCLUDING + // itself, which makes it self-referential: the value depends on its + // own varlen-encoded width. Solve via a tiny iteration: increasing + // the value monotonically grows its width (1->2->3->...->9), so the + // loop always terminates in at most 9 steps. + const std::size_t framing_misc_size{ + util::calculate_varlen_int_size(known_serialization_version_number) + + util::calculate_varlen_int_size(known_last_non_ignorable_field_id)}; + const std::size_t tlv_size{calculate_tlv_section_size()}; + const std::size_t tail{framing_misc_size + tlv_size}; + + // initial guess: the encoded size of the total size varlen integer is + // minimal (1 byte) + std::size_t width{1U}; + std::size_t new_width{}; + std::size_t iteration{0U}; + static constexpr std::size_t max_number_of_iterations{9U}; + while ( + ((new_width = util::calculate_varlen_int_size(tail + width)) != width) && + (iteration < max_number_of_iterations)) { + width = new_width; + ++iteration; + } + assert(iteration < max_number_of_iterations); + return tail + width; +} + +void generic_body_impl::encode_tlv_section_to( + util::byte_span &destination) const { + const auto emit_field_id{[&destination](field_id_type field_id) { + [[maybe_unused]] auto result{util::insert_varlen_int_to_byte_span_checked( + destination, util::to_underlying(field_id))}; + }}; + + // It is OK to ignore the result here as we already ensured + // that destination is large enough. + [[maybe_unused]] bool insert_result{}; + + // field: flags + emit_field_id(field_id_type::flags); + insert_result = + util::insert_varlen_int_to_byte_span_checked(destination, flags_); + + // field: uuid (16 separate varlen-encoded bytes) + emit_field_id(field_id_type::uuid); + for (auto uuid_byte : uuid_) { + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, util::to_underlying(uuid_byte)); + } + + // field: gno + emit_field_id(field_id_type::gno); + insert_result = + util::insert_varlen_int_to_byte_span_checked(destination, gno_); + + // field: tag (varlen length + raw bytes) + emit_field_id(field_id_type::tag); + insert_result = util::insert_varlen_int_to_byte_span_checked(destination, + std::size(tag_)); + const util::const_byte_span tag_span{tag_}; + util::insert_byte_span_to_byte_span(destination, tag_span); + + // field: last_committed + emit_field_id(field_id_type::last_committed); + insert_result = util::insert_varlen_int_to_byte_span_checked(destination, + last_committed_); + + // field: sequence_number + emit_field_id(field_id_type::sequence_number); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, sequence_number_); + + // field: immediate_commit_timestamp + emit_field_id(field_id_type::immediate_commit_timestamp); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, immediate_commit_timestamp_); + + // field: original_commit_timestamp (optional) + if (has_original_commit_timestamp()) { + emit_field_id(field_id_type::original_commit_timestamp); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, original_commit_timestamp_); + } + + // field: transaction_length + emit_field_id(field_id_type::transaction_length); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, transaction_length_); + + // field: immediate_server_version + emit_field_id(field_id_type::immediate_server_version); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, immediate_server_version_); + + // field: original_server_version (optional) + if (has_original_server_version()) { + emit_field_id(field_id_type::original_server_version); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, original_server_version_); + } + + // field: commit_group_ticket (optional) + if (has_commit_group_ticket()) { + emit_field_id(field_id_type::commit_group_ticket); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, commit_group_ticket_); + } +} + +void generic_body_impl::encode_to( + util::byte_span &destination) const { + const std::uint64_t encoded_size{calculate_encoded_size()}; + if (std::size(destination) < encoded_size) { + util::exception_location().raise( + "cannot encode gtid_tagged_log event body"); + } + + // The framing header: serialization_version_number, + // serializable_field_size, last_non_ignorable_field_id. + // It is OK to ignore the result here as we already ensured + // that destination is large enough. + [[maybe_unused]] bool insert_result{}; + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, known_serialization_version_number); + insert_result = + util::insert_varlen_int_to_byte_span_checked(destination, encoded_size); + insert_result = util::insert_varlen_int_to_byte_span_checked( + destination, known_last_non_ignorable_field_id); + + encode_tlv_section_to(destination); +} + } // namespace binsrv::events diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.hpp b/src/binsrv/events/gtid_tagged_log_body_impl.hpp index db465db..9718caa 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.hpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.hpp @@ -22,6 +22,7 @@ #include #include +#include "binsrv/events/common_types.hpp" #include "binsrv/events/gtid_log_flag_type_fwd.hpp" #include "binsrv/gtids/common_types.hpp" @@ -31,7 +32,9 @@ #include "util/bounded_string_storage.hpp" #include "util/byte_span_fwd.hpp" +#include "util/common_optional_types.hpp" #include "util/semantic_version_fwd.hpp" +#include "util/timestamp_helpers.hpp" namespace binsrv::events { @@ -39,6 +42,17 @@ template <> class [[nodiscard]] generic_body_impl { public: // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/binlog/event/control_events.h#L1111 + generic_body_impl( + const gtid_log_flag_set &flags, const gtids::uuid &uuid, gtids::gno_t gno, + const gtids::tag &tag, events::seq_no_t last_committed, + events::seq_no_t sequence_number, + const util::high_resolution_time_point &immediate_commit_timestamp, + const util::optional_high_resolution_time_point + &original_commit_timestamp, + std::uint64_t transaction_length, + const util::semantic_version &immediate_server_version, + const util::optional_semantic_version &original_server_version, + const util::optional_uint64_t &commit_group_ticket); explicit generic_body_impl(util::const_byte_span portion); [[nodiscard]] std::uint8_t get_flags_raw() const noexcept { return flags_; } @@ -67,22 +81,31 @@ template <> class [[nodiscard]] generic_body_impl { [[nodiscard]] std::int64_t get_last_committed_raw() const noexcept { return last_committed_; } + [[nodiscard]] events::seq_no_t get_last_committed() const noexcept { + return static_cast(last_committed_); + } + void set_last_committed_raw(std::int64_t last_committed) noexcept { + last_committed_ = last_committed; + } [[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept { return sequence_number_; } - - [[nodiscard]] bool has_immediate_commit_timestamp() const noexcept { - return immediate_commit_timestamp_ != unset_commit_timestamp; + [[nodiscard]] events::seq_no_t get_sequence_number() const noexcept { + return static_cast(sequence_number_); } + void set_sequence_number_raw(std::int64_t sequence_number) noexcept { + sequence_number_ = sequence_number; + } + [[nodiscard]] std::uint64_t get_immediate_commit_timestamp_raw() const noexcept { return immediate_commit_timestamp_; } - [[nodiscard]] std::chrono::high_resolution_clock::time_point + [[nodiscard]] util::high_resolution_time_point get_immediate_commit_timestamp() const noexcept { - return std::chrono::high_resolution_clock::time_point{ - std::chrono::microseconds(get_immediate_commit_timestamp_raw())}; + return util::microseconds_to_high_resolution_time_point( + get_immediate_commit_timestamp_raw()); } [[nodiscard]] std::string get_readable_immediate_commit_timestamp() const; @@ -93,27 +116,23 @@ template <> class [[nodiscard]] generic_body_impl { get_original_commit_timestamp_raw() const noexcept { return original_commit_timestamp_; } - - [[nodiscard]] bool has_transaction_length() const noexcept { - return transaction_length_ != unset_transaction_length; + [[nodiscard]] util::optional_high_resolution_time_point + get_original_commit_timestamp() const noexcept { + if (!has_original_commit_timestamp()) { + return std::nullopt; + } + return util::microseconds_to_high_resolution_time_point( + get_original_commit_timestamp_raw()); } + [[nodiscard]] std::string get_readable_original_commit_timestamp() const; + [[nodiscard]] std::uint64_t get_transaction_length_raw() const noexcept { return transaction_length_; } - - [[nodiscard]] bool has_original_server_version() const noexcept { - return original_server_version_ != unset_server_version; + void set_transaction_length_raw(std::uint64_t transaction_length) noexcept { + transaction_length_ = transaction_length; } - [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { - return original_server_version_; - } - [[nodiscard]] util::semantic_version - get_original_server_version() const noexcept; - [[nodiscard]] std::string get_readable_original_server_version() const; - [[nodiscard]] bool has_immediate_server_version() const noexcept { - return immediate_server_version_ != unset_server_version; - } [[nodiscard]] std::uint32_t get_immediate_server_version_raw() const noexcept { return immediate_server_version_; @@ -122,12 +141,45 @@ template <> class [[nodiscard]] generic_body_impl { get_immediate_server_version() const noexcept; [[nodiscard]] std::string get_readable_immediate_server_version() const; + [[nodiscard]] bool has_original_server_version() const noexcept { + return original_server_version_ != unset_server_version; + } + [[nodiscard]] std::uint32_t get_original_server_version_raw() const noexcept { + return original_server_version_; + } + [[nodiscard]] util::optional_semantic_version + get_original_server_version() const noexcept; + [[nodiscard]] std::string get_readable_original_server_version() const; + [[nodiscard]] bool has_commit_group_ticket() const noexcept { return commit_group_ticket_ != unset_commit_group_ticket; } [[nodiscard]] std::uint64_t get_commit_group_ticket_raw() const noexcept { return commit_group_ticket_; } + [[nodiscard]] util::optional_uint64_t + get_commit_group_ticket() const noexcept { + if (!has_commit_group_ticket()) { + return std::nullopt; + } + return commit_group_ticket_; + } + + // Returns the total number of bytes that encode_to() will emit for the + // current in-memory state, including the 3-field framing header + // (serialization_version_number, serializable_field_size, + // last_non_ignorable_field_id) and every TLV field. The value of the + // serializable_field_size field is computed self-consistently. + [[nodiscard]] std::size_t calculate_encoded_size() const; + + // Writes the body to *destination* using the same TLV layout as the + // input. The set of optional fields actually emitted matches the set + // observed during construction (decoding); only the values of + // last_committed / sequence_number / transaction_length may have been + // mutated through the corresponding setters above. + // + // Precondition: std::size(destination) >= calculate_encoded_size(). + void encode_to(util::byte_span &destination) const; friend bool operator==(const generic_body_impl & /* first */, const generic_body_impl & /* second */) = default; @@ -142,22 +194,35 @@ template <> class [[nodiscard]] generic_body_impl { static constexpr std::uint64_t unset_commit_group_ticket{ std::numeric_limits::max()}; - // the members are deliberately reordered for better packing - std::uint8_t flags_{}; // 0 - gtids::uuid_storage uuid_{}; // 1 + // The protocol fields below are deliberately reordered for better + // packing. The trailing "// N" annotation is the protocol + // field_id_type value upstream assigns to the field + // (see field_id_type enum in gtid_tagged_log_body_impl.cpp / + // define_fields() in upstream control_events.h), so the deviation + // from protocol order is visible at a glance. std::int64_t gno_{}; // 2 - gtids::tag_storage tag_{}; // 3 std::int64_t last_committed_{}; // 4 std::int64_t sequence_number_{}; // 5 std::uint64_t immediate_commit_timestamp_{unset_commit_timestamp}; // 6 std::uint64_t original_commit_timestamp_{unset_commit_timestamp}; // 7 std::uint64_t transaction_length_{unset_transaction_length}; // 8 - std::uint32_t original_server_version_{unset_server_version}; // 9 - std::uint32_t immediate_server_version_{unset_server_version}; // 10 + std::uint32_t immediate_server_version_{unset_server_version}; // 9 + std::uint32_t original_server_version_{unset_server_version}; // 10 std::uint64_t commit_group_ticket_{unset_commit_group_ticket}; // 11 + gtids::uuid_storage uuid_{}; // 1 + gtids::tag_storage tag_{}; // 3 + std::uint8_t flags_{}; // 0 void process_field_data(std::uint8_t field_id, util::const_byte_span &remainder); + + // Helpers for calculate_encoded_size() / encode_to(). + // Returns the size in bytes of the TLV section (every pair after the framing header). + [[nodiscard]] std::size_t calculate_tlv_section_size() const noexcept; + // Writes only the TLV section to *destination*; assumes destination has + // at least calculate_tlv_section_size() bytes available. + void encode_tlv_section_to(util::byte_span &destination) const; }; } // namespace binsrv::events diff --git a/src/binsrv/gtids/tag.hpp b/src/binsrv/gtids/tag.hpp index d720c35..fc5f8be 100644 --- a/src/binsrv/gtids/tag.hpp +++ b/src/binsrv/gtids/tag.hpp @@ -41,6 +41,8 @@ class tag { [[nodiscard]] bool is_empty() const noexcept { return data_.empty(); } + [[nodiscard]] const tag_storage &get_raw() const noexcept { return data_; } + [[nodiscard]] std::size_t get_size() const noexcept { return std::size(data_); } diff --git a/src/binsrv/gtids/uuid.cpp b/src/binsrv/gtids/uuid.cpp index ffdf172..38f6c43 100644 --- a/src/binsrv/gtids/uuid.cpp +++ b/src/binsrv/gtids/uuid.cpp @@ -55,6 +55,15 @@ uuid::uuid(const uuid_storage &data) { boost::uuids::uuid::static_size(), std::begin(data_)); } +[[nodiscard]] uuid_storage uuid::get_raw() const noexcept { + uuid_storage result; + std::copy_n( + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + reinterpret_cast(std::cbegin(data_)), + boost::uuids::uuid::static_size(), std::begin(result)); + return result; +} + [[nodiscard]] std::string uuid::str() const { return boost::uuids::to_string(data_); } diff --git a/src/binsrv/gtids/uuid.hpp b/src/binsrv/gtids/uuid.hpp index 53c4f93..8fe58c5 100644 --- a/src/binsrv/gtids/uuid.hpp +++ b/src/binsrv/gtids/uuid.hpp @@ -41,6 +41,7 @@ class uuid { explicit uuid(const uuid_storage &data); [[nodiscard]] bool is_empty() const noexcept { return data_.is_nil(); } + [[nodiscard]] uuid_storage get_raw() const noexcept; [[nodiscard]] std::string str() const; diff --git a/src/util/byte_span_inserters.hpp b/src/util/byte_span_inserters.hpp index 1185cef..469fbb4 100644 --- a/src/util/byte_span_inserters.hpp +++ b/src/util/byte_span_inserters.hpp @@ -164,6 +164,7 @@ template return false; } insert_fixed_int_to_byte_span(remainder, value, 1); + return true; } /* 251 is reserved for NULL */ if (value < packed_int_double_boundary) { @@ -172,6 +173,7 @@ template } insert_fixed_int_to_byte_span(remainder, packed_int_double_marker); insert_fixed_int_to_byte_span(remainder, value, packed_int_double_size); + return true; } if (value < packed_int_triple_boundary) { if (std::size(remainder) < 1U + packed_int_triple_size) { @@ -179,13 +181,14 @@ template } insert_fixed_int_to_byte_span(remainder, packed_int_triple_marker); insert_fixed_int_to_byte_span(remainder, value, packed_int_triple_size); - } else { - if (std::size(remainder) < 1U + packed_int_octuple_size) { - return false; - } - insert_fixed_int_to_byte_span(remainder, packed_int_octuple_marker); - insert_fixed_int_to_byte_span(remainder, value, packed_int_octuple_size); + return true; + } + + if (std::size(remainder) < 1U + packed_int_octuple_size) { + return false; } + insert_fixed_int_to_byte_span(remainder, packed_int_octuple_marker); + insert_fixed_int_to_byte_span(remainder, value, packed_int_octuple_size); return true; } diff --git a/src/util/semantic_version_fwd.hpp b/src/util/semantic_version_fwd.hpp index eed0ce2..5c4da15 100644 --- a/src/util/semantic_version_fwd.hpp +++ b/src/util/semantic_version_fwd.hpp @@ -16,10 +16,14 @@ #ifndef UTIL_SEMANTIC_VERSION_FWD_HPP #define UTIL_SEMANTIC_VERSION_FWD_HPP +#include + namespace util { class semantic_version; +using optional_semantic_version = std::optional; + } // namespace util #endif // UTIL_SEMANTIC_VERSION_FWD_HPP diff --git a/src/util/timestamp_helpers.cpp b/src/util/timestamp_helpers.cpp new file mode 100644 index 0000000..b17946b --- /dev/null +++ b/src/util/timestamp_helpers.cpp @@ -0,0 +1,56 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "util/timestamp_helpers.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace util { + +[[nodiscard]] std::uint64_t high_resolution_time_point_to_microseconds( + const high_resolution_time_point ×tamp) noexcept { + return static_cast( + std::chrono::duration_cast( + timestamp.time_since_epoch()) + .count()); +} + +[[nodiscard]] high_resolution_time_point +microseconds_to_high_resolution_time_point( + std::uint64_t microseconds) noexcept { + return high_resolution_time_point{std::chrono::microseconds{microseconds}}; +} + +[[nodiscard]] std::string +microseconds_to_iso_extended_string(std::uint64_t microseconds) { + // there is still no way to get string representation of the + // std::chrono::high_resolution_clock::time_point using standard stdlib means, + // so using boost::posix_time::ptime here + + // TODO: consider switching to std::format() + boost::posix_time::ptime timestamp{ + boost::posix_time::from_time_t(std::time_t{})}; + timestamp += boost::posix_time::microseconds{microseconds}; + return boost::posix_time::to_iso_extended_string(timestamp); +} + +} // namespace util diff --git a/src/util/timestamp_helpers.hpp b/src/util/timestamp_helpers.hpp new file mode 100644 index 0000000..96d6801 --- /dev/null +++ b/src/util/timestamp_helpers.hpp @@ -0,0 +1,37 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef UTIL_TIMESTAMP_HELPERS_HPP +#define UTIL_TIMESTAMP_HELPERS_HPP + +#include +#include + +#include "util/timestamp_types.hpp" // IWYU pragma: export + +namespace util { + +[[nodiscard]] std::uint64_t high_resolution_time_point_to_microseconds( + const high_resolution_time_point ×tamp) noexcept; + +[[nodiscard]] high_resolution_time_point +microseconds_to_high_resolution_time_point(std::uint64_t microseconds) noexcept; + +[[nodiscard]] std::string +microseconds_to_iso_extended_string(std::uint64_t microseconds); + +} // namespace util + +#endif // UTIL_TIMESTAMP_HELPERS_HPP diff --git a/src/util/timestamp_types.hpp b/src/util/timestamp_types.hpp new file mode 100644 index 0000000..8b89ba7 --- /dev/null +++ b/src/util/timestamp_types.hpp @@ -0,0 +1,31 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef UTIL_TIMESTAMP_TYPES_HPP +#define UTIL_TIMESTAMP_TYPES_HPP + +#include +#include + +namespace util { + +using high_resolution_time_point = + std::chrono::high_resolution_clock::time_point; +using optional_high_resolution_time_point = + std::optional; + +} // namespace util + +#endif // UTIL_TIMESTAMP_TYPES_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c577eec..9b15daf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -177,8 +177,9 @@ set(event_set_source_files ${PROJECT_SOURCE_DIR}/src/binsrv/events/unknown_post_header.cpp - ${PROJECT_SOURCE_DIR}/src/util/semantic_version.cpp ${PROJECT_SOURCE_DIR}/src/util/crc_helpers.cpp + ${PROJECT_SOURCE_DIR}/src/util/semantic_version.cpp + ${PROJECT_SOURCE_DIR}/src/util/timestamp_helpers.cpp ) add_executable(byte_span_encoding_test byte_span_encoding_test.cpp ${byte_span_encoding_source_files}) diff --git a/tests/byte_span_encoding_test.cpp b/tests/byte_span_encoding_test.cpp index 7ecd5ec..e3ba111 100644 --- a/tests/byte_span_encoding_test.cpp +++ b/tests/byte_span_encoding_test.cpp @@ -23,7 +23,7 @@ #include #include -#define BOOST_TEST_MODULE TagTests +#define BOOST_TEST_MODULE ExtractionInsertionTests // this include is needed as it provides the 'main()' function // NOLINTNEXTLINE(misc-include-cleaner) #include @@ -31,6 +31,10 @@ #include #include +#include + +#include + #include #include @@ -38,6 +42,7 @@ #include "util/byte_span_extractors.hpp" #include "util/byte_span_fwd.hpp" #include "util/byte_span_inserters.hpp" +#include "util/byte_span_packed_int_constants.hpp" struct mixin_fixture { static constexpr std::size_t bits_in_byte{ @@ -192,3 +197,56 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(VarlenIntEncoding, ValueType, print_and_check_single(std::numeric_limits::min()); } } + +static constexpr std::array packed_int_values{ + 0ULL, + 1ULL, + 2ULL, + util::packed_int_single_boundary - 2ULL, + util::packed_int_single_boundary - 1ULL, + util::packed_int_single_boundary, + util::packed_int_single_boundary + 1ULL, + util::packed_int_single_boundary + 2ULL, + util::packed_int_double_boundary - 2ULL, + util::packed_int_double_boundary - 1ULL, + util::packed_int_double_boundary, + util::packed_int_double_boundary + 1ULL, + util::packed_int_double_boundary + 2ULL, + util::packed_int_triple_boundary - 2ULL, + util::packed_int_triple_boundary - 1ULL, + util::packed_int_triple_boundary, + util::packed_int_triple_boundary + 1ULL, + util::packed_int_triple_boundary + 2ULL, + std::numeric_limits::max() - 2ULL, + std::numeric_limits::max() - 1ULL, + std::numeric_limits::max()}; + +BOOST_DATA_TEST_CASE(PackedIntEncoding, + boost::unit_test::data::make(packed_int_values)) { + const std::uint64_t value{sample}; // sample is provided by Boost.Test + + const auto calculated_encoded_size{util::calculate_packed_int_size(value)}; + using encoding_buffer_type = + std::array; + + encoding_buffer_type buffer; + util::byte_span encoding_portion{buffer}; + BOOST_CHECK( + util::insert_packed_int_to_byte_span_checked(encoding_portion, value)); + + const auto actual_encoded_size{std::size(buffer) - + std::size(encoding_portion)}; + + BOOST_CHECK_EQUAL(actual_encoded_size, calculated_encoded_size); + + util::const_byte_span decoding_portion{buffer}; + std::uint64_t restored_value{}; + BOOST_CHECK(util::extract_packed_int_from_byte_span_checked(decoding_portion, + restored_value)); + + const auto actual_decoded_size{std::size(buffer) - + std::size(decoding_portion)}; + BOOST_CHECK_EQUAL(actual_decoded_size, calculated_encoded_size); + + BOOST_CHECK_EQUAL(restored_value, value); +} diff --git a/tests/event_test.cpp b/tests/event_test.cpp index 01928b4..93e72bd 100644 --- a/tests/event_test.cpp +++ b/tests/event_test.cpp @@ -13,6 +13,7 @@ // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +#include #include #include @@ -29,14 +30,18 @@ #include "binsrv/ctime_timestamp.hpp" #include "binsrv/replication_mode_type.hpp" +#include "binsrv/gtids/common_types.hpp" #include "binsrv/gtids/gtid_set.hpp" #include "binsrv/events/checksum_algorithm_type.hpp" #include "binsrv/events/code_type.hpp" #include "binsrv/events/common_header.hpp" #include "binsrv/events/common_header_flag_type.hpp" +#include "binsrv/events/common_types.hpp" #include "binsrv/events/event.hpp" #include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_log_flag_type.hpp" +#include "binsrv/events/gtid_log_post_header.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" @@ -140,6 +145,122 @@ BOOST_AUTO_TEST_CASE(EventRoundTrip) { BOOST_CHECK_EQUAL(generated_previous_gtids_log_event, parsed_previous_gtids_log_event); + + const auto earlier_ts{std::chrono::high_resolution_clock::now()}; + const auto later_ts{earlier_ts + std::chrono::hours(1U)}; + + // GTID_LOG event + offset = generated_previous_gtids_log_event.get_common_header() + .get_next_event_position_raw(); + const binsrv::events::generic_post_header + gtid_log_post_header{ + binsrv::events::gtid_log_flag_set{ + binsrv::events::gtid_log_flag_type::may_have_sbr}, // flags + binsrv::gtids::uuid{"11111111-aaaa-1111-aaaa-111111111111"}, // uuid + binsrv::gtids::gno_t{6ULL}, // gno + binsrv::events::gtid_log_post_header:: + known_logical_ts_code, // logical_ts_code + binsrv::events::seq_no_t{0ULL}, // last_committed + binsrv::events::seq_no_t{1ULL} // sequence_number + }; + const binsrv::events::generic_body + gtid_log_body{ + later_ts, // immediate_commit_timestamp + earlier_ts, // original_commit_timestamp + 42ULL, // transaction_length + util::semantic_version{"8.4.8"}, // immediate_server_version + util::semantic_version{"8.0.46"}, // original_server_version + 123ULL // commit_group_ticket + }; + const auto generated_gtid_log_event{ + binsrv::events::event::create_event( + offset, binsrv::ctime_timestamp::now(), default_server_id, + binsrv::events::common_header_flag_set{}, gtid_log_post_header, + gtid_log_body, true, event_buffer)}; + + const binsrv::events::event_view generated_gtid_log_event_v{ + context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_gtid_log_event{generated_gtid_log_event_v}; + + BOOST_CHECK_EQUAL(generated_gtid_log_event, parsed_gtid_log_event); + + // ANONYMOUS_GTID_LOG event + offset = generated_previous_gtids_log_event.get_common_header() + .get_next_event_position_raw(); + const binsrv::events::generic_post_header< + binsrv::events::code_type::anonymous_gtid_log> + anonymous_gtid_log_post_header{ + binsrv::events::gtid_log_flag_set{ + binsrv::events::gtid_log_flag_type::may_have_sbr}, // flags + binsrv::gtids::uuid{}, // uuid + binsrv::gtids::gno_t{0ULL}, // gno + binsrv::events::gtid_log_post_header:: + known_logical_ts_code, // logical_ts_code + binsrv::events::seq_no_t{0ULL}, // last_committed + binsrv::events::seq_no_t{1ULL} // sequence_number + }; + const binsrv::events::generic_body< + binsrv::events::code_type::anonymous_gtid_log> + anonymous_gtid_log_body{ + later_ts, // immediate_commit_timestamp + earlier_ts, // original_commit_timestamp + 42ULL, // transaction_length + util::semantic_version{"8.4.8"}, // immediate_server_version + util::semantic_version{"8.0.46"}, // original_server_version + 123ULL // commit_group_ticket + }; + const auto generated_anonymous_gtid_log_event{ + binsrv::events::event::create_event< + binsrv::events::code_type::anonymous_gtid_log>( + offset, binsrv::ctime_timestamp::now(), default_server_id, + binsrv::events::common_header_flag_set{}, + anonymous_gtid_log_post_header, anonymous_gtid_log_body, true, + event_buffer)}; + + const binsrv::events::event_view generated_anonymous_gtid_log_event_v{ + context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_anonymous_gtid_log_event{ + generated_anonymous_gtid_log_event_v}; + + BOOST_CHECK_EQUAL(generated_anonymous_gtid_log_event, + parsed_anonymous_gtid_log_event); + + // GTID_TAGGED_LOG event + offset = generated_previous_gtids_log_event.get_common_header() + .get_next_event_position_raw(); + const binsrv::events::generic_post_header< + binsrv::events::code_type::gtid_tagged_log> + gtid_tagged_log_post_header{}; + const binsrv::events::generic_body + gtid_tagged_log_body{ + binsrv::events::gtid_log_flag_set{ + binsrv::events::gtid_log_flag_type::may_have_sbr}, // flags + binsrv::gtids::uuid{"11111111-aaaa-1111-aaaa-111111111111"}, // uuid + binsrv::gtids::gno_t{6ULL}, // gno + binsrv::gtids::tag{"mytag"}, // tag + binsrv::events::seq_no_t{0ULL}, // last_committed + binsrv::events::seq_no_t{1ULL}, // sequence_number + later_ts, // immediate_commit_timestamp + earlier_ts, // original_commit_timestamp + 42ULL, // transaction_length + util::semantic_version{"8.4.8"}, // immediate_server_version + util::semantic_version{"8.0.46"}, // original_server_version + 123ULL // commit_group_ticket + }; + const auto generated_gtid_tagged_log_event{ + binsrv::events::event::create_event< + binsrv::events::code_type::gtid_tagged_log>( + offset, binsrv::ctime_timestamp::now(), default_server_id, + binsrv::events::common_header_flag_set{}, gtid_tagged_log_post_header, + gtid_tagged_log_body, true, event_buffer)}; + + const binsrv::events::event_view generated_gtid_tagged_log_event_v{ + context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_gtid_tagged_log_event{ + generated_gtid_tagged_log_event_v}; + + BOOST_CHECK_EQUAL(generated_gtid_tagged_log_event, + parsed_gtid_tagged_log_event); } BOOST_AUTO_TEST_CASE(EventMaterialization) { diff --git a/tests/uuid_test.cpp b/tests/uuid_test.cpp index adf2184..7f5a89f 100644 --- a/tests/uuid_test.cpp +++ b/tests/uuid_test.cpp @@ -17,7 +17,7 @@ #include #include -#define BOOST_TEST_MODULE UuisTests +#define BOOST_TEST_MODULE UuidTests // this include is needed as it provides the 'main()' function // NOLINTNEXTLINE(misc-include-cleaner) #include