Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/r/gtid_purged.result
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ include/read_file_to_var.inc
*** Executing the Binlog Server utility one more time (fetching nothing)

*** Executing the Binlog Server utility in the 'search_by_gtid_set'
one more time (expecting the same results)
*** one more time (expecting the same results)
include/read_file_to_var.inc

*** Removing the search result file.
Expand Down
36 changes: 36 additions & 0 deletions mtr/binlog_streaming/r/thousand_binlogs.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
*** Resetting replication at the very beginning of the test.

*** Generating a configuration file in JSON format for the Binlog
*** Server utility.

*** Determining binlog file directory from the server.

*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
*** binlog files downloaded via the Binlog Server utility.

*** Creating a simple table.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;

*** Generating 1000 binlogs

*** Executing the Binlog Server utility and fetching all events.

*** Executing the Binlog Server utility in the 'search_by_timestamp'
include/read_file_to_var.inc

*** Executing the Binlog Server utility one more time (fetching nothing)

*** Executing the Binlog Server utility in the 'search_by_timestamp'
*** one more time (expecting the same results)
include/read_file_to_var.inc

*** Removing the search result file.

*** Dropping the table.
DROP TABLE t1;

*** Removing the Binlog Server utility storage directory.

*** Removing the Binlog Server utility log file.

*** Removing the Binlog Server utility configuration file.
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/t/gtid_purged.test
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ eval PURGE BINARY LOGS TO '$second_binlog';

--echo
--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set'
--echo one more time (expecting the same results)
--echo *** one more time (expecting the same results)
--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file

--source include/read_file_to_var.inc
Expand Down
90 changes: 90 additions & 0 deletions mtr/binlog_streaming/t/thousand_binlogs.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
--source ../include/have_binsrv.inc
--source include/big_test.inc

--source ../include/v80_v84_compatibility_defines.inc

# in case of --repeat=N, we need to start from a fresh binary log to make
# this test deterministic
--echo *** Resetting replication at the very beginning of the test.
--disable_query_log
eval $stmt_reset_binary_logs_and_gtids;
--enable_query_log

# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

# creating data directory, configuration file, etc.
--let $binsrv_connect_timeout = 20
--let $binsrv_read_timeout = 60
--let $binsrv_idle_time = 10
--let $binsrv_verify_checksum = TRUE
--let $binsrv_replication_mode = position
--let $binsrv_checkpoint_size = 1
--source ../include/set_up_binsrv_environment.inc

--let $read_from_file = $MYSQL_TMP_DIR/search_result.json

--echo
--echo *** Creating a simple table.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;

--let $number_of_binlogs = 1000
--echo
--echo *** Generating $number_of_binlogs binlogs
--let $index = 0

--disable_query_log
while($index < $number_of_binlogs)
{
INSERT INTO t1 VALUES();
FLUSH BINARY LOGS;
--inc $index
}
INSERT INTO t1 VALUES();
--enable_query_log

--echo
--echo *** Executing the Binlog Server utility and fetching all events.
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null

--let $current_timestamp = `SELECT DATE_FORMAT(CONVERT_TZ(NOW(), @@session.time_zone, '+00:00'),'%Y-%m-%dT%H:%i:%s')`

--echo
--echo *** Executing the Binlog Server utility in the 'search_by_timestamp'
--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file

