From 5388cf16aff99d45136ebdfadf5273568a6ad53f Mon Sep 17 00:00:00 2001 From: hll1213181368 Date: Thu, 18 Dec 2025 15:43:34 +0800 Subject: [PATCH 1/2] perf(cluster): use multithreads to optimize sendSnapshotByRawKV --- kvrocks.conf | 6 ++- src/cluster/batch_sender.cc | 16 ++----- src/cluster/batch_sender.h | 12 ++---- src/cluster/slot_migrate.cc | 84 ++++++++++++++++++++++++++++++------- src/cluster/slot_migrate.h | 9 ++++ src/config/config.cc | 12 ++++++ src/config/config.h | 1 + 7 files changed, 102 insertions(+), 38 deletions(-) diff --git a/kvrocks.conf b/kvrocks.conf index 48884a20bac..b1473c4c7f2 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -723,13 +723,17 @@ migrate-batch-size-kb 16 # Default: 16M migrate-batch-rate-limit-mb 16 - # If it is set to yes, kvrocks will skip the deallocation of block cache # while closing the database to speed up the shutdown # # Default: no # skip-block-cache-deallocation-on-close no +# The parallelism of slot migration passing SST files +# +# Default: the number of Kvrocks node cores +# migrate-slots-send-snapshots-parallelism + ################################ ROCKSDB ##################################### # Specify the capacity of column family block cache. A larger block cache diff --git a/src/cluster/batch_sender.cc b/src/cluster/batch_sender.cc index e92221ee6f3..43376f4ddcf 100644 --- a/src/cluster/batch_sender.cc +++ b/src/cluster/batch_sender.cc @@ -71,12 +71,12 @@ Status BatchSender::Send() { } // rate limit - if (bytes_per_sec_ > 0) { - auto single_burst = rate_limiter_->GetSingleBurstBytes(); + if (global_rate_limiter_) { + auto single_burst = global_rate_limiter_->GetSingleBurstBytes(); auto left = static_cast(write_batch_.GetDataSize()); while (left > 0) { auto request_size = std::min(left, single_burst); - rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr); + global_rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr); left -= request_size; } } @@ -109,16 +109,6 @@ Status BatchSender::sendApplyBatchCmd(int fd, const rocksdb::WriteBatch &write_b return Status::OK(); } -void BatchSender::SetBytesPerSecond(size_t bytes_per_sec) { - if (bytes_per_sec_ == bytes_per_sec) { - return; - } - bytes_per_sec_ = bytes_per_sec; - if (bytes_per_sec > 0) { - rate_limiter_->SetBytesPerSecond(static_cast(bytes_per_sec)); - } -} - double BatchSender::GetRate(uint64_t since) const { auto t = util::GetTimeStampMS(); if (t <= since) { diff --git a/src/cluster/batch_sender.h b/src/cluster/batch_sender.h index 07c4e9fdbbe..d118bcb4ac0 100644 --- a/src/cluster/batch_sender.h +++ b/src/cluster/batch_sender.h @@ -28,12 +28,8 @@ class BatchSender { public: BatchSender() = default; - BatchSender(int fd, size_t max_bytes, size_t bytes_per_sec) - : dst_fd_(fd), - max_bytes_(max_bytes), - bytes_per_sec_(bytes_per_sec), - rate_limiter_(std::unique_ptr( - rocksdb::NewGenericRateLimiter(static_cast(bytes_per_sec_)))) {} + BatchSender(int fd, size_t max_bytes, std::shared_ptr global_rate_limiter) + : dst_fd_(fd), max_bytes_(max_bytes), global_rate_limiter_(std::move(global_rate_limiter)) {} ~BatchSender() = default; @@ -50,7 +46,6 @@ class BatchSender { uint64_t GetSentBytes() const { return sent_bytes_; } uint32_t GetSentBatchesNum() const { return sent_batches_num_; } uint32_t GetEntriesNum() const { return entries_num_; } - void SetBytesPerSecond(size_t bytes_per_sec); double GetRate(uint64_t since) const; private: @@ -66,6 +61,5 @@ class BatchSender { int dst_fd_; size_t max_bytes_; - size_t bytes_per_sec_ = 0; // 0 means no limit - std::unique_ptr rate_limiter_; + std::shared_ptr global_rate_limiter_; }; diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 684f7f6c402..06199db5fe8 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -20,13 +20,16 @@ #include "slot_migrate.h" +#include #include #include +#include "arpa/inet.h" #include "db_util.h" #include "event_util.h" #include "fmt/format.h" #include "io_util.h" +#include "netinet/tcp.h" #include "storage/batch_extractor.h" #include "storage/iterator.h" #include "storage/redis_metadata.h" @@ -52,7 +55,8 @@ SlotMigrator::SlotMigrator(Server *srv) max_pipeline_size_(srv->GetConfig()->pipeline_size), seq_gap_limit_(srv->GetConfig()->sequence_gap), migrate_batch_bytes_per_sec_(srv->GetConfig()->migrate_batch_rate_limit_mb * MiB), - migrate_batch_size_bytes_(srv->GetConfig()->migrate_batch_size_kb * KiB) { + migrate_batch_size_bytes_(srv->GetConfig()->migrate_batch_size_kb * KiB), + migrate_slots_send_snapshots_parallelism_(srv->GetConfig()->migrate_slots_send_snapshots_parallelism) { // Let metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer, // because metadata_cf_handle_ and db_ will be destroyed if DB is reopened. // [Situation]: @@ -69,6 +73,7 @@ SlotMigrator::SlotMigrator(Server *srv) // [Note]: // This problem may exist in all functions of Database called in slot migration process. metadata_cf_handle_ = nullptr; + global_rate_limiter_.reset(rocksdb::NewGenericRateLimiter(static_cast(migrate_batch_bytes_per_sec_))); if (srv->IsSlave()) { SetStopMigrationFlag(true); @@ -1251,7 +1256,6 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) { Status SlotMigrator::sendMigrationBatch(BatchSender *batch) { // user may dynamically change some configs, apply it when send data batch->SetMaxBytes(migrate_batch_size_bytes_); - batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_); return batch->Send(); } @@ -1260,8 +1264,47 @@ Status SlotMigrator::sendSnapshotByRawKV() { auto slot_range = slot_range_.load(); info("[migrate] Migrating snapshot of slot(s) {} by raw key value", slot_range.String()); - auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start); - auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end); + int total_slots = slot_range.end - slot_range.start + 1; + int slots_per_thread = total_slots / migrate_slots_send_snapshots_parallelism_; + int remain_slots = total_slots % migrate_slots_send_snapshots_parallelism_; + + std::vector> results; + int cur_start = slot_range.start; + for (int i = 0; i < migrate_slots_send_snapshots_parallelism_; i++) { + int count = slots_per_thread + (i < remain_slots ? 1 : 0); + int cur_end = cur_start + count - 1; + + results.emplace_back(std::async(std::launch::async, [=]() -> Status { + int fd = createConnectToDstNode(); + if (fd < 0) { + return {Status::NotOK, fmt::format("failed to connect the destination node in thread[{}]", i)}; + } + auto s = migrateSlotRange(cur_start, cur_end, fd); + close(fd); + return s; + })); + + cur_start = cur_end + 1; + } + + // Wait til finish + for (auto &result : results) { + auto s = result.get(); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("[migrate] Parallel migrate get result error: {}", s.Msg())}; + } + } + + auto elapsed = util::GetTimeStampMS() - start_ts; + info("[migrate] Parallel snapshot migrate succeeded, slot(s) {}, elapsed: {} ms", slot_range.String(), elapsed); + + return Status::OK(); +} + +Status SlotMigrator::migrateSlotRange(int start_slot, int end_slot, int fd) { + SlotRange sub{start_slot, end_slot}; + auto prefix = ComposeSlotKeyPrefix(namespace_, start_slot); + auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot); rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = slot_snapshot_; @@ -1272,12 +1315,11 @@ Status SlotMigrator::sendSnapshotByRawKV() { auto no_txn_ctx = engine::Context::NoTransactionContext(storage_); engine::DBIterator iter(no_txn_ctx, read_options); - BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_); + BatchSender batch_sender(fd, migrate_batch_size_bytes_, global_rate_limiter_); for (iter.Seek(prefix); iter.Valid(); iter.Next()) { - // Iteration is out of range auto key_slot_id = ExtractSlotId(iter.Key()); - if (!slot_range.Contains(key_slot_id)) { + if (!sub.Contains(key_slot_id)) { break; } @@ -1325,20 +1367,32 @@ Status SlotMigrator::sendSnapshotByRawKV() { GET_OR_RET(sendMigrationBatch(&batch_sender)); - auto elapsed = util::GetTimeStampMS() - start_ts; - info( - "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, " - "batches: {}, entries: {}", - slot_range.String(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts), - batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum()); - return Status::OK(); } +int SlotMigrator::createConnectToDstNode() { + // Connect to the destination node + auto fd = util::SockConnect(dst_ip_, dst_port_); + if (!fd.IsOK()) { + error("failed to connect to the node error: {}", fd.Msg()); + return -1; + } + + std::string pass = srv_->GetConfig()->requirepass; + if (!pass.empty()) { + auto s = authOnDstNode(*fd, pass); + if (!s.IsOK()) { + error("failed to authenticate on destination node error: {}", s.Msg()); + return -1; + } + } + return *fd; +} + Status SlotMigrator::syncWALByRawKV() { uint64_t start_ts = util::GetTimeStampMS(); info("[migrate] Syncing WAL of slot(s) {} by raw key value", slot_range_.load().String()); - BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_); + BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, global_rate_limiter_); int epoch = 1; uint64_t wal_incremental_seq = 0; diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 71107a5af70..772e767045d 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -21,6 +21,7 @@ #pragma once #include +#include #include #include #include @@ -99,6 +100,9 @@ class SlotMigrator : public redis::Database { void SetSequenceGapLimit(int value) { if (value > 0) seq_gap_limit_ = value; } + void SetMigrateSlotsSendSnapshotsParallelism(int value) { + if (value > 0) migrate_slots_send_snapshots_parallelism_ = value; + } void SetMigrateBatchRateLimit(size_t bytes_per_sec) { migrate_batch_bytes_per_sec_ = bytes_per_sec; } void SetMigrateBatchSize(size_t size) { migrate_batch_size_bytes_ = size; } void SetStopMigrationFlag(bool value) { stop_migration_ = value; } @@ -148,6 +152,8 @@ class SlotMigrator : public redis::Database { Status sendMigrationBatch(BatchSender *batch); Status sendSnapshotByRawKV(); + Status migrateSlotRange(int start_slot, int end_slot, int fd); + int createConnectToDstNode(); Status syncWALByRawKV(); bool catchUpIncrementalWAL(); Status migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender *batch_sender); @@ -173,6 +179,9 @@ class SlotMigrator : public redis::Database { uint64_t seq_gap_limit_ = kDefaultSequenceGapLimit; std::atomic migrate_batch_bytes_per_sec_ = 1 * GiB; std::atomic migrate_batch_size_bytes_; + int migrate_slots_send_snapshots_parallelism_ = 0; + + std::shared_ptr global_rate_limiter_; SlotMigrationStage current_stage_ = SlotMigrationStage::kNone; ParserState parser_state_ = ParserState::ArrayLen; diff --git a/src/config/config.cc b/src/config/config.cc index 52690ca8e3c..81d38102b31 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -228,6 +228,8 @@ Config::Config() { new EnumField(&migrate_type, migration_types, MigrationType::kRawKeyValue)}, {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)}, {"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 1, INT_MAX)}, + {"migrate-slots-send-snapshots-parallelism", false, + new IntField(&migrate_slots_send_snapshots_parallelism, 0, 0, INT_MAX)}, {"unixsocket", true, new StringField(&unixsocket, "")}, {"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)}, {"log-retention-days", true, new IntField(&log_retention_days, -1, -1, INT_MAX)}, @@ -610,6 +612,16 @@ void Config::initFieldCallback() { srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB); return Status::OK(); }}, + {"migrate-slots-send-snapshots-parallelism", + [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { + if (migrate_slots_send_snapshots_parallelism == 0) { + unsigned int max_parallelism = std::thread::hardware_concurrency(); + migrate_slots_send_snapshots_parallelism = static_cast(max_parallelism); + } + if (!srv) return Status::OK(); + srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(migrate_slots_send_snapshots_parallelism); + return Status::OK(); + }}, {"log-level", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); diff --git a/src/config/config.h b/src/config/config.h index 036e506e8eb..0eeb97199f3 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -172,6 +172,7 @@ struct Config { MigrationType migrate_type; int migrate_batch_size_kb; int migrate_batch_rate_limit_mb; + int migrate_slots_send_snapshots_parallelism; bool redis_cursor_compatible = false; bool resp3_enabled = false; From 24b11523366b1bb829d9c21d2359dff72e6cf77c Mon Sep 17 00:00:00 2001 From: Lele Huang Date: Fri, 19 Dec 2025 16:10:57 +0800 Subject: [PATCH 2/2] optimize parallelism calculation for slot migration --- src/cluster/slot_migrate.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 06199db5fe8..51b2027583b 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -1265,12 +1265,13 @@ Status SlotMigrator::sendSnapshotByRawKV() { info("[migrate] Migrating snapshot of slot(s) {} by raw key value", slot_range.String()); int total_slots = slot_range.end - slot_range.start + 1; - int slots_per_thread = total_slots / migrate_slots_send_snapshots_parallelism_; - int remain_slots = total_slots % migrate_slots_send_snapshots_parallelism_; + int parallelism = std::min(migrate_slots_send_snapshots_parallelism_, total_slots); + int slots_per_thread = total_slots / parallelism; + int remain_slots = total_slots % parallelism; std::vector> results; int cur_start = slot_range.start; - for (int i = 0; i < migrate_slots_send_snapshots_parallelism_; i++) { + for (int i = 0; i < parallelism; i++) { int count = slots_per_thread + (i < remain_slots ? 1 : 0); int cur_end = cur_start + count - 1;