From cc8cfae715db5854183ec397975b54e5a02c45b6 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Sat, 8 Nov 2025 11:57:38 +0800 Subject: [PATCH 1/7] support gpu direct rdma 1) recv all data on gpu first 2) the gpu block is alloced from a gpu block pool 3) brpc header, meta and body will be copied from gpu to cpu to process. 4) To decrease the d2h counts, we will prefetch 512B to memory Co-authored-by: sunce4t <847480001@qq.com> --- BUILD.bazel | 13 + bazel/config/BUILD.bazel | 8 +- src/brpc/policy/baidu_rpc_protocol.cpp | 142 +++++++- src/brpc/rdma/rdma_endpoint.cpp | 131 +++++-- src/brpc/rdma/rdma_endpoint.h | 4 +- src/brpc/rdma/rdma_helper.cpp | 18 +- src/brpc/rdma/rdma_helper.h | 3 + src/butil/gpu/gpu_block_pool.cpp | 450 +++++++++++++++++++++++++ src/butil/gpu/gpu_block_pool.h | 201 +++++++++++ src/butil/iobuf.cpp | 109 ++++++ src/butil/iobuf.h | 9 +- 11 files changed, 1056 insertions(+), 32 deletions(-) create mode 100644 src/butil/gpu/gpu_block_pool.cpp create mode 100644 src/butil/gpu/gpu_block_pool.h diff --git a/BUILD.bazel b/BUILD.bazel index 138e416b10..e1a853d2db 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -54,6 +54,9 @@ COPTS = [ }) + select({ "//bazel/config:brpc_with_asan": ["-fsanitize=address"], "//conditions:default": [""], +}) + select({ + ":brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) + select({ "//bazel/config:brpc_with_no_pthread_mutex_hook": ["-DNO_PTHREAD_MUTEX_HOOK"], "//conditions:default": [""], @@ -232,6 +235,7 @@ BUTIL_SRCS = [ "src/butil/iobuf.cpp", "src/butil/single_iobuf.cpp", "src/butil/iobuf_profiler.cpp", + "src/butil/gpu/gpu_block_pool.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", @@ -337,6 +341,9 @@ cc_library( "-DUNIT_TEST", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), includes = [ "src/", @@ -356,6 +363,9 @@ cc_library( }) + select({ "//bazel/config:brpc_with_boringssl": ["@boringssl//:ssl", "@boringssl//:crypto"], "//conditions:default": ["@openssl//:ssl", "@openssl//:crypto"], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) @@ -573,6 +583,9 @@ cc_library( "@org_apache_thrift//:thrift", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index d08ea2ec23..06376cf85c 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -104,6 +104,12 @@ config_setting( visibility = ["//visibility:public"], ) +config_setting( + name = "brpc_with_gdr", + define_values = {"BRPC_WITH_GDR": "true"}, + visibility = ["//visibility:public"], +) + config_setting( name = "brpc_with_boringssl", define_values = {"BRPC_WITH_BORINGSSL": "true"}, @@ -148,4 +154,4 @@ config_setting( name = "with_babylon_counter", define_values = {"with_babylon_counter": "true"}, visibility = ["//visibility:public"], -) \ No newline at end of file +) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 5adf77b2c5..2e37bae3a3 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -23,6 +23,7 @@ #include #include "butil/logging.h" // LOG() #include "butil/iobuf.h" // butil::IOBuf +#include "butil/gpu/gpu_block_pool.h" #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "butil/memory/scope_guard.h" #include "json2pb/json_to_pb.h" @@ -69,6 +70,10 @@ DECLARE_bool(pb_enum_as_number); // 5. Not supported: chunk_info // Pack header into `buf' + +const int header_size = 12; +const int prefetch_d2h_size = 512; + inline void PackRpcHeader(char* rpc_header, uint32_t meta_size, int payload_size) { uint32_t* dummy = (uint32_t*)rpc_header; // suppress strict-alias warning *dummy = *(uint32_t*)"PRPC"; @@ -101,31 +106,81 @@ static void SerializeRpcHeaderAndMeta( ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { + char header_buf[12]; - const size_t n = source->copy_to(header_buf, sizeof(header_buf)); + size_t n = 0; +#if BRPC_WITH_GDR + void* prefetch_d2h_data = NULL; + + uint64_t data_meta = source->get_first_data_meta(); + bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + if (is_gpu_memory) { + prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); + if (prefetch_d2h_data == NULL) { + LOG(FATAL) << "alloc host data failed!!!"; + } + n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size); + size_t copy_size = n > 12 ? 12 : n; + memcpy(header_buf, prefetch_d2h_data, copy_size); + } else { + n = source->copy_to(header_buf, sizeof(header_buf)); + } +#else + n = source->copy_to(header_buf, sizeof(header_buf)); +#endif // BRPC_WITH_GDR if (n >= 4) { void* dummy = header_buf; if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_TRY_OTHERS); } } else { if (memcmp(header_buf, "PRPC", n) != 0) { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_TRY_OTHERS); } } if (n < sizeof(header_buf)) { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } uint32_t body_size; uint32_t meta_size; butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > 128 * 1024 * 1024) { + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + } if (body_size > FLAGS_max_body_size) { // We need this log to report the body_size to give users some clues // which is not printed in InputMessenger. LOG(ERROR) << "body_size=" << body_size << " from " << socket->remote_side() << " is too large"; +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); } else if (source->length() < sizeof(header_buf) + body_size) { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } if (meta_size > body_size) { @@ -133,12 +188,34 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, << body_size; // Pop the message source->pop_front(sizeof(header_buf) + body_size); +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR return MakeParseError(PARSE_ERROR_TRY_OTHERS); } source->pop_front(sizeof(header_buf)); MostCommonMessage* msg = MostCommonMessage::Get(); +#if BRPC_WITH_GDR + if (is_gpu_memory) { + if (header_size + meta_size <= n) { + auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); }; + msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n); + source->pop_front(meta_size); + } else { + host_allocator->DeallocateRaw(prefetch_d2h_data); + source->cutn_from_gpu(&msg->meta, meta_size); + } + source->cutn(&msg->payload, body_size - meta_size); + } else { + source->cutn(&msg->meta, meta_size); + source->cutn(&msg->payload, body_size - meta_size); + } +#else source->cutn(&msg->meta, meta_size); source->cutn(&msg->payload, body_size - meta_size); +#endif // BRPC_WITH_GDR return MakeMessage(msg); } @@ -793,7 +870,29 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); +#if BRPC_WITH_GDR + int meta_size = msg->meta.size(); + uint64_t data_meta = msg->payload.get_first_data_meta(); + bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + req_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&req_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&req_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&req_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR if (meta.attachment_size() > 0) { cntl->request_attachment().swap(msg->payload); } @@ -963,8 +1062,14 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } // Parse response message iff error code from meta is 0 butil::IOBuf res_buf; + int meta_size = msg->meta.size(); const int res_size = msg->payload.length(); butil::IOBuf* res_buf_ptr = &msg->payload; + +#if BRPC_WITH_GDR + uint64_t data_meta = msg->payload.get_first_data_meta(); + bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); +#endif // BRPC_WITH_GDR if (meta.has_attachment_size()) { if (meta.attachment_size() > res_size) { cntl->SetFailed( @@ -973,9 +1078,44 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { break; } int body_without_attachment_size = res_size - meta.attachment_size(); + +#if BRPC_WITH_GDR + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&res_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&res_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR res_buf_ptr = &res_buf; cntl->response_attachment().swap(msg->payload); +#if BRPC_WITH_GDR + } else if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + res_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, res_size); + msg->payload.pop_front(res_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, res_size); + } + res_buf_ptr = &res_buf; +#endif // BRPC_WITH_GDR } ContentType content_type = meta.content_type(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 1d502a98f7..fc6ce20928 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -20,6 +20,7 @@ #include #include "butil/fd_utility.h" #include "butil/logging.h" // CHECK, LOG +#include "butil/gpu/gpu_block_pool.h" #include "butil/sys_byteorder.h" // HostToNet,NetToHost #include "bthread/bthread.h" #include "brpc/errno.pb.h" @@ -30,6 +31,7 @@ #include "brpc/rdma/block_pool.h" #include "brpc/rdma/rdma_helper.h" #include "brpc/rdma/rdma_endpoint.h" +#include "brpc/traceprintf.h" DECLARE_int32(task_group_ntags); @@ -48,15 +50,15 @@ extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_at extern int (*IbvDestroyQp)(ibv_qp*); extern bool g_skip_rdma_init; -DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA"); -DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA"); +DEFINE_int32(rdma_sq_size, 64, "SQ size for RDMA"); +DEFINE_int32(rdma_rq_size, 64, "RQ size for RDMA"); DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side"); DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy"); DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: " "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)"); DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once."); DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); -DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); +DEFINE_int32(rdma_prepared_qp_cnt, 256, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); @@ -98,6 +100,7 @@ static const uint16_t MIN_QP_SIZE = 16; static const uint16_t MAX_QP_SIZE = 4096; static const uint16_t MIN_BLOCK_SIZE = 1024; static const uint32_t ACK_MSG_RDMA_OK = 0x1; +static const uint64_t FIXED_ACK_WR_ID = 1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; @@ -191,6 +194,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _remote_window_capacity(0) , _window_size(0) , _new_rq_wrs(0) + , _remote_recv_window(0) { if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; @@ -208,6 +212,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) } RdmaEndpoint::~RdmaEndpoint() { + // LOG(INFO) << _window_size << " " << _remote_recv_window << " " << _sq_unsignaled; Reset(); bthread::butex_destroy(_read_butex); } @@ -231,6 +236,7 @@ void RdmaEndpoint::Reset() { _new_rq_wrs = 0; _sq_sent = 0; _rq_received = 0; + _remote_recv_window.store(0, butil::memory_order_relaxed); } void RdmaConnect::StartConnect(const Socket* socket, @@ -514,7 +520,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); - + ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed); ep->_state = C_BRINGUP_QP; if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); @@ -622,7 +628,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); - + ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed); ep->_state = S_ALLOC_QPCQ; if (ep->AllocateResources() < 0) { LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:" @@ -716,7 +722,7 @@ bool RdmaEndpoint::IsWritable() const { return false; } - return _window_size.load(butil::memory_order_relaxed) > 0; + return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0; } // RdmaIOBuf inherits from IOBuf to provide a new function. @@ -787,12 +793,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { size_t total_len = 0; size_t current = 0; uint32_t window = 0; + uint32_t recv_window = 0; ibv_send_wr wr; int max_sge = GetRdmaMaxSge(); ibv_sge sglist[max_sge]; while (current < ndata) { - window = _window_size.load(butil::memory_order_relaxed); - if (window == 0) { + window = _window_size.load(butil::memory_order_acquire); + recv_window = _remote_recv_window.load(butil::memory_order_acquire); + if (window == 0 || recv_window == 0) { if (total_len > 0) { break; } else { @@ -883,6 +891,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) + << ", window_size:" << _window_size + << ", emote_recv_window: " << _remote_recv_window << ", window=" << window << ", sq_current=" << _sq_current; errno = err; @@ -898,7 +908,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // Because there is at most one thread can enter this function for each // Socket, and the other thread of HandleCompletion can only add this // counter. - _window_size.fetch_sub(1, butil::memory_order_relaxed); + _window_size.fetch_sub(1, butil::memory_order_release); + _remote_recv_window.fetch_sub(1, butil::memory_order_release); } return total_len; @@ -922,13 +933,16 @@ int RdmaEndpoint::SendImm(uint32_t imm) { wr.imm_data = butil::HostToNet32(imm); wr.send_flags |= IBV_SEND_SOLICITED; wr.send_flags |= IBV_SEND_SIGNALED; + wr.wr_id = FIXED_ACK_WR_ID; ibv_send_wr* bad = NULL; int err = ibv_post_send(_resource->qp, &wr, &bad); if (err != 0) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. - LOG(WARNING) << "Fail to ibv_post_send: " << berror(err); + LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) + << ", window_size:" << _window_size + << ", emote_recv_window: " << _remote_recv_window; return -1; } return 0; @@ -936,17 +950,40 @@ int RdmaEndpoint::SendImm(uint32_t imm) { ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { bool zerocopy = FLAGS_rdma_recv_zerocopy; + //LOG(INFO) << "Handle Completion: " << wc.opcode; switch (wc.opcode) { case IBV_WC_SEND: { // send completion - // Do nothing + if (wc.wr_id == 0) { + uint16_t wnd_to_update = _local_window_capacity / 4; + uint32_t num = wnd_to_update; + while(num > 0) { + _sbuf[_sq_sent++].clear(); + if (_sq_sent == _sq_size - RESERVED_WR_NUM) { + _sq_sent = 0; + } + --num; + } + butil::subtle::MemoryBarrier(); + uint32_t wnd_thresh = _local_window_capacity / 8; + _window_size.fetch_add(wnd_to_update, butil::memory_order_release); + //if ((_remote_recv_window.load(butil::memory_order_relaxed) >= wnd_thresh)) { + // Do not wake up writing thread right after _window_size > 0. + // Otherwise the writing thread may switch to background too quickly. + _socket->WakeAsEpollOut(); + //} + } break; } case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid if (wc.byte_len > 0) { +#if BRPC_WITH_GDR + zerocopy = true; +#else if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { zerocopy = false; } +#endif // BRPC_WITH_GDR CHECK(_state != FALLBACK_TCP); if (zerocopy) { butil::IOBuf tmp; @@ -958,24 +995,10 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } } if (wc.imm_data > 0) { - // Clear sbuf here because we ignore event wakeup for send completions uint32_t acks = butil::NetToHost32(wc.imm_data); - uint32_t num = acks; - while (num > 0) { - _sbuf[_sq_sent++].clear(); - if (_sq_sent == _sq_size - RESERVED_WR_NUM) { - _sq_sent = 0; - } - --num; - } - butil::subtle::MemoryBarrier(); - - // Update window uint32_t wnd_thresh = _local_window_capacity / 8; - if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh - || acks >= wnd_thresh) { - // Do not wake up writing thread right after _window_size > 0. - // Otherwise the writing thread may switch to background too quickly. + if(_remote_recv_window.fetch_add(acks, butil::memory_order_release) >= wnd_thresh || + acks >= wnd_thresh) { _socket->WakeAsEpollOut(); } } @@ -1017,11 +1040,43 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { return 0; } +int RdmaEndpoint::DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey) { + ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + ibv_sge sge; + sge.addr = (uint64_t)block; + sge.length = block_size; + sge.lkey = lkey; + wr.num_sge = 1; + wr.sg_list = &sge; + //LOG(INFO) << "POST recv: addr=0x" << std::hex << sge.addr + // << std::dec << " length=0x" << sge.length + // << " lkey=0x" << sge.lkey; + //LOG(INFO) << block << " " << _device_allocator->get_lkey(); + ibv_recv_wr* bad = NULL; + int err = ibv_post_recv(_resource->qp, &wr, &bad); + if (err != 0) { + LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err); + return -1; + } + return 0; +} + int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { // We do the post repeatedly from the _rbuf[_rq_received]. while (num > 0) { + uint32_t lkey = 0; if (zerocopy) { _rbuf[_rq_received].clear(); + +#if BRPC_WITH_GDR + butil::gdr::BlockPoolAllocator* device_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_allocator(); + void* device_ptr = device_allocator->AllocateRaw(g_rdma_recv_block_size); + auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; + lkey = device_allocator->get_lkey(device_ptr); + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_rdma_recv_block_size, deleter , lkey); + _rbuf_data[_rq_received] = device_ptr; +#else butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); int size = 0; @@ -1032,11 +1087,20 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { } else { CHECK(static_cast(size) == g_rdma_recv_block_size) << size; } +#endif // if BRPC_WITH_GDR } +#if BRPC_WITH_GDR + if (DoPostRecvGDR(_rbuf_data[_rq_received], g_rdma_recv_block_size, lkey) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } +#else if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { _rbuf[_rq_received].clear(); return -1; } +#endif // if BRPC_WITH_GDR + --num; ++_rq_received; if (_rq_received == _rq_size) { @@ -1504,6 +1568,10 @@ void RdmaEndpoint::DebugInfo(std::ostream& os) const { } int RdmaEndpoint::GlobalInitialize() { +#if BRPC_WITH_GDR + LOG(INFO) << ", gdr_block_size_kb: " << butil::gdr::gdr_block_size_kb; + g_rdma_recv_block_size = butil::gdr::gdr_block_size_kb * 1024 - IOBUF_BLOCK_HEADER_LEN; +#else if (FLAGS_rdma_recv_block_type == "default") { g_rdma_recv_block_size = GetBlockSize(0) - IOBUF_BLOCK_HEADER_LEN; } else if (FLAGS_rdma_recv_block_type == "large") { @@ -1514,6 +1582,15 @@ int RdmaEndpoint::GlobalInitialize() { errno = EINVAL; return -1; } +#endif // BRPC_WITH_GDR + + LOG(INFO) << "rdma_use_polling :" << FLAGS_rdma_use_polling + << ", rdma_poller_num : " << FLAGS_rdma_poller_num + << ", rdma_poller_yield : " << FLAGS_rdma_poller_yield + << ", rdma_sq_size: " << FLAGS_rdma_sq_size + << ", rdma_rq_size: " << FLAGS_rdma_rq_size + << ", rdma_zerocopy_min_size: " << FLAGS_rdma_zerocopy_min_size + << ", g_rdma_recv_block_size: " << g_rdma_recv_block_size; g_rdma_resource_mutex = new butil::Mutex; for (int i = 0; i < FLAGS_rdma_prepared_qp_cnt; ++i) { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index de7cd5f6d8..6f0c917e9e 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -31,7 +31,6 @@ #include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" - namespace brpc { class Socket; namespace rdma { @@ -173,6 +172,8 @@ friend class brpc::Socket; // -1: failed, errno set int DoPostRecv(void* block, size_t block_size); + + int DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey); // Read at most len bytes from fd in _socket to data // wait for _read_butex if encounter EAGAIN // return -1 if encounter other errno (including EOF) @@ -262,6 +263,7 @@ friend class brpc::Socket; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; + butil::atomic _remote_recv_window; // butex for inform read events on TCP fd during handshake butil::atomic *_read_butex; diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 9bad33750c..1b6ad85ae0 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -25,6 +25,7 @@ #include "butil/containers/flat_map.h" // butil::FlatMap #include "butil/fd_guard.h" #include "butil/fd_utility.h" // butil::make_non_blocking +#include "butil/gpu/gpu_block_pool.h" #include "butil/logging.h" #include "brpc/socket.h" #include "brpc/rdma/block_pool.h" @@ -84,6 +85,8 @@ static uint16_t g_lid; static int g_max_sge = 0; static uint8_t g_port_num = 1; +static int g_gpu_index = 0; + static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); @@ -93,7 +96,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); - +DEFINE_int32(gpu_index, 0, "The GPU device index to use.In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; @@ -477,6 +480,7 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } + g_gpu_index = FLAGS_gpu_index; // Find the first active port g_port_num = FLAGS_rdma_port; int available_devices; @@ -552,6 +556,13 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } +#if BRPC_WITH_GDR + if (!butil::gdr::InitGPUBlockPool(g_gpu_index, GetRdmaPd())) { + PLOG(ERROR) << "Fail to initialize RDMA GPU memory pool"; + ExitWithError(); + } +#endif // if BRPC_WITH_GDR + if (RdmaEndpoint::GlobalInitialize() < 0) { LOG(ERROR) << "rdma_recv_block_type incorrect " << "(valid value: default/large/huge)"; @@ -679,6 +690,11 @@ uint8_t GetRdmaPortNum() { return g_port_num; } +int GetGPUIndex() { + return g_gpu_index; +} + + bool IsRdmaAvailable() { return g_rdma_available.load(butil::memory_order_acquire); } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..25a93476e7 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -74,6 +74,9 @@ int GetRdmaCompVector(); // Return current port number used uint8_t GetRdmaPortNum(); +// Get GPU index used +int GetGPUIndex(); + // Get max_sge supported by the device int GetRdmaMaxSge(); diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp new file mode 100644 index 0000000000..dac26f4b5c --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include +#include +#include "butil/fast_rand.h" +#include "gpu_block_pool.h" +namespace butil { +namespace gdr { + +#define CHECK_CUDA(call) \ +do { \ + auto _sts = (call); \ + if (_sts != cudaSuccess) { \ + LOG(FATAL) << " cuda error:" \ + << (cudaGetErrorString(_sts)) << std::string(" at ") \ + << __FILE__ << ": " << __LINE__; \ + } \ +} while (0); + +bool verify_same_context() { + static int original_device = -1; + static bool first_call = true; + + int current_device; + cudaGetDevice(¤t_device); + + if (first_call) { + original_device = current_device; + first_call = false; + return true; + } + + return (current_device == original_device); +} + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + void *d_data; + + LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " << gpu_id; + + CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size)); + cudaDeviceSynchronize(); + return (void *)d_data; +} + +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + + LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " << gpu_id << "on host"; + + void* mem = NULL; + + CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size)); + + cudaDeviceSynchronize(); + + return mem; +} + + +BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr; + +BlockPoolAllocators* BlockPoolAllocators::singleton() { + static std::mutex mutex; + if (instance_ == nullptr) { + std::lock_guard l(mutex); + if(instance_ == nullptr) { + instance_ = new BlockPoolAllocators(); + std::atomic_thread_fence(std::memory_order_release); + } + } + std::atomic_thread_fence(std::memory_order_acquire); + return instance_; +} + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd) { + BlockPoolAllocators::singleton()->init(gpu_id, pd); + return true; +} + +class BlockHeaderList { + public: + BlockHeaderList() { + objects_.reserve(kMaxObjects); + } + virtual ~BlockHeaderList() { + for (size_t i = 0; i < objects_.size(); i++) { + delete objects_[i]; + } + } + + BlockHeader* New() { + { + std::lock_guard lock(mu_); + if (!objects_.empty()) { + BlockHeader* result = objects_.back(); + objects_.pop_back(); + return result; + } + } + return new BlockHeader; + } + void Release(BlockHeader* obj) { + obj->Reset(); + { + std::lock_guard lock(mu_); + if (objects_.size() < kMaxObjects) { + objects_.push_back(obj); + return; + } + } + delete obj; + } + + private: + static const int kMaxObjects = 100000; + + std::mutex mu_; + std::vector objects_; +}; + +static BlockHeaderList* get_bh_list() { + static BlockHeaderList* bh_list = new BlockHeaderList(); + return bh_list; +} + + +BlockPoolAllocator::BlockPoolAllocator(int gpuId, bool onGpu, ibv_pd* brpc_pd, + size_t blockSize, size_t regionSize) : + gpu_id(gpuId) + , on_gpu(onGpu) + , pd(brpc_pd) + , BLOCK_SIZE(std::max(blockSize, sizeof(BlockHeader))) + , REGION_SIZE((regionSize / blockSize) * blockSize) // 对齐到块大小的倍数 + , freeList(nullptr) + , g_region_num(0) + , totalAllocated(0) + , totalDeallocated(0) + , peakUsage(0) { + LOG(INFO) << "Memory Pool initialized: block_size=" << BLOCK_SIZE + << ", region_size=" << REGION_SIZE + << ", gpu_id=" << gpu_id << ", on_gpu=" << on_gpu << ", pd=" << pd; + + extendRegion(); +} + +BlockPoolAllocator::~BlockPoolAllocator() { +#ifdef DEBUG + printStatistics(); +#endif + + for (int i = 0; i < max_regions; i++) { + Region* r = &g_regions[i]; + if (!r->mr) { + return; + } + + LOG(INFO) << "try to free " << r->size << " bytes from gpu " << gpu_id << ", on_gpu " << on_gpu; + ibv_dereg_mr(r->mr); + if (on_gpu) { + CHECK_CUDA(cudaFree(reinterpret_cast(r->start))); + } else { + CHECK_CUDA(cudaFreeHost(reinterpret_cast(r->start))); + } + } +} + +Region* BlockPoolAllocator::GetRegion(const void* buf) { + if (!buf) { + errno = EINVAL; + return NULL; + } + Region* r = NULL; + uintptr_t addr = (uintptr_t)buf; + for (int i = 0; i < max_regions; ++i) { + if (g_regions[i].aligned_start == 0) { + break; + } + if (addr >= g_regions[i].aligned_start && + addr < g_regions[i].aligned_start + g_regions[i].aligned_size) { + r = &g_regions[i]; + break; + } + } + return r; +} + +uint32_t BlockPoolAllocator::get_lkey(const void* buf) { + Region* r = GetRegion(buf); + if (!r) { + LOG(ERROR) << "can not get a region for buf " << buf; + return 0; + } + return r->lkey; +} + +void* BlockPoolAllocator::AllocateRaw(size_t num_bytes) { + if (num_bytes == 0) { + return nullptr; + } + if (num_bytes > BLOCK_SIZE) { + LOG(FATAL) << "try to alloc " << num_bytes << " bytes, its bigger than block_size " << BLOCK_SIZE; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(poolMutex); + + if (!freeList) { + extendRegion(); + } + + BlockHeader* block = freeList; + freeList = freeList->next; + + void* addr = block->addr; + get_bh_list()->Release(block); + + totalAllocated++; + peakUsage = std::max(peakUsage, totalAllocated - totalDeallocated); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + +#ifdef DEBUG + if (duration.count() > 1000) { // 如果分配时间超过1微秒 + LOG(INFO) << "Slow allocation: " << duration.count() << " ns"; + } +#endif + + return addr; +} + +void BlockPoolAllocator::DeallocateRaw(void* ptr) { + if (!ptr) return; + + std::lock_guard lock(poolMutex); + + BlockHeader* block = get_bh_list()->New(); + block->addr = ptr; + block->next = freeList; + freeList = block; + + totalDeallocated++; +} + +// 获取统计信息 +void BlockPoolAllocator::printStatistics() const { + LOG(INFO) << "=== Memory Pool Statistics ==="; + LOG(INFO) << "Total regions: " << g_region_num + << ", Total blocks allocated: " << totalAllocated + << ", Total blocks deallocated: " << totalDeallocated + << ", Current usage: " << (totalAllocated - totalDeallocated) << " blocks" + << ", Peak usage: " << peakUsage << " blocks" + << ", Memory efficiency: " + << (static_cast(totalAllocated - totalDeallocated) / + (g_region_num * (REGION_SIZE / BLOCK_SIZE)) * 100) + << "%"; +} + +void BlockPoolAllocator::extendRegion() { + if (g_region_num == max_regions) { + LOG(FATAL) << "Gdr Memory pool reaches max regions"; + return ; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + void* ptr = nullptr; + void* aligned_ptr = nullptr; + int alignment = 4096; + + if (on_gpu) { + ptr = get_gpu_mem(gpu_id, REGION_SIZE); + } else { + ptr = get_cpu_mem(gpu_id, REGION_SIZE); + } + + aligned_ptr = (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1)); + + int64_t aligned_bytes = REGION_SIZE; + if (ptr != aligned_ptr) { + uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; + uintptr_t aligned_end_ptr = (region_end + alignment - 1) & ~(alignment - 1); + aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); + LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes + << ", region_size: " << REGION_SIZE; + } + + LOG(INFO) << "reg_mr for ptr: " << aligned_ptr << ", size:" << aligned_bytes; + auto mr = ibv_reg_mr(pd, aligned_ptr, aligned_bytes, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + //IBV_ACCESS_RELAXED_ORDERING); + + if (!mr) { + LOG(FATAL) << "Failed to register MR: " << strerror(errno) + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } else { + LOG(INFO) << "Success to register MR: " + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } + + LOG(INFO) << "try to init region, g_region_num:" << g_region_num; + size_t blockCount = aligned_bytes / BLOCK_SIZE; + Region* region = &g_regions[g_region_num++]; + region->start = (uintptr_t)ptr; + region->aligned_start = (uintptr_t)aligned_ptr; + region->mr = mr; + region->size = REGION_SIZE; + region->aligned_size = aligned_bytes; + region->lkey = mr->lkey; + region->blockCount = blockCount; + + + LOG(INFO) << "try to insert list, freeList:" << freeList << ", blockCount:" << blockCount; + BlockHeader* lastBlock = nullptr; + for (size_t i = 0; i < blockCount; ++i) { + BlockHeader* block = get_bh_list()->New(); + block->addr = reinterpret_cast(static_cast(aligned_ptr) + i * BLOCK_SIZE); + if (lastBlock != nullptr) { + lastBlock->next = block; + } else { + freeList = block; + } + lastBlock = block; + } + + if (lastBlock) { + lastBlock->next = nullptr; + } + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + LOG(INFO) << "Extended region #" << g_region_num << ": " << blockCount + << " blocks (" << (REGION_SIZE / (1024 * 1024)) << " MB)" << ", on_gpu " << on_gpu + << ", cost " << duration.count() << " ns"; +} + +GPUStreamPool::GPUStreamPool(int gpu_id) : + gpu_id_(gpu_id) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + d2d_streams_.resize(kMaxConcurrent); + d2h_streams_.resize(kMaxConcurrent); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamCreate(&d2d_streams_[i])); + CHECK_CUDA(cudaStreamCreate(&d2h_streams_[i])); + } + CHECK_CUDA(cudaDeviceSynchronize()); +} + +GPUStreamPool::~GPUStreamPool() { + CHECK_CUDA(cudaDeviceSynchronize()); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamDestroy(d2d_streams_[i])); + CHECK_CUDA(cudaStreamDestroy(d2h_streams_[i])); + } +} + +void GPUStreamPool::fast_d2d(std::vector& src_list, + std::vector& length_list, + void* dst) { +#ifdef DEBUG + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } +#endif + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2d_lb_lock_); + d2d_cnt_.fetch_add(1); + stream_idx = d2d_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2d_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToDevice, d2d_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); +} + +void GPUStreamPool::fast_d2h(std::vector& src_list, + std::vector& length_list, + void* dst) { + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2h_lb_lock_); + d2h_cnt_.fetch_add(1); + stream_idx = d2h_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2h_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToHost, d2h_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); +} + +} +} + +#endif // BRPC_WITH_GDR diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h new file mode 100644 index 0000000000..1d6a444a36 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.h @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H +#define BUTIL_GPU_GPU_BLOCK_POOL_H + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include +#include +#include +#include +#include "butil/containers/hash_tables.h" +#include "butil/logging.h" +#include +#include "cuda.h" + +// #include "gdrapi.h" +namespace butil { +namespace gdr { + +static int gdr_block_size_kb = [](){ + int ret = 64; + const char* env_var_val = getenv("GDR_BLOCK_SIZE_KB"); + if (env_var_val == nullptr) { + return ret; + } + ret = std::stoi(env_var_val); + + return ret; +}(); + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size); +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size); + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd); + +struct Region { + Region() { start = 0; aligned_start = 0;} + uintptr_t start; + uintptr_t aligned_start; + + size_t size; + size_t aligned_size; + size_t blockCount; + struct ibv_mr *mr {nullptr}; + uint32_t lkey; +}; + +struct BlockHeader { + BlockHeader() { addr = nullptr; next = nullptr;} + void Reset() { addr = nullptr; next = nullptr; } + void* addr; + BlockHeader* next; +}; + +class BlockPoolAllocator { + private: + int gpu_id; + bool on_gpu; + ibv_pd* pd {nullptr}; + + const size_t BLOCK_SIZE; + const size_t REGION_SIZE; + + BlockHeader* freeList; + static constexpr size_t max_regions = 16; + int g_region_num {0}; + Region g_regions[max_regions]; + std::mutex poolMutex; + + // 统计信息 + size_t totalAllocated; + size_t totalDeallocated; + size_t peakUsage; + + public: + explicit BlockPoolAllocator(int gpu_id, + bool on_gpu, ibv_pd* pd, + size_t blockSize, size_t regionSize); + + ~BlockPoolAllocator(); + + void* AllocateRaw(size_t num_bytes); + + void DeallocateRaw(void* ptr); + + // 获取统计信息 + void printStatistics() const; + + int64_t getCurrentUsage() const { + return totalAllocated - totalDeallocated; + } + + int64_t getTotalMemory() const { + return g_region_num * REGION_SIZE; + } + + int64_t get_block_size() const { + return BLOCK_SIZE; + } + + uint32_t get_lkey(const void* buf); + + private: + Region* GetRegion(const void* buf); + void extendRegion(); +}; + +class GPUStreamPool { +public: + explicit GPUStreamPool(int gpu_id); + + ~GPUStreamPool(); + + GPUStreamPool(const GPUStreamPool&) = delete; + GPUStreamPool& operator=(const GPUStreamPool&) = delete; + + void fast_d2h(std::vector& src_list, std::vector& length_list, void* dst); + + void fast_d2d(std::vector& src_list, std::vector& length_list, void* dst); + + static constexpr int kMaxConcurrent = 32; +private: + int gpu_id_ {-1}; + std::atomic d2h_cnt_ {0}; + std::atomic d2d_cnt_ {0}; + std::mutex d2h_locks_[kMaxConcurrent]; + std::mutex d2d_locks_[kMaxConcurrent]; + std::mutex d2h_lb_lock_; + std::mutex d2d_lb_lock_; + std::vector d2h_streams_; + std::vector d2d_streams_; +}; + +class BlockPoolAllocators { +public: + static BlockPoolAllocators* singleton(); + BlockPoolAllocators() {} + virtual ~BlockPoolAllocators() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + } + + void init(int gpu_id, ibv_pd* pd) { + LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id; + size_t region_size = 512LL * 1024 * 1024; + size_t block_size = gdr_block_size_kb * 1024; + gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size, region_size); + + region_size = 32LL * 1024 * 1024; + block_size = 512; + cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size, region_size); + + gpu_stream_pool = new GPUStreamPool(gpu_id); + } + + BlockPoolAllocator* get_gpu_allocator() { + return gpu_mem_alloc; + } + + BlockPoolAllocator* get_cpu_allocator() { + return cpu_mem_alloc; + } + + GPUStreamPool* get_gpu_stream_pool() { + return gpu_stream_pool; + } + +public: + static BlockPoolAllocators* instance_; + +private: + BlockPoolAllocator* gpu_mem_alloc {nullptr}; + BlockPoolAllocator* cpu_mem_alloc {nullptr}; + GPUStreamPool* gpu_stream_pool {nullptr}; +}; + +} +} + +#endif // BRPC_WITH_GDR + +#endif diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 26046e3745..b111f00454 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -40,6 +40,7 @@ #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" #include "butil/iobuf_profiler.h" +#include "butil/gpu/gpu_block_pool.h" namespace butil { namespace iobuf { @@ -722,6 +723,52 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { return saved_n; } +#if BRPC_WITH_GDR +size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + bool alloc_from_host_alloc = (n <= host_allocator->get_block_size()); + void* mem = NULL; + if (alloc_from_host_alloc) { + mem = host_allocator->AllocateRaw(n); + } else { + mem = malloc(n); + } + + if (mem == NULL) { + return -1; + } + struct timespec start, end; + clock_gettime(CLOCK_MONOTONIC, &start); + size_t saved_n = copy_from_gpu(mem, n, 0, false); + if (saved_n > 0) { + if (alloc_from_host_alloc) { + auto deleter = [host_allocator](void* data) { host_allocator->DeallocateRaw(data); }; + out->append_user_data(mem, saved_n, deleter); + } else { + auto deleter = [](void* data) { free(data); }; + out->append_user_data(mem, saved_n, deleter); + } + pop_front(saved_n); + } else { + if (alloc_from_host_alloc) { + host_allocator->DeallocateRaw(mem); + } else { + free(mem); + } + } + clock_gettime(CLOCK_MONOTONIC, &end); + double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3; + + // LOG(INFO) << "GDRCopy: " << saved_n << " bytes, " + // << time_us << " us"; + return saved_n; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::cutn(void* out, size_t n) { const size_t len = length(); if (n > len) { @@ -1155,6 +1202,15 @@ uint64_t IOBuf::get_first_data_meta() { return r.block->u.data_meta; } +void* IOBuf::get_first_data_ptr() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + return r.block->data; +} + + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1317,6 +1373,59 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { return n - m; } +#if BRPC_WITH_GDR +size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { + if (n == 0) { + return 0; + } + const size_t nref = _ref_num(); + // Skip `pos' bytes. `offset' is the starting position in starting BlockRef. + size_t offset = pos; + size_t i = 0; + for (; offset != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + if (offset < (size_t)r.length) { + break; + } + offset -= r.length; + } + + butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); + struct timespec start, end; + clock_gettime(CLOCK_MONOTONIC, &start); + size_t m = n; + std::vector src_list; + std::vector length_list; + for (; m != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + const size_t nc = std::min(m, (size_t)r.length - offset); + void* gpu_src = r.block->data + r.offset + offset; + // cudaMemcpy(d, gpu_src, nc, cudaMemcpyDeviceToDevice); + src_list.push_back(gpu_src); + length_list.push_back(nc); + //cuMemcpyDtoH(d, (CUdeviceptr)(r.block->data + r.offset + offset), nc); + // gdr_copy_from_mapping(allocator->mh(), d, allocator->ToCPUPtr(gpu_src), nc); + offset = 0; + // d = (char*)d + nc; + m -= nc; + } + if (to_gpu) { + gpu_stream_pool->fast_d2d(src_list, length_list, d); + } else { + gpu_stream_pool->fast_d2h(src_list, length_list, d); + } + clock_gettime(CLOCK_MONOTONIC, &end); + double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3; + size_t copied_bytes = n - m; + + // LOG(INFO) << "GDRCopy: " << copied_bytes << " bytes, " + // << time_us << " us" << ", to_gpu " << to_gpu; + //cuCtxSetCurrent(saved_context); + // If nref == 0, here returns 0 correctly + return n - m; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); if (len <= pos) { diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 239e82d950..e554dd0e40 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -141,6 +141,12 @@ friend class SingleIOBuf; size_t cutn(IOBuf* out, size_t n); size_t cutn(void* out, size_t n); size_t cutn(std::string* out, size_t n); + +#if BRPC_WITH_GDR + size_t cutn_from_gpu(IOBuf* out, size_t n); + size_t copy_from_gpu(void* d, size_t n, size_t pos = 0, bool to_gpu = false) const; +#endif // BRPC_WITH_GDR + // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. bool cut1(void* c); @@ -259,6 +265,7 @@ friend class SingleIOBuf; // The meta is specified with append_user_data_with_meta before. // 0 means the meta is invalid. uint64_t get_first_data_meta(); + void* get_first_data_ptr(); // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be @@ -775,4 +782,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) { #include "butil/iobuf_inl.h" -#endif // BUTIL_IOBUF_H \ No newline at end of file +#endif // BUTIL_IOBUF_H From 4811b060b1fcb1243eae69c19d7328e78b4b43f5 Mon Sep 17 00:00:00 2001 From: randomkang <75484924+randomkang@users.noreply.github.com> Date: Sun, 9 Nov 2025 20:17:11 +0800 Subject: [PATCH 2/7] Apply suggestions from code review Fix code style Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/brpc/rdma/rdma_endpoint.cpp | 6 +++--- src/brpc/rdma/rdma_helper.cpp | 2 +- src/butil/gpu/gpu_block_pool.cpp | 2 +- src/butil/gpu/gpu_block_pool.h | 1 - src/butil/iobuf.cpp | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index fc6ce20928..2a47a23a8a 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -892,7 +892,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // So we just consider this error as an unrecoverable error. LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << ", window_size:" << _window_size - << ", emote_recv_window: " << _remote_recv_window + << ", remote_recv_window: " << _remote_recv_window << ", window=" << window << ", sq_current=" << _sq_current; errno = err; @@ -942,7 +942,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) { // So we just consider this error as an unrecoverable error. LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << ", window_size:" << _window_size - << ", emote_recv_window: " << _remote_recv_window; + << ", remote_recv_window: " << _remote_recv_window; return -1; } return 0; @@ -997,7 +997,7 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { if (wc.imm_data > 0) { uint32_t acks = butil::NetToHost32(wc.imm_data); uint32_t wnd_thresh = _local_window_capacity / 8; - if(_remote_recv_window.fetch_add(acks, butil::memory_order_release) >= wnd_thresh || + if (_remote_recv_window.fetch_add(acks, butil::memory_order_release) >= wnd_thresh || acks >= wnd_thresh) { _socket->WakeAsEpollOut(); } diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 1b6ad85ae0..3b45b2621c 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -96,7 +96,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); -DEFINE_int32(gpu_index, 0, "The GPU device index to use.In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); +DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp index dac26f4b5c..336d4b6dca 100644 --- a/src/butil/gpu/gpu_block_pool.cpp +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -82,7 +82,7 @@ BlockPoolAllocators* BlockPoolAllocators::singleton() { static std::mutex mutex; if (instance_ == nullptr) { std::lock_guard l(mutex); - if(instance_ == nullptr) { + if (instance_ == nullptr) { instance_ = new BlockPoolAllocators(); std::atomic_thread_fence(std::memory_order_release); } diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h index 1d6a444a36..6106952c76 100644 --- a/src/butil/gpu/gpu_block_pool.h +++ b/src/butil/gpu/gpu_block_pool.h @@ -192,7 +192,6 @@ class BlockPoolAllocators { BlockPoolAllocator* cpu_mem_alloc {nullptr}; GPUStreamPool* gpu_stream_pool {nullptr}; }; - } } diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index b111f00454..79e992034e 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -739,7 +739,7 @@ size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { } if (mem == NULL) { - return -1; + return 0; } struct timespec start, end; clock_gettime(CLOCK_MONOTONIC, &start); From 31a30f7cd6ecd95e7e89295c2d9b88c08352a602 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 13 Nov 2025 23:45:12 +0800 Subject: [PATCH 3/7] clean unused code --- src/butil/iobuf.cpp | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 79e992034e..469e3775b3 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -741,8 +741,6 @@ size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { if (mem == NULL) { return 0; } - struct timespec start, end; - clock_gettime(CLOCK_MONOTONIC, &start); size_t saved_n = copy_from_gpu(mem, n, 0, false); if (saved_n > 0) { if (alloc_from_host_alloc) { @@ -760,11 +758,7 @@ size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { free(mem); } } - clock_gettime(CLOCK_MONOTONIC, &end); - double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3; - // LOG(INFO) << "GDRCopy: " << saved_n << " bytes, " - // << time_us << " us"; return saved_n; } #endif // BRPC_WITH_GDR @@ -1391,8 +1385,6 @@ size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { } butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); - struct timespec start, end; - clock_gettime(CLOCK_MONOTONIC, &start); size_t m = n; std::vector src_list; std::vector length_list; @@ -1400,13 +1392,9 @@ size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { IOBuf::BlockRef const& r = _ref_at(i); const size_t nc = std::min(m, (size_t)r.length - offset); void* gpu_src = r.block->data + r.offset + offset; - // cudaMemcpy(d, gpu_src, nc, cudaMemcpyDeviceToDevice); src_list.push_back(gpu_src); length_list.push_back(nc); - //cuMemcpyDtoH(d, (CUdeviceptr)(r.block->data + r.offset + offset), nc); - // gdr_copy_from_mapping(allocator->mh(), d, allocator->ToCPUPtr(gpu_src), nc); offset = 0; - // d = (char*)d + nc; m -= nc; } if (to_gpu) { @@ -1414,13 +1402,6 @@ size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { } else { gpu_stream_pool->fast_d2h(src_list, length_list, d); } - clock_gettime(CLOCK_MONOTONIC, &end); - double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3; - size_t copied_bytes = n - m; - - // LOG(INFO) << "GDRCopy: " << copied_bytes << " bytes, " - // << time_us << " us" << ", to_gpu " << to_gpu; - //cuCtxSetCurrent(saved_context); // If nref == 0, here returns 0 correctly return n - m; } @@ -2211,4 +2192,4 @@ bool IOBufBytesIterator::forward_one_block(const void** data, size_t* size) { void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) { return butil::iobuf::cp(dest, src, n); -} // namespace butil \ No newline at end of file +} // namespace butil From 88fc739ba11ac02e9cd85697c45e567f214e184a Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 13 Nov 2025 23:47:57 +0800 Subject: [PATCH 4/7] revert the fix of rdma window --- src/brpc/rdma/rdma_endpoint.cpp | 70 ++++++++++++--------------------- src/brpc/rdma/rdma_endpoint.h | 1 - 2 files changed, 26 insertions(+), 45 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 2a47a23a8a..6f451e841f 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -31,7 +31,6 @@ #include "brpc/rdma/block_pool.h" #include "brpc/rdma/rdma_helper.h" #include "brpc/rdma/rdma_endpoint.h" -#include "brpc/traceprintf.h" DECLARE_int32(task_group_ntags); @@ -50,8 +49,8 @@ extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_at extern int (*IbvDestroyQp)(ibv_qp*); extern bool g_skip_rdma_init; -DEFINE_int32(rdma_sq_size, 64, "SQ size for RDMA"); -DEFINE_int32(rdma_rq_size, 64, "RQ size for RDMA"); +DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA"); +DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA"); DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side"); DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy"); DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: " @@ -100,7 +99,6 @@ static const uint16_t MIN_QP_SIZE = 16; static const uint16_t MAX_QP_SIZE = 4096; static const uint16_t MIN_BLOCK_SIZE = 1024; static const uint32_t ACK_MSG_RDMA_OK = 0x1; -static const uint64_t FIXED_ACK_WR_ID = 1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; @@ -194,7 +192,6 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _remote_window_capacity(0) , _window_size(0) , _new_rq_wrs(0) - , _remote_recv_window(0) { if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; @@ -212,7 +209,6 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) } RdmaEndpoint::~RdmaEndpoint() { - // LOG(INFO) << _window_size << " " << _remote_recv_window << " " << _sq_unsignaled; Reset(); bthread::butex_destroy(_read_butex); } @@ -236,7 +232,6 @@ void RdmaEndpoint::Reset() { _new_rq_wrs = 0; _sq_sent = 0; _rq_received = 0; - _remote_recv_window.store(0, butil::memory_order_relaxed); } void RdmaConnect::StartConnect(const Socket* socket, @@ -520,7 +515,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); - ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed); + ep->_state = C_BRINGUP_QP; if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); @@ -628,7 +623,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); - ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed); + ep->_state = S_ALLOC_QPCQ; if (ep->AllocateResources() < 0) { LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:" @@ -722,7 +717,7 @@ bool RdmaEndpoint::IsWritable() const { return false; } - return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0; + return _window_size.load(butil::memory_order_relaxed) > 0; } // RdmaIOBuf inherits from IOBuf to provide a new function. @@ -793,14 +788,12 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { size_t total_len = 0; size_t current = 0; uint32_t window = 0; - uint32_t recv_window = 0; ibv_send_wr wr; int max_sge = GetRdmaMaxSge(); ibv_sge sglist[max_sge]; while (current < ndata) { - window = _window_size.load(butil::memory_order_acquire); - recv_window = _remote_recv_window.load(butil::memory_order_acquire); - if (window == 0 || recv_window == 0) { + window = _window_size.load(butil::memory_order_relaxed); + if (window == 0) { if (total_len > 0) { break; } else { @@ -891,8 +884,6 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) - << ", window_size:" << _window_size - << ", remote_recv_window: " << _remote_recv_window << ", window=" << window << ", sq_current=" << _sq_current; errno = err; @@ -908,8 +899,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // Because there is at most one thread can enter this function for each // Socket, and the other thread of HandleCompletion can only add this // counter. - _window_size.fetch_sub(1, butil::memory_order_release); - _remote_recv_window.fetch_sub(1, butil::memory_order_release); + _window_size.fetch_sub(1, butil::memory_order_relaxed); } return total_len; @@ -933,16 +923,13 @@ int RdmaEndpoint::SendImm(uint32_t imm) { wr.imm_data = butil::HostToNet32(imm); wr.send_flags |= IBV_SEND_SOLICITED; wr.send_flags |= IBV_SEND_SIGNALED; - wr.wr_id = FIXED_ACK_WR_ID; ibv_send_wr* bad = NULL; int err = ibv_post_send(_resource->qp, &wr, &bad); if (err != 0) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. - LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) - << ", window_size:" << _window_size - << ", remote_recv_window: " << _remote_recv_window; + LOG(WARNING) << "Fail to ibv_post_send: " << berror(err); return -1; } return 0; @@ -950,28 +937,9 @@ int RdmaEndpoint::SendImm(uint32_t imm) { ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { bool zerocopy = FLAGS_rdma_recv_zerocopy; - //LOG(INFO) << "Handle Completion: " << wc.opcode; switch (wc.opcode) { case IBV_WC_SEND: { // send completion - if (wc.wr_id == 0) { - uint16_t wnd_to_update = _local_window_capacity / 4; - uint32_t num = wnd_to_update; - while(num > 0) { - _sbuf[_sq_sent++].clear(); - if (_sq_sent == _sq_size - RESERVED_WR_NUM) { - _sq_sent = 0; - } - --num; - } - butil::subtle::MemoryBarrier(); - uint32_t wnd_thresh = _local_window_capacity / 8; - _window_size.fetch_add(wnd_to_update, butil::memory_order_release); - //if ((_remote_recv_window.load(butil::memory_order_relaxed) >= wnd_thresh)) { - // Do not wake up writing thread right after _window_size > 0. - // Otherwise the writing thread may switch to background too quickly. - _socket->WakeAsEpollOut(); - //} - } + // Do nothing break; } case IBV_WC_RECV: { // recv completion @@ -995,10 +963,24 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } } if (wc.imm_data > 0) { + // Clear sbuf here because we ignore event wakeup for send completions uint32_t acks = butil::NetToHost32(wc.imm_data); + uint32_t num = acks; + while (num > 0) { + _sbuf[_sq_sent++].clear(); + if (_sq_sent == _sq_size - RESERVED_WR_NUM) { + _sq_sent = 0; + } + --num; + } + butil::subtle::MemoryBarrier(); + + // Update window uint32_t wnd_thresh = _local_window_capacity / 8; - if (_remote_recv_window.fetch_add(acks, butil::memory_order_release) >= wnd_thresh || - acks >= wnd_thresh) { + if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh + || acks >= wnd_thresh) { + // Do not wake up writing thread right after _window_size > 0. + // Otherwise the writing thread may switch to background too quickly. _socket->WakeAsEpollOut(); } } diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 6f0c917e9e..4705e362ea 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -263,7 +263,6 @@ friend class brpc::Socket; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; - butil::atomic _remote_recv_window; // butex for inform read events on TCP fd during handshake butil::atomic *_read_butex; From cec0bf8130a8c47560dce961bda10e98d3b01720 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Fri, 14 Nov 2025 23:11:46 +0800 Subject: [PATCH 5/7] fix align --- src/butil/gpu/gpu_block_pool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp index 336d4b6dca..b768e408e8 100644 --- a/src/butil/gpu/gpu_block_pool.cpp +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -298,7 +298,7 @@ void BlockPoolAllocator::extendRegion() { int64_t aligned_bytes = REGION_SIZE; if (ptr != aligned_ptr) { uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; - uintptr_t aligned_end_ptr = (region_end + alignment - 1) & ~(alignment - 1); + uintptr_t aligned_end_ptr = region_end & ~(alignment - 1); aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes << ", region_size: " << REGION_SIZE; From b856888fdc01491df4b92d87714c572bbf3e94cb Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Fri, 14 Nov 2025 23:24:17 +0800 Subject: [PATCH 6/7] reorg code --- src/brpc/policy/baidu_rpc_protocol.cpp | 97 +++++++++++--------------- 1 file changed, 42 insertions(+), 55 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 2e37bae3a3..bbc678c921 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -109,9 +109,12 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, char header_buf[12]; size_t n = 0; + uint32_t body_size; + uint32_t meta_size; + ParseError pe = PARSE_OK; + #if BRPC_WITH_GDR void* prefetch_d2h_data = NULL; - uint64_t data_meta = source->get_first_data_meta(); bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); @@ -129,74 +132,58 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, #else n = source->copy_to(header_buf, sizeof(header_buf)); #endif // BRPC_WITH_GDR - if (n >= 4) { - void* dummy = header_buf; - if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { -#if BRPC_WITH_GDR - if (is_gpu_memory) { - host_allocator->DeallocateRaw(prefetch_d2h_data); + + do { + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { + pe = PARSE_ERROR_TRY_OTHERS; + break; } -#endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - } else { - if (memcmp(header_buf, "PRPC", n) != 0) { -#if BRPC_WITH_GDR - if (is_gpu_memory) { - host_allocator->DeallocateRaw(prefetch_d2h_data); + } else { + if (memcmp(header_buf, "PRPC", n) != 0) { + pe = PARSE_ERROR_TRY_OTHERS; + break; } -#endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_TRY_OTHERS); } - } - if (n < sizeof(header_buf)) { -#if BRPC_WITH_GDR - if (is_gpu_memory) { - host_allocator->DeallocateRaw(prefetch_d2h_data); + if (n < sizeof(header_buf)) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; } -#endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); - } - uint32_t body_size; - uint32_t meta_size; - butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); - if (body_size > 128 * 1024 * 1024) { - LOG(ERROR) << "body_size=" << body_size << " from " - << socket->remote_side() << " is too large"; - } - if (body_size > FLAGS_max_body_size) { - // We need this log to report the body_size to give users some clues - // which is not printed in InputMessenger. - LOG(ERROR) << "body_size=" << body_size << " from " - << socket->remote_side() << " is too large"; -#if BRPC_WITH_GDR - if (is_gpu_memory) { - host_allocator->DeallocateRaw(prefetch_d2h_data); + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + pe = PARSE_ERROR_TOO_BIG_DATA; + break; + } else if (source->length() < sizeof(header_buf) + body_size) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; } -#endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); - } else if (source->length() < sizeof(header_buf) + body_size) { -#if BRPC_WITH_GDR - if (is_gpu_memory) { - host_allocator->DeallocateRaw(prefetch_d2h_data); + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + pe = PARSE_ERROR_TRY_OTHERS; + break; } -#endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); - } - if (meta_size > body_size) { - LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" - << body_size; - // Pop the message - source->pop_front(sizeof(header_buf) + body_size); + } while (0); + + if (pe != PARSE_OK) { #if BRPC_WITH_GDR if (is_gpu_memory) { host_allocator->DeallocateRaw(prefetch_d2h_data); } #endif // BRPC_WITH_GDR - return MakeParseError(PARSE_ERROR_TRY_OTHERS); + return MakeParseError(pe); } + source->pop_front(sizeof(header_buf)); MostCommonMessage* msg = MostCommonMessage::Get(); + #if BRPC_WITH_GDR if (is_gpu_memory) { if (header_size + meta_size <= n) { From 1fb9370dfad21d3a3f9ebaa3695ff0338aa3426e Mon Sep 17 00:00:00 2001 From: sunce4t Date: Wed, 26 Nov 2025 10:17:33 +0800 Subject: [PATCH 7/7] Change GPU memory detection logics in baidu_rpc_protocol --- src/brpc/policy/baidu_rpc_protocol.cpp | 12 ++++++------ src/brpc/rdma/rdma_endpoint.cpp | 3 ++- src/butil/iobuf.cpp | 16 +++++++++++++++- src/butil/iobuf.h | 10 ++++++++++ 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index bbc678c921..7a2c079c24 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -115,8 +115,8 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, #if BRPC_WITH_GDR void* prefetch_d2h_data = NULL; - uint64_t data_meta = source->get_first_data_meta(); - bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); + uint32_t data_meta = source->get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); if (is_gpu_memory) { prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); @@ -859,8 +859,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { int body_without_attachment_size = req_size - meta.attachment_size(); #if BRPC_WITH_GDR int meta_size = msg->meta.size(); - uint64_t data_meta = msg->payload.get_first_data_meta(); - bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); if(is_gpu_memory) { int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { @@ -1054,8 +1054,8 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { butil::IOBuf* res_buf_ptr = &msg->payload; #if BRPC_WITH_GDR - uint64_t data_meta = msg->payload.get_first_data_meta(); - bool is_gpu_memory = (data_meta > 0 && data_meta <= UINT_MAX); + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); #endif // BRPC_WITH_GDR if (meta.has_attachment_size()) { if (meta.attachment_size() > res_size) { diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 6f451e841f..73bf974330 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -1056,7 +1056,8 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { void* device_ptr = device_allocator->AllocateRaw(g_rdma_recv_block_size); auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; lkey = device_allocator->get_lkey(device_ptr); - _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_rdma_recv_block_size, deleter , lkey); + uint64_t data_meta = (static_cast(butil::IOBuf::GPU_MEMORY) << 32) | lkey; + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_rdma_recv_block_size, deleter , data_meta); _rbuf_data[_rq_received] = device_ptr; #else butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 469e3775b3..ce3c0cc0bb 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -1193,7 +1193,21 @@ uint64_t IOBuf::get_first_data_meta() { if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { return 0; } - return r.block->u.data_meta; + return (r.block->u.data_meta & 0x00000000FFFFFFFF); +} + +// only when user use append_user_data_with_meta(), lkey is stored in data_meta +// We add this function for GDR, we want to know whether the data is in Host memory or GPU memory +// since lkey is uint32_t type, thus we use the high 32 bit to store +uint32_t IOBuf::get_first_data_meta_high32() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { + return 0; + } + return (uint32_t)(r.block->u.data_meta >> 32); } void* IOBuf::get_first_data_ptr() { diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index e554dd0e40..14077f0c29 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -70,6 +70,11 @@ friend class SingleIOBuf; static const size_t DEFAULT_BLOCK_SIZE = 8192; static const size_t INITIAL_CAP = 32; // must be power of 2 + enum MemoryMeta { + HOST_MEMORY = 0, + GPU_MEMORY = 1 + }; + struct Block; // can't directly use `struct iovec' here because we also need to access the @@ -265,6 +270,11 @@ friend class SingleIOBuf; // The meta is specified with append_user_data_with_meta before. // 0 means the meta is invalid. uint64_t get_first_data_meta(); + + // Get the high 32 bits of the data meta of the first byte in this IOBuf. + // The meta is specified with append_user_data_with_meta before. + // we use 0 to specify host memory, 1 to specify GPU memory + uint32_t get_first_data_meta_high32(); void* get_first_data_ptr(); // Resizes the buf to a length of n characters.