--source include/read_file_to_var.inc
# the result will be too big, so storing it in a session variable
--disable_query_log
eval SET @search_result = '$result';
--enable_query_log
--assert(`SELECT JSON_EXTRACT(@search_result, '$.status') = 'success'`)
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT(@search_result, '$.result')) = $number_of_binlogs + 1`)

--echo
--echo *** Executing the Binlog Server utility one more time (fetching nothing)
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null

--echo
--echo *** Executing the Binlog Server utility in the 'search_by_timestamp'
--echo *** one more time (expecting the same results)
--exec $BINSRV search_by_timestamp $binsrv_config_file_path $current_timestamp > $read_from_file

--source include/read_file_to_var.inc
# the result will be too big, so storing it in a session variable
--disable_query_log
eval SET @search_result = '$result';
--enable_query_log
--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`)
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = $number_of_binlogs + 1`)

--echo
--echo *** Removing the search result file.
--remove_file $read_from_file

--echo
--echo *** Dropping the table.
DROP TABLE t1;

# cleaning up
--source ../include/tear_down_binsrv_environment.inc
102 changes: 65 additions & 37 deletions src/binsrv/s3_storage_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

#include <aws/core/utils/memory/AWSMemory.h>

#include <aws/core/utils/memory/stl/AWSVector.h>

#include <aws/s3-crt/ClientConfiguration.h>
#include <aws/s3-crt/S3CrtClient.h>

Expand All @@ -57,6 +59,7 @@
#include <aws/s3-crt/model/GetObjectResult.h>
#include <aws/s3-crt/model/ListObjectsV2Request.h>
#include <aws/s3-crt/model/ListObjectsV2Result.h>
#include <aws/s3-crt/model/Object.h>
#include <aws/s3-crt/model/PutObjectRequest.h>

#include "binsrv/s3_error_helpers_private.hpp"
Expand Down Expand Up @@ -187,6 +190,12 @@ class s3_storage_backend::aws_context : private aws_context_base {
void get_object_internal(const qualified_object_path &source,
const stream_factory_type &stream_factory,
const stream_handler_type &stream_handler) const;

using list_object_container = Aws::Vector<Aws::S3Crt::Model::Object>;
static void
process_list_objects_internal(const list_object_container &list_objects,
const std::string &prefix,
storage_object_name_container &storage_objects);
};

s3_storage_backend::aws_context::aws_context(
Expand Down Expand Up @@ -371,49 +380,45 @@ s3_storage_backend::aws_context::list_objects(
list_objects_request.SetPrefix(prefix_str);
}

const auto list_objects_outcome{client_->ListObjectsV2(list_objects_request)};
if (!list_objects_outcome.IsSuccess()) {
raise_s3_error_from_outcome("cannot list objects in the specified bucket",
list_objects_outcome.GetError());
}
const auto &list_objects_result = list_objects_outcome.GetResult();
// TODO: implement receiving the rest of the list
if (list_objects_result.GetIsTruncated()) {
util::exception_location().raise<std::logic_error>(
"too many objects in the specified bucket");
}
std::string continuation_token;
bool has_more{true};
while (has_more) {
if (!continuation_token.empty()) {
list_objects_request.SetContinuationToken(continuation_token);
}

auto model_key_count = list_objects_result.GetKeyCount();
if (!std::in_range<std::size_t>(model_key_count)) {
util::exception_location().raise<std::logic_error>(
"invalid key count in the list objects result");
}
auto key_count{static_cast<std::size_t>(model_key_count)};
const auto list_objects_outcome{
client_->ListObjectsV2(list_objects_request)};
if (!list_objects_outcome.IsSuccess()) {
raise_s3_error_from_outcome("cannot list objects in the specified bucket",
list_objects_outcome.GetError());
}
const auto &list_objects_result = list_objects_outcome.GetResult();

const auto &model_objects{list_objects_result.GetContents()};
if (key_count != std::size(model_objects)) {
util::exception_location().raise<std::logic_error>(
"key count does not match the number of objects in the list objects "
"result");
}
result.reserve(key_count);
auto model_key_count = list_objects_result.GetKeyCount();
if (!std::in_range<std::size_t>(model_key_count)) {
util::exception_location().raise<std::logic_error>(
"invalid key count in the list objects result");
}
auto key_count{static_cast<std::size_t>(model_key_count)};

for (const auto &model_object : model_objects) {
// if the prefix is set, the list of objects in the response will include
// the prefix itself (as a directory) with zero size - it neeeds to be
// skipped
const auto &model_objects{list_objects_result.GetContents()};
if (key_count != std::size(model_objects)) {
util::exception_location().raise<std::logic_error>(
"key count does not match the number of objects in the list objects "
"result");
}
result.reserve(std::size(result) + key_count);

// moreover, we need to remove the prefix itself from the object paths
std::string_view key{model_object.GetKey()};
if (!prefix_str.empty()) {
if (!key.starts_with(prefix_str)) {
process_list_objects_internal(model_objects, prefix_str, result);

has_more = list_objects_result.GetIsTruncated();
if (has_more) {
continuation_token = list_objects_result.GetNextContinuationToken();
if (continuation_token.empty()) {
util::exception_location().raise<std::logic_error>(
"encountered an object with unexpected prefix");
"truncated list objects result is missing continuation token");
}
key.remove_prefix(std::size(prefix_str));
}
if (!key.empty()) {
result.emplace(key, model_object.GetSize());
}
}

Expand Down Expand Up @@ -453,6 +458,29 @@ void s3_storage_backend::aws_context::get_object_internal(
}
}

void s3_storage_backend::aws_context::process_list_objects_internal(
const list_object_container &list_objects, const std::string &prefix,
storage_object_name_container &storage_objects) {
for (const auto &list_object : list_objects) {
// if the prefix is set, the list of objects in the response will
// include the prefix itself (as a directory) with zero size - it
// needs to be skipped

// moreover, we need to remove the prefix itself from the object paths
std::string_view key{list_object.GetKey()};
if (!prefix.empty()) {
if (!key.starts_with(prefix)) {
util::exception_location().raise<std::logic_error>(
"encountered an object with unexpected prefix");
}
key.remove_prefix(std::size(prefix));
}
if (!key.empty()) {
storage_objects.emplace(key, list_object.GetSize());
}
}
}

s3_storage_backend::s3_storage_backend(const storage_config &config)
: bucket_{}, root_path_{}, current_name_{}, uuid_generator_{},
tmp_file_directory_{}, current_tmp_file_path_{}, tmp_fstream_{}, impl_{} {
Expand Down
Loading