From b41f43547e33194ac60c969842dfd1496e551c7c Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Fri, 24 Apr 2026 01:35:25 +0200 Subject: [PATCH] PS-11033 fix: crash when S3 bucket accumulates large number of objects; recovery requires manual intervention https://perconadev.atlassian.net/browse/PS-11033 Fixed problem with initializing a non-empty S3 storage that has more than 500 binlog files / binlog metadata files (1000 total). Implemented pagination for the 'ListObjectsV2()' AWS SDK C++ API call in the 's3_storage_backend::aws_context::list_objects()' method. S3 API has a hard limit that sets the max number of items in the 'ListObjectsV2' response to be no greater than 1000. In case when the storage has more than 1000 objects, we now perform several 'ListObjectsV2' calls with the continuation token. Added 'binlog_streaming.thousand_binlogs' MTR test case that executes 'FLUSH BINARY LOG' 1000 times and executes PBS two times, on an empty and populated storage. Co-authored-by: Copilot --- mtr/binlog_streaming/r/gtid_purged.result | 2 +- .../r/thousand_binlogs.result | 36 +++++++ mtr/binlog_streaming/t/gtid_purged.test | 2 +- mtr/binlog_streaming/t/thousand_binlogs.test | 90 ++++++++++++++++ src/binsrv/s3_storage_backend.cpp | 102 +++++++++++------- 5 files changed, 193 insertions(+), 39 deletions(-) create mode 100644 mtr/binlog_streaming/r/thousand_binlogs.result create mode 100644 mtr/binlog_streaming/t/thousand_binlogs.test diff --git a/mtr/binlog_streaming/r/gtid_purged.result b/mtr/binlog_streaming/r/gtid_purged.result index 44a69e1..91d74d5 100644 --- a/mtr/binlog_streaming/r/gtid_purged.result +++ b/mtr/binlog_streaming/r/gtid_purged.result @@ -35,7 +35,7 @@ include/read_file_to_var.inc *** Executing the Binlog Server utility one more time (fetching nothing) *** Executing the Binlog Server utility in the 'search_by_gtid_set' -one more time (expecting the same results) +*** one more time (expecting the same results) include/read_file_to_var.inc *** Removing the search result file. diff --git a/mtr/binlog_streaming/r/thousand_binlogs.result b/mtr/binlog_streaming/r/thousand_binlogs.result new file mode 100644 index 0000000..03449e2 --- /dev/null +++ b/mtr/binlog_streaming/r/thousand_binlogs.result @@ -0,0 +1,36 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Generating 1000 binlogs + +*** Executing the Binlog Server utility and fetching all events. + +*** Executing the Binlog Server utility in the 'search_by_timestamp' +include/read_file_to_var.inc + +*** Executing the Binlog Server utility one more time (fetching nothing) + +*** Executing the Binlog Server utility in the 'search_by_timestamp' +*** one more time (expecting the same results) +include/read_file_to_var.inc + +*** Removing the search result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/gtid_purged.test b/mtr/binlog_streaming/t/gtid_purged.test index 33c6376..242950f 100644 --- a/mtr/binlog_streaming/t/gtid_purged.test +++ b/mtr/binlog_streaming/t/gtid_purged.test @@ -76,7 +76,7 @@ eval PURGE BINARY LOGS TO '$second_binlog'; --echo --echo *** Executing the Binlog Server utility in the 'search_by_gtid_set' ---echo one more time (expecting the same results) +--echo *** one more time (expecting the same results) --exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file --source include/read_file_to_var.inc diff --git a/mtr/binlog_streaming/t/thousand_binlogs.test b/mtr/binlog_streaming/t/thousand_binlogs.test new file mode 100644 index 0000000..a40dda7 --- /dev/null +++ b/mtr/binlog_streaming/t/thousand_binlogs.test @@ -0,0 +1,90 @@ +--source ../include/have_binsrv.inc +--source include/big_test.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $read_from_file = $MYSQL_TMP_DIR/search_result.json + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--let $number_of_binlogs = 1000 +--echo +--echo *** Generating $number_of_binlogs binlogs +--let $index = 0 + +--disable_query_log +while($index < $number_of_binlogs) +{ + INSERT INTO t1 VALUES(); + FLUSH BINARY LOGS; + --inc $index +} +INSERT INTO t1 VALUES(); +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--let $current_timestamp = `SELECT DATE_FORMAT(CONVERT_TZ(NOW(), @@session.time_zone, '+00:00'),'%Y-%m-%dT%H:%i:%s')` + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_timestamp' +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file + +--source include/read_file_to_var.inc +# the result will be too big, so storing it in a session variable +--disable_query_log +eval SET @search_result = '$result'; +--enable_query_log +--assert(`SELECT JSON_EXTRACT(@search_result, '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT(@search_result, '$.result')) = $number_of_binlogs + 1`) + +--echo +--echo *** Executing the Binlog Server utility one more time (fetching nothing) +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** one more time (expecting the same results) +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file + +--source include/read_file_to_var.inc +# the result will be too big, so storing it in a session variable +--disable_query_log +eval SET @search_result = '$result'; +--enable_query_log +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = $number_of_binlogs + 1`) + +--echo +--echo *** Removing the search result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index e82c217..ea3bbda 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -47,6 +47,8 @@ #include +#include + #include #include @@ -57,6 +59,7 @@ #include #include #include +#include #include #include "binsrv/s3_error_helpers_private.hpp" @@ -187,6 +190,12 @@ class s3_storage_backend::aws_context : private aws_context_base { void get_object_internal(const qualified_object_path &source, const stream_factory_type &stream_factory, const stream_handler_type &stream_handler) const; + + using list_object_container = Aws::Vector; + static void + process_list_objects_internal(const list_object_container &list_objects, + const std::string &prefix, + storage_object_name_container &storage_objects); }; s3_storage_backend::aws_context::aws_context( @@ -371,49 +380,45 @@ s3_storage_backend::aws_context::list_objects( list_objects_request.SetPrefix(prefix_str); } - const auto list_objects_outcome{client_->ListObjectsV2(list_objects_request)}; - if (!list_objects_outcome.IsSuccess()) { - raise_s3_error_from_outcome("cannot list objects in the specified bucket", - list_objects_outcome.GetError()); - } - const auto &list_objects_result = list_objects_outcome.GetResult(); - // TODO: implement receiving the rest of the list - if (list_objects_result.GetIsTruncated()) { - util::exception_location().raise( - "too many objects in the specified bucket"); - } + std::string continuation_token; + bool has_more{true}; + while (has_more) { + if (!continuation_token.empty()) { + list_objects_request.SetContinuationToken(continuation_token); + } - auto model_key_count = list_objects_result.GetKeyCount(); - if (!std::in_range(model_key_count)) { - util::exception_location().raise( - "invalid key count in the list objects result"); - } - auto key_count{static_cast(model_key_count)}; + const auto list_objects_outcome{ + client_->ListObjectsV2(list_objects_request)}; + if (!list_objects_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot list objects in the specified bucket", + list_objects_outcome.GetError()); + } + const auto &list_objects_result = list_objects_outcome.GetResult(); - const auto &model_objects{list_objects_result.GetContents()}; - if (key_count != std::size(model_objects)) { - util::exception_location().raise( - "key count does not match the number of objects in the list objects " - "result"); - } - result.reserve(key_count); + auto model_key_count = list_objects_result.GetKeyCount(); + if (!std::in_range(model_key_count)) { + util::exception_location().raise( + "invalid key count in the list objects result"); + } + auto key_count{static_cast(model_key_count)}; - for (const auto &model_object : model_objects) { - // if the prefix is set, the list of objects in the response will include - // the prefix itself (as a directory) with zero size - it neeeds to be - // skipped + const auto &model_objects{list_objects_result.GetContents()}; + if (key_count != std::size(model_objects)) { + util::exception_location().raise( + "key count does not match the number of objects in the list objects " + "result"); + } + result.reserve(std::size(result) + key_count); - // moreover, we need to remove the prefix itself from the object paths - std::string_view key{model_object.GetKey()}; - if (!prefix_str.empty()) { - if (!key.starts_with(prefix_str)) { + process_list_objects_internal(model_objects, prefix_str, result); + + has_more = list_objects_result.GetIsTruncated(); + if (has_more) { + continuation_token = list_objects_result.GetNextContinuationToken(); + if (continuation_token.empty()) { util::exception_location().raise( - "encountered an object with unexpected prefix"); + "truncated list objects result is missing continuation token"); } - key.remove_prefix(std::size(prefix_str)); - } - if (!key.empty()) { - result.emplace(key, model_object.GetSize()); } } @@ -453,6 +458,29 @@ void s3_storage_backend::aws_context::get_object_internal( } } +void s3_storage_backend::aws_context::process_list_objects_internal( + const list_object_container &list_objects, const std::string &prefix, + storage_object_name_container &storage_objects) { + for (const auto &list_object : list_objects) { + // if the prefix is set, the list of objects in the response will + // include the prefix itself (as a directory) with zero size - it + // needs to be skipped + + // moreover, we need to remove the prefix itself from the object paths + std::string_view key{list_object.GetKey()}; + if (!prefix.empty()) { + if (!key.starts_with(prefix)) { + util::exception_location().raise( + "encountered an object with unexpected prefix"); + } + key.remove_prefix(std::size(prefix)); + } + if (!key.empty()) { + storage_objects.emplace(key, list_object.GetSize()); + } + } +} + s3_storage_backend::s3_storage_backend(const storage_config &config) : bucket_{}, root_path_{}, current_name_{}, uuid_generator_{}, tmp_file_directory_{}, current_tmp_file_path_{}, tmp_fstream_{}, impl_{} {