From 3158c06550af8cb3016ed96d80745d9f3261d467 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 27 Nov 2025 17:32:54 -0300 Subject: [PATCH 01/14] keep track of export partition zk requests --- src/Common/ProfileEvents.cpp | 12 +++++ .../ExportPartitionManifestUpdatingTask.cpp | 24 ++++++++++ .../ExportPartitionTaskScheduler.cpp | 44 +++++++++++++++++++ .../MergeTree/ExportPartitionUtils.cpp | 15 +++++++ src/Storages/StorageReplicatedMergeTree.cpp | 31 +++++++++++++ 5 files changed, 126 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 41790c8adf90..e4314b76ffab 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -261,6 +261,18 @@ M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.", ValueType::Bytes) \ M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.", ValueType::Bytes) \ \ + M(ExportPartitionZooKeeperRequests, "Total number of ZooKeeper requests made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGet, "Number of 'get' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetChildren, "Number of 'getChildren' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetChildrenWatch, "Number of 'getChildrenWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetWatch, "Number of 'getWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperCreate, "Number of 'create' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperSet, "Number of 'set' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperRemove, "Number of 'remove' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperRemoveRecursive, "Number of 'removeRecursive' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + \ M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \ M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \ M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.", ValueType::Number) \ diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 79b92663b7bf..359edbd12daf 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -5,8 +5,19 @@ #include "Common/logger_useful.h" #include #include +#include #include +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperGetChildrenWatch; + extern const Event ExportPartitionZooKeeperGetWatch; + extern const Event ExportPartitionZooKeeperRemoveRecursive; +} + namespace DB { namespace @@ -35,6 +46,8 @@ namespace if (has_expired && !is_pending) { zk->tryRemoveRecursive(fs::path(entry_path)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.erase(it); @@ -44,9 +57,12 @@ namespace } else if (is_pending) { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(entry_path) / "processing", parts_in_processing_or_pending)) { + LOG_INFO(log, "ExportPartition Manifest Updating Task: Failed to get parts in processing or pending, skipping"); return false; } @@ -98,6 +114,8 @@ void ExportPartitionManifestUpdatingTask::poll() Coordination::Stat stat; const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildrenWatch); const std::unordered_set zk_children(children.begin(), children.end()); const auto now = time(nullptr); @@ -111,6 +129,8 @@ void ExportPartitionManifestUpdatingTask::poll() { const std::string entry_path = fs::path(exports_path) / key; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string metadata_json; if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json)) { @@ -144,6 +164,8 @@ void ExportPartitionManifestUpdatingTask::poll() } }); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); std::string status; if (!zk->tryGetWatch(fs::path(entry_path) / "status", status, nullptr, status_watch_callback)) { @@ -273,6 +295,8 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() if (it == storage.export_merge_tree_partition_task_entries_by_key.end()) continue; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); /// get new status from zk std::string new_status_string; if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string)) diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index ab3a8ce361c7..1666222f24b4 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -4,9 +4,22 @@ #include #include #include +#include #include "Storages/MergeTree/ExportPartitionUtils.h" #include "Storages/MergeTree/MergeTreePartExportManifest.h" +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; + extern const Event ExportPartitionZooKeeperSet; + extern const Event ExportPartitionZooKeeperRemove; + extern const Event ExportPartitionZooKeeperMulti; + extern const Event ExportPartitionZooKeeperExists; +} + namespace DB { @@ -68,6 +81,8 @@ void ExportPartitionTaskScheduler::run() continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string status_in_zk_string; if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", status_in_zk_string)) { @@ -75,6 +90,9 @@ void ExportPartitionTaskScheduler::run() continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + const auto status_in_zk = magic_enum::enum_cast(status_in_zk_string); if (!status_in_zk) @@ -90,6 +108,8 @@ void ExportPartitionTaskScheduler::run() continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "processing", parts_in_processing_or_pending)) @@ -98,12 +118,15 @@ void ExportPartitionTaskScheduler::run() continue; } + if (parts_in_processing_or_pending.empty()) { LOG_INFO(storage.log, "ExportPartition scheduler task: No parts in processing or pending, skipping"); continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector locked_parts; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "locks", locked_parts)) @@ -129,6 +152,8 @@ void ExportPartitionTaskScheduler::run() continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); @@ -152,6 +177,8 @@ void ExportPartitionTaskScheduler::run() { tryLogCurrentException(__PRETTY_FUNCTION__); zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); /// we should not increment retry_count because the node might just be full } } @@ -238,6 +265,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( Coordination::Stat locked_by_stat; std::string locked_by; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not increment error counts", part_name); @@ -254,6 +283,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( const auto processing_part_path = processing_parts_path / part_name; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string processing_part_string; if (!zk->tryGet(processing_part_path, processing_part_string)) @@ -288,10 +319,14 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( const auto count_path = exceptions_per_replica_path / "count"; const auto last_exception_path = exceptions_per_replica_path / "last_exception"; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zk->exists(exceptions_per_replica_path)) { std::string num_exceptions_string; zk->tryGet(count_path, num_exceptions_string); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); num_exceptions = std::stoull(num_exceptions_string.c_str()); ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); @@ -309,6 +344,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( num_exceptions++; ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); Coordination::Responses responses; if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) { @@ -330,6 +367,8 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( Coordination::Stat locked_by_stat; std::string locked_by; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not commit or set it as completed", part_name); @@ -355,9 +394,12 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( requests.emplace_back(zkutil::makeCreateRequest(processed_part_path, processed_part_entry.toJsonString(), zkutil::CreateMode::Persistent)); requests.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); Coordination::Responses responses; if (Coordination::Error::ZOK != zk->tryMulti(requests, responses)) { + /// todo arthur remember what to do here LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping"); return false; @@ -370,6 +412,8 @@ bool ExportPartitionTaskScheduler::areAllPartsProcessed( const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk) { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); Strings parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(export_path / "processing", parts_in_processing_or_pending)) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 466eb79e8367..d9c0430678d2 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -1,10 +1,19 @@ #include #include +#include #include #include "Storages/ExportReplicatedMergeTreePartitionManifest.h" #include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h" #include +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperSet; +} + namespace DB { @@ -23,6 +32,8 @@ namespace ExportPartitionUtils const auto processed_parts_path = fs::path(export_path) / "processed"; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector processed_parts; if (Coordination::Error::ZOK != zk->tryGetChildren(processed_parts_path, processed_parts)) { @@ -39,6 +50,8 @@ namespace ExportPartitionUtils } auto responses = zk->tryGet(get_paths); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, get_paths.size()); responses.waitForResponses(); @@ -81,6 +94,8 @@ namespace ExportPartitionUtils destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, context); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1)) { LOG_INFO(log, "ExportPartition: Marked export as completed"); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ec6c43ff775e..aff99b8cb8bd 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -153,6 +153,15 @@ namespace ProfileEvents extern const Event NotCreatedLogEntryForMutation; extern const Event ReplicaPartialShutdown; extern const Event ReplicatedCoveredPartsInZooKeeperOnStart; + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; + extern const Event ExportPartitionZooKeeperSet; + extern const Event ExportPartitionZooKeeperRemove; + extern const Event ExportPartitionZooKeeperRemoveRecursive; + extern const Event ExportPartitionZooKeeperMulti; + extern const Event ExportPartitionZooKeeperExists; } namespace CurrentMetrics @@ -4456,6 +4465,8 @@ std::vector StorageReplicatedMergeTree::getPartit const auto zk = getZooKeeper(); const auto exports_path = fs::path(zookeeper_path) / "exports"; std::vector children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) { LOG_INFO(log, "Failed to get children from exports path, returning empty export info list"); @@ -4468,6 +4479,8 @@ std::vector StorageReplicatedMergeTree::getPartit const auto export_partition_path = fs::path(exports_path) / child; std::string metadata_json; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_partition_path / "metadata.json", metadata_json)) { LOG_INFO(log, "Skipping {}: missing metadata.json", child); @@ -4475,6 +4488,8 @@ std::vector StorageReplicatedMergeTree::getPartit } std::string status; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_partition_path / "status", status)) { LOG_INFO(log, "Skipping {}: missing status", child); @@ -4482,6 +4497,8 @@ std::vector StorageReplicatedMergeTree::getPartit } std::vector processing_parts; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); if (Coordination::Error::ZOK != zk->tryGetChildren(export_partition_path / "processing", processing_parts)) { LOG_INFO(log, "Skipping {}: missing processing parts", child); @@ -4498,6 +4515,8 @@ std::vector StorageReplicatedMergeTree::getPartit const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; Strings exception_replicas; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); if (Coordination::Error::ZOK != zk->tryGetChildren(exceptions_per_replica_path, exception_replicas)) { LOG_INFO(log, "Skipping {}: missing exceptions_per_replica", export_partition_path); @@ -4507,6 +4526,8 @@ std::vector StorageReplicatedMergeTree::getPartit for (const auto & replica : exception_replicas) { std::string exception_count_string; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(exceptions_per_replica_path / replica / "count", exception_count_string)) { LOG_INFO(log, "Skipping {}: missing count", replica); @@ -8148,15 +8169,21 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & const auto partition_exports_path = fs::path(exports_path) / export_key; /// check if entry already exists + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zookeeper->exists(partition_exports_path)) { LOG_INFO(log, "Export with key {} is already exported or it is being exported. Checking if it has expired so that we can overwrite it", export_key); bool has_expired = false; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zookeeper->exists(fs::path(partition_exports_path) / "metadata.json")) { std::string metadata_json; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (zookeeper->tryGet(fs::path(partition_exports_path) / "metadata.json", metadata_json)) { const auto manifest = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); @@ -8183,6 +8210,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & /// Not putting in ops (same transaction) because we can't construct a "tryRemoveRecursive" request. /// It is possible that the zk being used does not support RemoveRecursive requests. /// It is ok for this to be non transactional. Worst case scenario an on-going export is going to be killed and a new task won't be scheduled. + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); zookeeper->tryRemoveRecursive(partition_exports_path); } @@ -8269,6 +8298,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & "PENDING", zkutil::CreateMode::Persistent)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); Coordination::Responses responses; Coordination::Error code = zookeeper->tryMulti(ops, responses); From ba581eb7377d95dba22d7841343d939cb956bb2e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 27 Nov 2025 17:58:50 -0300 Subject: [PATCH 02/14] vibe coded getexportpartitioninfo --- src/Storages/StorageReplicatedMergeTree.cpp | 326 ++++++++++++++++---- 1 file changed, 268 insertions(+), 58 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index aff99b8cb8bd..4a099f81aa8f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4461,106 +4461,319 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() std::vector StorageReplicatedMergeTree::getPartitionExportsInfo() const { std::vector infos; - const auto zk = getZooKeeper(); const auto exports_path = fs::path(zookeeper_path) / "exports"; - std::vector children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + + std::vector children; if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) { LOG_INFO(log, "Failed to get children from exports path, returning empty export info list"); return infos; } + if (children.empty()) + return infos; + + /// Batch all metadata.json, status gets, and getChildren operations in a single multi request + Coordination::Requests requests; + requests.reserve(children.size() * 4); // metadata, status, processing, exceptions_per_replica + + // Track response indices for each child + struct ChildResponseIndices + { + size_t metadata_idx; + size_t status_idx; + size_t processing_idx; + size_t exceptions_per_replica_idx; + }; + std::vector response_indices; + response_indices.reserve(children.size()); + for (const auto & child : children) { - ReplicatedPartitionExportInfo info; + const auto export_partition_path = fs::path(exports_path) / child; + + ChildResponseIndices indices; + indices.metadata_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "metadata.json")); + + indices.status_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "status")); + + indices.processing_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "processing")); + + indices.exceptions_per_replica_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "exceptions_per_replica")); + + response_indices.push_back(indices); + } + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + Coordination::Error code = zk->tryMulti(requests, responses); + + if (code != Coordination::Error::ZOK) + { + LOG_INFO(log, "Failed to execute multi request for export partition info, error: {}", code); + return infos; + } + + // Helper to extract GetResponse data + auto getGetResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + const auto * get_response = dynamic_cast(responses[idx].get()); + if (!get_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + return {get_response->error, get_response->data}; + }; + + // Helper to extract ListResponse data + auto getListResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + const auto * list_response = dynamic_cast(responses[idx].get()); + if (!list_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + return {list_response->error, list_response->names}; + }; + + // Create response wrappers matching the MultiTryGetResponse/MultiTryGetChildrenResponse interface + struct ResponseWrapper + { + Coordination::Error error; + std::string data; + Strings names; + + ResponseWrapper(Coordination::Error err, const std::string & d, const Strings & n) + : error(err), data(d), names(n) {} + }; + std::vector metadata_responses_wrapper; + std::vector status_responses_wrapper; + std::vector processing_responses_wrapper; + std::vector exceptions_per_replica_responses_wrapper; + + metadata_responses_wrapper.reserve(children.size()); + status_responses_wrapper.reserve(children.size()); + processing_responses_wrapper.reserve(children.size()); + exceptions_per_replica_responses_wrapper.reserve(children.size()); + + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & indices = response_indices[child_idx]; + + // Extract metadata response + auto [metadata_error, metadata_data] = getGetResponseData(indices.metadata_idx); + metadata_responses_wrapper.emplace_back(metadata_error, metadata_data, Strings{}); + + // Extract status response + auto [status_error, status_data] = getGetResponseData(indices.status_idx); + status_responses_wrapper.emplace_back(status_error, status_data, Strings{}); + + // Extract processing response + auto [processing_error, processing_names] = getListResponseData(indices.processing_idx); + processing_responses_wrapper.emplace_back(processing_error, "", processing_names); + + // Extract exceptions_per_replica response + auto [exceptions_error, exceptions_names] = getListResponseData(indices.exceptions_per_replica_idx); + exceptions_per_replica_responses_wrapper.emplace_back(exceptions_error, "", exceptions_names); + } + + // Use wrapper vectors directly - they match the interface expected by the code below + auto & metadata_responses = metadata_responses_wrapper; + auto & status_responses = status_responses_wrapper; + auto & processing_responses = processing_responses_wrapper; + auto & exceptions_per_replica_responses = exceptions_per_replica_responses_wrapper; + + /// Collect all exception replica paths for batching + struct ExceptionReplicaPath + { + size_t child_idx; + std::string replica; + std::string count_path; + std::string exception_path; + std::string part_path; + }; + + std::vector exception_replica_paths; + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & child = children[child_idx]; const auto export_partition_path = fs::path(exports_path) / child; - std::string metadata_json; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (!zk->tryGet(export_partition_path / "metadata.json", metadata_json)) + /// Check if we got valid responses + if (metadata_responses[child_idx].error != Coordination::Error::ZOK) { LOG_INFO(log, "Skipping {}: missing metadata.json", child); continue; } - - std::string status; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (!zk->tryGet(export_partition_path / "status", status)) + if (status_responses[child_idx].error != Coordination::Error::ZOK) { LOG_INFO(log, "Skipping {}: missing status", child); continue; } - - std::vector processing_parts; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - if (Coordination::Error::ZOK != zk->tryGetChildren(export_partition_path / "processing", processing_parts)) + if (processing_responses[child_idx].error != Coordination::Error::ZOK) { LOG_INFO(log, "Skipping {}: missing processing parts", child); continue; } - - const auto parts_to_do = processing_parts.size(); - - std::string exception_replica; - std::string last_exception; - std::string exception_part; - std::size_t exception_count = 0; - - const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; - - Strings exception_replicas; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - if (Coordination::Error::ZOK != zk->tryGetChildren(exceptions_per_replica_path, exception_replicas)) + if (exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) { LOG_INFO(log, "Skipping {}: missing exceptions_per_replica", export_partition_path); continue; } - + const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; + const auto & exception_replicas = exceptions_per_replica_responses[child_idx].names; for (const auto & replica : exception_replicas) { - std::string exception_count_string; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (!zk->tryGet(exceptions_per_replica_path / replica / "count", exception_count_string)) - { - LOG_INFO(log, "Skipping {}: missing count", replica); - continue; - } + const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; + exception_replica_paths.push_back({ + child_idx, + replica, + (exceptions_per_replica_path / replica / "count").string(), + (last_exception_path / "exception").string(), + (last_exception_path / "part").string() + }); + } + } + /// Batch get all exception data in a single multi request + std::map>> exception_data_by_child; - exception_count += std::stoull(exception_count_string.c_str()); + if (!exception_replica_paths.empty()) + { + Coordination::Requests exception_requests; + exception_requests.reserve(exception_replica_paths.size() * 3); // count, exception, part for each + + // Track response indices for each exception replica path + struct ExceptionResponseIndices + { + size_t count_idx; + size_t exception_idx; + size_t part_idx; + }; + std::vector exception_response_indices; + exception_response_indices.reserve(exception_replica_paths.size()); + + for (const auto & erp : exception_replica_paths) + { + ExceptionResponseIndices indices; + indices.count_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.count_path)); + + indices.exception_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.exception_path)); + + indices.part_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.part_path)); + + exception_response_indices.push_back(indices); + } + + // Execute single multi request for all exception data + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses exception_responses; + Coordination::Error exception_code = zk->tryMulti(exception_requests, exception_responses); - if (last_exception.empty()) + if (exception_code != Coordination::Error::ZOK) + { + LOG_INFO(log, "Failed to execute multi request for exception data, error: {}", exception_code); + } + else + { + // Parse exception responses + for (size_t exception_path_idx = 0; exception_path_idx < exception_replica_paths.size(); ++exception_path_idx) { - const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; - std::string last_exception_string; - if (!zk->tryGet(last_exception_path / "exception", last_exception_string)) + const auto & erp = exception_replica_paths[exception_path_idx]; + const auto & indices = exception_response_indices[exception_path_idx]; + + std::string count_str; + std::string exception_str; + std::string part_str; + + // Extract count response + if (indices.count_idx < exception_responses.size()) { - LOG_INFO(log, "Skipping {}: missing last_exception/exception", last_exception_path); - continue; + const auto * count_response = dynamic_cast(exception_responses[indices.count_idx].get()); + if (count_response && count_response->error == Coordination::Error::ZOK) + count_str = count_response->data; } - std::string exception_part_zk; - if (!zk->tryGet(last_exception_path / "part", exception_part_zk)) + // Extract exception response + if (indices.exception_idx < exception_responses.size()) { - LOG_INFO(log, "Skipping {}: missing exception part", last_exception_path); - continue; + const auto * exception_response = dynamic_cast(exception_responses[indices.exception_idx].get()); + if (exception_response && exception_response->error == Coordination::Error::ZOK) + exception_str = exception_response->data; + } + + // Extract part response + if (indices.part_idx < exception_responses.size()) + { + const auto * part_response = dynamic_cast(exception_responses[indices.part_idx].get()); + if (part_response && part_response->error == Coordination::Error::ZOK) + part_str = part_response->data; } - exception_replica = replica; - last_exception = last_exception_string; - exception_part = exception_part_zk; + exception_data_by_child[erp.child_idx].emplace_back(erp.replica, count_str, exception_str, part_str); } } + } - const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + /// Build the result + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + /// Skip if we already determined this child is invalid + if (metadata_responses[child_idx].error != Coordination::Error::ZOK + || status_responses[child_idx].error != Coordination::Error::ZOK + || processing_responses[child_idx].error != Coordination::Error::ZOK + || exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) + { + continue; + } + ReplicatedPartitionExportInfo info; + const auto metadata_json = metadata_responses[child_idx].data; + const auto status = status_responses[child_idx].data; + const auto processing_parts = processing_responses[child_idx].names; + const auto parts_to_do = processing_parts.size(); + std::string exception_replica; + std::string last_exception; + std::string exception_part; + std::size_t exception_count = 0; + /// Process exception data + auto exception_data_it = exception_data_by_child.find(child_idx); + if (exception_data_it != exception_data_by_child.end()) + { + for (const auto & [replica, count_str, exception_str, part_str] : exception_data_it->second) + { + if (!count_str.empty()) + { + exception_count += std::stoull(count_str); + } + if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) + { + exception_replica = replica; + last_exception = exception_str; + exception_part = part_str; + } + } + } + + const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); info.destination_database = metadata.destination_database; info.destination_table = metadata.destination_table; info.partition_id = metadata.partition_id; @@ -4575,14 +4788,11 @@ std::vector StorageReplicatedMergeTree::getPartit info.last_exception = last_exception; info.exception_part = exception_part; info.exception_count = exception_count; - infos.emplace_back(std::move(info)); } - return infos; } - StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, From efa0b03b2a30d9056542494d232315303d45190e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 27 Nov 2025 18:06:44 -0300 Subject: [PATCH 03/14] move vibe coded getpartitionexports inside the updating task --- .../ExportPartitionManifestUpdatingTask.cpp | 338 ++++++++++++++++++ .../ExportPartitionManifestUpdatingTask.h | 3 + src/Storages/StorageReplicatedMergeTree.cpp | 332 +---------------- 3 files changed, 342 insertions(+), 331 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 359edbd12daf..14b376c700db 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -16,6 +16,7 @@ namespace ProfileEvents extern const Event ExportPartitionZooKeeperGetChildrenWatch; extern const Event ExportPartitionZooKeeperGetWatch; extern const Event ExportPartitionZooKeeperRemoveRecursive; + extern const Event ExportPartitionZooKeeperMulti; } namespace DB @@ -95,6 +96,343 @@ ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(Storage { } +std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfo() const +{ + std::vector infos; + const auto zk = storage.getZooKeeper(); + const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + + std::vector children; + if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) + { + LOG_INFO(storage.log, "Failed to get children from exports path, returning empty export info list"); + return infos; + } + + if (children.empty()) + return infos; + + /// Batch all metadata.json, status gets, and getChildren operations in a single multi request + Coordination::Requests requests; + requests.reserve(children.size() * 4); // metadata, status, processing, exceptions_per_replica + + // Track response indices for each child + struct ChildResponseIndices + { + size_t metadata_idx; + size_t status_idx; + size_t processing_idx; + size_t exceptions_per_replica_idx; + }; + std::vector response_indices; + response_indices.reserve(children.size()); + + for (const auto & child : children) + { + const auto export_partition_path = fs::path(exports_path) / child; + + ChildResponseIndices indices; + indices.metadata_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "metadata.json")); + + indices.status_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "status")); + + indices.processing_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "processing")); + + indices.exceptions_per_replica_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "exceptions_per_replica")); + + response_indices.push_back(indices); + } + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + Coordination::Error code = zk->tryMulti(requests, responses); + + if (code != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Failed to execute multi request for export partition info, error: {}", code); + return infos; + } + + // Helper to extract GetResponse data + auto getGetResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + const auto * get_response = dynamic_cast(responses[idx].get()); + if (!get_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + return {get_response->error, get_response->data}; + }; + + // Helper to extract ListResponse data + auto getListResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + const auto * list_response = dynamic_cast(responses[idx].get()); + if (!list_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + return {list_response->error, list_response->names}; + }; + + // Create response wrappers matching the MultiTryGetResponse/MultiTryGetChildrenResponse interface + struct ResponseWrapper + { + Coordination::Error error; + std::string data; + Strings names; + + ResponseWrapper(Coordination::Error err, const std::string & d, const Strings & n) + : error(err), data(d), names(n) {} + }; + + std::vector metadata_responses_wrapper; + std::vector status_responses_wrapper; + std::vector processing_responses_wrapper; + std::vector exceptions_per_replica_responses_wrapper; + + metadata_responses_wrapper.reserve(children.size()); + status_responses_wrapper.reserve(children.size()); + processing_responses_wrapper.reserve(children.size()); + exceptions_per_replica_responses_wrapper.reserve(children.size()); + + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & indices = response_indices[child_idx]; + + // Extract metadata response + auto [metadata_error, metadata_data] = getGetResponseData(indices.metadata_idx); + metadata_responses_wrapper.emplace_back(metadata_error, metadata_data, Strings{}); + + // Extract status response + auto [status_error, status_data] = getGetResponseData(indices.status_idx); + status_responses_wrapper.emplace_back(status_error, status_data, Strings{}); + + // Extract processing response + auto [processing_error, processing_names] = getListResponseData(indices.processing_idx); + processing_responses_wrapper.emplace_back(processing_error, "", processing_names); + + // Extract exceptions_per_replica response + auto [exceptions_error, exceptions_names] = getListResponseData(indices.exceptions_per_replica_idx); + exceptions_per_replica_responses_wrapper.emplace_back(exceptions_error, "", exceptions_names); + } + + // Use wrapper vectors directly - they match the interface expected by the code below + auto & metadata_responses = metadata_responses_wrapper; + auto & status_responses = status_responses_wrapper; + auto & processing_responses = processing_responses_wrapper; + auto & exceptions_per_replica_responses = exceptions_per_replica_responses_wrapper; + + /// Collect all exception replica paths for batching + struct ExceptionReplicaPath + { + size_t child_idx; + std::string replica; + std::string count_path; + std::string exception_path; + std::string part_path; + }; + + std::vector exception_replica_paths; + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & child = children[child_idx]; + const auto export_partition_path = fs::path(exports_path) / child; + /// Check if we got valid responses + if (metadata_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing metadata.json", child); + continue; + } + if (status_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing status", child); + continue; + } + if (processing_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing processing parts", child); + continue; + } + if (exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing exceptions_per_replica", export_partition_path); + continue; + } + const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; + const auto & exception_replicas = exceptions_per_replica_responses[child_idx].names; + for (const auto & replica : exception_replicas) + { + const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; + exception_replica_paths.push_back({ + child_idx, + replica, + (exceptions_per_replica_path / replica / "count").string(), + (last_exception_path / "exception").string(), + (last_exception_path / "part").string() + }); + } + } + /// Batch get all exception data in a single multi request + std::map>> exception_data_by_child; + + if (!exception_replica_paths.empty()) + { + Coordination::Requests exception_requests; + exception_requests.reserve(exception_replica_paths.size() * 3); // count, exception, part for each + + // Track response indices for each exception replica path + struct ExceptionResponseIndices + { + size_t count_idx; + size_t exception_idx; + size_t part_idx; + }; + std::vector exception_response_indices; + exception_response_indices.reserve(exception_replica_paths.size()); + + for (const auto & erp : exception_replica_paths) + { + ExceptionResponseIndices indices; + indices.count_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.count_path)); + + indices.exception_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.exception_path)); + + indices.part_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.part_path)); + + exception_response_indices.push_back(indices); + } + + // Execute single multi request for all exception data + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses exception_responses; + Coordination::Error exception_code = zk->tryMulti(exception_requests, exception_responses); + + if (exception_code != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Failed to execute multi request for exception data, error: {}", exception_code); + } + else + { + // Parse exception responses + for (size_t exception_path_idx = 0; exception_path_idx < exception_replica_paths.size(); ++exception_path_idx) + { + const auto & erp = exception_replica_paths[exception_path_idx]; + const auto & indices = exception_response_indices[exception_path_idx]; + + std::string count_str; + std::string exception_str; + std::string part_str; + + // Extract count response + if (indices.count_idx < exception_responses.size()) + { + const auto * count_response = dynamic_cast(exception_responses[indices.count_idx].get()); + if (count_response && count_response->error == Coordination::Error::ZOK) + count_str = count_response->data; + } + + // Extract exception response + if (indices.exception_idx < exception_responses.size()) + { + const auto * exception_response = dynamic_cast(exception_responses[indices.exception_idx].get()); + if (exception_response && exception_response->error == Coordination::Error::ZOK) + exception_str = exception_response->data; + } + + // Extract part response + if (indices.part_idx < exception_responses.size()) + { + const auto * part_response = dynamic_cast(exception_responses[indices.part_idx].get()); + if (part_response && part_response->error == Coordination::Error::ZOK) + part_str = part_response->data; + } + + exception_data_by_child[erp.child_idx].emplace_back(erp.replica, count_str, exception_str, part_str); + } + } + } + + /// Build the result + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + /// Skip if we already determined this child is invalid + if (metadata_responses[child_idx].error != Coordination::Error::ZOK + || status_responses[child_idx].error != Coordination::Error::ZOK + || processing_responses[child_idx].error != Coordination::Error::ZOK + || exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) + { + continue; + } + + ReplicatedPartitionExportInfo info; + const auto metadata_json = metadata_responses[child_idx].data; + const auto status = status_responses[child_idx].data; + const auto processing_parts = processing_responses[child_idx].names; + const auto parts_to_do = processing_parts.size(); + std::string exception_replica; + std::string last_exception; + std::string exception_part; + std::size_t exception_count = 0; + /// Process exception data + auto exception_data_it = exception_data_by_child.find(child_idx); + if (exception_data_it != exception_data_by_child.end()) + { + for (const auto & [replica, count_str, exception_str, part_str] : exception_data_it->second) + { + if (!count_str.empty()) + { + exception_count += std::stoull(count_str); + } + if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) + { + exception_replica = replica; + last_exception = exception_str; + exception_part = part_str; + } + } + } + + const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + + info.destination_database = metadata.destination_database; + info.destination_table = metadata.destination_table; + info.partition_id = metadata.partition_id; + info.transaction_id = metadata.transaction_id; + info.create_time = metadata.create_time; + info.source_replica = metadata.source_replica; + info.parts_count = metadata.number_of_parts; + info.parts_to_do = parts_to_do; + info.parts = metadata.parts; + info.status = status; + info.exception_replica = exception_replica; + info.last_exception = last_exception; + info.exception_part = exception_part; + info.exception_count = exception_count; + infos.emplace_back(std::move(info)); + } + + return infos; +} + void ExportPartitionManifestUpdatingTask::poll() { std::lock_guard lock(storage.export_merge_tree_partition_mutex); diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index ea52f679d654..ee48d52bc98f 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -21,6 +22,8 @@ class ExportPartitionManifestUpdatingTask void addStatusChange(const std::string & key); + std::vector getPartitionExportsInfo() const; + private: StorageReplicatedMergeTree & storage; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 4a099f81aa8f..6f596e0a84f9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4460,337 +4460,7 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() std::vector StorageReplicatedMergeTree::getPartitionExportsInfo() const { - std::vector infos; - const auto zk = getZooKeeper(); - const auto exports_path = fs::path(zookeeper_path) / "exports"; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - - std::vector children; - if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) - { - LOG_INFO(log, "Failed to get children from exports path, returning empty export info list"); - return infos; - } - - if (children.empty()) - return infos; - - /// Batch all metadata.json, status gets, and getChildren operations in a single multi request - Coordination::Requests requests; - requests.reserve(children.size() * 4); // metadata, status, processing, exceptions_per_replica - - // Track response indices for each child - struct ChildResponseIndices - { - size_t metadata_idx; - size_t status_idx; - size_t processing_idx; - size_t exceptions_per_replica_idx; - }; - std::vector response_indices; - response_indices.reserve(children.size()); - - for (const auto & child : children) - { - const auto export_partition_path = fs::path(exports_path) / child; - - ChildResponseIndices indices; - indices.metadata_idx = requests.size(); - requests.push_back(zkutil::makeGetRequest(export_partition_path / "metadata.json")); - - indices.status_idx = requests.size(); - requests.push_back(zkutil::makeGetRequest(export_partition_path / "status")); - - indices.processing_idx = requests.size(); - requests.push_back(zkutil::makeListRequest(export_partition_path / "processing")); - - indices.exceptions_per_replica_idx = requests.size(); - requests.push_back(zkutil::makeListRequest(export_partition_path / "exceptions_per_replica")); - - response_indices.push_back(indices); - } - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - - Coordination::Responses responses; - Coordination::Error code = zk->tryMulti(requests, responses); - - if (code != Coordination::Error::ZOK) - { - LOG_INFO(log, "Failed to execute multi request for export partition info, error: {}", code); - return infos; - } - - // Helper to extract GetResponse data - auto getGetResponseData = [&responses](size_t idx) -> std::pair - { - if (idx >= responses.size()) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; - - const auto * get_response = dynamic_cast(responses[idx].get()); - if (!get_response) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; - - return {get_response->error, get_response->data}; - }; - - // Helper to extract ListResponse data - auto getListResponseData = [&responses](size_t idx) -> std::pair - { - if (idx >= responses.size()) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; - - const auto * list_response = dynamic_cast(responses[idx].get()); - if (!list_response) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; - - return {list_response->error, list_response->names}; - }; - - // Create response wrappers matching the MultiTryGetResponse/MultiTryGetChildrenResponse interface - struct ResponseWrapper - { - Coordination::Error error; - std::string data; - Strings names; - - ResponseWrapper(Coordination::Error err, const std::string & d, const Strings & n) - : error(err), data(d), names(n) {} - }; - - std::vector metadata_responses_wrapper; - std::vector status_responses_wrapper; - std::vector processing_responses_wrapper; - std::vector exceptions_per_replica_responses_wrapper; - - metadata_responses_wrapper.reserve(children.size()); - status_responses_wrapper.reserve(children.size()); - processing_responses_wrapper.reserve(children.size()); - exceptions_per_replica_responses_wrapper.reserve(children.size()); - - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - const auto & indices = response_indices[child_idx]; - - // Extract metadata response - auto [metadata_error, metadata_data] = getGetResponseData(indices.metadata_idx); - metadata_responses_wrapper.emplace_back(metadata_error, metadata_data, Strings{}); - - // Extract status response - auto [status_error, status_data] = getGetResponseData(indices.status_idx); - status_responses_wrapper.emplace_back(status_error, status_data, Strings{}); - - // Extract processing response - auto [processing_error, processing_names] = getListResponseData(indices.processing_idx); - processing_responses_wrapper.emplace_back(processing_error, "", processing_names); - - // Extract exceptions_per_replica response - auto [exceptions_error, exceptions_names] = getListResponseData(indices.exceptions_per_replica_idx); - exceptions_per_replica_responses_wrapper.emplace_back(exceptions_error, "", exceptions_names); - } - - // Use wrapper vectors directly - they match the interface expected by the code below - auto & metadata_responses = metadata_responses_wrapper; - auto & status_responses = status_responses_wrapper; - auto & processing_responses = processing_responses_wrapper; - auto & exceptions_per_replica_responses = exceptions_per_replica_responses_wrapper; - - /// Collect all exception replica paths for batching - struct ExceptionReplicaPath - { - size_t child_idx; - std::string replica; - std::string count_path; - std::string exception_path; - std::string part_path; - }; - - std::vector exception_replica_paths; - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - const auto & child = children[child_idx]; - const auto export_partition_path = fs::path(exports_path) / child; - /// Check if we got valid responses - if (metadata_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(log, "Skipping {}: missing metadata.json", child); - continue; - } - if (status_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(log, "Skipping {}: missing status", child); - continue; - } - if (processing_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(log, "Skipping {}: missing processing parts", child); - continue; - } - if (exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(log, "Skipping {}: missing exceptions_per_replica", export_partition_path); - continue; - } - const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; - const auto & exception_replicas = exceptions_per_replica_responses[child_idx].names; - for (const auto & replica : exception_replicas) - { - const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; - exception_replica_paths.push_back({ - child_idx, - replica, - (exceptions_per_replica_path / replica / "count").string(), - (last_exception_path / "exception").string(), - (last_exception_path / "part").string() - }); - } - } - /// Batch get all exception data in a single multi request - std::map>> exception_data_by_child; - - if (!exception_replica_paths.empty()) - { - Coordination::Requests exception_requests; - exception_requests.reserve(exception_replica_paths.size() * 3); // count, exception, part for each - - // Track response indices for each exception replica path - struct ExceptionResponseIndices - { - size_t count_idx; - size_t exception_idx; - size_t part_idx; - }; - std::vector exception_response_indices; - exception_response_indices.reserve(exception_replica_paths.size()); - - for (const auto & erp : exception_replica_paths) - { - ExceptionResponseIndices indices; - indices.count_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.count_path)); - - indices.exception_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.exception_path)); - - indices.part_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.part_path)); - - exception_response_indices.push_back(indices); - } - - // Execute single multi request for all exception data - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - - Coordination::Responses exception_responses; - Coordination::Error exception_code = zk->tryMulti(exception_requests, exception_responses); - - if (exception_code != Coordination::Error::ZOK) - { - LOG_INFO(log, "Failed to execute multi request for exception data, error: {}", exception_code); - } - else - { - // Parse exception responses - for (size_t exception_path_idx = 0; exception_path_idx < exception_replica_paths.size(); ++exception_path_idx) - { - const auto & erp = exception_replica_paths[exception_path_idx]; - const auto & indices = exception_response_indices[exception_path_idx]; - - std::string count_str; - std::string exception_str; - std::string part_str; - - // Extract count response - if (indices.count_idx < exception_responses.size()) - { - const auto * count_response = dynamic_cast(exception_responses[indices.count_idx].get()); - if (count_response && count_response->error == Coordination::Error::ZOK) - count_str = count_response->data; - } - - // Extract exception response - if (indices.exception_idx < exception_responses.size()) - { - const auto * exception_response = dynamic_cast(exception_responses[indices.exception_idx].get()); - if (exception_response && exception_response->error == Coordination::Error::ZOK) - exception_str = exception_response->data; - } - - // Extract part response - if (indices.part_idx < exception_responses.size()) - { - const auto * part_response = dynamic_cast(exception_responses[indices.part_idx].get()); - if (part_response && part_response->error == Coordination::Error::ZOK) - part_str = part_response->data; - } - - exception_data_by_child[erp.child_idx].emplace_back(erp.replica, count_str, exception_str, part_str); - } - } - } - - /// Build the result - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - /// Skip if we already determined this child is invalid - if (metadata_responses[child_idx].error != Coordination::Error::ZOK - || status_responses[child_idx].error != Coordination::Error::ZOK - || processing_responses[child_idx].error != Coordination::Error::ZOK - || exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) - { - continue; - } - - ReplicatedPartitionExportInfo info; - const auto metadata_json = metadata_responses[child_idx].data; - const auto status = status_responses[child_idx].data; - const auto processing_parts = processing_responses[child_idx].names; - const auto parts_to_do = processing_parts.size(); - std::string exception_replica; - std::string last_exception; - std::string exception_part; - std::size_t exception_count = 0; - /// Process exception data - auto exception_data_it = exception_data_by_child.find(child_idx); - if (exception_data_it != exception_data_by_child.end()) - { - for (const auto & [replica, count_str, exception_str, part_str] : exception_data_it->second) - { - if (!count_str.empty()) - { - exception_count += std::stoull(count_str); - } - if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) - { - exception_replica = replica; - last_exception = exception_str; - exception_part = part_str; - } - } - } - - const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - info.destination_database = metadata.destination_database; - info.destination_table = metadata.destination_table; - info.partition_id = metadata.partition_id; - info.transaction_id = metadata.transaction_id; - info.create_time = metadata.create_time; - info.source_replica = metadata.source_replica; - info.parts_count = metadata.number_of_parts; - info.parts_to_do = parts_to_do; - info.parts = metadata.parts; - info.status = status; - info.exception_replica = exception_replica; - info.last_exception = last_exception; - info.exception_part = exception_part; - info.exception_count = exception_count; - infos.emplace_back(std::move(info)); - } - return infos; + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); } StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( From acf5fd1fbab8202841afaa4e4a76f53d7cf2eb10 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 28 Nov 2025 10:47:08 -0300 Subject: [PATCH 04/14] rmv unexistent increment --- src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 1666222f24b4..d5ae0b2c786b 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -90,9 +90,6 @@ void ExportPartitionTaskScheduler::run() continue; } - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - const auto status_in_zk = magic_enum::enum_cast(status_in_zk_string); if (!status_in_zk) From fadf2f708234b8da90c6e1c77716ceb167a16747 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 28 Nov 2025 14:01:30 -0300 Subject: [PATCH 05/14] lock part inside task --- .../ExportPartFromPartitionExportTask.cpp | 67 +++++++++++++++++++ .../ExportPartFromPartitionExportTask.h | 35 ++++++++++ src/Storages/MergeTree/ExportPartTask.cpp | 5 ++ src/Storages/MergeTree/ExportPartTask.h | 1 + .../ExportPartitionTaskScheduler.cpp | 59 ++++++++-------- src/Storages/MergeTree/MergeTreeData.h | 1 + src/Storages/StorageReplicatedMergeTree.h | 1 + 7 files changed, 138 insertions(+), 31 deletions(-) create mode 100644 src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp create mode 100644 src/Storages/MergeTree/ExportPartFromPartitionExportTask.h diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp new file mode 100644 index 000000000000..8327753f7eaf --- /dev/null +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp @@ -0,0 +1,67 @@ +#include +#include + +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; +} +namespace DB +{ + +ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( + StorageReplicatedMergeTree & storage_, + const std::string & key_, + const MergeTreePartExportManifest & manifest_, + ContextPtr context_) + : storage(storage_), + key(key_), + manifest(manifest_), + local_context(context_) +{ + export_part_task = std::make_shared(storage, manifest, local_context); +} + +bool ExportPartFromPartitionExportTask::executeStep() +{ + const auto zk = storage.getZooKeeper(); + const auto part_name = manifest.data_part->name; + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + export_part_task->executeStep(); + return false; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", part_name); + return false; +} + +void ExportPartFromPartitionExportTask::cancel() noexcept +{ + export_part_task->cancel(); +} + +void ExportPartFromPartitionExportTask::onCompleted() +{ + export_part_task->onCompleted(); +} + +StorageID ExportPartFromPartitionExportTask::getStorageID() const +{ + return export_part_task->getStorageID(); +} + +Priority ExportPartFromPartitionExportTask::getPriority() const +{ + return export_part_task->getPriority(); +} + +String ExportPartFromPartitionExportTask::getQueryId() const +{ + return export_part_task->getQueryId(); +} +} diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h new file mode 100644 index 000000000000..ed48d97f7181 --- /dev/null +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class ExportPartFromPartitionExportTask : public IExecutableTask +{ +public: + explicit ExportPartFromPartitionExportTask( + StorageReplicatedMergeTree & storage_, + const std::string & key_, + const MergeTreePartExportManifest & manifest_, + ContextPtr context_); + bool executeStep() override; + void onCompleted() override; + StorageID getStorageID() const override; + Priority getPriority() const override; + String getQueryId() const override; + + void cancel() noexcept override; + +private: + StorageReplicatedMergeTree & storage; + std::string key; + MergeTreePartExportManifest manifest; + ContextPtr local_context; + std::shared_ptr export_part_task; +}; + +} diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index a43c45d0edaf..d0b1f546faf2 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -45,6 +45,11 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo { } +const MergeTreePartExportManifest & ExportPartTask::getManifest() const +{ + return manifest; +} + bool ExportPartTask::executeStep() { const auto & metadata_snapshot = manifest.metadata_snapshot; diff --git a/src/Storages/MergeTree/ExportPartTask.h b/src/Storages/MergeTree/ExportPartTask.h index bcec68b2b737..8b93b376df4b 100644 --- a/src/Storages/MergeTree/ExportPartTask.h +++ b/src/Storages/MergeTree/ExportPartTask.h @@ -19,6 +19,7 @@ class ExportPartTask : public IExecutableTask StorageID getStorageID() const override; Priority getPriority() const override; String getQueryId() const override; + const MergeTreePartExportManifest & getManifest() const; void cancel() noexcept override; diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index d5ae0b2c786b..7ef3a5cb5802 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -7,6 +7,9 @@ #include #include "Storages/MergeTree/ExportPartitionUtils.h" #include "Storages/MergeTree/MergeTreePartExportManifest.h" +#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h" +#include "Formats/FormatFactory.h" +#include namespace ProfileEvents { @@ -24,6 +27,11 @@ namespace ProfileEvents namespace DB { +namespace Setting +{ + extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; +} + namespace ErrorCodes { extern const int QUERY_WAS_CANCELLED; @@ -149,40 +157,29 @@ void ExportPartitionTaskScheduler::run() continue; } - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); - continue; - } + std::lock_guard part_export_lock(storage.export_manifests_mutex); - try - { - storage.exportPartToTable( - part->name, - destination_storage_id, - manifest.transaction_id, - getContextCopyWithTaskSettings(storage.getContext(), manifest), - [this, key, zk_part_name, manifest, destination_storage] - (MergeTreePartExportManifest::CompletionCallbackResult result) - { - handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); - }); - } - catch (const Exception &) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); - /// we should not increment retry_count because the node might just be full - } + auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest); + + const auto format_settings = getFormatSettings(context); + + MergeTreePartExportManifest part_export_manifest( + destination_storage->getStorageID(), + part, + manifest.transaction_id, + context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, + format_settings, + storage.getInMemoryMetadataPtr(), + [this, key, zk_part_name, manifest, destination_storage] + (MergeTreePartExportManifest::CompletionCallbackResult result) + { + handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); + }); + + storage.background_moves_assignee.scheduleMoveTask( + std::make_shared(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest))); } } - - /// maybe we failed to schedule or failed to export, need to retry eventually - storage.export_merge_tree_partition_select_task->scheduleAfter(1000 * 5); } void ExportPartitionTaskScheduler::handlePartExportCompletion( diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 521bc7e50279..84c8a45dc900 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1359,6 +1359,7 @@ class MergeTreeData : public IStorage, public WithMutableContext friend class IPartMetadataManager; friend class IMergedBlockOutputStream; // for access to log friend class ExportPartTask; + friend class ExportPartFromPartitionExportTask; bool require_part_metadata; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 712ba0ba4183..679b70f73ba8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -406,6 +406,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; + friend class ExportPartFromPartitionExportTask; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; From d51d7033111936b9e0253b6bf5dffacef20b9839 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 3 Dec 2025 15:40:13 -0300 Subject: [PATCH 06/14] tmp --- .../MergeTree/BackgroundJobsAssignee.cpp | 4 +++ .../MergeTree/BackgroundJobsAssignee.h | 2 ++ .../ExportPartFromPartitionExportTask.cpp | 5 +++- .../ExportPartitionTaskScheduler.cpp | 25 +++++++++++++++++++ .../MergeTree/MergeTreeBackgroundExecutor.cpp | 6 +++++ .../MergeTree/MergeTreeBackgroundExecutor.h | 2 ++ 6 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp index 90244a3d6a35..1b1eb48a6b2d 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -95,6 +95,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b return schedule_res; } +std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const +{ + return getContext()->getMovesExecutor()->getAvailableSlots(); +} String BackgroundJobsAssignee::toString(Type type) { diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h index a488ea23a871..b237565f8de1 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext bool scheduleMoveTask(ExecutableTaskPtr move_task); bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger); + std::size_t getAvailableMoveExecutors() const; + /// Just call finish ~BackgroundJobsAssignee(); diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp index 8327753f7eaf..bf874c1010cc 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp @@ -27,11 +27,14 @@ bool ExportPartFromPartitionExportTask::executeStep() { const auto zk = storage.getZooKeeper(); const auto part_name = manifest.data_part->name; - + + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", part_name); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) { + LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", part_name); export_part_task->executeStep(); return false; } diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 7ef3a5cb5802..4343e83b2ba2 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -60,6 +60,17 @@ ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMerg void ExportPartitionTaskScheduler::run() { + const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors(); + if (available_move_executors == 0) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping"); + return; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Available move executors: {}", available_move_executors); + + std::size_t scheduled_exports_count = 0; + std::lock_guard lock(storage.export_merge_tree_partition_mutex); auto zk = storage.getZooKeeper(); @@ -67,6 +78,12 @@ void ExportPartitionTaskScheduler::run() // Iterate sorted by create_time for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time) { + if (scheduled_exports_count >= available_move_executors) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping"); + break; + } + const auto & manifest = entry.manifest; const auto key = entry.getCompositeKey(); const auto database = storage.getContext()->resolveDatabase(manifest.destination_database); @@ -144,6 +161,12 @@ void ExportPartitionTaskScheduler::run() for (const auto & zk_part_name : parts_in_processing_or_pending) { + if (scheduled_exports_count >= available_move_executors) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping"); + break; + } + if (locked_parts_set.contains(zk_part_name)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked, skipping", zk_part_name); @@ -157,6 +180,8 @@ void ExportPartitionTaskScheduler::run() continue; } + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name); + std::lock_guard part_export_lock(storage.export_manifests_mutex); auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest); diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index d66111ea406a..08a8444ed9ad 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -136,6 +136,12 @@ size_t MergeTreeBackgroundExecutor::getMaxTasksCount() const return max_tasks_count.load(std::memory_order_relaxed); } +template +size_t MergeTreeBackgroundExecutor::getAvailableSlots() const +{ + return getMaxTasksCount() - CurrentMetrics::values[metric].load(std::memory_order_relaxed); +} + template bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task) { diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index 4b67d50a49bb..4079c16497dd 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -279,6 +279,8 @@ class MergeTreeBackgroundExecutor final : boost::noncopyable /// can lead only to some postponing, not logical error. size_t getMaxTasksCount() const; + size_t getAvailableSlots() const; + bool trySchedule(ExecutableTaskPtr task); void removeTasksCorrespondingToStorage(StorageID id); void wait(); From ef5807b75d1623af52434fdc99e3401f993a0db2 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 9 Dec 2025 08:48:52 -0300 Subject: [PATCH 07/14] okish --- .../MergeTree/ExportPartitionTaskScheduler.cpp | 14 ++++++++++++-- .../MergeTree/MergeTreePartExportManifest.h | 4 ++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 4343e83b2ba2..d9f864e89652 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -61,6 +61,8 @@ ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMerg void ExportPartitionTaskScheduler::run() { const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors(); + + /// this is subject to TOCTOU - but for now we choose to live with it. if (available_move_executors == 0) { LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping"); @@ -71,6 +73,9 @@ void ExportPartitionTaskScheduler::run() std::size_t scheduled_exports_count = 0; + const uint32_t seed = uint32_t(std::hash{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count); + std::mt19937 rng(seed); + std::lock_guard lock(storage.export_merge_tree_partition_mutex); auto zk = storage.getZooKeeper(); @@ -147,6 +152,9 @@ void ExportPartitionTaskScheduler::run() continue; } + /// shuffle the parts to reduce the risk of lock collisions + std::shuffle(parts_in_processing_or_pending.begin(), parts_in_processing_or_pending.end(), rng); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector locked_parts; @@ -200,9 +208,11 @@ void ExportPartitionTaskScheduler::run() { handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); }); + + scheduled_exports_count++; - storage.background_moves_assignee.scheduleMoveTask( - std::make_shared(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest))); + part_export_manifest.task = std::make_shared(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest)); + storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task); } } } diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index 533eeb6decdd..0214dd4d18cb 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -11,7 +11,7 @@ namespace DB class Exception; -class ExportPartTask; +class IExecutableTask; struct MergeTreePartExportManifest { @@ -73,7 +73,7 @@ struct MergeTreePartExportManifest time_t create_time; mutable bool in_progress = false; - mutable std::shared_ptr task = nullptr; + mutable std::shared_ptr task = nullptr; bool operator<(const MergeTreePartExportManifest & rhs) const { From 64accc3add2db6fe053745bb14783481d4b4ca5d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 10 Dec 2025 10:11:59 -0300 Subject: [PATCH 08/14] add setting to control locking behavior (inside/outside task), deny export request in case pool is full --- src/Core/Settings.cpp | 4 + .../ExportPartFromPartitionExportTask.cpp | 14 +- .../ExportPartFromPartitionExportTask.h | 4 +- src/Storages/MergeTree/ExportPartTask.cpp | 28 ++- src/Storages/MergeTree/ExportPartTask.h | 4 +- .../ExportPartitionManifestUpdatingTask.cpp | 7 + .../ExportPartitionTaskScheduler.cpp | 194 ++++++++++++------ src/Storages/MergeTree/MergeTreeData.cpp | 70 +++---- .../MergeTree/MergeTreePartExportManifest.h | 8 +- .../test.py | 6 + 10 files changed, 204 insertions(+), 135 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 30728305319c..ee5d8c8aa687 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6899,6 +6899,10 @@ Possible values: - `` (empty value) - use session timezone Default value is `UTC`. +)", 0) \ + DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( +Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. +On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp index bf874c1010cc..90aef3bf00bf 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp @@ -13,14 +13,12 @@ namespace DB ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( StorageReplicatedMergeTree & storage_, const std::string & key_, - const MergeTreePartExportManifest & manifest_, - ContextPtr context_) + const MergeTreePartExportManifest & manifest_) : storage(storage_), key(key_), - manifest(manifest_), - local_context(context_) + manifest(manifest_) { - export_part_task = std::make_shared(storage, manifest, local_context); + export_part_task = std::make_shared(storage, manifest); } bool ExportPartFromPartitionExportTask::executeStep() @@ -28,18 +26,18 @@ bool ExportPartFromPartitionExportTask::executeStep() const auto zk = storage.getZooKeeper(); const auto part_name = manifest.data_part->name; - LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", part_name); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) { - LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", part_name); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); export_part_task->executeStep(); return false; } - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", part_name); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); return false; } diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h index ed48d97f7181..5caaf20b2038 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h @@ -14,8 +14,7 @@ class ExportPartFromPartitionExportTask : public IExecutableTask explicit ExportPartFromPartitionExportTask( StorageReplicatedMergeTree & storage_, const std::string & key_, - const MergeTreePartExportManifest & manifest_, - ContextPtr context_); + const MergeTreePartExportManifest & manifest_); bool executeStep() override; void onCompleted() override; StorageID getStorageID() const override; @@ -28,7 +27,6 @@ class ExportPartFromPartitionExportTask : public IExecutableTask StorageReplicatedMergeTree & storage; std::string key; MergeTreePartExportManifest manifest; - ContextPtr local_context; std::shared_ptr export_part_task; }; diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index d0b1f546faf2..55d71071450f 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -38,10 +39,9 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; } -ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_) +ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) : storage(storage_), - manifest(manifest_), - local_context(context_) + manifest(manifest_) { } @@ -52,6 +52,12 @@ const MergeTreePartExportManifest & ExportPartTask::getManifest() const bool ExportPartTask::executeStep() { + auto local_context = Context::createCopy(storage.getContext()); + local_context->makeQueryContextForExportPart(); + local_context->setCurrentQueryId(manifest.transaction_id); + local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); + local_context->setSettings(manifest.settings); + const auto & metadata_snapshot = manifest.metadata_snapshot; Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -96,7 +102,7 @@ bool ExportPartTask::executeStep() block_with_partition_values, (*exports_list_entry)->destination_file_path, manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, - manifest.format_settings, + getFormatSettings(local_context), local_context); } catch (const Exception & e) @@ -131,10 +137,21 @@ bool ExportPartTask::executeStep() } } - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_INFO(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message()); ProfileEvents::increment(ProfileEvents::PartsExportFailures); + storage.writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + std::lock_guard inner_lock(storage.export_manifests_mutex); storage.export_manifests.erase(manifest); @@ -264,6 +281,7 @@ bool ExportPartTask::executeStep() void ExportPartTask::cancel() noexcept { + LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name); cancel_requested.store(true); pipeline.cancel(); } diff --git a/src/Storages/MergeTree/ExportPartTask.h b/src/Storages/MergeTree/ExportPartTask.h index 8b93b376df4b..a3f1635c4902 100644 --- a/src/Storages/MergeTree/ExportPartTask.h +++ b/src/Storages/MergeTree/ExportPartTask.h @@ -12,8 +12,7 @@ class ExportPartTask : public IExecutableTask public: explicit ExportPartTask( MergeTreeData & storage_, - const MergeTreePartExportManifest & manifest_, - ContextPtr context_); + const MergeTreePartExportManifest & manifest_); bool executeStep() override; void onCompleted() override; StorageID getStorageID() const override; @@ -26,7 +25,6 @@ class ExportPartTask : public IExecutableTask private: MergeTreeData & storage; MergeTreePartExportManifest manifest; - ContextPtr local_context; QueryPipeline pipeline; std::atomic cancel_requested = false; diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 14b376c700db..000fd1131d21 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -100,6 +100,12 @@ std::vector ExportPartitionManifestUpdatingTask:: { std::vector infos; const auto zk = storage.getZooKeeper(); + + if (!zk->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ)) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MULTI_READ feature flag is not enabled"); + } + const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); @@ -657,6 +663,7 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() { try { + LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key); storage.killExportPart(it->manifest.transaction_id); } catch (...) diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index d9f864e89652..a35b741792af 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -30,6 +30,7 @@ namespace DB namespace Setting { extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; + extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; } namespace ErrorCodes @@ -190,29 +191,83 @@ void ExportPartitionTaskScheduler::run() LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name); - std::lock_guard part_export_lock(storage.export_manifests_mutex); - auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest); - const auto format_settings = getFormatSettings(context); - - MergeTreePartExportManifest part_export_manifest( - destination_storage->getStorageID(), - part, - manifest.transaction_id, - context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - format_settings, - storage.getInMemoryMetadataPtr(), - [this, key, zk_part_name, manifest, destination_storage] - (MergeTreePartExportManifest::CompletionCallbackResult result) + /// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly. + /// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being + /// recreated with a new schema before the export task is scheduled. + if (context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task"); + std::lock_guard part_export_lock(storage.export_manifests_mutex); + + MergeTreePartExportManifest part_export_manifest( + destination_storage->getStorageID(), + part, + manifest.transaction_id, + context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, + context->getSettingsCopy(), + storage.getInMemoryMetadataPtr(), + [this, key, zk_part_name, manifest, destination_storage] + (MergeTreePartExportManifest::CompletionCallbackResult result) + { + handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); + }); + + part_export_manifest.task = std::make_shared(storage, key, part_export_manifest); + + /// todo arthur this might conflict with the standalone export part. what to do in this case? + if (!storage.export_manifests.emplace(part_export_manifest).second) { - handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); - }); - - scheduled_exports_count++; + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is already being exported, skipping", zk_part_name); + continue; + } - part_export_manifest.task = std::make_shared(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest)); - storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task); + if (!storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task)) + { + storage.export_manifests.erase(part_export_manifest); + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to schedule export part task, skipping"); + return; + } + } + else + { + try + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); + + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); + continue; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); + + storage.exportPartToTable( + part->name, + destination_storage_id, + manifest.transaction_id, + getContextCopyWithTaskSettings(storage.getContext(), manifest), + [this, key, zk_part_name, manifest, destination_storage] + (MergeTreePartExportManifest::CompletionCallbackResult result) + { + handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); + }); + } + catch (const Exception &) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); + /// we should not increment retry_count because the node might just be full + } + } + + scheduled_exports_count++; } } } @@ -279,6 +334,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( size_t max_retries ) { + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name); + if (!exception) { throw Exception(ErrorCodes::LOGICAL_ERROR, "ExportPartition scheduler task: No exception provided for error handling. Sounds like a bug"); @@ -327,61 +384,68 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( processing_part_entry.retry_count++; - if (processing_part_entry.retry_count) + ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); + + LOG_INFO(storage.log, "ExportPartition scheduler task: Updating processing part entry for part {}, retry count: {}, max retries: {}", part_name, processing_part_entry.retry_count, max_retries); + + if (processing_part_entry.retry_count >= max_retries) { - ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); + /// just set status in processing_part_path and finished_by + processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; + processing_part_entry.finished_by = storage.replica_name; - if (processing_part_entry.retry_count >= max_retries) - { - /// just set status in processing_part_path and finished_by - processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; - processing_part_entry.finished_by = storage.replica_name; + ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); + LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); + } + else + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name); + } - ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); - LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); - } + std::size_t num_exceptions = 0; - std::size_t num_exceptions = 0; - - const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / storage.replica_name; - const auto count_path = exceptions_per_replica_path / "count"; - const auto last_exception_path = exceptions_per_replica_path / "last_exception"; + const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / storage.replica_name; + const auto count_path = exceptions_per_replica_path / "count"; + const auto last_exception_path = exceptions_per_replica_path / "last_exception"; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); + if (zk->exists(exceptions_per_replica_path)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path exists, no need to create it"); + std::string num_exceptions_string; + zk->tryGet(count_path, num_exceptions_string); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); - if (zk->exists(exceptions_per_replica_path)) - { - std::string num_exceptions_string; - zk->tryGet(count_path, num_exceptions_string); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - num_exceptions = std::stoull(num_exceptions_string.c_str()); - - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1)); - } - else - { - ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(count_path, "0", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent)); - } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + num_exceptions = std::stoull(num_exceptions_string.c_str()); + + ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); + ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1)); + } + else + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path does not exist, will create it"); + ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(count_path, "0", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent)); + } - num_exceptions++; - ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); + num_exceptions++; + ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - Coordination::Responses responses; - if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: All failure mechanism failed, will not try to update it"); - return; - } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + Coordination::Responses responses; + if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: All failure mechanism failed, will not try to update it"); + return; } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Successfully updated exception counters for part {}", part_name); } bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 6f1c59239b93..325babff6ab4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6255,26 +6255,32 @@ void MergeTreeData::exportPartToTable( part_name, getStorageID().getFullTableName()); { - const auto format_settings = getFormatSettings(query_context); MergeTreePartExportManifest manifest( dest_storage->getStorageID(), part, transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - format_settings, + query_context->getSettingsCopy(), source_metadata_ptr, completion_callback); std::lock_guard lock(export_manifests_mutex); - if (!export_manifests.emplace(std::move(manifest)).second) + manifest.task = std::make_shared(*this, manifest); + + if (!export_manifests.emplace(manifest).second) { throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", + part_name, dest_storage->getStorageID().getFullTableName()); + } + + if (!background_moves_assignee.scheduleMoveTask(manifest.task)) + { + export_manifests.erase(manifest); + throw Exception(ErrorCodes::ABORTED, "Failed to schedule export part task for data part '{}' to table '{}'. Background executor is busy", part_name, dest_storage->getStorageID().getFullTableName()); } } - - background_moves_assignee.trigger(); } void MergeTreeData::killExportPart(const String & transaction_id) @@ -9107,51 +9113,21 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) { - if (!parts_mover.moves_blocker.isCancelled()) - { - auto moving_tagger = selectPartsForMove(); - if (!moving_tagger->parts_to_move.empty()) - { - assignee.scheduleMoveTask(std::make_shared( - [this, moving_tagger] () mutable - { - ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); - WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; - }, moves_assignee_trigger, getStorageID())); - return true; - } - } - - std::lock_guard lock(export_manifests_mutex); - - for (auto & manifest : export_manifests) - { - if (manifest.in_progress) - { - continue; - } - - auto context_copy = Context::createCopy(getContext()); - context_copy->makeQueryContextForExportPart(); - context_copy->setCurrentQueryId(manifest.transaction_id); - context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); - - auto task = std::make_shared(*this, manifest, context_copy); + if (parts_mover.moves_blocker.isCancelled()) + return false; - manifest.in_progress = assignee.scheduleMoveTask(task); + auto moving_tagger = selectPartsForMove(); + if (moving_tagger->parts_to_move.empty()) + return false; - if (!manifest.in_progress) + assignee.scheduleMoveTask(std::make_shared( + [this, moving_tagger] () mutable { - continue; - } - - manifest.task = task; - - return true; - } - - return false; + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); + WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + }, moves_assignee_trigger, getStorageID())); + return true; } bool MergeTreeData::areBackgroundMovesNeeded() const diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index 0214dd4d18cb..5deced97ab82 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -46,14 +47,14 @@ struct MergeTreePartExportManifest const DataPartPtr & data_part_, const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, - const FormatSettings & format_settings_, + const Settings & settings_, const StorageMetadataPtr & metadata_snapshot_, std::function completion_callback_ = {}) : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), - format_settings(format_settings_), + settings(settings_), metadata_snapshot(metadata_snapshot_), completion_callback(completion_callback_), create_time(time(nullptr)) {} @@ -63,7 +64,7 @@ struct MergeTreePartExportManifest /// Used for killing the export. String transaction_id; FileAlreadyExistsPolicy file_already_exists_policy; - FormatSettings format_settings; + Settings settings; /// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations /// Otherwise the export could fail if the schema changes between validation and execution @@ -72,7 +73,6 @@ struct MergeTreePartExportManifest std::function completion_callback; time_t create_time; - mutable bool in_progress = false; mutable std::shared_ptr task = nullptr; bool operator<(const MergeTreePartExportManifest & rhs) const diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index a4cb0807d6ee..39f9c2ad757f 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -77,6 +77,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) cluster.add_instance( "replica2", @@ -85,6 +86,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) # node that does not participate in the export, but will have visibility over the s3 table cluster.add_instance( @@ -100,6 +102,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) logging.info("Starting cluster...") cluster.start() @@ -253,6 +256,9 @@ def test_kill_export(cluster): assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'KILLED\n', "Partition 2020 was not killed as expected" assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" + # check the data did not land on s3 + assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '0\n', "Partition 2020 was written to S3, it was not killed as expected" + def test_drop_source_table_during_export(cluster): node = cluster.instances["replica1"] From 3817e50632d9677b97daf795ada86796e48159b1 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 11 Dec 2025 08:39:42 -0300 Subject: [PATCH 09/14] clear part references from partition task when status changes, properly preserve lock inside task setting --- src/Storages/ExportReplicatedMergeTreePartitionManifest.h | 7 +++++++ src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h | 2 +- .../MergeTree/ExportPartitionManifestUpdatingTask.cpp | 6 ++++++ src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp | 3 +-- src/Storages/StorageReplicatedMergeTree.cpp | 3 +++ 5 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 81f61b5b9f12..89c76813665e 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -109,6 +109,7 @@ struct ExportReplicatedMergeTreePartitionManifest bool parallel_formatting; bool parquet_parallel_encoding; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; + bool lock_inside_the_task; /// todo temporary std::string toJsonString() const { @@ -131,6 +132,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("create_time", create_time); json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); + json.set("lock_inside_the_task", lock_inside_the_task); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -172,6 +174,11 @@ struct ExportReplicatedMergeTreePartitionManifest /// what to do if it's not a valid value? } + if (json->has("lock_inside_the_task")) + { + manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); + } + return manifest; } }; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 76674bfc4a92..e62f7de99bed 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -30,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// This is used to prevent the parts from being deleted before finishing the export operation /// It does not mean this replica will export all the parts /// There is also a chance this replica does not contain a given part and it is totally ok. - std::vector part_references; + mutable std::vector part_references; std::string getCompositeKey() const { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 000fd1131d21..b491b0e937b4 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -673,6 +673,12 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() } it->status = *new_status; + + if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + /// we no longer need to keep the data parts alive + it->part_references.clear(); + } } } diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index a35b741792af..52827d1b6e02 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -30,7 +30,6 @@ namespace DB namespace Setting { extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; - extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; } namespace ErrorCodes @@ -196,7 +195,7 @@ void ExportPartitionTaskScheduler::run() /// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly. /// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being /// recreated with a new schema before the export task is scheduled. - if (context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]) + if (manifest.lock_inside_the_task) { LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task"); std::lock_guard part_export_lock(storage.export_manifests_mutex); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6f596e0a84f9..3e3427eca49b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -209,6 +209,7 @@ namespace Setting extern const SettingsBool output_format_parquet_parallel_encoding; extern const SettingsMaxThreads max_threads; extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; + extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; } namespace MergeTreeSetting @@ -8131,6 +8132,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; + manifest.lock_inside_the_task = query_context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]; + manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; From 85e14f9af751efbe8433ac43f6b52c2d9ea202b2 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 11 Dec 2025 10:39:26 -0300 Subject: [PATCH 10/14] settings history --- src/Core/SettingsChangesHistory.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3e3f5e3f7608..ffbdb9b84319 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -55,7 +55,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, - {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, + {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."} }); addSettingsChanges(settings_changes_history, "25.8", { From 0d183bf1131a1b5fe33ff86aeb1d85530f11e784 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 11 Dec 2025 14:19:55 -0300 Subject: [PATCH 11/14] improvements --- ...xportReplicatedMergeTreePartitionManifest.h | 18 +++++------------- .../ExportPartFromPartitionExportTask.h | 3 +++ .../MergeTree/ExportPartitionTaskScheduler.cpp | 2 +- .../MergeTree/MergeTreePartExportManifest.h | 1 + .../test.py | 3 +++ 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 89c76813665e..774edf23d155 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -162,22 +162,14 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.max_threads = json->getValue("max_threads"); manifest.parallel_formatting = json->getValue("parallel_formatting"); manifest.parquet_parallel_encoding = json->getValue("parquet_parallel_encoding"); - - if (json->has("file_already_exists_policy")) + const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); + /// todo what to do if it's not a valid value? + if (file_already_exists_policy) { - const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); - if (file_already_exists_policy) - { - manifest.file_already_exists_policy = file_already_exists_policy.value(); - } - - /// what to do if it's not a valid value? + manifest.file_already_exists_policy = file_already_exists_policy.value(); } - if (json->has("lock_inside_the_task")) - { - manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); - } + manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); return manifest; } diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h index 5caaf20b2038..e170b22b470d 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h @@ -8,6 +8,9 @@ namespace DB { +/* + Decorator around the ExportPartTask to lock the part inside the task +*/ class ExportPartFromPartitionExportTask : public IExecutableTask { public: diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 52827d1b6e02..020ef0ffe53a 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -131,7 +131,7 @@ void ExportPartitionTaskScheduler::run() if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) { entry.status = status_in_zk.value(); - LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status from zk is {}", entry.status); + LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(entry.status).data()); continue; } diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index 5deced97ab82..7805f4d7a49e 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -73,6 +73,7 @@ struct MergeTreePartExportManifest std::function completion_callback; time_t create_time; + /// Required to cancel export tasks mutable std::shared_ptr task = nullptr; bool operator<(const MergeTreePartExportManifest & rhs) const diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 39f9c2ad757f..b63809e9d11a 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -245,6 +245,9 @@ def test_kill_export(cluster): # ZooKeeper operations (KILL) proceed quickly since only S3 is blocked node.query(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") + # sleep for a while to let the kill to be processed + time.sleep(2) + # wait for 2021 to finish wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") From 651d6e46c7b7d6a5df562d4f17832a301a251d9e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 12 Dec 2025 09:47:35 -0300 Subject: [PATCH 12/14] implement local query to system replicated partition exports --- src/Core/Settings.cpp | 4 +++ src/Core/SettingsChangesHistory.cpp | 3 +- .../ExportPartitionManifestUpdatingTask.cpp | 32 ++++++++++++++++--- .../ExportPartitionManifestUpdatingTask.h | 2 ++ src/Storages/StorageReplicatedMergeTree.cpp | 9 ++++-- src/Storages/StorageReplicatedMergeTree.h | 2 +- src/Storages/System/StorageSystemExports.cpp | 1 - ...torageSystemReplicatedPartitionExports.cpp | 8 ++++- .../test.py | 15 ++++++--- 9 files changed, 60 insertions(+), 16 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f165f3c4eaf1..3b124b04ccce 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6903,6 +6903,10 @@ Default value is `UTC`. DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. +)", 0) \ + DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"( +Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. +Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ffbdb9b84319..22c227dbf58d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -56,7 +56,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, - {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."} + {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, + {"export_merge_tree_partition_system_table_use_local_information", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index b491b0e937b4..bac00e16d284 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -101,11 +101,6 @@ std::vector ExportPartitionManifestUpdatingTask:: std::vector infos; const auto zk = storage.getZooKeeper(); - if (!zk->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ)) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MULTI_READ feature flag is not enabled"); - } - const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); @@ -439,6 +434,33 @@ std::vector ExportPartitionManifestUpdatingTask:: return infos; } +std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfoLocal() const +{ + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + + std::vector infos; + + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + { + ReplicatedPartitionExportInfo info; + + info.destination_database = entry.manifest.destination_database; + info.destination_table = entry.manifest.destination_table; + info.partition_id = entry.manifest.partition_id; + info.transaction_id = entry.manifest.transaction_id; + info.create_time = entry.manifest.create_time; + info.source_replica = entry.manifest.source_replica; + info.parts_count = entry.manifest.number_of_parts; + info.parts_to_do = entry.manifest.parts.size(); + info.parts = entry.manifest.parts; + info.status = magic_enum::enum_name(entry.status); + + infos.emplace_back(std::move(info)); + } + + return infos; +} + void ExportPartitionManifestUpdatingTask::poll() { std::lock_guard lock(storage.export_merge_tree_partition_mutex); diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index ee48d52bc98f..99078e486cb3 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -24,6 +24,8 @@ class ExportPartitionManifestUpdatingTask std::vector getPartitionExportsInfo() const; + std::vector getPartitionExportsInfoLocal() const; + private: StorageReplicatedMergeTree & storage; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3e3427eca49b..c7e926228644 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4459,9 +4459,14 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() } } -std::vector StorageReplicatedMergeTree::getPartitionExportsInfo() const +std::vector StorageReplicatedMergeTree::getPartitionExportsInfo(bool prefer_remote_information) const { - return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); + if (prefer_remote_information && getZooKeeper()->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ)) + { + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); + } + + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfoLocal(); } StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 679b70f73ba8..df00da87ec4f 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -377,7 +377,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData using ShutdownDeadline = std::chrono::time_point; void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline); - std::vector getPartitionExportsInfo() const; + std::vector getPartitionExportsInfo(bool prefer_remote_information) const; private: std::atomic_bool are_restoring_replica {false}; diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index bd56a40c3a68..b5abb77e4eb1 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -8,7 +8,6 @@ #include #include - namespace DB { diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 018f0c8ffac7..343fd72afb6e 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -10,11 +10,17 @@ #include #include "Columns/ColumnString.h" #include "Storages/VirtualColumnUtils.h" +#include namespace DB { +namespace Setting +{ + extern const SettingsBool export_merge_tree_partition_system_table_prefer_remote_information; +} + ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescription() { return ColumnsDescription @@ -110,7 +116,7 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu { const IStorage * storage = replicated_merge_tree_tables[database][table].get(); if (const auto * replicated_merge_tree = dynamic_cast(storage)) - partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(); + partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(context->getSettingsRef()[Setting::export_merge_tree_partition_system_table_prefer_remote_information]); } for (const ReplicatedPartitionExportInfo & info : partition_exports_info) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index b63809e9d11a..779bc773baed 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -197,13 +197,16 @@ def test_restart_nodes_during_export(cluster): assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2021") != f'0\n', "Export of partition 2021 did not resume after crash" -def test_kill_export(cluster): +@pytest.mark.parametrize( + "system_table_prefer_remote_information", ['0', '1'] +) +def test_kill_export(cluster, system_table_prefer_remote_information): node = cluster.instances["replica1"] node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] - mt_table = "kill_export_mt_table" - s3_table = "kill_export_s3_table" + mt_table = f"kill_export_mt_table_{system_table_prefer_remote_information}" + s3_table = f"kill_export_s3_table_{system_table_prefer_remote_information}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -256,8 +259,8 @@ def test_kill_export(cluster): assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2021_*', format=LineAsString)") != f'0\n', "Partition 2021 was not written to S3, but it should have been" # check system.replicated_partition_exports for the export, status should be KILLED - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'KILLED\n', "Partition 2020 was not killed as expected" - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'KILLED\n', "Partition 2020 was not killed as expected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" # check the data did not land on s3 assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '0\n', "Partition 2020 was written to S3, it was not killed as expected" @@ -396,6 +399,7 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' + SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) @@ -407,6 +411,7 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' + SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) assert int(exception_count.strip()) > 0, "Expected non-zero exception_count in system.replicated_partition_exports" From 7a1e7418ac31b5b3a05d02bb111d1c1dcd28183e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Dec 2025 09:10:41 -0300 Subject: [PATCH 13/14] address comments --- src/Storages/MergeTree/ExportPartTask.cpp | 2 +- .../MergeTree/ExportPartitionManifestUpdatingTask.cpp | 7 ++++--- src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp | 6 ++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 55d71071450f..f8bc04f7cf16 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -137,7 +137,7 @@ bool ExportPartTask::executeStep() } } - LOG_INFO(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message()); + LOG_WARNING(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message()); ProfileEvents::increment(ProfileEvents::PartsExportFailures); diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index bac00e16d284..a1ea6feb3fd8 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -401,7 +401,7 @@ std::vector ExportPartitionManifestUpdatingTask:: { if (!count_str.empty()) { - exception_count += std::stoull(count_str); + exception_count += parse(count_str); } if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) { @@ -478,10 +478,11 @@ void ExportPartitionManifestUpdatingTask::poll() LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries"); } - Coordination::Stat stat; - const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildrenWatch); + + Coordination::Stat stat; + const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); const std::unordered_set zk_children(children.begin(), children.end()); const auto now = time(nullptr); diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 020ef0ffe53a..af916eb570b7 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -74,7 +74,7 @@ void ExportPartitionTaskScheduler::run() std::size_t scheduled_exports_count = 0; const uint32_t seed = uint32_t(std::hash{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count); - std::mt19937 rng(seed); + pcg64_fast rng(seed); std::lock_guard lock(storage.export_merge_tree_partition_mutex); @@ -261,6 +261,8 @@ void ExportPartitionTaskScheduler::run() catch (const Exception &) { tryLogCurrentException(__PRETTY_FUNCTION__); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); /// we should not increment retry_count because the node might just be full } @@ -417,7 +419,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( zk->tryGet(count_path, num_exceptions_string); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - num_exceptions = std::stoull(num_exceptions_string.c_str()); + num_exceptions = parse(num_exceptions_string); ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1)); From 1962e1917b7b9e0938790f2667da2f15c00c5190 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Dec 2025 09:51:50 -0300 Subject: [PATCH 14/14] glitch --- src/Core/SettingsChangesHistory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 22c227dbf58d..28d0bc114f4c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -57,7 +57,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, - {"export_merge_tree_partition_system_table_use_local_information", false, false, "New setting."}, + {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", {