diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 20c54ddf7cef..98fa1da683d3 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -265,6 +265,18 @@ M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.", ValueType::Bytes) \ M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.", ValueType::Bytes) \ \ + M(ExportPartitionZooKeeperRequests, "Total number of ZooKeeper requests made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGet, "Number of 'get' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetChildren, "Number of 'getChildren' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetChildrenWatch, "Number of 'getChildrenWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperGetWatch, "Number of 'getWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperCreate, "Number of 'create' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperSet, "Number of 'set' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperRemove, "Number of 'remove' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperRemoveRecursive, "Number of 'removeRecursive' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ + \ M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \ M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \ M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.", ValueType::Number) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 2042e998ec31..3b124b04ccce 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6899,6 +6899,14 @@ Possible values: - `` (empty value) - use session timezone Default value is `UTC`. +)", 0) \ + DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( +Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. +On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. +)", 0) \ + DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"( +Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. +Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3e3f5e3f7608..28d0bc114f4c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, - {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, + {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, + {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 81f61b5b9f12..774edf23d155 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -109,6 +109,7 @@ struct ExportReplicatedMergeTreePartitionManifest bool parallel_formatting; bool parquet_parallel_encoding; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; + bool lock_inside_the_task; /// todo temporary std::string toJsonString() const { @@ -131,6 +132,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("create_time", create_time); json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); + json.set("lock_inside_the_task", lock_inside_the_task); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -160,18 +162,15 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.max_threads = json->getValue("max_threads"); manifest.parallel_formatting = json->getValue("parallel_formatting"); manifest.parquet_parallel_encoding = json->getValue("parquet_parallel_encoding"); - - if (json->has("file_already_exists_policy")) + const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); + /// todo what to do if it's not a valid value? + if (file_already_exists_policy) { - const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); - if (file_already_exists_policy) - { - manifest.file_already_exists_policy = file_already_exists_policy.value(); - } - - /// what to do if it's not a valid value? + manifest.file_already_exists_policy = file_already_exists_policy.value(); } + manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); + return manifest; } }; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 76674bfc4a92..e62f7de99bed 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -30,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// This is used to prevent the parts from being deleted before finishing the export operation /// It does not mean this replica will export all the parts /// There is also a chance this replica does not contain a given part and it is totally ok. - std::vector part_references; + mutable std::vector part_references; std::string getCompositeKey() const { diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp index 90244a3d6a35..1b1eb48a6b2d 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -95,6 +95,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b return schedule_res; } +std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const +{ + return getContext()->getMovesExecutor()->getAvailableSlots(); +} String BackgroundJobsAssignee::toString(Type type) { diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h index a488ea23a871..b237565f8de1 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext bool scheduleMoveTask(ExecutableTaskPtr move_task); bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger); + std::size_t getAvailableMoveExecutors() const; + /// Just call finish ~BackgroundJobsAssignee(); diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp new file mode 100644 index 000000000000..90aef3bf00bf --- /dev/null +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp @@ -0,0 +1,68 @@ +#include +#include + +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; +} +namespace DB +{ + +ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( + StorageReplicatedMergeTree & storage_, + const std::string & key_, + const MergeTreePartExportManifest & manifest_) + : storage(storage_), + key(key_), + manifest(manifest_) +{ + export_part_task = std::make_shared(storage, manifest); +} + +bool ExportPartFromPartitionExportTask::executeStep() +{ + const auto zk = storage.getZooKeeper(); + const auto part_name = manifest.data_part->name; + + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); + export_part_task->executeStep(); + return false; + } + + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); + return false; +} + +void ExportPartFromPartitionExportTask::cancel() noexcept +{ + export_part_task->cancel(); +} + +void ExportPartFromPartitionExportTask::onCompleted() +{ + export_part_task->onCompleted(); +} + +StorageID ExportPartFromPartitionExportTask::getStorageID() const +{ + return export_part_task->getStorageID(); +} + +Priority ExportPartFromPartitionExportTask::getPriority() const +{ + return export_part_task->getPriority(); +} + +String ExportPartFromPartitionExportTask::getQueryId() const +{ + return export_part_task->getQueryId(); +} +} diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h new file mode 100644 index 000000000000..e170b22b470d --- /dev/null +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +/* + Decorator around the ExportPartTask to lock the part inside the task +*/ +class ExportPartFromPartitionExportTask : public IExecutableTask +{ +public: + explicit ExportPartFromPartitionExportTask( + StorageReplicatedMergeTree & storage_, + const std::string & key_, + const MergeTreePartExportManifest & manifest_); + bool executeStep() override; + void onCompleted() override; + StorageID getStorageID() const override; + Priority getPriority() const override; + String getQueryId() const override; + + void cancel() noexcept override; + +private: + StorageReplicatedMergeTree & storage; + std::string key; + MergeTreePartExportManifest manifest; + std::shared_ptr export_part_task; +}; + +} diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index a43c45d0edaf..f8bc04f7cf16 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -38,15 +39,25 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; } -ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_) +ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) : storage(storage_), - manifest(manifest_), - local_context(context_) + manifest(manifest_) { } +const MergeTreePartExportManifest & ExportPartTask::getManifest() const +{ + return manifest; +} + bool ExportPartTask::executeStep() { + auto local_context = Context::createCopy(storage.getContext()); + local_context->makeQueryContextForExportPart(); + local_context->setCurrentQueryId(manifest.transaction_id); + local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); + local_context->setSettings(manifest.settings); + const auto & metadata_snapshot = manifest.metadata_snapshot; Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -91,7 +102,7 @@ bool ExportPartTask::executeStep() block_with_partition_values, (*exports_list_entry)->destination_file_path, manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, - manifest.format_settings, + getFormatSettings(local_context), local_context); } catch (const Exception & e) @@ -126,10 +137,21 @@ bool ExportPartTask::executeStep() } } - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_WARNING(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message()); ProfileEvents::increment(ProfileEvents::PartsExportFailures); + storage.writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + std::lock_guard inner_lock(storage.export_manifests_mutex); storage.export_manifests.erase(manifest); @@ -259,6 +281,7 @@ bool ExportPartTask::executeStep() void ExportPartTask::cancel() noexcept { + LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name); cancel_requested.store(true); pipeline.cancel(); } diff --git a/src/Storages/MergeTree/ExportPartTask.h b/src/Storages/MergeTree/ExportPartTask.h index bcec68b2b737..a3f1635c4902 100644 --- a/src/Storages/MergeTree/ExportPartTask.h +++ b/src/Storages/MergeTree/ExportPartTask.h @@ -12,20 +12,19 @@ class ExportPartTask : public IExecutableTask public: explicit ExportPartTask( MergeTreeData & storage_, - const MergeTreePartExportManifest & manifest_, - ContextPtr context_); + const MergeTreePartExportManifest & manifest_); bool executeStep() override; void onCompleted() override; StorageID getStorageID() const override; Priority getPriority() const override; String getQueryId() const override; + const MergeTreePartExportManifest & getManifest() const; void cancel() noexcept override; private: MergeTreeData & storage; MergeTreePartExportManifest manifest; - ContextPtr local_context; QueryPipeline pipeline; std::atomic cancel_requested = false; diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 79b92663b7bf..a1ea6feb3fd8 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -5,8 +5,20 @@ #include "Common/logger_useful.h" #include #include +#include #include +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperGetChildrenWatch; + extern const Event ExportPartitionZooKeeperGetWatch; + extern const Event ExportPartitionZooKeeperRemoveRecursive; + extern const Event ExportPartitionZooKeeperMulti; +} + namespace DB { namespace @@ -35,6 +47,8 @@ namespace if (has_expired && !is_pending) { zk->tryRemoveRecursive(fs::path(entry_path)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.erase(it); @@ -44,9 +58,12 @@ namespace } else if (is_pending) { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(entry_path) / "processing", parts_in_processing_or_pending)) { + LOG_INFO(log, "ExportPartition Manifest Updating Task: Failed to get parts in processing or pending, skipping"); return false; } @@ -79,6 +96,371 @@ ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(Storage { } +std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfo() const +{ + std::vector infos; + const auto zk = storage.getZooKeeper(); + + const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + + std::vector children; + if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) + { + LOG_INFO(storage.log, "Failed to get children from exports path, returning empty export info list"); + return infos; + } + + if (children.empty()) + return infos; + + /// Batch all metadata.json, status gets, and getChildren operations in a single multi request + Coordination::Requests requests; + requests.reserve(children.size() * 4); // metadata, status, processing, exceptions_per_replica + + // Track response indices for each child + struct ChildResponseIndices + { + size_t metadata_idx; + size_t status_idx; + size_t processing_idx; + size_t exceptions_per_replica_idx; + }; + std::vector response_indices; + response_indices.reserve(children.size()); + + for (const auto & child : children) + { + const auto export_partition_path = fs::path(exports_path) / child; + + ChildResponseIndices indices; + indices.metadata_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "metadata.json")); + + indices.status_idx = requests.size(); + requests.push_back(zkutil::makeGetRequest(export_partition_path / "status")); + + indices.processing_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "processing")); + + indices.exceptions_per_replica_idx = requests.size(); + requests.push_back(zkutil::makeListRequest(export_partition_path / "exceptions_per_replica")); + + response_indices.push_back(indices); + } + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + Coordination::Error code = zk->tryMulti(requests, responses); + + if (code != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Failed to execute multi request for export partition info, error: {}", code); + return infos; + } + + // Helper to extract GetResponse data + auto getGetResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + const auto * get_response = dynamic_cast(responses[idx].get()); + if (!get_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; + + return {get_response->error, get_response->data}; + }; + + // Helper to extract ListResponse data + auto getListResponseData = [&responses](size_t idx) -> std::pair + { + if (idx >= responses.size()) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + const auto * list_response = dynamic_cast(responses[idx].get()); + if (!list_response) + return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; + + return {list_response->error, list_response->names}; + }; + + // Create response wrappers matching the MultiTryGetResponse/MultiTryGetChildrenResponse interface + struct ResponseWrapper + { + Coordination::Error error; + std::string data; + Strings names; + + ResponseWrapper(Coordination::Error err, const std::string & d, const Strings & n) + : error(err), data(d), names(n) {} + }; + + std::vector metadata_responses_wrapper; + std::vector status_responses_wrapper; + std::vector processing_responses_wrapper; + std::vector exceptions_per_replica_responses_wrapper; + + metadata_responses_wrapper.reserve(children.size()); + status_responses_wrapper.reserve(children.size()); + processing_responses_wrapper.reserve(children.size()); + exceptions_per_replica_responses_wrapper.reserve(children.size()); + + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & indices = response_indices[child_idx]; + + // Extract metadata response + auto [metadata_error, metadata_data] = getGetResponseData(indices.metadata_idx); + metadata_responses_wrapper.emplace_back(metadata_error, metadata_data, Strings{}); + + // Extract status response + auto [status_error, status_data] = getGetResponseData(indices.status_idx); + status_responses_wrapper.emplace_back(status_error, status_data, Strings{}); + + // Extract processing response + auto [processing_error, processing_names] = getListResponseData(indices.processing_idx); + processing_responses_wrapper.emplace_back(processing_error, "", processing_names); + + // Extract exceptions_per_replica response + auto [exceptions_error, exceptions_names] = getListResponseData(indices.exceptions_per_replica_idx); + exceptions_per_replica_responses_wrapper.emplace_back(exceptions_error, "", exceptions_names); + } + + // Use wrapper vectors directly - they match the interface expected by the code below + auto & metadata_responses = metadata_responses_wrapper; + auto & status_responses = status_responses_wrapper; + auto & processing_responses = processing_responses_wrapper; + auto & exceptions_per_replica_responses = exceptions_per_replica_responses_wrapper; + + /// Collect all exception replica paths for batching + struct ExceptionReplicaPath + { + size_t child_idx; + std::string replica; + std::string count_path; + std::string exception_path; + std::string part_path; + }; + + std::vector exception_replica_paths; + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + const auto & child = children[child_idx]; + const auto export_partition_path = fs::path(exports_path) / child; + /// Check if we got valid responses + if (metadata_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing metadata.json", child); + continue; + } + if (status_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing status", child); + continue; + } + if (processing_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing processing parts", child); + continue; + } + if (exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Skipping {}: missing exceptions_per_replica", export_partition_path); + continue; + } + const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; + const auto & exception_replicas = exceptions_per_replica_responses[child_idx].names; + for (const auto & replica : exception_replicas) + { + const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; + exception_replica_paths.push_back({ + child_idx, + replica, + (exceptions_per_replica_path / replica / "count").string(), + (last_exception_path / "exception").string(), + (last_exception_path / "part").string() + }); + } + } + /// Batch get all exception data in a single multi request + std::map>> exception_data_by_child; + + if (!exception_replica_paths.empty()) + { + Coordination::Requests exception_requests; + exception_requests.reserve(exception_replica_paths.size() * 3); // count, exception, part for each + + // Track response indices for each exception replica path + struct ExceptionResponseIndices + { + size_t count_idx; + size_t exception_idx; + size_t part_idx; + }; + std::vector exception_response_indices; + exception_response_indices.reserve(exception_replica_paths.size()); + + for (const auto & erp : exception_replica_paths) + { + ExceptionResponseIndices indices; + indices.count_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.count_path)); + + indices.exception_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.exception_path)); + + indices.part_idx = exception_requests.size(); + exception_requests.push_back(zkutil::makeGetRequest(erp.part_path)); + + exception_response_indices.push_back(indices); + } + + // Execute single multi request for all exception data + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses exception_responses; + Coordination::Error exception_code = zk->tryMulti(exception_requests, exception_responses); + + if (exception_code != Coordination::Error::ZOK) + { + LOG_INFO(storage.log, "Failed to execute multi request for exception data, error: {}", exception_code); + } + else + { + // Parse exception responses + for (size_t exception_path_idx = 0; exception_path_idx < exception_replica_paths.size(); ++exception_path_idx) + { + const auto & erp = exception_replica_paths[exception_path_idx]; + const auto & indices = exception_response_indices[exception_path_idx]; + + std::string count_str; + std::string exception_str; + std::string part_str; + + // Extract count response + if (indices.count_idx < exception_responses.size()) + { + const auto * count_response = dynamic_cast(exception_responses[indices.count_idx].get()); + if (count_response && count_response->error == Coordination::Error::ZOK) + count_str = count_response->data; + } + + // Extract exception response + if (indices.exception_idx < exception_responses.size()) + { + const auto * exception_response = dynamic_cast(exception_responses[indices.exception_idx].get()); + if (exception_response && exception_response->error == Coordination::Error::ZOK) + exception_str = exception_response->data; + } + + // Extract part response + if (indices.part_idx < exception_responses.size()) + { + const auto * part_response = dynamic_cast(exception_responses[indices.part_idx].get()); + if (part_response && part_response->error == Coordination::Error::ZOK) + part_str = part_response->data; + } + + exception_data_by_child[erp.child_idx].emplace_back(erp.replica, count_str, exception_str, part_str); + } + } + } + + /// Build the result + for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) + { + /// Skip if we already determined this child is invalid + if (metadata_responses[child_idx].error != Coordination::Error::ZOK + || status_responses[child_idx].error != Coordination::Error::ZOK + || processing_responses[child_idx].error != Coordination::Error::ZOK + || exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) + { + continue; + } + + ReplicatedPartitionExportInfo info; + const auto metadata_json = metadata_responses[child_idx].data; + const auto status = status_responses[child_idx].data; + const auto processing_parts = processing_responses[child_idx].names; + const auto parts_to_do = processing_parts.size(); + std::string exception_replica; + std::string last_exception; + std::string exception_part; + std::size_t exception_count = 0; + /// Process exception data + auto exception_data_it = exception_data_by_child.find(child_idx); + if (exception_data_it != exception_data_by_child.end()) + { + for (const auto & [replica, count_str, exception_str, part_str] : exception_data_it->second) + { + if (!count_str.empty()) + { + exception_count += parse(count_str); + } + if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) + { + exception_replica = replica; + last_exception = exception_str; + exception_part = part_str; + } + } + } + + const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + + info.destination_database = metadata.destination_database; + info.destination_table = metadata.destination_table; + info.partition_id = metadata.partition_id; + info.transaction_id = metadata.transaction_id; + info.create_time = metadata.create_time; + info.source_replica = metadata.source_replica; + info.parts_count = metadata.number_of_parts; + info.parts_to_do = parts_to_do; + info.parts = metadata.parts; + info.status = status; + info.exception_replica = exception_replica; + info.last_exception = last_exception; + info.exception_part = exception_part; + info.exception_count = exception_count; + infos.emplace_back(std::move(info)); + } + + return infos; +} + +std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfoLocal() const +{ + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + + std::vector infos; + + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + { + ReplicatedPartitionExportInfo info; + + info.destination_database = entry.manifest.destination_database; + info.destination_table = entry.manifest.destination_table; + info.partition_id = entry.manifest.partition_id; + info.transaction_id = entry.manifest.transaction_id; + info.create_time = entry.manifest.create_time; + info.source_replica = entry.manifest.source_replica; + info.parts_count = entry.manifest.number_of_parts; + info.parts_to_do = entry.manifest.parts.size(); + info.parts = entry.manifest.parts; + info.status = magic_enum::enum_name(entry.status); + + infos.emplace_back(std::move(info)); + } + + return infos; +} + void ExportPartitionManifestUpdatingTask::poll() { std::lock_guard lock(storage.export_merge_tree_partition_mutex); @@ -96,6 +478,9 @@ void ExportPartitionManifestUpdatingTask::poll() LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries"); } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildrenWatch); + Coordination::Stat stat; const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); const std::unordered_set zk_children(children.begin(), children.end()); @@ -111,6 +496,8 @@ void ExportPartitionManifestUpdatingTask::poll() { const std::string entry_path = fs::path(exports_path) / key; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string metadata_json; if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json)) { @@ -144,6 +531,8 @@ void ExportPartitionManifestUpdatingTask::poll() } }); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); std::string status; if (!zk->tryGetWatch(fs::path(entry_path) / "status", status, nullptr, status_watch_callback)) { @@ -273,6 +662,8 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() if (it == storage.export_merge_tree_partition_task_entries_by_key.end()) continue; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); /// get new status from zk std::string new_status_string; if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string)) @@ -295,6 +686,7 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() { try { + LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key); storage.killExportPart(it->manifest.transaction_id); } catch (...) @@ -304,6 +696,12 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() } it->status = *new_status; + + if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + /// we no longer need to keep the data parts alive + it->part_references.clear(); + } } } diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index ea52f679d654..99078e486cb3 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -21,6 +22,10 @@ class ExportPartitionManifestUpdatingTask void addStatusChange(const std::string & key); + std::vector getPartitionExportsInfo() const; + + std::vector getPartitionExportsInfoLocal() const; + private: StorageReplicatedMergeTree & storage; diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index ab3a8ce361c7..af916eb570b7 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -4,13 +4,34 @@ #include #include #include +#include #include "Storages/MergeTree/ExportPartitionUtils.h" #include "Storages/MergeTree/MergeTreePartExportManifest.h" +#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h" +#include "Formats/FormatFactory.h" +#include + +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; + extern const Event ExportPartitionZooKeeperSet; + extern const Event ExportPartitionZooKeeperRemove; + extern const Event ExportPartitionZooKeeperMulti; + extern const Event ExportPartitionZooKeeperExists; +} namespace DB { +namespace Setting +{ + extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; +} + namespace ErrorCodes { extern const int QUERY_WAS_CANCELLED; @@ -39,6 +60,22 @@ ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMerg void ExportPartitionTaskScheduler::run() { + const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors(); + + /// this is subject to TOCTOU - but for now we choose to live with it. + if (available_move_executors == 0) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping"); + return; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Available move executors: {}", available_move_executors); + + std::size_t scheduled_exports_count = 0; + + const uint32_t seed = uint32_t(std::hash{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count); + pcg64_fast rng(seed); + std::lock_guard lock(storage.export_merge_tree_partition_mutex); auto zk = storage.getZooKeeper(); @@ -46,6 +83,12 @@ void ExportPartitionTaskScheduler::run() // Iterate sorted by create_time for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time) { + if (scheduled_exports_count >= available_move_executors) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping"); + break; + } + const auto & manifest = entry.manifest; const auto key = entry.getCompositeKey(); const auto database = storage.getContext()->resolveDatabase(manifest.destination_database); @@ -68,6 +111,8 @@ void ExportPartitionTaskScheduler::run() continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string status_in_zk_string; if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", status_in_zk_string)) { @@ -86,10 +131,12 @@ void ExportPartitionTaskScheduler::run() if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) { entry.status = status_in_zk.value(); - LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status from zk is {}", entry.status); + LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(entry.status).data()); continue; } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "processing", parts_in_processing_or_pending)) @@ -98,12 +145,18 @@ void ExportPartitionTaskScheduler::run() continue; } + if (parts_in_processing_or_pending.empty()) { LOG_INFO(storage.log, "ExportPartition scheduler task: No parts in processing or pending, skipping"); continue; } + /// shuffle the parts to reduce the risk of lock collisions + std::shuffle(parts_in_processing_or_pending.begin(), parts_in_processing_or_pending.end(), rng); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector locked_parts; if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "locks", locked_parts)) @@ -116,6 +169,12 @@ void ExportPartitionTaskScheduler::run() for (const auto & zk_part_name : parts_in_processing_or_pending) { + if (scheduled_exports_count >= available_move_executors) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping"); + break; + } + if (locked_parts_set.contains(zk_part_name)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked, skipping", zk_part_name); @@ -129,36 +188,89 @@ void ExportPartitionTaskScheduler::run() continue; } - if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); - continue; - } + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name); - try + auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest); + + /// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly. + /// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being + /// recreated with a new schema before the export task is scheduled. + if (manifest.lock_inside_the_task) { - storage.exportPartToTable( - part->name, - destination_storage_id, + LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task"); + std::lock_guard part_export_lock(storage.export_manifests_mutex); + + MergeTreePartExportManifest part_export_manifest( + destination_storage->getStorageID(), + part, manifest.transaction_id, - getContextCopyWithTaskSettings(storage.getContext(), manifest), + context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, + context->getSettingsCopy(), + storage.getInMemoryMetadataPtr(), [this, key, zk_part_name, manifest, destination_storage] (MergeTreePartExportManifest::CompletionCallbackResult result) { handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); }); + + part_export_manifest.task = std::make_shared(storage, key, part_export_manifest); + + /// todo arthur this might conflict with the standalone export part. what to do in this case? + if (!storage.export_manifests.emplace(part_export_manifest).second) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is already being exported, skipping", zk_part_name); + continue; + } + + if (!storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task)) + { + storage.export_manifests.erase(part_export_manifest); + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to schedule export part task, skipping"); + return; + } } - catch (const Exception &) + else { - tryLogCurrentException(__PRETTY_FUNCTION__); - zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); - /// we should not increment retry_count because the node might just be full + try + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); + + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); + continue; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); + + storage.exportPartToTable( + part->name, + destination_storage_id, + manifest.transaction_id, + getContextCopyWithTaskSettings(storage.getContext(), manifest), + [this, key, zk_part_name, manifest, destination_storage] + (MergeTreePartExportManifest::CompletionCallbackResult result) + { + handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); + }); + } + catch (const Exception &) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); + zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); + /// we should not increment retry_count because the node might just be full + } } + + scheduled_exports_count++; } } - - /// maybe we failed to schedule or failed to export, need to retry eventually - storage.export_merge_tree_partition_select_task->scheduleAfter(1000 * 5); } void ExportPartitionTaskScheduler::handlePartExportCompletion( @@ -223,6 +335,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( size_t max_retries ) { + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name); + if (!exception) { throw Exception(ErrorCodes::LOGICAL_ERROR, "ExportPartition scheduler task: No exception provided for error handling. Sounds like a bug"); @@ -238,6 +352,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( Coordination::Stat locked_by_stat; std::string locked_by; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not increment error counts", part_name); @@ -254,6 +370,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( const auto processing_part_path = processing_parts_path / part_name; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); std::string processing_part_string; if (!zk->tryGet(processing_part_path, processing_part_string)) @@ -267,55 +385,68 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( processing_part_entry.retry_count++; - if (processing_part_entry.retry_count) - { - ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); + ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); - if (processing_part_entry.retry_count >= max_retries) - { - /// just set status in processing_part_path and finished_by - processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; - processing_part_entry.finished_by = storage.replica_name; + LOG_INFO(storage.log, "ExportPartition scheduler task: Updating processing part entry for part {}, retry count: {}, max retries: {}", part_name, processing_part_entry.retry_count, max_retries); - ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); - LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); - } + if (processing_part_entry.retry_count >= max_retries) + { + /// just set status in processing_part_path and finished_by + processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; + processing_part_entry.finished_by = storage.replica_name; - std::size_t num_exceptions = 0; - - const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / storage.replica_name; - const auto count_path = exceptions_per_replica_path / "count"; - const auto last_exception_path = exceptions_per_replica_path / "last_exception"; + ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); + LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); + } + else + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name); + } - if (zk->exists(exceptions_per_replica_path)) - { - std::string num_exceptions_string; - zk->tryGet(count_path, num_exceptions_string); - num_exceptions = std::stoull(num_exceptions_string.c_str()); + std::size_t num_exceptions = 0; - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1)); - } - else - { - ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(count_path, "0", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent)); - } + const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / storage.replica_name; + const auto count_path = exceptions_per_replica_path / "count"; + const auto last_exception_path = exceptions_per_replica_path / "last_exception"; - num_exceptions++; - ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); + if (zk->exists(exceptions_per_replica_path)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path exists, no need to create it"); + std::string num_exceptions_string; + zk->tryGet(count_path, num_exceptions_string); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + num_exceptions = parse(num_exceptions_string); + + ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); + ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1)); + } + else + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path does not exist, will create it"); + ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(count_path, "0", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent)); + } - Coordination::Responses responses; - if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: All failure mechanism failed, will not try to update it"); - return; - } + num_exceptions++; + ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + Coordination::Responses responses; + if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: All failure mechanism failed, will not try to update it"); + return; } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Successfully updated exception counters for part {}", part_name); } bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( @@ -330,6 +461,8 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( Coordination::Stat locked_by_stat; std::string locked_by; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not commit or set it as completed", part_name); @@ -355,9 +488,12 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( requests.emplace_back(zkutil::makeCreateRequest(processed_part_path, processed_part_entry.toJsonString(), zkutil::CreateMode::Persistent)); requests.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); Coordination::Responses responses; if (Coordination::Error::ZOK != zk->tryMulti(requests, responses)) { + /// todo arthur remember what to do here LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping"); return false; @@ -370,6 +506,8 @@ bool ExportPartitionTaskScheduler::areAllPartsProcessed( const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk) { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); Strings parts_in_processing_or_pending; if (Coordination::Error::ZOK != zk->tryGetChildren(export_path / "processing", parts_in_processing_or_pending)) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 466eb79e8367..d9c0430678d2 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -1,10 +1,19 @@ #include #include +#include #include #include "Storages/ExportReplicatedMergeTreePartitionManifest.h" #include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h" #include +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperSet; +} + namespace DB { @@ -23,6 +32,8 @@ namespace ExportPartitionUtils const auto processed_parts_path = fs::path(export_path) / "processed"; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); std::vector processed_parts; if (Coordination::Error::ZOK != zk->tryGetChildren(processed_parts_path, processed_parts)) { @@ -39,6 +50,8 @@ namespace ExportPartitionUtils } auto responses = zk->tryGet(get_paths); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, get_paths.size()); responses.waitForResponses(); @@ -81,6 +94,8 @@ namespace ExportPartitionUtils destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, context); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1)) { LOG_INFO(log, "ExportPartition: Marked export as completed"); diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index dab0c54729fb..463bf366075d 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -146,6 +146,12 @@ size_t MergeTreeBackgroundExecutor::getMaxTasksCount() const return max_tasks_count.load(std::memory_order_relaxed); } +template +size_t MergeTreeBackgroundExecutor::getAvailableSlots() const +{ + return getMaxTasksCount() - CurrentMetrics::values[metric].load(std::memory_order_relaxed); +} + template bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task) { diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index d5d55a61d731..0e57c161acdc 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -329,6 +329,8 @@ class MergeTreeBackgroundExecutor final : boost::noncopyable /// can lead only to some postponing, not logical error. size_t getMaxTasksCount() const; + size_t getAvailableSlots() const; + bool trySchedule(ExecutableTaskPtr task); void removeTasksCorrespondingToStorage(StorageID id); void wait(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 260508764f6d..71ee2896dd06 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6255,26 +6255,32 @@ void MergeTreeData::exportPartToTable( part_name, getStorageID().getFullTableName()); { - const auto format_settings = getFormatSettings(query_context); MergeTreePartExportManifest manifest( dest_storage->getStorageID(), part, transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - format_settings, + query_context->getSettingsCopy(), source_metadata_ptr, completion_callback); std::lock_guard lock(export_manifests_mutex); - if (!export_manifests.emplace(std::move(manifest)).second) + manifest.task = std::make_shared(*this, manifest); + + if (!export_manifests.emplace(manifest).second) { throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", + part_name, dest_storage->getStorageID().getFullTableName()); + } + + if (!background_moves_assignee.scheduleMoveTask(manifest.task)) + { + export_manifests.erase(manifest); + throw Exception(ErrorCodes::ABORTED, "Failed to schedule export part task for data part '{}' to table '{}'. Background executor is busy", part_name, dest_storage->getStorageID().getFullTableName()); } } - - background_moves_assignee.trigger(); } void MergeTreeData::killExportPart(const String & transaction_id) @@ -9109,51 +9115,21 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) { - if (!parts_mover.moves_blocker.isCancelled()) - { - auto moving_tagger = selectPartsForMove(); - if (!moving_tagger->parts_to_move.empty()) - { - assignee.scheduleMoveTask(std::make_shared( - [this, moving_tagger] () mutable - { - ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); - WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; - }, moves_assignee_trigger, getStorageID())); - return true; - } - } - - std::lock_guard lock(export_manifests_mutex); - - for (auto & manifest : export_manifests) - { - if (manifest.in_progress) - { - continue; - } - - auto context_copy = Context::createCopy(getContext()); - context_copy->makeQueryContextForExportPart(); - context_copy->setCurrentQueryId(manifest.transaction_id); - context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); - - auto task = std::make_shared(*this, manifest, context_copy); + if (parts_mover.moves_blocker.isCancelled()) + return false; - manifest.in_progress = assignee.scheduleMoveTask(task); + auto moving_tagger = selectPartsForMove(); + if (moving_tagger->parts_to_move.empty()) + return false; - if (!manifest.in_progress) + assignee.scheduleMoveTask(std::make_shared( + [this, moving_tagger] () mutable { - continue; - } - - manifest.task = task; - - return true; - } - - return false; + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); + WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + }, moves_assignee_trigger, getStorageID())); + return true; } bool MergeTreeData::areBackgroundMovesNeeded() const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 521bc7e50279..84c8a45dc900 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1359,6 +1359,7 @@ class MergeTreeData : public IStorage, public WithMutableContext friend class IPartMetadataManager; friend class IMergedBlockOutputStream; // for access to log friend class ExportPartTask; + friend class ExportPartFromPartitionExportTask; bool require_part_metadata; diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index 533eeb6decdd..7805f4d7a49e 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -5,13 +5,14 @@ #include #include #include +#include namespace DB { class Exception; -class ExportPartTask; +class IExecutableTask; struct MergeTreePartExportManifest { @@ -46,14 +47,14 @@ struct MergeTreePartExportManifest const DataPartPtr & data_part_, const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, - const FormatSettings & format_settings_, + const Settings & settings_, const StorageMetadataPtr & metadata_snapshot_, std::function completion_callback_ = {}) : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), - format_settings(format_settings_), + settings(settings_), metadata_snapshot(metadata_snapshot_), completion_callback(completion_callback_), create_time(time(nullptr)) {} @@ -63,7 +64,7 @@ struct MergeTreePartExportManifest /// Used for killing the export. String transaction_id; FileAlreadyExistsPolicy file_already_exists_policy; - FormatSettings format_settings; + Settings settings; /// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations /// Otherwise the export could fail if the schema changes between validation and execution @@ -72,8 +73,8 @@ struct MergeTreePartExportManifest std::function completion_callback; time_t create_time; - mutable bool in_progress = false; - mutable std::shared_ptr task = nullptr; + /// Required to cancel export tasks + mutable std::shared_ptr task = nullptr; bool operator<(const MergeTreePartExportManifest & rhs) const { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ec6c43ff775e..c7e926228644 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -153,6 +153,15 @@ namespace ProfileEvents extern const Event NotCreatedLogEntryForMutation; extern const Event ReplicaPartialShutdown; extern const Event ReplicatedCoveredPartsInZooKeeperOnStart; + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; + extern const Event ExportPartitionZooKeeperCreate; + extern const Event ExportPartitionZooKeeperSet; + extern const Event ExportPartitionZooKeeperRemove; + extern const Event ExportPartitionZooKeeperRemoveRecursive; + extern const Event ExportPartitionZooKeeperMulti; + extern const Event ExportPartitionZooKeeperExists; } namespace CurrentMetrics @@ -200,6 +209,7 @@ namespace Setting extern const SettingsBool output_format_parquet_parallel_encoding; extern const SettingsMaxThreads max_threads; extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; + extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; } namespace MergeTreeSetting @@ -4449,119 +4459,16 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() } } -std::vector StorageReplicatedMergeTree::getPartitionExportsInfo() const +std::vector StorageReplicatedMergeTree::getPartitionExportsInfo(bool prefer_remote_information) const { - std::vector infos; - - const auto zk = getZooKeeper(); - const auto exports_path = fs::path(zookeeper_path) / "exports"; - std::vector children; - if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) + if (prefer_remote_information && getZooKeeper()->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ)) { - LOG_INFO(log, "Failed to get children from exports path, returning empty export info list"); - return infos; + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); } - for (const auto & child : children) - { - ReplicatedPartitionExportInfo info; - - const auto export_partition_path = fs::path(exports_path) / child; - std::string metadata_json; - if (!zk->tryGet(export_partition_path / "metadata.json", metadata_json)) - { - LOG_INFO(log, "Skipping {}: missing metadata.json", child); - continue; - } - - std::string status; - if (!zk->tryGet(export_partition_path / "status", status)) - { - LOG_INFO(log, "Skipping {}: missing status", child); - continue; - } - - std::vector processing_parts; - if (Coordination::Error::ZOK != zk->tryGetChildren(export_partition_path / "processing", processing_parts)) - { - LOG_INFO(log, "Skipping {}: missing processing parts", child); - continue; - } - - const auto parts_to_do = processing_parts.size(); - - std::string exception_replica; - std::string last_exception; - std::string exception_part; - std::size_t exception_count = 0; - - const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; - - Strings exception_replicas; - if (Coordination::Error::ZOK != zk->tryGetChildren(exceptions_per_replica_path, exception_replicas)) - { - LOG_INFO(log, "Skipping {}: missing exceptions_per_replica", export_partition_path); - continue; - } - - for (const auto & replica : exception_replicas) - { - std::string exception_count_string; - if (!zk->tryGet(exceptions_per_replica_path / replica / "count", exception_count_string)) - { - LOG_INFO(log, "Skipping {}: missing count", replica); - continue; - } - - exception_count += std::stoull(exception_count_string.c_str()); - - if (last_exception.empty()) - { - const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; - std::string last_exception_string; - if (!zk->tryGet(last_exception_path / "exception", last_exception_string)) - { - LOG_INFO(log, "Skipping {}: missing last_exception/exception", last_exception_path); - continue; - } - - std::string exception_part_zk; - if (!zk->tryGet(last_exception_path / "part", exception_part_zk)) - { - LOG_INFO(log, "Skipping {}: missing exception part", last_exception_path); - continue; - } - - exception_replica = replica; - last_exception = last_exception_string; - exception_part = exception_part_zk; - } - } - - const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - - info.destination_database = metadata.destination_database; - info.destination_table = metadata.destination_table; - info.partition_id = metadata.partition_id; - info.transaction_id = metadata.transaction_id; - info.create_time = metadata.create_time; - info.source_replica = metadata.source_replica; - info.parts_count = metadata.number_of_parts; - info.parts_to_do = parts_to_do; - info.parts = metadata.parts; - info.status = status; - info.exception_replica = exception_replica; - info.last_exception = last_exception; - info.exception_part = exception_part; - info.exception_count = exception_count; - - infos.emplace_back(std::move(info)); - } - - return infos; + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfoLocal(); } - StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, @@ -8148,15 +8055,21 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & const auto partition_exports_path = fs::path(exports_path) / export_key; /// check if entry already exists + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zookeeper->exists(partition_exports_path)) { LOG_INFO(log, "Export with key {} is already exported or it is being exported. Checking if it has expired so that we can overwrite it", export_key); bool has_expired = false; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zookeeper->exists(fs::path(partition_exports_path) / "metadata.json")) { std::string metadata_json; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); if (zookeeper->tryGet(fs::path(partition_exports_path) / "metadata.json", metadata_json)) { const auto manifest = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); @@ -8183,6 +8096,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & /// Not putting in ops (same transaction) because we can't construct a "tryRemoveRecursive" request. /// It is possible that the zk being used does not support RemoveRecursive requests. /// It is ok for this to be non transactional. Worst case scenario an on-going export is going to be killed and a new task won't be scheduled. + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); zookeeper->tryRemoveRecursive(partition_exports_path); } @@ -8222,6 +8137,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; + manifest.lock_inside_the_task = query_context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]; + manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; @@ -8269,6 +8186,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & "PENDING", zkutil::CreateMode::Persistent)); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); Coordination::Responses responses; Coordination::Error code = zookeeper->tryMulti(ops, responses); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 712ba0ba4183..df00da87ec4f 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -377,7 +377,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData using ShutdownDeadline = std::chrono::time_point; void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline); - std::vector getPartitionExportsInfo() const; + std::vector getPartitionExportsInfo(bool prefer_remote_information) const; private: std::atomic_bool are_restoring_replica {false}; @@ -406,6 +406,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; + friend class ExportPartFromPartitionExportTask; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index bd56a40c3a68..b5abb77e4eb1 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -8,7 +8,6 @@ #include #include - namespace DB { diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 018f0c8ffac7..343fd72afb6e 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -10,11 +10,17 @@ #include #include "Columns/ColumnString.h" #include "Storages/VirtualColumnUtils.h" +#include namespace DB { +namespace Setting +{ + extern const SettingsBool export_merge_tree_partition_system_table_prefer_remote_information; +} + ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescription() { return ColumnsDescription @@ -110,7 +116,7 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu { const IStorage * storage = replicated_merge_tree_tables[database][table].get(); if (const auto * replicated_merge_tree = dynamic_cast(storage)) - partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(); + partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(context->getSettingsRef()[Setting::export_merge_tree_partition_system_table_prefer_remote_information]); } for (const ReplicatedPartitionExportInfo & info : partition_exports_info) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index a4cb0807d6ee..779bc773baed 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -77,6 +77,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) cluster.add_instance( "replica2", @@ -85,6 +86,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) # node that does not participate in the export, but will have visibility over the s3 table cluster.add_instance( @@ -100,6 +102,7 @@ def cluster(): with_minio=True, stay_alive=True, with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], ) logging.info("Starting cluster...") cluster.start() @@ -194,13 +197,16 @@ def test_restart_nodes_during_export(cluster): assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2021") != f'0\n', "Export of partition 2021 did not resume after crash" -def test_kill_export(cluster): +@pytest.mark.parametrize( + "system_table_prefer_remote_information", ['0', '1'] +) +def test_kill_export(cluster, system_table_prefer_remote_information): node = cluster.instances["replica1"] node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] - mt_table = "kill_export_mt_table" - s3_table = "kill_export_s3_table" + mt_table = f"kill_export_mt_table_{system_table_prefer_remote_information}" + s3_table = f"kill_export_s3_table_{system_table_prefer_remote_information}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -242,6 +248,9 @@ def test_kill_export(cluster): # ZooKeeper operations (KILL) proceed quickly since only S3 is blocked node.query(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") + # sleep for a while to let the kill to be processed + time.sleep(2) + # wait for 2021 to finish wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") @@ -250,8 +259,11 @@ def test_kill_export(cluster): assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2021_*', format=LineAsString)") != f'0\n', "Partition 2021 was not written to S3, but it should have been" # check system.replicated_partition_exports for the export, status should be KILLED - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'KILLED\n', "Partition 2020 was not killed as expected" - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'KILLED\n', "Partition 2020 was not killed as expected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" + + # check the data did not land on s3 + assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '0\n', "Partition 2020 was written to S3, it was not killed as expected" def test_drop_source_table_during_export(cluster): @@ -387,6 +399,7 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' + SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) @@ -398,6 +411,7 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' + SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) assert int(exception_count.strip()) > 0, "Expected non-zero exception_count in system.replicated_partition_exports"