Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ jobs:
working-directory: ${{github.workspace}}/src-build-${{matrix.config.label}}
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest --parallel
run: ctest --parallel --output-on-failure

- name: Info Build artefacts
run: |
Expand Down
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ set(source_files
src/binsrv/events/common_header_flag_type_fwd.hpp
src/binsrv/events/common_header_flag_type.hpp

src/binsrv/events/common_types.hpp

src/binsrv/events/empty_body_fwd.hpp
src/binsrv/events/empty_body.hpp
src/binsrv/events/empty_body.cpp
Expand Down Expand Up @@ -389,6 +391,10 @@ set(source_files
src/util/semantic_version.hpp
src/util/semantic_version.cpp

src/util/timestamp_types.hpp
src/util/timestamp_helpers.hpp
src/util/timestamp_helpers.cpp

# mysql wrapper library files
src/easymysql/core_error_helpers_private.hpp
src/easymysql/core_error_helpers_private.cpp
Expand Down
27 changes: 27 additions & 0 deletions src/binsrv/events/common_types.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2023-2024 Percona and/or its affiliates.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

#ifndef BINSRV_EVENTS_COMMON_TYPES_HPP
#define BINSRV_EVENTS_COMMON_TYPES_HPP

#include <cstdint>

namespace binsrv::events {

using seq_no_t = std::uint64_t;

} // namespace binsrv::events

#endif // BINSRV_EVENTS_COMMON_TYPES_HPP
8 changes: 4 additions & 4 deletions src/binsrv/events/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ class [[nodiscard]] event {

template <code_type Code>
static event
create_event(std::uint32_t offset, const ctime_timestamp &timestamp,
std::uint32_t server_id, common_header_flag_set flags,
const generic_post_header<Code> &post_header,
const generic_body<Code> &body, bool include_checksum) {
create_event_simple(std::uint32_t offset, const ctime_timestamp &timestamp,
std::uint32_t server_id, common_header_flag_set flags,
const generic_post_header<Code> &post_header,
const generic_body<Code> &body, bool include_checksum) {
return event{Code, offset, timestamp, server_id, flags,
post_header, body, include_checksum, nullptr};
}
Expand Down
174 changes: 137 additions & 37 deletions src/binsrv/events/gtid_log_body.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,48 @@

#include <cstdint>
#include <ctime>
#include <optional>
#include <ostream>
#include <stdexcept>
#include <string>

#include <boost/align/align_up.hpp>

#include <boost/date_time/posix_time/conversion.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/posix_time/time_formatters_limited.hpp>

#include "util/byte_span.hpp"
#include "util/byte_span_extractors.hpp"
#include "util/byte_span_inserters.hpp"
#include "util/common_optional_types.hpp"
#include "util/exception_location_helpers.hpp"
#include "util/semantic_version.hpp"
#include "util/timestamp_helpers.hpp"

namespace binsrv::events {

gtid_log_body::gtid_log_body(
const util::high_resolution_time_point &immediate_commit_timestamp,
const util::optional_high_resolution_time_point &original_commit_timestamp,
std::uint64_t transaction_length,
const util::optional_semantic_version &immediate_server_version,
const util::optional_semantic_version &original_server_version,
const util::optional_uint64_t &commit_group_ticket) noexcept
: immediate_commit_timestamp_(
util::high_resolution_time_point_to_microseconds(
immediate_commit_timestamp)),
original_commit_timestamp_{
original_commit_timestamp
? util::high_resolution_time_point_to_microseconds(
*original_commit_timestamp)
: unset_commit_timestamp},
transaction_length_{transaction_length},
immediate_server_version_{immediate_server_version
? immediate_server_version->get_encoded()
: unset_server_version},
original_server_version_{original_server_version
? original_server_version->get_encoded()
: unset_server_version},
commit_group_ticket_{commit_group_ticket ? *commit_group_ticket
: unset_commit_group_ticket} {}

gtid_log_body::gtid_log_body(util::const_byte_span portion) {
// TODO: rework with direct member initialization

Expand All @@ -42,8 +67,8 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) {
sizeof immediate_commit_timestamp_ +
sizeof original_commit_timestamp_ +
sizeof transaction_length_ +
sizeof original_server_version_ +
sizeof immediate_server_version_ +
sizeof original_server_version_ +
sizeof commit_group_ticket_,
alignof(decltype(*this))),
"inefficient data member reordering in gtid_log event body");
Expand All @@ -59,12 +84,12 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) {
"gtid_log event body is too short to extract immediate commit "
"timestamp");
}
const std::uint64_t commit_timestamp_mask{
static constexpr std::uint64_t commit_timestamp_mask{
1ULL << (commit_timestamp_field_length * 8ULL - 1ULL)};
if ((immediate_commit_timestamp_ & commit_timestamp_mask) != 0ULL) {
// clearing the most significant bit in the
// commit_timestamp_field_length-byte sequence
original_commit_timestamp_ &= ~commit_timestamp_mask;
immediate_commit_timestamp_ &= ~commit_timestamp_mask;
if (!util::extract_fixed_int_from_byte_span_checked(
remainder, original_commit_timestamp_,
commit_timestamp_field_length)) {
Expand All @@ -81,15 +106,16 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) {
}

if (std::size(remainder) >= server_version_field_length) {
util::extract_fixed_int_from_byte_span(remainder, original_server_version_);
const std::uint32_t server_version_mask{
1ULL << (server_version_field_length * 8ULL - 1ULL)};
if ((original_server_version_ & server_version_mask) != 0UL) {
original_server_version_ &= ~server_version_mask;
util::extract_fixed_int_from_byte_span(remainder,
immediate_server_version_);
static constexpr std::uint32_t server_version_mask{
1U << (server_version_field_length * 8U - 1U)};
if ((immediate_server_version_ & server_version_mask) != 0U) {
immediate_server_version_ &= ~server_version_mask;
if (!util::extract_fixed_int_from_byte_span_checked(
remainder, immediate_server_version_)) {
remainder, original_server_version_)) {
util::exception_location().raise<std::invalid_argument>(
"gtid_log event body is too short to extract immediate server "
"gtid_log event body is too short to extract original server "
"version");
}
}
Expand All @@ -104,54 +130,128 @@ gtid_log_body::gtid_log_body(util::const_byte_span portion) {
}
[[nodiscard]] std::string
gtid_log_body::get_readable_immediate_commit_timestamp() const {
// threre is still no way to get string representationof the
// std::chrono::high_resolution_clock::time_point using standard stdlib means,
// so using boost::posix_time::ptime here
boost::posix_time::ptime timestamp{
boost::posix_time::from_time_t(std::time_t{})};
timestamp +=
boost::posix_time::microseconds{get_immediate_commit_timestamp_raw()};
return boost::posix_time::to_iso_extended_string(timestamp);
return util::microseconds_to_iso_extended_string(
get_immediate_commit_timestamp_raw());
}

[[nodiscard]] std::string
gtid_log_body::get_readable_original_commit_timestamp() const {
if (!has_original_commit_timestamp()) {
return {};
}
return util::microseconds_to_iso_extended_string(
get_original_commit_timestamp_raw());
}

[[nodiscard]] util::optional_semantic_version
gtid_log_body::get_immediate_server_version() const noexcept {
if (!has_immediate_server_version()) {
return std::nullopt;
}
return util::semantic_version{get_immediate_server_version_raw()};
}

[[nodiscard]] util::semantic_version
[[nodiscard]] std::string
gtid_log_body::get_readable_immediate_server_version() const {
const auto immediate_server_version{get_immediate_server_version()};
if (!immediate_server_version) {
return {};
}
return immediate_server_version->get_string();
}

[[nodiscard]] util::optional_semantic_version
gtid_log_body::get_original_server_version() const noexcept {
if (!has_original_server_version()) {
return std::nullopt;
}
return util::semantic_version{get_original_server_version_raw()};
}

[[nodiscard]] std::string
gtid_log_body::get_readable_original_server_version() const {
return get_original_server_version().get_string();
const auto original_server_version{get_original_server_version()};
if (!original_server_version) {
return {};
}
return original_server_version->get_string();
}

[[nodiscard]] util::semantic_version
gtid_log_body::get_immediate_server_version() const noexcept {
return util::semantic_version{get_immediate_server_version_raw()};
[[nodiscard]] std::size_t
gtid_log_body::calculate_encoded_size() const noexcept {
std::size_t result{0U};
result += commit_timestamp_field_length;
result +=
(has_original_commit_timestamp() ? commit_timestamp_field_length : 0U);
result += util::calculate_packed_int_size(transaction_length_);
if (has_immediate_server_version()) {
result += server_version_field_length;
result +=
(has_original_server_version() ? server_version_field_length : 0U);
result +=
(has_commit_group_ticket() ? commit_group_ticket_field_length : 0U);
}

return result;
}

[[nodiscard]] std::string
gtid_log_body::get_readable_immediate_server_version() const {
return get_immediate_server_version().get_string();
void gtid_log_body::encode_to(util::byte_span &destination) const {
if (std::size(destination) < calculate_encoded_size()) {
util::exception_location().raise<std::invalid_argument>(
"cannot encode rotate event body");
}

static constexpr std::uint64_t commit_timestamp_mask{
1ULL << (commit_timestamp_field_length * 8ULL - 1ULL)};

const std::uint64_t patched_immediate_commit_timestamp{
immediate_commit_timestamp_ |
(has_original_commit_timestamp() ? commit_timestamp_mask : 0ULL)};
util::insert_fixed_int_to_byte_span(destination,
patched_immediate_commit_timestamp,
commit_timestamp_field_length);
if (has_original_commit_timestamp()) {
util::insert_fixed_int_to_byte_span(destination, original_commit_timestamp_,
commit_timestamp_field_length);
}
[[maybe_unused]] const auto insertion_result{
util::insert_packed_int_to_byte_span_checked(destination,
transaction_length_)};

if (has_immediate_server_version()) {
static constexpr std::uint32_t server_version_mask{
1U << (server_version_field_length * 8U - 1U)};
const std::uint32_t patched_immediate_server_version{
immediate_server_version_ |
(has_original_server_version() ? server_version_mask : 0U)};
util::insert_fixed_int_to_byte_span(destination,
patched_immediate_server_version);
if (has_original_server_version()) {
util::insert_fixed_int_to_byte_span(destination,
original_server_version_);
}
if (has_commit_group_ticket()) {
util::insert_fixed_int_to_byte_span(destination, commit_group_ticket_);
}
}
}

std::ostream &operator<<(std::ostream &output, const gtid_log_body &obj) {
output << "immediate_commit_timestamp: "
<< obj.get_readable_immediate_commit_timestamp();
if (obj.has_original_commit_timestamp()) {
output << ", original_commit_timestamp: "
<< obj.get_original_commit_timestamp_raw();
<< obj.get_readable_original_commit_timestamp();
}
if (obj.has_transaction_length()) {
output << ", transaction_length: " << obj.get_transaction_length_raw();
output << ", transaction_length: " << obj.get_transaction_length_raw();
if (obj.has_immediate_server_version()) {
output << ", immediate_server_version: "
<< obj.get_readable_immediate_server_version();
}
if (obj.has_original_server_version()) {
output << ", original_server_version: "
<< obj.get_readable_original_server_version();
}
if (obj.has_immediate_server_version()) {
output << ", immediate_server_version: "
<< obj.get_readable_immediate_server_version();
}
if (obj.has_commit_group_ticket()) {
output << ", commit_group_ticket: " << obj.get_commit_group_ticket_raw();
}
Expand Down
Loading
Loading