diff --git a/README.md b/README.md index cf2ff39..cdd47f7 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,7 @@ may print "uri": "s3://binsrv-bucket/storage/binlog.000001", "min_timestamp": "2026-02-09T17:22:01", "max_timestamp": "2026-02-09T17:22:08", - "initial_gtids": "", + "previous_gtids": "", "added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456" }, { @@ -206,7 +206,7 @@ may print "uri": "s3://binsrv-bucket/storage/binlog.000002", "min_timestamp": "2026-02-09T17:22:08", "max_timestamp": "2026-02-09T17:22:09", - "initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456", + "previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456", "added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912" } ] @@ -243,7 +243,7 @@ may print "uri": "s3://binsrv-bucket/storage/binlog.000001", "min_timestamp": "2026-02-09T17:22:01", "max_timestamp": "2026-02-09T17:22:08", - "initial_gtids": "", + "previous_gtids": "", "added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456" } ] @@ -264,7 +264,7 @@ may print "uri": "s3://binsrv-bucket/storage/binlog.000001", "min_timestamp": "2026-02-09T17:22:01", "max_timestamp": "2026-02-09T17:22:08", - "initial_gtids": "", + "previous_gtids": "", "added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456", }, { @@ -273,7 +273,7 @@ may print "uri": "s3://binsrv-bucket/storage/binlog.000002", "min_timestamp": "2026-02-09T17:22:08", "max_timestamp": "2026-02-09T17:22:09", - "initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456", + "previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456", "added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912" } ] diff --git a/mtr/binlog_streaming/r/gtid_purged.result b/mtr/binlog_streaming/r/gtid_purged.result new file mode 100644 index 0000000..44a69e1 --- /dev/null +++ b/mtr/binlog_streaming/r/gtid_purged.result @@ -0,0 +1,50 @@ +*** Resetting replication at the very beginning of the test. + +*** Determining the first binary log name. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Filling the table with some data. +INSERT INTO t1 VALUES(); + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** Determining the second binary log name. + +*** Filling the table with more data. +INSERT INTO t1 VALUES(); + +*** Filling the table with more data. +PURGE BINARY LOGS TO ''; + +*** Executing the Binlog Server utility and fetching all events. + +*** Executing the Binlog Server utility in the 'search_by_gtid_set' +include/read_file_to_var.inc + +*** Executing the Binlog Server utility one more time (fetching nothing) + +*** Executing the Binlog Server utility in the 'search_by_gtid_set' +one more time (expecting the same results) +include/read_file_to_var.inc + +*** Removing the search result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/gtid_purged-master.opt b/mtr/binlog_streaming/t/gtid_purged-master.opt new file mode 100644 index 0000000..c7529f0 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_purged-master.opt @@ -0,0 +1,2 @@ +--gtid-mode=on +--enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/gtid_purged.test b/mtr/binlog_streaming/t/gtid_purged.test new file mode 100644 index 0000000..33c6376 --- /dev/null +++ b/mtr/binlog_streaming/t/gtid_purged.test @@ -0,0 +1,98 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +--echo +--echo *** Determining the first binary log name. +--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = gtid +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $read_from_file = $MYSQL_TMP_DIR/search_result.json + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** Filling the table with some data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +--echo +--echo *** Determining the second binary log name. +--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +--echo +--echo *** Filling the table with more data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Filling the table with more data. +--replace_result $second_binlog +eval PURGE BINARY LOGS TO '$second_binlog'; + +--let $captured_gtid_purged = `SELECT @@global.gtid_purged` +--let $captured_second_insert_gtid = `SELECT GTID_SUBTRACT(@@global.gtid_executed, @@global.gtid_purged)` + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set' +--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file + +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`) + +--echo +--echo *** Executing the Binlog Server utility one more time (fetching nothing) +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set' +--echo one more time (expecting the same results) +--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file + +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`) + +--echo +--echo *** Removing the search result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index a686823..c47bffc 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -779,7 +779,7 @@ bool open_connection_and_switch_to_replication( if (operation_mode == binsrv::operation_mode_type::fetch) { throw; } - logger.log(binsrv::log_severity::info, + logger.log(binsrv::log_severity::error, "unable to establish connection to mysql server"); return false; } @@ -796,7 +796,20 @@ bool open_connection_and_switch_to_replication( try { if (storage.is_in_gtid_replication_mode()) { - const auto >ids{storage.get_gtids()}; + if (storage.is_empty()) { + static constexpr std::string_view select_gtid_purged_query{ + "SELECT @@GLOBAL.gtid_purged"}; + storage.set_purged_gtids(binsrv::gtids::gtid_set{ + connection.execute_select_query_string_result( + select_gtid_purged_query)}); + logger.log( + binsrv::log_severity::info, + "extracted purged GTIDs from the mysql server for an empty " + "storage: " + + boost::lexical_cast(storage.get_purged_gtids())); + } + + const auto gtids{storage.get_gtids()}; const auto encoded_size{gtids.calculate_encoded_size()}; binsrv::gtids::gtid_set_storage encoded_gtids_buffer(encoded_size); @@ -820,7 +833,7 @@ bool open_connection_and_switch_to_replication( if (operation_mode == binsrv::operation_mode_type::fetch) { throw; } - logger.log(binsrv::log_severity::info, "unable to switch to replication"); + logger.log(binsrv::log_severity::error, "unable to switch to replication"); return false; } diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 958b8a3..5beb67d 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -115,6 +115,8 @@ storage::storage(const storage_config &config, load_and_validate_binlog_metadata_set(storage_objects, storage_metadata_objects); + assert(!binlog_records_.front().added_gtids.has_value() || + purged_gtids_ == binlog_records_.front().added_gtids); } storage::~storage() { @@ -127,6 +129,18 @@ storage::~storage() { } } +void storage::set_purged_gtids(const gtids::gtid_set &purged_gtids) { + if (!is_in_gtid_replication_mode()) { + util::exception_location().raise( + "cannot set purged GTIDs in position-based replication mode"); + } + if (!is_empty()) { + util::exception_location().raise( + "cannot set purged GTIDs in a non-empty storage"); + } + purged_gtids_ = purged_gtids; +} + [[nodiscard]] std::string storage::get_backend_description() const { return backend_->get_description(); } @@ -540,6 +554,13 @@ void storage::load_and_validate_binlog_metadata_set( util::exception_location().raise( "found metadata for a non-existing binlog"); } + + // if we are in GTID replication mode, then we can consider GTIDs from the + // first binlog metadata as purged GTIDs for the whole storage + const auto &optional_added_gtids{binlog_records_.front().added_gtids}; + if (optional_added_gtids.has_value()) { + purged_gtids_ = *optional_added_gtids; + } } } // namespace binsrv diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 62be85f..e2ed0f1 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -70,6 +70,11 @@ class [[nodiscard]] storage { // file to complete the rule of 5 ~storage(); + [[nodiscard]] const gtids::gtid_set &get_purged_gtids() const noexcept { + return purged_gtids_; + } + void set_purged_gtids(const gtids::gtid_set &purged_gtids); + [[nodiscard]] std::string get_backend_description() const; [[nodiscard]] replication_mode_type get_replication_mode() const noexcept { @@ -94,16 +99,19 @@ class [[nodiscard]] storage { } [[nodiscard]] gtids::gtid_set get_gtids() const { - gtids::gtid_set result{}; + if (!is_in_gtid_replication_mode()) { + return {}; + } + if (is_empty()) { - return result; + return get_purged_gtids(); } + gtids::gtid_set result{}; const auto &optional_previous_gtids{ get_current_binlog_record().previous_gtids}; - if (!optional_previous_gtids.has_value()) { - return result; + if (optional_previous_gtids.has_value()) { + result = *optional_previous_gtids; } - result = *optional_previous_gtids; const auto &optional_added_gtids{get_current_binlog_record().added_gtids}; if (optional_added_gtids.has_value()) { result.add(*optional_added_gtids); @@ -133,6 +141,7 @@ class [[nodiscard]] storage { replication_mode_type replication_mode_; composite_binlog_name binlog_name_sentinel_{}; + gtids::gtid_set purged_gtids_{}; binlog_record_container binlog_records_{}; std::uint64_t checkpoint_size_bytes_{0ULL}; diff --git a/src/easymysql/connection.cpp b/src/easymysql/connection.cpp index 574ed47..323c1bf 100644 --- a/src/easymysql/connection.cpp +++ b/src/easymysql/connection.cpp @@ -429,10 +429,64 @@ void connection::execute_generic_query_noresult(std::string_view query) { auto *casted_impl = mysql_deimpl::get(mysql_impl_); if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) { - raise_core_error_from_connection("cannot execute query", *this); + raise_core_error_from_connection("cannot execute noresult query", *this); } } +[[nodiscard]] std::string +connection::execute_select_query_string_result(std::string_view query) { + assert(!is_empty()); + if (is_in_replication_mode()) { + util::exception_location().raise( + "cannot execute query in replication mode"); + } + + auto *casted_impl = mysql_deimpl::get(mysql_impl_); + if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) { + raise_core_error_from_connection("cannot execute string result query", + *this); + } + + const auto mysql_res_deleter = [](MYSQL_RES *result_raw) { + if (result_raw != nullptr) { + mysql_free_result(result_raw); + } + }; + using mysql_res_ptr = std::unique_ptr; + + const mysql_res_ptr result{mysql_store_result(casted_impl), + mysql_res_deleter}; + if (!result) { + raise_core_error_from_connection("cannot store query result", *this); + } + if (mysql_num_rows(result.get()) != 1U) { + raise_core_error_from_connection("query did not return exactly one row", + *this); + } + + static constexpr std::size_t expected_num_fields{1U}; + if (mysql_num_fields(result.get()) != expected_num_fields) { + raise_core_error_from_connection("query did not return exactly one column", + *this); + } + + MYSQL_ROW row_raw{mysql_fetch_row(result.get())}; + assert(row_raw != nullptr); + + const std::span row{ + row_raw, expected_num_fields}; + if (row.front() == nullptr) { + raise_core_error_from_connection("query returned NULL value", *this); + } + + const auto *const lengths_raw{mysql_fetch_lengths(result.get())}; + assert(lengths_raw != nullptr); + const std::span lengths{ + lengths_raw, expected_num_fields}; + + return std::string{row.front(), lengths.front()}; +} + bool connection::ping() { assert(!is_empty()); if (is_in_replication_mode()) { diff --git a/src/easymysql/connection.hpp b/src/easymysql/connection.hpp index 3bff42e..1d55dc1 100644 --- a/src/easymysql/connection.hpp +++ b/src/easymysql/connection.hpp @@ -58,6 +58,8 @@ class [[nodiscard]] connection { [[nodiscard]] std::string_view get_character_set_name() const noexcept; void execute_generic_query_noresult(std::string_view query); + [[nodiscard]] std::string + execute_select_query_string_result(std::string_view query); [[nodiscard]] bool ping(); [[nodiscard]] bool is_in_replication_mode() const noexcept {