From a0529be7ea578b0ddf0ba38f371d1a1aebea55de Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Tue, 4 Nov 2025 15:23:51 +0800 Subject: [PATCH 1/2] add pending signaled wrs checking --- src/brpc/rdma/rdma_endpoint.cpp | 38 +++++++++++++++++++++++++-------- src/brpc/rdma/rdma_endpoint.h | 4 ++++ 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 1d502a98f7..99864d6245 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -71,6 +71,8 @@ static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent // This is the number of reserved WRs in SQ/RQ for pure ACK. static const size_t RESERVED_WR_NUM = 3; +static const uint64_t FIXED_ACK_WR_ID = 1; + // magic string RDMA (4B) // message length (2B) // hello version (2B) @@ -191,6 +193,8 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _remote_window_capacity(0) , _window_size(0) , _new_rq_wrs(0) + , _pending_signaled_wrs(0) + , _pending_acks(0) { if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; @@ -869,12 +873,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { } // Avoid too much send completion event to reduce the CPU overhead + bool signaled = false; ++_sq_unsignaled; if (_sq_unsignaled >= _local_window_capacity / 4) { // Refer to: // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/ wr.send_flags |= IBV_SEND_SIGNALED; _sq_unsignaled = 0; + signaled = true; } ibv_send_wr* bad = NULL; @@ -889,6 +895,10 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { return -1; } + if (signaled) { + _pending_signaled_wrs.fetch_add(1, butil::memory_order_relaxed); + } + ++_sq_current; if (_sq_current == _sq_size - RESERVED_WR_NUM) { _sq_current = 0; @@ -918,6 +928,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) { ibv_send_wr wr; memset(&wr, 0, sizeof(wr)); + wr.wr_id = FIXED_ACK_WR_ID; wr.opcode = IBV_WR_SEND_WITH_IMM; wr.imm_data = butil::HostToNet32(imm); wr.send_flags |= IBV_SEND_SOLICITED; @@ -936,9 +947,13 @@ int RdmaEndpoint::SendImm(uint32_t imm) { ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { bool zerocopy = FLAGS_rdma_recv_zerocopy; + uint16_t pending_signaled_wrs = _pending_signaled_wrs.load(butil::memory_order_relaxed); switch (wc.opcode) { case IBV_WC_SEND: { // send completion - // Do nothing + if (wc.wr_id == 0) { + pending_signaled_wrs = + _pending_signaled_wrs.fetch_sub(1, butil::memory_order_relaxed) - 1; + } break; } case IBV_WC_RECV: { // recv completion @@ -970,14 +985,8 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } butil::subtle::MemoryBarrier(); - // Update window - uint32_t wnd_thresh = _local_window_capacity / 8; - if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh - || acks >= wnd_thresh) { - // Do not wake up writing thread right after _window_size > 0. - // Otherwise the writing thread may switch to background too quickly. - _socket->WakeAsEpollOut(); - } + // Update acks + _pending_acks += acks; } // We must re-post recv WR if (PostRecv(1, zerocopy) < 0) { @@ -995,6 +1004,17 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { << wc.opcode; return -1; } + if (pending_signaled_wrs <= 0) { + const uint32_t wnd_thresh = _local_window_capacity / 8; + auto acks = _window_size.fetch_add(_pending_acks, butil::memory_order_relaxed); + // Do not wake up writing thread right after _window_size > 0. + // Otherwise the writing thread may switch to background too + // quickly. + if (acks >= wnd_thresh || _pending_acks >= wnd_thresh) { + _socket->WakeAsEpollOut(); + } + _pending_acks = 0; + } return 0; } diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index de7cd5f6d8..df5189b64f 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -261,6 +261,10 @@ friend class brpc::Socket; butil::atomic _window_size; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; + // The number of pending signaled send WRs + butil::atomic _pending_signaled_wrs; + // The number of pending acks + uint16_t _pending_acks; // butex for inform read events on TCP fd during handshake butil::atomic *_read_butex; From ab7debe0a00485616d4ac1d764e611fbe56cad7d Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Fri, 7 Nov 2025 16:48:19 +0800 Subject: [PATCH 2/2] ResetNonPods clear checksum value --- src/brpc/controller.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d4dbab951b..2d5c68efc4 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -232,6 +232,7 @@ void Controller::ResetNonPods() { } delete _remote_stream_settings; _thrift_method_name.clear(); + _checksum_value.clear(); _after_rpc_resp_fn = nullptr; CHECK(_unfinished_call == NULL);