Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,10 @@ void process_binlog_event(const binsrv::events::event_view &current_event_v,

// checking if the event needs to be written to the binlog
if (!info_only) {
storage.write_event(current_event_v.get_portion(),
context.is_at_transaction_boundary(),
context.get_transaction_gtid(),
current_common_header_v.get_timestamp());
storage.write_event(
current_event_v.get_portion(), context.is_at_transaction_boundary(),
context.get_transaction_gtid(), current_common_header_v.get_timestamp(),
context.get_transaction_sequence_number());
}

// processing the very last event in the sequence - either a non-artificial
Expand Down
2 changes: 1 addition & 1 deletion src/binsrv/binlog_file_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace binsrv {

binlog_file_metadata::binlog_file_metadata()
: impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {}
: impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}, {}} {}

binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} {
auto json_value = boost::json::parse(data);
Expand Down
11 changes: 10 additions & 1 deletion src/binsrv/binlog_file_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,29 @@

#include "binsrv/gtids/gtid_set.hpp"

#include "binsrv/events/common_types.hpp"

#include "util/nv_tuple.hpp"

namespace binsrv {

class [[nodiscard]] binlog_file_metadata {
private:
// The 'last_sequence_number' field persists the rewrite-mode GTID
// recovery state for this binlog file. It is stored here (rather
// than re-derived from the binlog file content on resume) because
// every other piece of resume state already lives in this metadata
// record - this addition simply makes recovery a single
// read-and-load step.
using impl_type = util::nv_tuple<
// clang-format off
util::nv<"version", std::uint32_t>,
util::nv<"size", std::uint64_t>,
util::nv<"previous_gtids", gtids::optional_gtid_set>,
util::nv<"added_gtids", gtids::optional_gtid_set>,
util::nv<"min_timestamp", ctime_timestamp>,
util::nv<"max_timestamp", ctime_timestamp>
util::nv<"max_timestamp", ctime_timestamp>,
util::nv<"last_sequence_number", events::seq_no_t>
// clang-format on
>;

Expand Down
4 changes: 4 additions & 0 deletions src/binsrv/events/reader_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ void reader_context::start_transaction(const event_view &current_event_v) {
util::exception_location().raise<std::logic_error>(
"encountered non-empty gtid in the anonymous gtid log event");
}
transaction_sequence_number_ = current_post_header.get_sequence_number();
const auto current_body{generic_body<code_type::anonymous_gtid_log>{
current_event_v.get_body_raw()}};
const auto expected_transaction_length_raw{
Expand All @@ -962,6 +963,7 @@ void reader_context::start_transaction(const event_view &current_event_v) {
util::exception_location().raise<std::logic_error>(
"encountered an empty gtid in the gtid log event");
}
transaction_sequence_number_ = current_post_header.get_sequence_number();
const auto current_body{
generic_body<code_type::gtid_log>{current_event_v.get_body_raw()}};
const auto expected_transaction_length_raw{
Expand All @@ -981,6 +983,7 @@ void reader_context::start_transaction(const event_view &current_event_v) {
util::exception_location().raise<std::logic_error>(
"encountered an empty gtid in the gtid tagged log event");
}
transaction_sequence_number_ = current_body.get_sequence_number();
const auto expected_transaction_length_raw{
current_body.get_transaction_length_raw()};
if (!std::in_range<std::uint32_t>(expected_transaction_length_raw)) {
Expand Down Expand Up @@ -1011,6 +1014,7 @@ void reader_context::finish_transaction() {
"transaction did not end at the declared length");
}
transaction_gtid_ = gtids::gtid{};
transaction_sequence_number_ = 0ULL;
expected_transaction_length_ = 0U;
current_transaction_length_ = 0U;
}
Expand Down
14 changes: 14 additions & 0 deletions src/binsrv/events/reader_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "binsrv/gtids/gtid.hpp"

#include "binsrv/events/common_header_view_fwd.hpp"
#include "binsrv/events/common_types.hpp"
#include "binsrv/events/event_fwd.hpp"
#include "binsrv/events/event_view_fwd.hpp"
#include "binsrv/events/protocol_traits.hpp"
Expand Down Expand Up @@ -63,6 +64,13 @@ class [[nodiscard]] reader_context {
[[nodiscard]] const gtids::gtid &get_transaction_gtid() const noexcept {
return transaction_gtid_;
}
[[nodiscard]] bool has_transaction_sequence_number() const noexcept {
return transaction_sequence_number_ != 0ULL;
}
[[nodiscard]] seq_no_t get_transaction_sequence_number() const noexcept {
return transaction_sequence_number_;
}

[[nodiscard]] bool is_at_transaction_boundary() const noexcept {
return (state_ == state_type::any_other_expected &&
current_transaction_length_ == expected_transaction_length_) ||
Expand Down Expand Up @@ -112,8 +120,14 @@ class [[nodiscard]] reader_context {
std::uint32_t position_;
post_header_length_container post_header_lengths_{};

// last seen transaction GTID (empty GTID if between transactions)
gtids::gtid transaction_gtid_{};
// sequence number of the last transaction seen (0 if between transactions)
seq_no_t transaction_sequence_number_{0ULL};
// the expected length of the current transaction
// (extracted from one of the GTID_LOG events)
std::uint32_t expected_transaction_length_{0U};
// the length of the transaction seen so far
std::uint32_t current_transaction_length_{0U};

bool expect_info_only_preamble_events_{false};
Expand Down
103 changes: 64 additions & 39 deletions src/binsrv/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cassert>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <filesystem>
#include <iterator>
#include <sstream>
Expand All @@ -36,6 +37,7 @@
#include "binsrv/storage_config.hpp"
#include "binsrv/storage_metadata.hpp"

#include "binsrv/events/common_types.hpp"
#include "binsrv/events/protocol_traits_fwd.hpp"

#include "binsrv/gtids/gtid.hpp"
Expand Down Expand Up @@ -184,38 +186,10 @@ storage::open_binlog(const composite_binlog_name &binlog_name) {
: storage_backend_open_stream_mode::create};
const auto open_stream_offset{backend_->open_stream(binlog_name.str(), mode)};

if (!binlog_exists) {
// writing the magic binlog footprint only if this is a newly
// created file
backend_->write_data_to_stream(events::magic_binlog_payload);

gtids::optional_gtid_set previous_binlog_gtids{};
gtids::optional_gtid_set added_binlog_gtids{};
if (is_in_gtid_replication_mode()) {
previous_binlog_gtids = get_gtids();
added_binlog_gtids = gtids::gtid_set{};
}

binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset,
std::move(previous_binlog_gtids),
std::move(added_binlog_gtids),
ctime_timestamp_range{});
save_binlog_metadata(get_current_binlog_record());
save_binlog_index();
result = open_binlog_status::created;
if (binlog_exists) {
result = open_existing_binlog_file_internal(open_stream_offset);
} else {
assert(get_current_position() == open_stream_offset);
if (open_stream_offset == 0ULL) {
backend_->write_data_to_stream(events::magic_binlog_payload);
get_current_binlog_record().size = events::magic_binlog_offset;
result = open_binlog_status::opened_empty;
} else if (open_stream_offset == events::magic_binlog_offset) {
result = open_binlog_status::opened_at_magic_payload_offset;
} else {
// position is beyond magic payload offset
assert(open_stream_offset > events::magic_binlog_offset);
result = open_binlog_status::opened_with_data_present;
}
result = open_new_binlog_file_internal(binlog_name);
}
update_last_checkpoint_info();

Expand All @@ -225,19 +199,25 @@ storage::open_binlog(const composite_binlog_name &binlog_name) {
assert(gtids_in_event_buffer_.is_empty());
assert(ready_to_flush_timestamps_.is_empty());
assert(incomplete_transaction_timestamps_.is_empty());
assert(ready_to_flush_last_sequence_number_ == 0ULL);
assert(incomplete_transaction_last_sequence_number_ == 0ULL);

return result;
}

void storage::write_event(util::const_byte_span event_data,
bool at_transaction_boundary,
const gtids::gtid &transaction_gtid,
const ctime_timestamp &event_timestamp) {
const ctime_timestamp &event_timestamp,
events::seq_no_t transaction_sequence_number) {
ensure_streaming_mode();

event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data),
std::cend(event_data));
incomplete_transaction_timestamps_.add_timestamp(event_timestamp);
if (transaction_sequence_number != 0ULL) {
incomplete_transaction_last_sequence_number_ = transaction_sequence_number;
}

if (at_transaction_boundary) {
last_transaction_boundary_position_in_event_buffer_ =
Expand All @@ -247,6 +227,10 @@ void storage::write_event(util::const_byte_span event_data,
}
ready_to_flush_timestamps_.add_range(incomplete_transaction_timestamps_);
incomplete_transaction_timestamps_.clear();

ready_to_flush_last_sequence_number_ =
incomplete_transaction_last_sequence_number_;
incomplete_transaction_last_sequence_number_ = 0ULL;
}

// now we are writing data from the event buffer to the storage backend if
Expand Down Expand Up @@ -303,6 +287,7 @@ void storage::discard_incomplete_transaction_events() {

event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_);
incomplete_transaction_timestamps_.clear();
incomplete_transaction_last_sequence_number_ = 0ULL;
}

void storage::flush_event_buffer() {
Expand Down Expand Up @@ -334,6 +319,41 @@ void storage::update_last_checkpoint_info() {
}
}

[[nodiscard]] open_binlog_status storage::open_new_binlog_file_internal(
const composite_binlog_name &binlog_name) {
// writing the magic binlog footprint only if this is a newly
// created file
backend_->write_data_to_stream(events::magic_binlog_payload);

gtids::optional_gtid_set previous_binlog_gtids{};
gtids::optional_gtid_set added_binlog_gtids{};
if (is_in_gtid_replication_mode()) {
previous_binlog_gtids = get_gtids();
added_binlog_gtids = gtids::gtid_set{};
}

binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset,
std::move(previous_binlog_gtids),
std::move(added_binlog_gtids),
ctime_timestamp_range{});
save_binlog_metadata(get_current_binlog_record());
save_binlog_index();
return open_binlog_status::created;
}
[[nodiscard]] open_binlog_status
storage::open_existing_binlog_file_internal(std::uint64_t open_stream_offset) {
assert(get_current_position() == open_stream_offset);
if (open_stream_offset >= events::magic_binlog_offset) {
return open_stream_offset == events::magic_binlog_offset
? open_binlog_status::opened_at_magic_payload_offset
: open_binlog_status::opened_with_data_present;
}
assert(open_stream_offset == 0ULL);
backend_->write_data_to_stream(events::magic_binlog_payload);
get_current_binlog_record().size = events::magic_binlog_offset;
return open_binlog_status::opened_empty;
}

void storage::flush_event_buffer_internal() {
assert(!event_buffer_.empty());
assert(last_transaction_boundary_position_in_event_buffer_ <=
Expand All @@ -354,6 +374,8 @@ void storage::flush_event_buffer_internal() {
}
}
get_current_binlog_record().timestamps.add_range(ready_to_flush_timestamps_);
get_current_binlog_record().last_sequence_number =
ready_to_flush_last_sequence_number_;

save_binlog_metadata(get_current_binlog_record());

Expand All @@ -369,6 +391,7 @@ void storage::flush_event_buffer_internal() {
gtids_in_event_buffer_.clear();
}
ready_to_flush_timestamps_.clear();
ready_to_flush_last_sequence_number_ = 0ULL;
}

void storage::load_binlog_index() {
Expand Down Expand Up @@ -476,13 +499,14 @@ storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const {
backend_->get_object(generate_binlog_metadata_name(binlog_name))};
binlog_file_metadata metadata{content};

return binlog_record{.name = binlog_name,
.size = metadata.root().get<"size">(),
.previous_gtids =
metadata.root().get<"previous_gtids">(),
.added_gtids = metadata.root().get<"added_gtids">(),
.timestamps = {metadata.root().get<"min_timestamp">(),
metadata.root().get<"max_timestamp">()}};
return binlog_record{
.name = binlog_name,
.size = metadata.root().get<"size">(),
.previous_gtids = metadata.root().get<"previous_gtids">(),
.added_gtids = metadata.root().get<"added_gtids">(),
.timestamps = {metadata.root().get<"min_timestamp">(),
metadata.root().get<"max_timestamp">()},
.last_sequence_number = metadata.root().get<"last_sequence_number">()};
}

void storage::validate_binlog_metadata(const binlog_record &record) const {
Expand Down Expand Up @@ -522,6 +546,7 @@ void storage::save_binlog_metadata(const binlog_record &record) const {
ctime_timestamp{record.timestamps.get_min_timestamp()};
metadata.root().get<"max_timestamp">() =
ctime_timestamp{record.timestamps.get_max_timestamp()};
metadata.root().get<"last_sequence_number">() = record.last_sequence_number;
const auto content{metadata.str()};
backend_->put_object(generate_binlog_metadata_name(record.name),
util::as_const_byte_span(content));
Expand Down
25 changes: 24 additions & 1 deletion src/binsrv/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,28 @@
#include "binsrv/gtids/gtid_fwd.hpp"
#include "binsrv/gtids/gtid_set.hpp"

#include "binsrv/events/common_types.hpp"

#include "util/byte_span_fwd.hpp"

namespace binsrv {

class [[nodiscard]] storage {
private:
struct binlog_record {
// binlog file name
composite_binlog_name name;
// binlog file size in bytes
std::uint64_t size{0ULL};
// accumulated GTIDs present in the binlog files before this one
gtids::optional_gtid_set previous_gtids{};
// GTIDs present in this binlog file
gtids::optional_gtid_set added_gtids{};
// minimum and maximum event timestamps observed in this binlog file
ctime_timestamp_range timestamps{};
// sequence_number of the last transaction seen in this file -
// used for GTID rewrite-mode resume state persistence
events::seq_no_t last_sequence_number{0ULL};
};
using binlog_record_container = std::vector<binlog_record>;

Expand Down Expand Up @@ -119,14 +129,20 @@ class [[nodiscard]] storage {
return result;
}

[[nodiscard]] events::seq_no_t
get_current_transaction_sequence_number() const noexcept {
return is_empty() ? 0ULL : get_current_binlog_record().last_sequence_number;
}

[[nodiscard]] bool is_binlog_open() const noexcept;

[[nodiscard]] open_binlog_status
open_binlog(const composite_binlog_name &binlog_name);
void write_event(util::const_byte_span event_data,
bool at_transaction_boundary,
const gtids::gtid &transaction_gtid,
const ctime_timestamp &event_timestamp);
const ctime_timestamp &event_timestamp,
events::seq_no_t transaction_sequence_number);
void close_binlog();

void discard_incomplete_transaction_events();
Expand Down Expand Up @@ -156,6 +172,8 @@ class [[nodiscard]] storage {
gtids::gtid_set gtids_in_event_buffer_{};
ctime_timestamp_range ready_to_flush_timestamps_{};
ctime_timestamp_range incomplete_transaction_timestamps_{};
events::seq_no_t ready_to_flush_last_sequence_number_{0ULL};
events::seq_no_t incomplete_transaction_last_sequence_number_{0ULL};

void ensure_streaming_mode() const;

Expand Down Expand Up @@ -187,6 +205,11 @@ class [[nodiscard]] storage {
return get_flushed_position() +
last_transaction_boundary_position_in_event_buffer_;
}
[[nodiscard]] open_binlog_status
open_new_binlog_file_internal(const composite_binlog_name &binlog_name);
[[nodiscard]] open_binlog_status
open_existing_binlog_file_internal(std::uint64_t open_stream_offset);

void flush_event_buffer_internal();

void load_binlog_index();
Expand Down
Loading