diff --git a/mtr/binlog_streaming/r/gtid_purged.result b/mtr/binlog_streaming/r/gtid_purged.result index 44a69e1..91d74d5 100644 --- a/mtr/binlog_streaming/r/gtid_purged.result +++ b/mtr/binlog_streaming/r/gtid_purged.result @@ -35,7 +35,7 @@ include/read_file_to_var.inc *** Executing the Binlog Server utility one more time (fetching nothing) *** Executing the Binlog Server utility in the 'search_by_gtid_set' -one more time (expecting the same results) +*** one more time (expecting the same results) include/read_file_to_var.inc *** Removing the search result file. diff --git a/mtr/binlog_streaming/r/thousand_binlogs.result b/mtr/binlog_streaming/r/thousand_binlogs.result new file mode 100644 index 0000000..03449e2 --- /dev/null +++ b/mtr/binlog_streaming/r/thousand_binlogs.result @@ -0,0 +1,36 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Generating 1000 binlogs + +*** Executing the Binlog Server utility and fetching all events. + +*** Executing the Binlog Server utility in the 'search_by_timestamp' +include/read_file_to_var.inc + +*** Executing the Binlog Server utility one more time (fetching nothing) + +*** Executing the Binlog Server utility in the 'search_by_timestamp' +*** one more time (expecting the same results) +include/read_file_to_var.inc + +*** Removing the search result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/gtid_purged.test b/mtr/binlog_streaming/t/gtid_purged.test index 33c6376..242950f 100644 --- a/mtr/binlog_streaming/t/gtid_purged.test +++ b/mtr/binlog_streaming/t/gtid_purged.test @@ -76,7 +76,7 @@ eval PURGE BINARY LOGS TO '$second_binlog'; --echo --echo *** Executing the Binlog Server utility in the 'search_by_gtid_set' ---echo one more time (expecting the same results) +--echo *** one more time (expecting the same results) --exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file --source include/read_file_to_var.inc diff --git a/mtr/binlog_streaming/t/thousand_binlogs.test b/mtr/binlog_streaming/t/thousand_binlogs.test new file mode 100644 index 0000000..a40dda7 --- /dev/null +++ b/mtr/binlog_streaming/t/thousand_binlogs.test @@ -0,0 +1,90 @@ +--source ../include/have_binsrv.inc +--source include/big_test.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $read_from_file = $MYSQL_TMP_DIR/search_result.json + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--let $number_of_binlogs = 1000 +--echo +--echo *** Generating $number_of_binlogs binlogs +--let $index = 0 + +--disable_query_log +while($index < $number_of_binlogs) +{ + INSERT INTO t1 VALUES(); + FLUSH BINARY LOGS; + --inc $index +} +INSERT INTO t1 VALUES(); +--enable_query_log + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--let $current_timestamp = `SELECT DATE_FORMAT(CONVERT_TZ(NOW(), @@session.time_zone, '+00:00'),'%Y-%m-%dT%H:%i:%s')` + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_timestamp' +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file + +--source include/read_file_to_var.inc +# the result will be too big, so storing it in a session variable +--disable_query_log +eval SET @search_result = '$result'; +--enable_query_log +--assert(`SELECT JSON_EXTRACT(@search_result, '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT(@search_result, '$.result')) = $number_of_binlogs + 1`) + +--echo +--echo *** Executing the Binlog Server utility one more time (fetching nothing) +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** one more time (expecting the same results) +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file + +--source include/read_file_to_var.inc +# the result will be too big, so storing it in a session variable +--disable_query_log +eval SET @search_result = '$result'; +--enable_query_log +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = $number_of_binlogs + 1`) + +--echo +--echo *** Removing the search result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index e82c217..ea3bbda 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -47,6 +47,8 @@ #include +#include + #include #include @@ -57,6 +59,7 @@ #include #include #include +#include #include #include "binsrv/s3_error_helpers_private.hpp" @@ -187,6 +190,12 @@ class s3_storage_backend::aws_context : private aws_context_base { void get_object_internal(const qualified_object_path &source, const stream_factory_type &stream_factory, const stream_handler_type &stream_handler) const; + + using list_object_container = Aws::Vector; + static void + process_list_objects_internal(const list_object_container &list_objects, + const std::string &prefix, + storage_object_name_container &storage_objects); }; s3_storage_backend::aws_context::aws_context( @@ -371,49 +380,45 @@ s3_storage_backend::aws_context::list_objects( list_objects_request.SetPrefix(prefix_str); } - const auto list_objects_outcome{client_->ListObjectsV2(list_objects_request)}; - if (!list_objects_outcome.IsSuccess()) { - raise_s3_error_from_outcome("cannot list objects in the specified bucket", - list_objects_outcome.GetError()); - } - const auto &list_objects_result = list_objects_outcome.GetResult(); - // TODO: implement receiving the rest of the list - if (list_objects_result.GetIsTruncated()) { - util::exception_location().raise( - "too many objects in the specified bucket"); - } + std::string continuation_token; + bool has_more{true}; + while (has_more) { + if (!continuation_token.empty()) { + list_objects_request.SetContinuationToken(continuation_token); + } - auto model_key_count = list_objects_result.GetKeyCount(); - if (!std::in_range(model_key_count)) { - util::exception_location().raise( - "invalid key count in the list objects result"); - } - auto key_count{static_cast(model_key_count)}; + const auto list_objects_outcome{ + client_->ListObjectsV2(list_objects_request)}; + if (!list_objects_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot list objects in the specified bucket", + list_objects_outcome.GetError()); + } + const auto &list_objects_result = list_objects_outcome.GetResult(); - const auto &model_objects{list_objects_result.GetContents()}; - if (key_count != std::size(model_objects)) { - util::exception_location().raise( - "key count does not match the number of objects in the list objects " - "result"); - } - result.reserve(key_count); + auto model_key_count = list_objects_result.GetKeyCount(); + if (!std::in_range(model_key_count)) { + util::exception_location().raise( + "invalid key count in the list objects result"); + } + auto key_count{static_cast(model_key_count)}; - for (const auto &model_object : model_objects) { - // if the prefix is set, the list of objects in the response will include - // the prefix itself (as a directory) with zero size - it neeeds to be - // skipped + const auto &model_objects{list_objects_result.GetContents()}; + if (key_count != std::size(model_objects)) { + util::exception_location().raise( + "key count does not match the number of objects in the list objects " + "result"); + } + result.reserve(std::size(result) + key_count); - // moreover, we need to remove the prefix itself from the object paths - std::string_view key{model_object.GetKey()}; - if (!prefix_str.empty()) { - if (!key.starts_with(prefix_str)) { + process_list_objects_internal(model_objects, prefix_str, result); + + has_more = list_objects_result.GetIsTruncated(); + if (has_more) { + continuation_token = list_objects_result.GetNextContinuationToken(); + if (continuation_token.empty()) { util::exception_location().raise( - "encountered an object with unexpected prefix"); + "truncated list objects result is missing continuation token"); } - key.remove_prefix(std::size(prefix_str)); - } - if (!key.empty()) { - result.emplace(key, model_object.GetSize()); } } @@ -453,6 +458,29 @@ void s3_storage_backend::aws_context::get_object_internal( } } +void s3_storage_backend::aws_context::process_list_objects_internal( + const list_object_container &list_objects, const std::string &prefix, + storage_object_name_container &storage_objects) { + for (const auto &list_object : list_objects) { + // if the prefix is set, the list of objects in the response will + // include the prefix itself (as a directory) with zero size - it + // needs to be skipped + + // moreover, we need to remove the prefix itself from the object paths + std::string_view key{list_object.GetKey()}; + if (!prefix.empty()) { + if (!key.starts_with(prefix)) { + util::exception_location().raise( + "encountered an object with unexpected prefix"); + } + key.remove_prefix(std::size(prefix)); + } + if (!key.empty()) { + storage_objects.emplace(key, list_object.GetSize()); + } + } +} + s3_storage_backend::s3_storage_backend(const storage_config &config) : bucket_{}, root_path_{}, current_name_{}, uuid_generator_{}, tmp_file_directory_{}, current_tmp_file_path_{}, tmp_fstream_{}, impl_{} {