From 527992972524f93709811821fd754105904ee941 Mon Sep 17 00:00:00 2001 From: chenBright Date: Sun, 9 Nov 2025 13:45:46 +0800 Subject: [PATCH 1/7] Bugfix: The failure of ibv_post_send is caused by polling send CQE before recv CQE --- src/brpc/rdma/rdma_endpoint.cpp | 85 ++++++++++++++++++++++----------- src/brpc/rdma/rdma_endpoint.h | 4 +- 2 files changed, 61 insertions(+), 28 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 5176756510..5e9e95b7d6 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -102,6 +102,11 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; +enum SendType { + SEND_TYPE_RDMA_DATA = 0, + SEND_TYPE_RDMA_IMM, +}; + struct HelloMessage { void Serialize(void* data) const; void Deserialize(void* data); @@ -189,7 +194,8 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _rq_received(0) , _local_window_capacity(0) , _remote_window_capacity(0) - , _window_size(0) + , _remote_rq_window_size(0) + , _sq_window_size(0) , _new_rq_wrs(0) { if (_sq_size < MIN_QP_SIZE) { @@ -227,7 +233,8 @@ void RdmaEndpoint::Reset() { _sq_unsignaled = 0; _local_window_capacity = 0; _remote_window_capacity = 0; - _window_size.store(0, butil::memory_order_relaxed); + _remote_rq_window_size.store(0, butil::memory_order_relaxed); + _sq_window_size.store(0, butil::memory_order_relaxed); _new_rq_wrs = 0; _sq_sent = 0; _rq_received = 0; @@ -517,7 +524,10 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM; ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, - ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); + ep->_remote_rq_window_size.store( + ep->_local_window_capacity, butil::memory_order_relaxed); + ep->_sq_window_size.store( + ep->_local_window_capacity, butil::memory_order_relaxed); ep->_state = C_BRINGUP_QP; if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { @@ -548,11 +558,11 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { if (s->_rdma_state == Socket::RDMA_ON) { ep->_state = ESTABLISHED; LOG_IF(INFO, FLAGS_rdma_trace_verbose) - << "Handshake ends (use rdma) on " << s->description(); + << "Client handshake ends (use rdma) on " << s->description(); } else { ep->_state = FALLBACK_TCP; LOG_IF(INFO, FLAGS_rdma_trace_verbose) - << "Handshake ends (use tcp) on " << s->description(); + << "Client handshake ends (use tcp) on " << s->description(); } errno = 0; @@ -625,7 +635,10 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM; ep->_remote_window_capacity = std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, - ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed); + ep->_remote_rq_window_size.store( + ep->_local_window_capacity, butil::memory_order_relaxed); + ep->_sq_window_size.store( + ep->_local_window_capacity, butil::memory_order_relaxed); ep->_state = S_ALLOC_QPCQ; if (ep->AllocateResources() < 0) { @@ -701,13 +714,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { s->_rdma_state = Socket::RDMA_ON; ep->_state = ESTABLISHED; LOG_IF(INFO, FLAGS_rdma_trace_verbose) - << "Handshake ends (use rdma) on " << s->description(); + << "Server handshake ends (use rdma) on " << s->description(); } } else { s->_rdma_state = Socket::RDMA_OFF; ep->_state = FALLBACK_TCP; LOG_IF(INFO, FLAGS_rdma_trace_verbose) - << "Handshake ends (use tcp) on " << s->description(); + << "Server handshake ends (use tcp) on " << s->description(); } ep->TryReadOnTcp(); @@ -720,7 +733,8 @@ bool RdmaEndpoint::IsWritable() const { return false; } - return _window_size.load(butil::memory_order_relaxed) > 0; + return _remote_rq_window_size.load(butil::memory_order_relaxed) > 0 && + _sq_window_size.load(butil::memory_order_relaxed) > 0; } // RdmaIOBuf inherits from IOBuf to provide a new function. @@ -790,13 +804,16 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { size_t total_len = 0; size_t current = 0; - uint32_t window = 0; + uint32_t remote_rq_window_size = + _remote_rq_window_size.load(butil::memory_order_relaxed); + uint32_t sq_window_size = + _sq_window_size.load(butil::memory_order_relaxed); ibv_send_wr wr; int max_sge = GetRdmaMaxSge(); ibv_sge sglist[max_sge]; while (current < ndata) { - window = _window_size.load(butil::memory_order_relaxed); - if (window == 0) { + if (remote_rq_window_size == 0 || sq_window_size == 0) { + // There is no space left in SQ or remote RQ. if (total_len > 0) { break; } else { @@ -815,7 +832,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { size_t sge_index = 0; while (sge_index < (uint32_t)max_sge && this_len < _remote_recv_block_size) { - if (data->size() == 0) { + if (data->empty()) { // The current IOBuf is empty, find next one ++current; if (current == ndata) { @@ -845,7 +862,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { wr.imm_data = butil::HostToNet32(imm); // Avoid too much recv completion event to reduce the cpu overhead bool solicited = false; - if (window == 1 || current + 1 >= ndata) { + if (remote_rq_window_size == 1 || sq_window_size == 1 || current + 1 >= ndata) { // Only last message in the write queue or last message in the // current window will be flagged as solicited. solicited = true; @@ -878,6 +895,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // Refer to: // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/ wr.send_flags |= IBV_SEND_SIGNALED; + wr.wr_id = SEND_TYPE_RDMA_DATA; _sq_unsignaled = 0; } @@ -887,7 +905,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) - << ", window=" << window + << ", remote_rq_window_size=" << remote_rq_window_size + << ", sq_window_size=" << sq_window_size << ", sq_current=" << _sq_current; errno = err; return -1; @@ -898,11 +917,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { _sq_current = 0; } - // Update _window_size. Note that _window_size will never be negative. + // Update `_remote_rq_window_size' and `_sq_window_size'. Note that + // `_remote_rq_window_size' and `_sq_window_size' will never be negative. // Because there is at most one thread can enter this function for each - // Socket, and the other thread of HandleCompletion can only add this - // counter. - _window_size.fetch_sub(1, butil::memory_order_relaxed); + // Socket, and the other thread of HandleCompletion can only add these + // counters. + remote_rq_window_size = + _remote_rq_window_size.fetch_sub(1, butil::memory_order_relaxed) - 1; + sq_window_size = _sq_window_size.fetch_sub(1, butil::memory_order_relaxed) - 1; } return total_len; @@ -926,6 +948,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) { wr.imm_data = butil::HostToNet32(imm); wr.send_flags |= IBV_SEND_SOLICITED; wr.send_flags |= IBV_SEND_SIGNALED; + wr.wr_id = SEND_TYPE_RDMA_IMM; ibv_send_wr* bad = NULL; int err = ibv_post_send(_resource->qp, &wr, &bad); @@ -942,8 +965,16 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { bool zerocopy = FLAGS_rdma_recv_zerocopy; switch (wc.opcode) { case IBV_WC_SEND: { // send completion - // Do nothing - break; + if (SEND_TYPE_RDMA_IMM == wc.wr_id) { + // Do nothing for imm. + return 0; + } + // Update window + uint16_t wnd_to_update = _local_window_capacity / 4; + _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed); + // Wake up writing thread right after every signaled sending cqe. + _socket->WakeAsEpollOut(); + return 0; } case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid @@ -953,9 +984,7 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { } CHECK(_state != FALLBACK_TCP); if (zerocopy) { - butil::IOBuf tmp; - _rbuf[_rq_received].cutn(&tmp, wc.byte_len); - _socket->_read_buf.append(tmp); + _rbuf[_rq_received].cutn(&_socket->_read_buf, wc.byte_len); } else { // Copy data when the receive data is really small _socket->_read_buf.append(_rbuf_data[_rq_received], wc.byte_len); @@ -976,9 +1005,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { // Update window uint32_t wnd_thresh = _local_window_capacity / 8; - if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh + if (_remote_rq_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh || acks >= wnd_thresh) { - // Do not wake up writing thread right after _window_size > 0. + // Do not wake up writing thread right after _remote_rq_window_size > 0. // Otherwise the writing thread may switch to background too quickly. _socket->WakeAsEpollOut(); } @@ -1494,7 +1523,9 @@ std::string RdmaEndpoint::GetStateStr() const { void RdmaEndpoint::DebugInfo(std::ostream& os) const { os << "\nrdma_state=ON" << "\nhandshake_state=" << GetStateStr() - << "\nrdma_window_size=" << _window_size.load(butil::memory_order_relaxed) + << "\nrdma_remote_rq_window_size=" + << _remote_rq_window_size.load(butil::memory_order_relaxed) + << "\nrdma_sq_window_size=" << _sq_window_size.load(butil::memory_order_relaxed) << "\nrdma_local_window_capacity=" << _local_window_capacity << "\nrdma_remote_window_capacity=" << _remote_window_capacity << "\nrdma_sbuf_head=" << _sq_current diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index de7cd5f6d8..eed5fd3fee 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -257,8 +257,10 @@ friend class brpc::Socket; uint16_t _local_window_capacity; // The capacity of remote window: min(local RQ, remote SQ) uint16_t _remote_window_capacity; + // The number of WRs we can send to remote side. + butil::atomic _remote_rq_window_size; // The number of WRs we can post to the local Send Queue - butil::atomic _window_size; + butil::atomic _sq_window_size; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; From c1cf5de16db29006fe0c4128438d730913955118 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 17 Nov 2025 00:02:07 +0800 Subject: [PATCH 2/7] Split send and recv comp channel --- src/brpc/rdma/rdma_endpoint.cpp | 364 +++++++++++++++++++------------- src/brpc/rdma/rdma_endpoint.h | 55 +++-- src/brpc/socket.cpp | 25 ++- 3 files changed, 263 insertions(+), 181 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 5e9e95b7d6..75941f575e 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -152,24 +152,24 @@ void HelloMessage::Deserialize(void* data) { qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16)); } -RdmaResource::RdmaResource() - : qp(NULL) - , cq(NULL) - , comp_channel(NULL) - , next(NULL) { } - RdmaResource::~RdmaResource() { - if (qp) { + if (NULL != qp) { IbvDestroyQp(qp); - qp = NULL; } - if (cq) { - IbvDestroyCq(cq); - cq = NULL; + if (NULL != polling_cq) { + IbvDestroyCq(polling_cq); + } + if (NULL != send_cq) { + IbvDestroyCq(send_cq); + } + if (NULL != send_comp_channel) { + IbvDestroyCompChannel(send_comp_channel); } - if (comp_channel) { - IbvDestroyCompChannel(comp_channel); - comp_channel = NULL; + if (NULL != recv_cq) { + IbvDestroyCq(recv_cq); + } + if (NULL != recv_comp_channel) { + IbvDestroyCompChannel(recv_comp_channel); } } @@ -177,13 +177,13 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) : _socket(s) , _state(UNINIT) , _resource(NULL) - , _cq_events(0) - , _cq_sid(INVALID_SOCKET_ID) + , _send_cq_events(0) + , _recv_cq_events(0) + , _send_cq_sid(INVALID_SOCKET_ID) + , _recv_cq_sid(INVALID_SOCKET_ID) + , _polling_cq_sid(INVALID_SOCKET_ID) , _sq_size(FLAGS_rdma_sq_size) , _rq_size(FLAGS_rdma_rq_size) - , _sbuf() - , _rbuf() - , _rbuf_data() , _remote_recv_block_size(0) , _accumulated_ack(0) , _unsolicited(0) @@ -221,23 +221,29 @@ RdmaEndpoint::~RdmaEndpoint() { void RdmaEndpoint::Reset() { DeallocateResources(); - _cq_events = 0; - _cq_sid = INVALID_SOCKET_ID; _state = UNINIT; + _resource = NULL; + _send_cq_events = 0; + _recv_cq_events = 0; + _send_cq_sid = INVALID_SOCKET_ID; + _recv_cq_sid = INVALID_SOCKET_ID; + _polling_cq_sid = INVALID_SOCKET_ID; _sbuf.clear(); _rbuf.clear(); _rbuf_data.clear(); + _remote_recv_block_size = 0; _accumulated_ack = 0; _unsolicited = 0; + _unsolicited_bytes = 0; _sq_current = 0; _sq_unsignaled = 0; + _sq_sent = 0; + _rq_received = 0; _local_window_capacity = 0; _remote_window_capacity = 0; _remote_rq_window_size.store(0, butil::memory_order_relaxed); _sq_window_size.store(0, butil::memory_order_relaxed); _new_rq_wrs = 0; - _sq_sent = 0; - _rq_received = 0; } void RdmaConnect::StartConnect(const Socket* socket, @@ -262,6 +268,7 @@ void RdmaConnect::StartConnect(const Socket* socket, if (bthread_start_background(&tid, &attr, RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) { LOG(FATAL) << "Fail to start handshake bthread"; + Run(); } else { s.release(); } @@ -1079,71 +1086,100 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { return 0; } -static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { - RdmaResource* res = new (std::nothrow) RdmaResource; - if (!res) { - return NULL; +static bool AllocateCq(ibv_comp_channel*& comp_channel, ibv_cq*& cq) { + comp_channel = IbvCreateCompChannel(GetRdmaContext()); + if (NULL == comp_channel) { + PLOG(WARNING) << "Fail to create comp channel for CQ"; + return false; + } + + if (butil::make_close_on_exec(comp_channel->fd) < 0) { + PLOG(WARNING) << "Fail to set comp channel close-on-exec"; + return false; + } + if (butil::make_non_blocking(comp_channel->fd) < 0) { + PLOG(WARNING) << "Fail to set comp channel nonblocking"; + return false; } + cq = IbvCreateCq(GetRdmaContext(), FLAGS_rdma_prepared_qp_size, + NULL, comp_channel, GetRdmaCompVector()); + if (NULL == cq) { + PLOG(WARNING) << "Fail to create CQ"; + return false; + } + + return true; +} + +static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size, uint32_t rq_size) { + ibv_qp_init_attr attr; + memset(&attr, 0, sizeof(attr)); + attr.send_cq = send_cq; + attr.recv_cq = recv_cq; + attr.cap.max_send_wr = sq_size; + attr.cap.max_recv_wr = rq_size; + attr.cap.max_send_sge = GetRdmaMaxSge(); + attr.cap.max_recv_sge = 1; + attr.qp_type = IBV_QPT_RC; + return IbvCreateQp(GetRdmaPd(), &attr); +} + +static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { + std::unique_ptr resource(new RdmaResource); if (!FLAGS_rdma_use_polling) { - res->comp_channel = IbvCreateCompChannel(GetRdmaContext()); - if (!res->comp_channel) { - PLOG(WARNING) << "Fail to create comp channel for CQ"; - delete res; + if (!AllocateCq(resource->send_comp_channel, resource->send_cq)) { + PLOG(WARNING) << "Fail to create send CQ"; return NULL; } - butil::make_close_on_exec(res->comp_channel->fd); - if (butil::make_non_blocking(res->comp_channel->fd) < 0) { - PLOG(WARNING) << "Fail to set comp channel nonblocking"; - delete res; + if (!AllocateCq(resource->recv_comp_channel, resource->recv_cq)) { + PLOG(WARNING) << "Fail to create recv CQ"; return NULL; } - res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, - NULL, res->comp_channel, GetRdmaCompVector()); - if (!res->cq) { - PLOG(WARNING) << "Fail to create CQ"; - delete res; + resource->qp = AllocateQp(resource->send_cq, resource->recv_cq, sq_size, rq_size); + if (NULL == resource->qp) { + PLOG(WARNING) << "Fail to create QP"; return NULL; } } else { - res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, - NULL, NULL, 0); - if (!res->cq) { - PLOG(WARNING) << "Fail to create CQ"; - delete res; + resource->polling_cq = + IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, NULL, NULL, 0); + if (NULL == resource->polling_cq) { + PLOG(WARNING) << "Fail to create polling CQ"; + return NULL; + } + resource->qp = AllocateQp(resource->polling_cq, + resource->polling_cq, + sq_size, rq_size); + if (NULL == resource->qp) { + PLOG(WARNING) << "Fail to create QP"; return NULL; } } - ibv_qp_init_attr attr; - memset(&attr, 0, sizeof(attr)); - attr.send_cq = res->cq; - attr.recv_cq = res->cq; - // NOTE: Since we hope to reduce send completion events, we set signaled - // send_wr every 1/4 of the total wnd. The wnd will increase when the ack - // is received, which means the receive side has already received the data - // in the corresponding send_wr. However, the ack does not mean the send_wr - // has been removed from SQ if it is set unsignaled. The reason is that - // the unsignaled send_wr is removed from SQ only after the CQE of next - // signaled send_wr is polled. Thus in a rare case, a new send_wr cannot be - // posted to SQ even in the wnd is not empty. In order to solve this - // problem, we enlarge the size of SQ to contain redundant 1/4 of the wnd, - // which is the maximum number of unsignaled send_wrs. - attr.cap.max_send_wr = sq_size * 5 / 4; /*NOTE*/ - attr.cap.max_recv_wr = rq_size; - attr.cap.max_send_sge = GetRdmaMaxSge(); - attr.cap.max_recv_sge = 1; - attr.qp_type = IBV_QPT_RC; - res->qp = IbvCreateQp(GetRdmaPd(), &attr); - if (!res->qp) { - PLOG(WARNING) << "Fail to create QP"; - delete res; - return NULL; + return resource.release(); +} + +SocketId RdmaEndpoint::CreateSocket(int fd, ibv_cq* cq, int solicited_only) { + SocketId socket_id = INVALID_SOCKET_ID; + int err = ibv_req_notify_cq(cq, solicited_only); + if (err != 0) { + LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err); + return socket_id; + } + + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + options.fd = fd; + options.on_edge_triggered_events = PollCq; + if (Socket::Create(options, &socket_id) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; } - return res; + return socket_id; } int RdmaEndpoint::AllocateResources() { @@ -1172,26 +1208,22 @@ int RdmaEndpoint::AllocateResources() { } if (!FLAGS_rdma_use_polling) { - SocketOptions options; - options.user = this; - options.keytable_pool = _socket->_keytable_pool; - options.fd = _resource->comp_channel->fd; - options.on_edge_triggered_events = PollCq; - if (Socket::Create(options, &_cq_sid) < 0) { - PLOG(WARNING) << "Fail to create socket for cq"; + _send_cq_sid = CreateSocket(_resource->send_comp_channel->fd, + _resource->send_cq, 0); + if (INVALID_SOCKET_ID == _send_cq_sid) { return -1; } - int err = ibv_req_notify_cq(_resource->cq, 1); - if (err != 0) { - LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err); + _recv_cq_sid = CreateSocket(_resource->recv_comp_channel->fd, + _resource->recv_cq, 1); + if (INVALID_SOCKET_ID == _recv_cq_sid) { return -1; } } else { SocketOptions options; options.user = this; options.keytable_pool = _socket->_keytable_pool; - if (Socket::Create(options, &_cq_sid) < 0) { + if (Socket::Create(options, &_polling_cq_sid) < 0) { PLOG(WARNING) << "Fail to create socket for cq"; return -1; } @@ -1292,6 +1324,40 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { return 0; } +void RdmaEndpoint::DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, + unsigned int cq_events, bthread_tag_t tag) { + if (NULL != cq) { + if (cq_events > 0) { + IbvAckCqEvents(cq, cq_events); + } + int err = IbvDestroyCq(cq); + LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err); + } + if (NULL != comp_channel) { + // Destroy send_comp_channel will destroy this fd, + // so that we should remove it from epoll fd first + GetGlobalEventDispatcher(comp_channel->fd, tag).RemoveConsumer(comp_channel->fd); + int err = IbvDestroyCompChannel(comp_channel); + LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ channel: " << berror(err); + + } +} + +void RdmaEndpoint::SetSocketFailed(SocketId socket_id, bool remove_consumer) { + if (INVALID_SOCKET_ID == socket_id) { + return; + } + SocketUniquePtr s; + if (Socket::Address(socket_id, &s) == 0) { + if (remove_consumer) { + s->_io_event.RemoveConsumer(s->_fd); + } + s->_user = NULL; // Do not release user (this RdmaEndpoint). + s->_fd = -1; // Already remove fd from epoll fd. + s->SetFailed(); + } +} + void RdmaEndpoint::DeallocateResources() { if (!_resource) { return; @@ -1309,73 +1375,48 @@ void RdmaEndpoint::DeallocateResources() { move_to_rdma_resource_list = true; } } - int fd = -1; - if (_resource->comp_channel) { - fd = _resource->comp_channel->fd; - } - int err; if (!move_to_rdma_resource_list) { - if (_resource->qp) { - err = IbvDestroyQp(_resource->qp); - if (err != 0) { - LOG(WARNING) << "Fail to destroy QP: " << berror(err); - } + if (NULL != _resource->qp) { + int err = IbvDestroyQp(_resource->qp); + LOG_IF(WARNING, 0 != err) << "Fail to destroy QP: " << berror(err); _resource->qp = NULL; } - if (_resource->cq) { - IbvAckCqEvents(_resource->cq, _cq_events); - err = IbvDestroyCq(_resource->cq); - if (err != 0) { - PLOG(WARNING) << "Fail to destroy CQ: " << berror(err); - } - _resource->cq = NULL; - } - if (_resource->comp_channel) { - // destroy comp_channel will destroy this fd - // so that we should remove it from epoll fd first - _socket->_io_event.RemoveConsumer(fd); - fd = -1; - err = IbvDestroyCompChannel(_resource->comp_channel); - if (err != 0) { - LOG(WARNING) << "Fail to destroy CQ channel: " << berror(err); - } - _resource->comp_channel = NULL; - } + + DeallocateCq(_resource->send_cq, _resource->send_comp_channel, + _send_cq_events, _socket->_io_event.bthread_tag()); + DeallocateCq(_resource->recv_cq, _resource->recv_comp_channel, + _recv_cq_events, _socket->_io_event.bthread_tag()); + _resource->send_cq = NULL; + _resource->recv_cq = NULL; + _resource->send_comp_channel = NULL; + _resource->recv_comp_channel = NULL; delete _resource; - _resource = NULL; } - SocketUniquePtr s; - if (_cq_sid != INVALID_SOCKET_ID) { - if (Socket::Address(_cq_sid, &s) == 0) { - s->_user = NULL; // do not release user (this RdmaEndpoint) - if (fd >= 0) { - _socket->_io_event.RemoveConsumer(fd); - } - s->_fd = -1; // already remove fd from epoll fd - s->SetFailed(); - } - _cq_sid = INVALID_SOCKET_ID; - } + SetSocketFailed(_send_cq_sid, move_to_rdma_resource_list); + SetSocketFailed(_recv_cq_sid, move_to_rdma_resource_list); + SetSocketFailed(_polling_cq_sid, false); if (move_to_rdma_resource_list) { - if (_resource->cq) { - IbvAckCqEvents(_resource->cq, _cq_events); + if (NULL != _resource->send_cq) { + IbvAckCqEvents(_resource->send_cq, _send_cq_events); + } + if (NULL != _resource->recv_cq) { + IbvAckCqEvents(_resource->recv_cq, _recv_cq_events); } BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex); _resource->next = g_rdma_resource_list; g_rdma_resource_list = _resource; } - - _resource = NULL; } static const int MAX_CQ_EVENTS = 128; -int RdmaEndpoint::GetAndAckEvents() { +int RdmaEndpoint::GetAndAckEvents(ibv_comp_channel* comp_channel, + ibv_cq* cq, unsigned int* cq_events) { int events = 0; void* context = NULL; - while (1) { - if (IbvGetCqEvent(_resource->comp_channel, &_resource->cq, &context) != 0) { + while (true) { + if (IbvGetCqEvent(comp_channel, &cq, &context) != 0) { if (errno != EAGAIN) { return -1; } @@ -1386,10 +1427,10 @@ int RdmaEndpoint::GetAndAckEvents() { if (events == 0) { return 0; } - _cq_events += events; - if (_cq_events >= MAX_CQ_EVENTS) { - IbvAckCqEvents(_resource->cq, _cq_events); - _cq_events = 0; + *cq_events += events; + if (*cq_events >= MAX_CQ_EVENTS) { + IbvAckCqEvents(cq, *cq_events); + *cq_events = 0; } return 0; } @@ -1406,14 +1447,31 @@ void RdmaEndpoint::PollCq(Socket* m) { } CHECK(ep == s->_rdma_ep); + bool send = false; + ibv_comp_channel* comp_channel = NULL; + ibv_cq* cq = NULL; + int solicited_only = 0; + unsigned int* cq_events = NULL; + if (!FLAGS_rdma_use_polling) { - if (ep->GetAndAckEvents() < 0) { + send = ep->_resource->send_comp_channel->fd == m->_fd; + comp_channel = send ? + ep->_resource->send_comp_channel : ep->_resource->recv_comp_channel; + cq = send ? ep->_resource->send_cq : ep->_resource->recv_cq; + solicited_only = send ? 0 : 1; + cq_events = send ? &ep->_send_cq_events : &ep->_recv_cq_events; + + if (ep->GetAndAckEvents(comp_channel, cq, cq_events) < 0) { const int saved_errno = errno; - PLOG(ERROR) << "Fail to get cq event: " << s->description(); + PLOG(ERROR) << "Fail to get cq event from " << s->description(); s->SetFailed(saved_errno, "Fail to get cq event from %s: %s", s->description().c_str(), berror(saved_errno)); return; } + } else { + // Polling is considered as non-send, so no need to change `send'. + // Only need to poll polling_cq. + cq = ep->_resource->polling_cq; } int progress = Socket::PROGRESS_INIT; @@ -1421,7 +1479,7 @@ void RdmaEndpoint::PollCq(Socket* m) { InputMessenger::InputMessageClosure last_msg; ibv_wc wc[FLAGS_rdma_cqe_poll_once]; while (true) { - int cnt = ibv_poll_cq(ep->_resource->cq, FLAGS_rdma_cqe_poll_once, wc); + int cnt = ibv_poll_cq(cq, FLAGS_rdma_cqe_poll_once, wc); if (cnt < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to poll cq: " << s->description(); @@ -1439,10 +1497,10 @@ void RdmaEndpoint::PollCq(Socket* m) { // that the event arrives after the poll but before the notify, // we should re-poll the CQ once after the notify to check if // there is an available CQE. - errno = ibv_req_notify_cq(ep->_resource->cq, 1); + errno = ibv_req_notify_cq(cq, solicited_only); if (errno != 0) { const int saved_errno = errno; - PLOG(WARNING) << "Fail to arm CQ comp channel: " << s->description(); + PLOG(WARNING) << "Fail to arm CQ comp channel from " << s->description(); s->SetFailed(saved_errno, "Fail to arm cq channel from %s: %s", s->description().c_str(), berror(saved_errno)); return; @@ -1453,7 +1511,7 @@ void RdmaEndpoint::PollCq(Socket* m) { if (!m->MoreReadEvents(&progress)) { break; } - if (ep->GetAndAckEvents() < 0) { + if (ep->GetAndAckEvents(comp_channel, cq, cq_events) < 0) { s->SetFailed(errno, "Fail to ack CQ event on %s", s->description().c_str()); return; @@ -1487,6 +1545,10 @@ void RdmaEndpoint::PollCq(Socket* m) { bytes += nr; } } + // Send CQE has no messages to process. + if (send) { + continue; + } // Just call PrcessNewMessage once for all of these CQEs. // Otherwise it may call too many bthread_flush to affect performance. @@ -1546,6 +1608,8 @@ int RdmaEndpoint::GlobalInitialize() { } else if (FLAGS_rdma_recv_block_type == "huge") { g_rdma_recv_block_size = GetBlockSize(2) - IOBUF_BLOCK_HEADER_LEN; } else { + LOG(ERROR) << "rdma_recv_block_type incorrect " + << "(valid value: default/large/huge)"; errno = EINVAL; return -1; } @@ -1589,9 +1653,9 @@ void RdmaEndpoint::GlobalRelease() { std::vector RdmaEndpoint::_poller_groups; int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag, - std::function callback, - std::function init_fn, - std::function release_fn) { + std::function callback, + std::function init_fn, + std::function release_fn) { if (!FLAGS_rdma_use_polling) { return 0; } @@ -1673,27 +1737,27 @@ void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) { auto& running = group.running; running.store(false, std::memory_order_relaxed); for (int i = 0; i < FLAGS_rdma_poller_num; ++i) { - bthread_join(pollers[i].tid, nullptr); + bthread_join(pollers[i].tid, NULL); } } void RdmaEndpoint::PollerAddCqSid() { - auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; + auto index = butil::fmix32(_polling_cq_sid) % FLAGS_rdma_poller_num; auto& group = _poller_groups[bthread_self_tag()]; auto& pollers = group.pollers; auto& poller = pollers[index]; - if (_cq_sid != INVALID_SOCKET_ID) { - poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD}); + if (INVALID_SOCKET_ID != _polling_cq_sid) { + poller.op_queue.Enqueue(CqSidOp{_polling_cq_sid, CqSidOp::ADD}); } } void RdmaEndpoint::PollerRemoveCqSid() { - auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; + auto index = butil::fmix32(_polling_cq_sid) % FLAGS_rdma_poller_num; auto& group = _poller_groups[bthread_self_tag()]; auto& pollers = group.pollers; auto& poller = pollers[index]; - if (_cq_sid != INVALID_SOCKET_ID) { - poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE}); + if (INVALID_SOCKET_ID != _polling_cq_sid) { + poller.op_queue.Enqueue(CqSidOp{_polling_cq_sid, CqSidOp::REMOVE}); } } diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index eed5fd3fee..a274bbc9de 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -54,26 +54,29 @@ class RdmaConnect : public AppConnect { private: void Run(); - void (*_done)(int, void*); - void* _data; + void (*_done)(int, void*){NULL}; + void* _data{NULL}; }; struct RdmaResource { - ibv_qp* qp; - ibv_cq* cq; - ibv_comp_channel* comp_channel; - RdmaResource* next; - RdmaResource(); + ibv_qp* qp{NULL}; + ibv_cq* polling_cq{NULL}; + ibv_cq* send_cq{NULL}; + ibv_cq* recv_cq{NULL}; + ibv_comp_channel* send_comp_channel{NULL}; + ibv_comp_channel* recv_comp_channel{NULL}; + RdmaResource* next{NULL}; + RdmaResource() = default; ~RdmaResource(); DISALLOW_COPY_AND_ASSIGN(RdmaResource); }; class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser { friend class RdmaConnect; -friend class brpc::Socket; +friend class Socket; public: - RdmaEndpoint(Socket* s); - ~RdmaEndpoint(); + explicit RdmaEndpoint(Socket* s); + ~RdmaEndpoint() override; // Global initialization // Return 0 if success, -1 if failed and errno set @@ -129,6 +132,17 @@ friend class brpc::Socket; // Process handshake at the server static void* ProcessHandshakeAtServer(void* arg); + // Create a socket which wrap the comp channel of CQ. + SocketId CreateSocket(int fd, ibv_cq* cq, int solicited_only); + + // Deallocate CQ resource. + static void DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, + unsigned int cq_events, bthread_tag_t tag); + + // Release a socket which wrap the comp channel of CQ. + static void SetSocketFailed(SocketId socket_id, bool remove_consumer); + + // Allocate resources // Return 0 if success, -1 if failed and errno set int AllocateResources(); @@ -195,7 +209,8 @@ friend class brpc::Socket; int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num); // Get event from comp channel and ack the events - int GetAndAckEvents(); + int GetAndAckEvents(ibv_comp_channel* comp_channel, + ibv_cq* cq, unsigned int* cq_events); // Poll CQ and get the work completion static void PollCq(Socket* m); @@ -221,11 +236,15 @@ friend class brpc::Socket; // rdma resource RdmaResource* _resource; - // the number of events requiring ack - int _cq_events; + // The number of events requiring ack. + unsigned int _send_cq_events; + unsigned int _recv_cq_events; - // the SocketId which wrap the comp channel of CQ - SocketId _cq_sid; + // The SocketId which wrap the comp channel of CQ. + SocketId _send_cq_sid; + SocketId _recv_cq_sid; + // The SocketId which is for polling CQ. + SocketId _polling_cq_sid; // Capacity of local Send Queue and local Recv Queue uint16_t _sq_size; @@ -284,9 +303,9 @@ friend class brpc::Socket; butil::MPSCQueue> op_queue; // Callback used for io_uring/spdk etc std::function callback; - // Init and Destory function - std::function init_fn; - std::function release_fn; + // Init and Destroy function + std::function init_fn; + std::function release_fn; }; // Poller group struct BAIDU_CACHELINE_ALIGNMENT PollerGroup { diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index ec5300987c..9490650b78 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -895,7 +895,7 @@ void Socket::BeforeRecycled() { const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); if (asid != INVALID_SOCKET_ID) { SocketUniquePtr ptr; - if (Socket::Address(asid, &ptr) == 0) { + if (Address(asid, &ptr) == 0) { ptr->ReleaseAdditionalReference(); } } @@ -1319,7 +1319,7 @@ int Socket::Connect(const timespec* abstime, SocketOptions options; options.bthread_tag = _io_event.bthread_tag(); options.user = req; - if (Socket::Create(options, &connect_id) != 0) { + if (Create(options, &connect_id) != 0) { LOG(FATAL) << "Fail to create Socket"; delete req; return -1; @@ -1328,7 +1328,7 @@ int Socket::Connect(const timespec* abstime, // `connect_id'. We hold an additional reference here to // ensure `req' to be valid in this scope SocketUniquePtr s; - CHECK_EQ(0, Socket::Address(connect_id, &s)); + CHECK_EQ(0, Address(connect_id, &s)); // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches @@ -1425,7 +1425,7 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { void Socket::WakeAsEpollOut() { _epollout_butex->fetch_add(1, butil::memory_order_release); - bthread::butex_wake_except(_epollout_butex, 0); + bthread::butex_wake_except(_epollout_butex, INVALID_BTHREAD); } int Socket::OnOutputEvent(void* user_data, uint32_t, @@ -1436,7 +1436,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t, // added into epoll, these sockets miss the signal inside // `SetFailed' and therefore must be signalled here using // `AddressFailedAsWell' to prevent waiting forever - if (Socket::AddressFailedAsWell(id, &s) < 0) { + if (AddressFailedAsWell(id, &s) < 0) { // Ignore recycled sockets return -1; } @@ -1456,7 +1456,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t, void Socket::HandleEpollOutTimeout(void* arg) { SocketId id = (SocketId)arg; SocketUniquePtr s; - if (Socket::Address(id, &s) != 0) { + if (Address(id, &s) != 0) { return; } EpollOutRequest* req = dynamic_cast(s->user()); @@ -1532,12 +1532,11 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) { // Run ssl connect in a new bthread to avoid blocking // the current bthread (thus blocking the EventDispatcher) bthread_t th; - std::unique_ptr thrd_func(brpc::NewCallback( - Socket::CheckConnectedAndKeepWrite, fd, err, data)); + std::unique_ptr thrd_func( + NewCallback(CheckConnectedAndKeepWrite, fd, err, data)); bthread_attr_t attr = BTHREAD_ATTR_NORMAL; bthread_attr_set_name(&attr, "CheckConnectedAndKeepWrite"); - if ((err = bthread_start_background(&th, &attr, - RunClosure, thrd_func.get())) == 0) { + if ((err = bthread_start_background(&th, &attr, RunClosure, thrd_func.get())) == 0) { thrd_func.release(); return 0; } else { @@ -2323,7 +2322,7 @@ std::ostream& operator<<(std::ostream& os, const ObjectPtr& obj) { void Socket::DebugSocket(std::ostream& os, SocketId id) { SocketUniquePtr ptr; - int ret = Socket::AddressFailedAsWell(id, &ptr); + int ret = AddressFailedAsWell(id, &ptr); if (ret < 0) { os << "SocketId=" << id << " is invalid or recycled"; return; @@ -2920,7 +2919,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) { opt.app_connect = _app_connect; opt.use_rdma = (_rdma_ep) ? true : false; if (get_client_side_messenger()->Create(opt, &id) != 0 || - Socket::Address(id, short_socket) != 0) { + Address(id, short_socket) != 0) { return -1; } (*short_socket)->ShareStats(this); @@ -2931,7 +2930,7 @@ int Socket::GetAgentSocket(SocketUniquePtr* out, bool (*checkfn)(Socket*)) { SocketId id = _agent_socket_id.load(butil::memory_order_relaxed); SocketUniquePtr tmp_sock; do { - if (Socket::Address(id, &tmp_sock) == 0) { + if (Address(id, &tmp_sock) == 0) { if (checkfn == NULL || checkfn(tmp_sock.get())) { out->swap(tmp_sock); return 0; From e6bcceab3c1d5d9ad48703b7b8debae1ad01e023 Mon Sep 17 00:00:00 2001 From: chenBright Date: Tue, 18 Nov 2025 20:41:00 +0800 Subject: [PATCH 3/7] Use wr_id to update _sq_window_size --- src/brpc/rdma/rdma_endpoint.cpp | 47 ++++++++++++++------------------- src/brpc/rdma/rdma_endpoint.h | 6 ++--- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 75941f575e..33a32d4041 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -102,11 +102,6 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; -enum SendType { - SEND_TYPE_RDMA_DATA = 0, - SEND_TYPE_RDMA_IMM, -}; - struct HelloMessage { void Serialize(void* data) const; void Deserialize(void* data); @@ -902,7 +897,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { // Refer to: // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/ wr.send_flags |= IBV_SEND_SIGNALED; - wr.wr_id = SEND_TYPE_RDMA_DATA; + wr.wr_id = _sq_unsignaled; _sq_unsignaled = 0; } @@ -955,7 +950,6 @@ int RdmaEndpoint::SendImm(uint32_t imm) { wr.imm_data = butil::HostToNet32(imm); wr.send_flags |= IBV_SEND_SOLICITED; wr.send_flags |= IBV_SEND_SIGNALED; - wr.wr_id = SEND_TYPE_RDMA_IMM; ibv_send_wr* bad = NULL; int err = ibv_post_send(_resource->qp, &wr, &bad); @@ -972,14 +966,13 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { bool zerocopy = FLAGS_rdma_recv_zerocopy; switch (wc.opcode) { case IBV_WC_SEND: { // send completion - if (SEND_TYPE_RDMA_IMM == wc.wr_id) { + if (0 == wc.wr_id) { // Do nothing for imm. return 0; } - // Update window - uint16_t wnd_to_update = _local_window_capacity / 4; - _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed); - // Wake up writing thread right after every signaled sending cqe. + // Update SQ window. + _sq_window_size.fetch_add(wc.wr_id, butil::memory_order_relaxed); + // Wake up writing thread right after every signaled send WC. _socket->WakeAsEpollOut(); return 0; } @@ -1182,6 +1175,21 @@ SocketId RdmaEndpoint::CreateSocket(int fd, ibv_cq* cq, int solicited_only) { return socket_id; } +void RdmaEndpoint::SetSocketFailed(SocketId socket_id, bool remove_consumer) { + if (INVALID_SOCKET_ID == socket_id) { + return; + } + SocketUniquePtr s; + if (Socket::Address(socket_id, &s) == 0) { + if (remove_consumer) { + s->_io_event.RemoveConsumer(s->_fd); + } + s->_user = NULL; // Do not release user (this RdmaEndpoint). + s->_fd = -1; // Already remove fd from epoll fd. + s->SetFailed(); + } +} + int RdmaEndpoint::AllocateResources() { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // For UT @@ -1343,21 +1351,6 @@ void RdmaEndpoint::DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, } } -void RdmaEndpoint::SetSocketFailed(SocketId socket_id, bool remove_consumer) { - if (INVALID_SOCKET_ID == socket_id) { - return; - } - SocketUniquePtr s; - if (Socket::Address(socket_id, &s) == 0) { - if (remove_consumer) { - s->_io_event.RemoveConsumer(s->_fd); - } - s->_user = NULL; // Do not release user (this RdmaEndpoint). - s->_fd = -1; // Already remove fd from epoll fd. - s->SetFailed(); - } -} - void RdmaEndpoint::DeallocateResources() { if (!_resource) { return; diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index a274bbc9de..58ea8c52a0 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -135,13 +135,13 @@ friend class Socket; // Create a socket which wrap the comp channel of CQ. SocketId CreateSocket(int fd, ibv_cq* cq, int solicited_only); + // Release a socket which wrap the comp channel of CQ. + static void SetSocketFailed(SocketId socket_id, bool remove_consumer); + // Deallocate CQ resource. static void DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, unsigned int cq_events, bthread_tag_t tag); - // Release a socket which wrap the comp channel of CQ. - static void SetSocketFailed(SocketId socket_id, bool remove_consumer); - // Allocate resources // Return 0 if success, -1 if failed and errno set From 0794a476ee88e19f91cb9ce43166b4d2f90077b3 Mon Sep 17 00:00:00 2001 From: chenBright Date: Tue, 2 Dec 2025 20:43:04 +0800 Subject: [PATCH 4/7] Send CQ and recv CQ share comp channel --- src/brpc/rdma/rdma_endpoint.cpp | 387 ++++++++++++++++---------------- src/brpc/rdma/rdma_endpoint.h | 33 ++- 2 files changed, 208 insertions(+), 212 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 33a32d4041..a4a1b034be 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -157,14 +157,11 @@ RdmaResource::~RdmaResource() { if (NULL != send_cq) { IbvDestroyCq(send_cq); } - if (NULL != send_comp_channel) { - IbvDestroyCompChannel(send_comp_channel); - } if (NULL != recv_cq) { IbvDestroyCq(recv_cq); } - if (NULL != recv_comp_channel) { - IbvDestroyCompChannel(recv_comp_channel); + if (NULL != comp_channel) { + IbvDestroyCompChannel(comp_channel); } } @@ -174,9 +171,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _resource(NULL) , _send_cq_events(0) , _recv_cq_events(0) - , _send_cq_sid(INVALID_SOCKET_ID) - , _recv_cq_sid(INVALID_SOCKET_ID) - , _polling_cq_sid(INVALID_SOCKET_ID) + , _cq_sid(INVALID_SOCKET_ID) , _sq_size(FLAGS_rdma_sq_size) , _rq_size(FLAGS_rdma_rq_size) , _remote_recv_block_size(0) @@ -192,6 +187,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _remote_rq_window_size(0) , _sq_window_size(0) , _new_rq_wrs(0) + , _imm_inflight(0) { if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; @@ -220,9 +216,7 @@ void RdmaEndpoint::Reset() { _resource = NULL; _send_cq_events = 0; _recv_cq_events = 0; - _send_cq_sid = INVALID_SOCKET_ID; - _recv_cq_sid = INVALID_SOCKET_ID; - _polling_cq_sid = INVALID_SOCKET_ID; + _cq_sid = INVALID_SOCKET_ID; _sbuf.clear(); _rbuf.clear(); _rbuf_data.clear(); @@ -238,7 +232,8 @@ void RdmaEndpoint::Reset() { _remote_window_capacity = 0; _remote_rq_window_size.store(0, butil::memory_order_relaxed); _sq_window_size.store(0, butil::memory_order_relaxed); - _new_rq_wrs = 0; + _new_rq_wrs.store(0, butil::memory_order_relaxed); + _imm_inflight.store(0, butil::memory_order_relaxed); } void RdmaConnect::StartConnect(const Socket* socket, @@ -845,8 +840,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { } ssize_t len = data->cut_into_sglist_and_iobuf( - sglist, &sge_index, to, max_sge, - _remote_recv_block_size - this_len); + sglist, &sge_index, to, max_sge, + _remote_recv_block_size - this_len); if (len < 0) { return -1; } @@ -906,10 +901,9 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { if (err != 0) { // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. - LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) - << ", remote_rq_window_size=" << remote_rq_window_size - << ", sq_window_size=" << sq_window_size - << ", sq_current=" << _sq_current; + std::ostringstream oss; + DebugInfo(oss, ", "); + LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " << oss.str(); errno = err; return -1; } @@ -948,17 +942,20 @@ int RdmaEndpoint::SendImm(uint32_t imm) { memset(&wr, 0, sizeof(wr)); wr.opcode = IBV_WR_SEND_WITH_IMM; wr.imm_data = butil::HostToNet32(imm); - wr.send_flags |= IBV_SEND_SOLICITED; - wr.send_flags |= IBV_SEND_SIGNALED; + wr.send_flags |= IBV_SEND_SOLICITED | IBV_SEND_SIGNALED; + wr.wr_id = 0; ibv_send_wr* bad = NULL; int err = ibv_post_send(_resource->qp, &wr, &bad); if (err != 0) { + std::ostringstream oss; + DebugInfo(oss, ", "); // We use other way to guarantee the Send Queue is not full. // So we just consider this error as an unrecoverable error. - LOG(WARNING) << "Fail to ibv_post_send: " << berror(err); + LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " << oss.str(); return -1; } + _imm_inflight.fetch_add(1, butil::memory_order_relaxed); return 0; } @@ -968,12 +965,26 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_SEND: { // send completion if (0 == wc.wr_id) { // Do nothing for imm. + _imm_inflight.fetch_sub(1, butil::memory_order_relaxed); return 0; } // Update SQ window. - _sq_window_size.fetch_add(wc.wr_id, butil::memory_order_relaxed); - // Wake up writing thread right after every signaled send WC. - _socket->WakeAsEpollOut(); + uint16_t wnd_to_update = wc.wr_id; + for (uint16_t i = 0; i < wnd_to_update; ++i) { + _sbuf[_sq_sent++].clear(); + if (_sq_sent == _sq_size - RESERVED_WR_NUM) { + _sq_sent = 0; + } + } + butil::subtle::MemoryBarrier(); + + _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed); + if (_remote_rq_window_size.load(butil::memory_order_relaxed) >= + _local_window_capacity / 8) { + // Do not wake up writing thread right after polling IBV_WC_SEND. + // Otherwise the writing thread may switch to background too quickly. + _socket->WakeAsEpollOut(); + } return 0; } case IBV_WC_RECV: { // recv completion @@ -990,23 +1001,14 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { _socket->_read_buf.append(_rbuf_data[_rq_received], wc.byte_len); } } - if (wc.imm_data > 0) { - // Clear sbuf here because we ignore event wakeup for send completions - uint32_t acks = butil::NetToHost32(wc.imm_data); - uint32_t num = acks; - while (num > 0) { - _sbuf[_sq_sent++].clear(); - if (_sq_sent == _sq_size - RESERVED_WR_NUM) { - _sq_sent = 0; - } - --num; - } - butil::subtle::MemoryBarrier(); - + if (0 != (wc.wc_flags & IBV_WC_WITH_IMM) && wc.imm_data > 0) { // Update window + uint32_t acks = butil::NetToHost32(wc.imm_data); uint32_t wnd_thresh = _local_window_capacity / 8; - if (_remote_rq_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh - || acks >= wnd_thresh) { + uint32_t remote_rq_window_size = + _remote_rq_window_size.fetch_add(acks, butil::memory_order_relaxed); + if (_sq_window_size.load(butil::memory_order_relaxed) > 0 && + (remote_rq_window_size >= wnd_thresh || acks >= wnd_thresh)) { // Do not wake up writing thread right after _remote_rq_window_size > 0. // Otherwise the writing thread may switch to background too quickly. _socket->WakeAsEpollOut(); @@ -1079,32 +1081,6 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { return 0; } -static bool AllocateCq(ibv_comp_channel*& comp_channel, ibv_cq*& cq) { - comp_channel = IbvCreateCompChannel(GetRdmaContext()); - if (NULL == comp_channel) { - PLOG(WARNING) << "Fail to create comp channel for CQ"; - return false; - } - - if (butil::make_close_on_exec(comp_channel->fd) < 0) { - PLOG(WARNING) << "Fail to set comp channel close-on-exec"; - return false; - } - if (butil::make_non_blocking(comp_channel->fd) < 0) { - PLOG(WARNING) << "Fail to set comp channel nonblocking"; - return false; - } - - cq = IbvCreateCq(GetRdmaContext(), FLAGS_rdma_prepared_qp_size, - NULL, comp_channel, GetRdmaCompVector()); - if (NULL == cq) { - PLOG(WARNING) << "Fail to create CQ"; - return false; - } - - return true; -} - static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size, uint32_t rq_size) { ibv_qp_init_attr attr; memset(&attr, 0, sizeof(attr)); @@ -1120,13 +1096,35 @@ static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size, ui static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { std::unique_ptr resource(new RdmaResource); + // NOTE: we enlarge the size of SQ to contain redundant 1/4 of the wnd, + // which is for unexpected Imm. + sq_size = sq_size * 5 / 4; /* NOTE */ if (!FLAGS_rdma_use_polling) { - if (!AllocateCq(resource->send_comp_channel, resource->send_cq)) { + resource->comp_channel = IbvCreateCompChannel(GetRdmaContext()); + if (NULL == resource->comp_channel) { + PLOG(WARNING) << "Fail to create comp channel for CQ"; + return NULL; + } + + if (butil::make_close_on_exec(resource->comp_channel->fd) < 0) { + PLOG(WARNING) << "Fail to set comp channel close-on-exec"; + return NULL; + } + if (butil::make_non_blocking(resource->comp_channel->fd) < 0) { + PLOG(WARNING) << "Fail to set comp channel nonblocking"; + return NULL; + } + + resource->send_cq = IbvCreateCq(GetRdmaContext(), FLAGS_rdma_prepared_qp_size, + NULL, resource->comp_channel, GetRdmaCompVector()); + if (NULL == resource->send_cq) { PLOG(WARNING) << "Fail to create send CQ"; return NULL; } - if (!AllocateCq(resource->recv_comp_channel, resource->recv_cq)) { + resource->recv_cq = IbvCreateCq(GetRdmaContext(), FLAGS_rdma_prepared_qp_size, + NULL, resource->comp_channel, GetRdmaCompVector()); + if (NULL == resource->recv_cq) { PLOG(WARNING) << "Fail to create recv CQ"; return NULL; } @@ -1155,41 +1153,6 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { return resource.release(); } -SocketId RdmaEndpoint::CreateSocket(int fd, ibv_cq* cq, int solicited_only) { - SocketId socket_id = INVALID_SOCKET_ID; - int err = ibv_req_notify_cq(cq, solicited_only); - if (err != 0) { - LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err); - return socket_id; - } - - SocketOptions options; - options.user = this; - options.keytable_pool = _socket->_keytable_pool; - options.fd = fd; - options.on_edge_triggered_events = PollCq; - if (Socket::Create(options, &socket_id) < 0) { - PLOG(WARNING) << "Fail to create socket for cq"; - } - - return socket_id; -} - -void RdmaEndpoint::SetSocketFailed(SocketId socket_id, bool remove_consumer) { - if (INVALID_SOCKET_ID == socket_id) { - return; - } - SocketUniquePtr s; - if (Socket::Address(socket_id, &s) == 0) { - if (remove_consumer) { - s->_io_event.RemoveConsumer(s->_fd); - } - s->_user = NULL; // Do not release user (this RdmaEndpoint). - s->_fd = -1; // Already remove fd from epoll fd. - s->SetFailed(); - } -} - int RdmaEndpoint::AllocateResources() { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // For UT @@ -1216,22 +1179,26 @@ int RdmaEndpoint::AllocateResources() { } if (!FLAGS_rdma_use_polling) { - _send_cq_sid = CreateSocket(_resource->send_comp_channel->fd, - _resource->send_cq, 0); - if (INVALID_SOCKET_ID == _send_cq_sid) { + if (0 != ReqNotifyCq(true)) { return -1; } - - _recv_cq_sid = CreateSocket(_resource->recv_comp_channel->fd, - _resource->recv_cq, 1); - if (INVALID_SOCKET_ID == _recv_cq_sid) { + if (0 != ReqNotifyCq(false)) { return -1; } + + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + options.fd = _resource->comp_channel->fd; + options.on_edge_triggered_events = PollCq; + if (Socket::Create(options, &_cq_sid) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; + } } else { SocketOptions options; options.user = this; options.keytable_pool = _socket->_keytable_pool; - if (Socket::Create(options, &_polling_cq_sid) < 0) { + if (Socket::Create(options, &_cq_sid) < 0) { PLOG(WARNING) << "Fail to create socket for cq"; return -1; } @@ -1332,23 +1299,16 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { return 0; } -void RdmaEndpoint::DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, - unsigned int cq_events, bthread_tag_t tag) { - if (NULL != cq) { - if (cq_events > 0) { - IbvAckCqEvents(cq, cq_events); - } - int err = IbvDestroyCq(cq); - LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err); +void RdmaEndpoint::DeallocateCq(ibv_cq* cq, unsigned int cq_events) { + if (NULL == cq) { + return; } - if (NULL != comp_channel) { - // Destroy send_comp_channel will destroy this fd, - // so that we should remove it from epoll fd first - GetGlobalEventDispatcher(comp_channel->fd, tag).RemoveConsumer(comp_channel->fd); - int err = IbvDestroyCompChannel(comp_channel); - LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ channel: " << berror(err); + if (cq_events > 0) { + IbvAckCqEvents(cq, cq_events); } + int err = IbvDestroyCq(cq); + LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err); } void RdmaEndpoint::DeallocateResources() { @@ -1368,6 +1328,8 @@ void RdmaEndpoint::DeallocateResources() { move_to_rdma_resource_list = true; } } + + bool remove_consumer = true; if (!move_to_rdma_resource_list) { if (NULL != _resource->qp) { int err = IbvDestroyQp(_resource->qp); @@ -1375,20 +1337,37 @@ void RdmaEndpoint::DeallocateResources() { _resource->qp = NULL; } - DeallocateCq(_resource->send_cq, _resource->send_comp_channel, - _send_cq_events, _socket->_io_event.bthread_tag()); - DeallocateCq(_resource->recv_cq, _resource->recv_comp_channel, - _recv_cq_events, _socket->_io_event.bthread_tag()); + DeallocateCq(_resource->send_cq, _send_cq_events); + DeallocateCq(_resource->recv_cq, _recv_cq_events); + + if (NULL != _resource->comp_channel) { + // Destroy send_comp_channel will destroy this fd, + // so that we should remove it from epoll fd first + int fd = _resource->comp_channel->fd; + GetGlobalEventDispatcher(fd, _socket->_io_event.bthread_tag()).RemoveConsumer(fd); + remove_consumer = false; + int err = IbvDestroyCompChannel(_resource->comp_channel); + LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ channel: " << berror(err); + + } + _resource->send_cq = NULL; _resource->recv_cq = NULL; - _resource->send_comp_channel = NULL; - _resource->recv_comp_channel = NULL; + _resource->comp_channel = NULL; delete _resource; } - SetSocketFailed(_send_cq_sid, move_to_rdma_resource_list); - SetSocketFailed(_recv_cq_sid, move_to_rdma_resource_list); - SetSocketFailed(_polling_cq_sid, false); + if (INVALID_SOCKET_ID != _cq_sid) { + SocketUniquePtr s; + if (Socket::Address(_cq_sid, &s) == 0) { + if (remove_consumer) { + s->_io_event.RemoveConsumer(s->_fd); + } + s->_user = NULL; // Do not release user (this RdmaEndpoint). + s->_fd = -1; // Already remove fd from epoll fd. + s->SetFailed(); + } + } if (move_to_rdma_resource_list) { if (NULL != _resource->send_cq) { @@ -1405,26 +1384,52 @@ void RdmaEndpoint::DeallocateResources() { static const int MAX_CQ_EVENTS = 128; -int RdmaEndpoint::GetAndAckEvents(ibv_comp_channel* comp_channel, - ibv_cq* cq, unsigned int* cq_events) { - int events = 0; void* context = NULL; +int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) { + void* context = NULL; + ibv_cq* cq = NULL; while (true) { - if (IbvGetCqEvent(comp_channel, &cq, &context) != 0) { + if (IbvGetCqEvent(_resource->comp_channel, &cq, &context) != 0) { if (errno != EAGAIN) { + const int saved_errno = errno; + PLOG(ERROR) << "Fail to get cq event from " << s->description(); + s->SetFailed(saved_errno, "Fail to get cq event from %s: %s", + s->description().c_str(), berror(saved_errno)); return -1; } break; } - ++events; + if (cq == _resource->send_cq) { + ++_send_cq_events; + } else { + ++_recv_cq_events; + } } - if (events == 0) { - return 0; + if (_send_cq_events >= MAX_CQ_EVENTS) { + IbvAckCqEvents(_resource->send_cq, _send_cq_events); + _send_cq_events = 0; + } + if (_recv_cq_events >= MAX_CQ_EVENTS) { + IbvAckCqEvents(_resource->recv_cq, _recv_cq_events); + _recv_cq_events = 0; } - *cq_events += events; - if (*cq_events >= MAX_CQ_EVENTS) { - IbvAckCqEvents(cq, *cq_events); - *cq_events = 0; + return 0; +} + + + +int RdmaEndpoint::ReqNotifyCq(bool send_cq) { + errno = ibv_req_notify_cq( + send_cq ? _resource->send_cq : _resource->recv_cq, send_cq ? 0 : 1); + if (0 != errno) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to arm" << (send_cq ? "send" : "recv") + << " CQ comp channel from " << _socket->description(); + _socket->SetFailed(saved_errno, "Fail to arm %s CQ channel from %s: %s", + send_cq ? "send" : "recv", _socket->description().c_str(), + berror(saved_errno)); + return -1; } + return 0; } @@ -1441,24 +1446,10 @@ void RdmaEndpoint::PollCq(Socket* m) { CHECK(ep == s->_rdma_ep); bool send = false; - ibv_comp_channel* comp_channel = NULL; - ibv_cq* cq = NULL; - int solicited_only = 0; - unsigned int* cq_events = NULL; + ibv_cq* cq = ep->_resource->recv_cq; if (!FLAGS_rdma_use_polling) { - send = ep->_resource->send_comp_channel->fd == m->_fd; - comp_channel = send ? - ep->_resource->send_comp_channel : ep->_resource->recv_comp_channel; - cq = send ? ep->_resource->send_cq : ep->_resource->recv_cq; - solicited_only = send ? 0 : 1; - cq_events = send ? &ep->_send_cq_events : &ep->_recv_cq_events; - - if (ep->GetAndAckEvents(comp_channel, cq, cq_events) < 0) { - const int saved_errno = errno; - PLOG(ERROR) << "Fail to get cq event from " << s->description(); - s->SetFailed(saved_errno, "Fail to get cq event from %s: %s", - s->description().c_str(), berror(saved_errno)); + if (ep->GetAndAckEvents(s) < 0) { return; } } else { @@ -1484,18 +1475,24 @@ void RdmaEndpoint::PollCq(Socket* m) { if (FLAGS_rdma_use_polling) { return; } + + if (!send) { + // It's send cq's turn. + send = true; + cq = ep->_resource->send_cq; + continue; + } + // `recv_cq' and `send_cq' have been polled. if (!notified) { // Since RDMA only provides one shot event, we have to call the // notify function every time. Because there is a possibility // that the event arrives after the poll but before the notify, // we should re-poll the CQ once after the notify to check if // there is an available CQE. - errno = ibv_req_notify_cq(cq, solicited_only); - if (errno != 0) { - const int saved_errno = errno; - PLOG(WARNING) << "Fail to arm CQ comp channel from " << s->description(); - s->SetFailed(saved_errno, "Fail to arm cq channel from %s: %s", - s->description().c_str(), berror(saved_errno)); + if (0 != ep->ReqNotifyCq(true)) { + return; + } + if (0 != ep->ReqNotifyCq(false)) { return; } notified = true; @@ -1504,11 +1501,14 @@ void RdmaEndpoint::PollCq(Socket* m) { if (!m->MoreReadEvents(&progress)) { break; } - if (ep->GetAndAckEvents(comp_channel, cq, cq_events) < 0) { - s->SetFailed(errno, "Fail to ack CQ event on %s", - s->description().c_str()); + + if (0 != ep->GetAndAckEvents(s)) { return; } + + // Restart polling from `recv_cq'. + send = false; + cq = ep->_resource->recv_cq; notified = false; continue; } @@ -1517,7 +1517,7 @@ void RdmaEndpoint::PollCq(Socket* m) { ssize_t bytes = 0; for (int i = 0; i < cnt; ++i) { if (s->Failed()) { - continue; + return; } if (wc[i].status != IBV_WC_SUCCESS) { @@ -1533,7 +1533,7 @@ void RdmaEndpoint::PollCq(Socket* m) { const int saved_errno = errno; PLOG(WARNING) << "Fail to handle RDMA completion: " << s->description(); s->SetFailed(saved_errno, "Fail to handle rdma completion from %s: %s", - s->description().c_str(), berror(saved_errno)); + s->description().c_str(), berror(saved_errno)); } else if (nr > 0) { bytes += nr; } @@ -1575,22 +1575,23 @@ std::string RdmaEndpoint::GetStateStr() const { } } -void RdmaEndpoint::DebugInfo(std::ostream& os) const { - os << "\nrdma_state=ON" - << "\nhandshake_state=" << GetStateStr() - << "\nrdma_remote_rq_window_size=" - << _remote_rq_window_size.load(butil::memory_order_relaxed) - << "\nrdma_sq_window_size=" << _sq_window_size.load(butil::memory_order_relaxed) - << "\nrdma_local_window_capacity=" << _local_window_capacity - << "\nrdma_remote_window_capacity=" << _remote_window_capacity - << "\nrdma_sbuf_head=" << _sq_current - << "\nrdma_sbuf_tail=" << _sq_sent - << "\nrdma_rbuf_head=" << _rq_received - << "\nrdma_unacked_rq_wr=" << _new_rq_wrs - << "\nrdma_received_ack=" << _accumulated_ack - << "\nrdma_unsolicited_sent=" << _unsolicited - << "\nrdma_unsignaled_sq_wr=" << _sq_unsignaled - << "\n"; +void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) const { + os << "rdma_state=ON" + << connector << "handshake_state=" << GetStateStr() + << connector << "rdma_remote_rq_window_size=" << _remote_rq_window_size.load(butil::memory_order_relaxed) + << connector << "rdma_sq_window_size=" << _sq_window_size.load(butil::memory_order_relaxed) + << connector << "rdma_local_window_capacity=" << _local_window_capacity + << connector << "rdma_remote_window_capacity=" << _remote_window_capacity + << connector << "rdma_sbuf_head=" << _sq_current + << connector << "rdma_sbuf_tail=" << _sq_sent + << connector << "rdma_rbuf_head=" << _rq_received + << connector << "rdma_unacked_rq_wr=" << _new_rq_wrs + << connector << "rdma_received_ack=" << _accumulated_ack + << connector << "rdma_unsolicited_sent=" << _unsolicited + << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled + << connector << "rdma_new_rq_wrs=" << _new_rq_wrs.load(butil::memory_order_relaxed) + << connector << "rdma_imm_inflight=" << _imm_inflight.load(butil::memory_order_relaxed) + << connector << ""; } int RdmaEndpoint::GlobalInitialize() { @@ -1735,22 +1736,22 @@ void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) { } void RdmaEndpoint::PollerAddCqSid() { - auto index = butil::fmix32(_polling_cq_sid) % FLAGS_rdma_poller_num; + auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; auto& group = _poller_groups[bthread_self_tag()]; auto& pollers = group.pollers; auto& poller = pollers[index]; - if (INVALID_SOCKET_ID != _polling_cq_sid) { - poller.op_queue.Enqueue(CqSidOp{_polling_cq_sid, CqSidOp::ADD}); + if (INVALID_SOCKET_ID != _cq_sid) { + poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD}); } } void RdmaEndpoint::PollerRemoveCqSid() { - auto index = butil::fmix32(_polling_cq_sid) % FLAGS_rdma_poller_num; + auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; auto& group = _poller_groups[bthread_self_tag()]; auto& pollers = group.pollers; auto& poller = pollers[index]; - if (INVALID_SOCKET_ID != _polling_cq_sid) { - poller.op_queue.Enqueue(CqSidOp{_polling_cq_sid, CqSidOp::REMOVE}); + if (INVALID_SOCKET_ID != _cq_sid) { + poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE}); } } diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 58ea8c52a0..c2cb2e59e3 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -59,13 +59,14 @@ class RdmaConnect : public AppConnect { }; struct RdmaResource { + RdmaResource* next{NULL}; ibv_qp* qp{NULL}; + // For polling mode. ibv_cq* polling_cq{NULL}; + // For event mode. ibv_cq* send_cq{NULL}; ibv_cq* recv_cq{NULL}; - ibv_comp_channel* send_comp_channel{NULL}; - ibv_comp_channel* recv_comp_channel{NULL}; - RdmaResource* next{NULL}; + ibv_comp_channel* comp_channel{NULL}; RdmaResource() = default; ~RdmaResource(); DISALLOW_COPY_AND_ASSIGN(RdmaResource); @@ -95,7 +96,8 @@ friend class Socket; bool IsWritable() const; // For debug - void DebugInfo(std::ostream& os) const; + void DebugInfo(std::ostream& os, + butil::StringPiece connector = "\n") const; // Callback when there is new epollin event on TCP fd static void OnNewDataFromTcp(Socket* m); @@ -132,16 +134,8 @@ friend class Socket; // Process handshake at the server static void* ProcessHandshakeAtServer(void* arg); - // Create a socket which wrap the comp channel of CQ. - SocketId CreateSocket(int fd, ibv_cq* cq, int solicited_only); - - // Release a socket which wrap the comp channel of CQ. - static void SetSocketFailed(SocketId socket_id, bool remove_consumer); - // Deallocate CQ resource. - static void DeallocateCq(ibv_cq* cq, ibv_comp_channel* comp_channel, - unsigned int cq_events, bthread_tag_t tag); - + static void DeallocateCq(ibv_cq* cq, unsigned int cq_events); // Allocate resources // Return 0 if success, -1 if failed and errno set @@ -209,8 +203,10 @@ friend class Socket; int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num); // Get event from comp channel and ack the events - int GetAndAckEvents(ibv_comp_channel* comp_channel, - ibv_cq* cq, unsigned int* cq_events); + int GetAndAckEvents(SocketUniquePtr& s); + + // Request completion notification on a send/recv CQ. + int ReqNotifyCq(bool send_cq); // Poll CQ and get the work completion static void PollCq(Socket* m); @@ -241,10 +237,7 @@ friend class Socket; unsigned int _recv_cq_events; // The SocketId which wrap the comp channel of CQ. - SocketId _send_cq_sid; - SocketId _recv_cq_sid; - // The SocketId which is for polling CQ. - SocketId _polling_cq_sid; + SocketId _cq_sid; // Capacity of local Send Queue and local Recv Queue uint16_t _sq_size; @@ -282,6 +275,8 @@ friend class Socket; butil::atomic _sq_window_size; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; + // The number of inflight send IMM. + butil::atomic _imm_inflight; // butex for inform read events on TCP fd during handshake butil::atomic *_read_butex; From ea0398381864870a8f8f14cfa9afa7c14f3493fd Mon Sep 17 00:00:00 2001 From: chenBright Date: Sun, 14 Dec 2025 23:16:56 +0800 Subject: [PATCH 5/7] Add IMM window --- src/brpc/rdma/rdma_endpoint.cpp | 23 +++++++++++------------ src/brpc/rdma/rdma_endpoint.h | 2 ++ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index a4a1b034be..7351d2962f 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -184,10 +184,10 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) , _rq_received(0) , _local_window_capacity(0) , _remote_window_capacity(0) + , _sq_imm_window_size(0) , _remote_rq_window_size(0) , _sq_window_size(0) , _new_rq_wrs(0) - , _imm_inflight(0) { if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; @@ -233,7 +233,6 @@ void RdmaEndpoint::Reset() { _remote_rq_window_size.store(0, butil::memory_order_relaxed); _sq_window_size.store(0, butil::memory_order_relaxed); _new_rq_wrs.store(0, butil::memory_order_relaxed); - _imm_inflight.store(0, butil::memory_order_relaxed); } void RdmaConnect::StartConnect(const Socket* socket, @@ -520,7 +519,8 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_local_window_capacity = std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM; ep->_remote_window_capacity = - std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, + std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM; + ep->_sq_imm_window_size = RESERVED_WR_NUM; ep->_remote_rq_window_size.store( ep->_local_window_capacity, butil::memory_order_relaxed); ep->_sq_window_size.store( @@ -631,7 +631,8 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { ep->_local_window_capacity = std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM; ep->_remote_window_capacity = - std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM, + std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM; + ep->_sq_imm_window_size = RESERVED_WR_NUM; ep->_remote_rq_window_size.store( ep->_local_window_capacity, butil::memory_order_relaxed); ep->_sq_window_size.store( @@ -927,7 +928,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { } int RdmaEndpoint::SendAck(int num) { - if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) { + if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2 && + _sq_imm_window_size > 0) { return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed)); } return 0; @@ -955,7 +957,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) { LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " << oss.str(); return -1; } - _imm_inflight.fetch_add(1, butil::memory_order_relaxed); + _sq_imm_window_size -= 1; return 0; } @@ -964,8 +966,8 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { switch (wc.opcode) { case IBV_WC_SEND: { // send completion if (0 == wc.wr_id) { - // Do nothing for imm. - _imm_inflight.fetch_sub(1, butil::memory_order_relaxed); + _sq_imm_window_size += 1; + SendAck(0); return 0; } // Update SQ window. @@ -1096,9 +1098,6 @@ static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size, ui static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { std::unique_ptr resource(new RdmaResource); - // NOTE: we enlarge the size of SQ to contain redundant 1/4 of the wnd, - // which is for unexpected Imm. - sq_size = sq_size * 5 / 4; /* NOTE */ if (!FLAGS_rdma_use_polling) { resource->comp_channel = IbvCreateCompChannel(GetRdmaContext()); if (NULL == resource->comp_channel) { @@ -1578,6 +1577,7 @@ std::string RdmaEndpoint::GetStateStr() const { void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) const { os << "rdma_state=ON" << connector << "handshake_state=" << GetStateStr() + << connector << "rdma__sq_imm_window_size=" << _sq_imm_window_size << connector << "rdma_remote_rq_window_size=" << _remote_rq_window_size.load(butil::memory_order_relaxed) << connector << "rdma_sq_window_size=" << _sq_window_size.load(butil::memory_order_relaxed) << connector << "rdma_local_window_capacity=" << _local_window_capacity @@ -1590,7 +1590,6 @@ void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) con << connector << "rdma_unsolicited_sent=" << _unsolicited << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled << connector << "rdma_new_rq_wrs=" << _new_rq_wrs.load(butil::memory_order_relaxed) - << connector << "rdma_imm_inflight=" << _imm_inflight.load(butil::memory_order_relaxed) << connector << ""; } diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index c2cb2e59e3..3fc82e2967 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -269,6 +269,8 @@ friend class Socket; uint16_t _local_window_capacity; // The capacity of remote window: min(local RQ, remote SQ) uint16_t _remote_window_capacity; + // The number of IMM WRs we can post to the local Send Queue. + uint16_t _sq_imm_window_size; // The number of WRs we can send to remote side. butil::atomic _remote_rq_window_size; // The number of WRs we can post to the local Send Queue From b02d13a0de3d8580c12ff54fb3bd841cfd4d5b01 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 15 Dec 2025 14:42:27 +0800 Subject: [PATCH 6/7] Deallocate polling cq --- src/brpc/rdma/rdma_endpoint.cpp | 59 +++++++++++++++++++-------------- src/brpc/rdma/rdma_endpoint.h | 5 --- test/brpc_rdma_unittest.cpp | 6 +--- 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 7351d2962f..616ef33252 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -230,6 +230,7 @@ void RdmaEndpoint::Reset() { _rq_received = 0; _local_window_capacity = 0; _remote_window_capacity = 0; + _sq_imm_window_size = 0; _remote_rq_window_size.store(0, butil::memory_order_relaxed); _sq_window_size.store(0, butil::memory_order_relaxed); _new_rq_wrs.store(0, butil::memory_order_relaxed); @@ -841,8 +842,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { } ssize_t len = data->cut_into_sglist_and_iobuf( - sglist, &sge_index, to, max_sge, - _remote_recv_block_size - this_len); + sglist, &sge_index, to, max_sge, _remote_recv_block_size - this_len); if (len < 0) { return -1; } @@ -957,6 +957,10 @@ int RdmaEndpoint::SendImm(uint32_t imm) { LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " << oss.str(); return -1; } + + // `_sq_imm_window_size' will never be negative. + // Because IMM can only be sent if + // `_sq_imm_window_size` is greater than 0. _sq_imm_window_size -= 1; return 0; } @@ -967,6 +971,7 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_SEND: { // send completion if (0 == wc.wr_id) { _sq_imm_window_size += 1; + // If there are any unacknowledged recvs, send an ack. SendAck(0); return 0; } @@ -1298,14 +1303,11 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { return 0; } -void RdmaEndpoint::DeallocateCq(ibv_cq* cq, unsigned int cq_events) { +static void DeallocateCq(ibv_cq* cq) { if (NULL == cq) { return; } - if (cq_events > 0) { - IbvAckCqEvents(cq, cq_events); - } int err = IbvDestroyCq(cq); LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err); } @@ -1328,6 +1330,13 @@ void RdmaEndpoint::DeallocateResources() { } } + if (NULL != _resource->send_cq) { + IbvAckCqEvents(_resource->send_cq, _send_cq_events); + } + if (NULL != _resource->recv_cq) { + IbvAckCqEvents(_resource->recv_cq, _recv_cq_events); + } + bool remove_consumer = true; if (!move_to_rdma_resource_list) { if (NULL != _resource->qp) { @@ -1336,8 +1345,9 @@ void RdmaEndpoint::DeallocateResources() { _resource->qp = NULL; } - DeallocateCq(_resource->send_cq, _send_cq_events); - DeallocateCq(_resource->recv_cq, _recv_cq_events); + DeallocateCq(_resource->polling_cq); + DeallocateCq(_resource->send_cq); + DeallocateCq(_resource->recv_cq); if (NULL != _resource->comp_channel) { // Destroy send_comp_channel will destroy this fd, @@ -1350,10 +1360,12 @@ void RdmaEndpoint::DeallocateResources() { } + _resource->polling_cq = NULL; _resource->send_cq = NULL; _resource->recv_cq = NULL; _resource->comp_channel = NULL; delete _resource; + _resource = NULL; } if (INVALID_SOCKET_ID != _cq_sid) { @@ -1369,12 +1381,6 @@ void RdmaEndpoint::DeallocateResources() { } if (move_to_rdma_resource_list) { - if (NULL != _resource->send_cq) { - IbvAckCqEvents(_resource->send_cq, _send_cq_events); - } - if (NULL != _resource->recv_cq) { - IbvAckCqEvents(_resource->recv_cq, _recv_cq_events); - } BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex); _resource->next = g_rdma_resource_list; g_rdma_resource_list = _resource; @@ -1399,8 +1405,16 @@ int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) { } if (cq == _resource->send_cq) { ++_send_cq_events; - } else { + } else if (cq == _resource->recv_cq) { ++_recv_cq_events; + } else { + // Unexpected CQ event that does not belong to + // this endpoint's send/recv CQs. + LOG(WARNING) << "Unexpected CQ event from cq=" << cq + << " of " << s->description(); + // Acknowledge this single event immediately + // to avoid leaking unacknowledged events. + IbvAckCqEvents(cq, 1); } } if (_send_cq_events >= MAX_CQ_EVENTS) { @@ -1414,14 +1428,13 @@ int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) { return 0; } - - int RdmaEndpoint::ReqNotifyCq(bool send_cq) { errno = ibv_req_notify_cq( - send_cq ? _resource->send_cq : _resource->recv_cq, send_cq ? 0 : 1); + send_cq ? _resource->send_cq : _resource->recv_cq, + send_cq ? 0 : 1); if (0 != errno) { const int saved_errno = errno; - PLOG(WARNING) << "Fail to arm" << (send_cq ? "send" : "recv") + PLOG(WARNING) << "Fail to arm " << (send_cq ? "send" : "recv") << " CQ comp channel from " << _socket->description(); _socket->SetFailed(saved_errno, "Fail to arm %s CQ channel from %s: %s", send_cq ? "send" : "recv", _socket->description().c_str(), @@ -1577,7 +1590,7 @@ std::string RdmaEndpoint::GetStateStr() const { void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) const { os << "rdma_state=ON" << connector << "handshake_state=" << GetStateStr() - << connector << "rdma__sq_imm_window_size=" << _sq_imm_window_size + << connector << "rdma_sq_imm_window_size=" << _sq_imm_window_size << connector << "rdma_remote_rq_window_size=" << _remote_rq_window_size.load(butil::memory_order_relaxed) << connector << "rdma_sq_window_size=" << _sq_window_size.load(butil::memory_order_relaxed) << connector << "rdma_local_window_capacity=" << _local_window_capacity @@ -1585,12 +1598,10 @@ void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) con << connector << "rdma_sbuf_head=" << _sq_current << connector << "rdma_sbuf_tail=" << _sq_sent << connector << "rdma_rbuf_head=" << _rq_received - << connector << "rdma_unacked_rq_wr=" << _new_rq_wrs + << connector << "rdma_unacked_rq_wr=" << _new_rq_wrs.load(butil::memory_order_relaxed) << connector << "rdma_received_ack=" << _accumulated_ack << connector << "rdma_unsolicited_sent=" << _unsolicited - << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled - << connector << "rdma_new_rq_wrs=" << _new_rq_wrs.load(butil::memory_order_relaxed) - << connector << ""; + << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled; } int RdmaEndpoint::GlobalInitialize() { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 3fc82e2967..eb4714ef0d 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -134,9 +134,6 @@ friend class Socket; // Process handshake at the server static void* ProcessHandshakeAtServer(void* arg); - // Deallocate CQ resource. - static void DeallocateCq(ibv_cq* cq, unsigned int cq_events); - // Allocate resources // Return 0 if success, -1 if failed and errno set int AllocateResources(); @@ -277,8 +274,6 @@ friend class Socket; butil::atomic _sq_window_size; // The number of new WRs posted in the local Recv Queue butil::atomic _new_rq_wrs; - // The number of inflight send IMM. - butil::atomic _imm_inflight; // butex for inform read events on TCP fd during handshake butil::atomic *_read_butex; diff --git a/test/brpc_rdma_unittest.cpp b/test/brpc_rdma_unittest.cpp index 066d01277e..ccb280f1c8 100644 --- a/test/brpc_rdma_unittest.cpp +++ b/test/brpc_rdma_unittest.cpp @@ -209,8 +209,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) { uint8_t data[RDMA_HELLO_MSG_LEN]; memcpy(data, "PRPC", 4); // send as normal baidu_std protocol - memset(data + 4, 0, 32); - ASSERT_EQ(38, write(sockfd, data, 38)); + ASSERT_EQ(4, write(sockfd, data, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); @@ -660,9 +659,6 @@ TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) { ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(sizeof(flags), write(sockfd1, &flags, sizeof(flags))); usleep(100000); - ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); - close(sockfd1); - usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); From fb1e3c058cb5edd3e624ff5b5496ce9b5aef24f6 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 22 Dec 2025 15:23:44 +0800 Subject: [PATCH 7/7] Update RDMA documents --- docs/cn/rdma.md | 48 ++++++++++++++++++++++++------------------------ docs/en/rdma.md | 43 ++++++++++++++++++++++++------------------- 2 files changed, 48 insertions(+), 43 deletions(-) diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md index 29b0b6fb43..e775459893 100644 --- a/docs/cn/rdma.md +++ b/docs/cn/rdma.md @@ -35,7 +35,7 @@ RDMA与TCP不同,不使用socket接口进行通信。但是在实现上仍然 brpc内部使用RDMA RC模式,每个RdmaEndpoint对应一个QP。RDMA连接建立依赖于前置TCP建连,TCP建连后双方交换必要参数,如GID、QPN等,再发起RDMA连接并实现数据传输。这个过程我们称为握手(参见RdmaEndpoint)。因为握手需要TCP连接,因此RdmaEndpoint所在的Socket类中,原本的TCP fd仍然有效。握手过程采用了brpc中已有的AppConnect逻辑。注意,握手用的TCP连接在后续数据传输阶段并不会收发数据,但仍保持为EST状态。一旦TCP连接中断,其上对应的RDMA连接同样会置错。 -RdmaEndpoint数据传输逻辑的第一个重要特性是零拷贝。要发送的所有数据默认都存放在IOBuf的Block中,因此所发送的Block需要等到对端确认接收完成后才可以释放,这些Block的引用被存放于RdmaEndpoint::_sbuf中。而要实现接收零拷贝,则需要确保接受端所预提交的接收缓冲区必须直接在IOBuf的Block里面,被存放于RdmaEndpoint::_rbuf。注意,接收端预提交的每一段Block,有一个固定的大小(recv_block_size)。发送端发送时,一个请求最多只能有这么大,否则接收端则无法成功接收。 +RdmaEndpoint数据传输逻辑的第一个重要特性是零拷贝。要发送的所有数据默认都存放在IOBuf的Block中,因此所发送的Block需要等到发送CQE触发后才可以释放,这些Block的引用被存放于RdmaEndpoint::_sbuf中。而要实现接收零拷贝,则需要确保接受端所预提交的接收缓冲区必须直接在IOBuf的Block里面,被存放于RdmaEndpoint::_rbuf。注意,接收端预提交的每一段Block,有一个固定的大小(recv_block_size)。发送端发送时,一个请求最多只能有这么大,否则接收端则无法成功接收。 RdmaEndpoint数据传输逻辑的第二个重要特性是滑动窗口流控。这一流控机制是为了避免发送端持续在发送,其速度超过了接收端处理的速度。TCP传输中也有类似的逻辑,但是是由内核协议栈来实现的。RdmaEndpoint内实现了这一流控机制,通过接收端显式回复ACK来确认接收端处理完毕。为了减少ACK本身的开销,让ACK以立即数形式返回,可以被附在数据消息里。 @@ -52,26 +52,26 @@ RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通 # 参数 可配置参数说明: -* rdma_trace_verbose: 日志中打印RDMA建连相关信息,默认false -* rdma_recv_zerocopy: 是否启用接收零拷贝,默认true -* rdma_zerocopy_min_size: 接收零拷贝最小的msg大小,默认512B -* rdma_recv_block_type: 为接收数据预准备的block类型,分为三类default(8KB)/large(64KB)/huge(2MB),默认为default -* rdma_prepared_qp_size: 程序启动预生成的QP的大小,默认128 -* rdma_prepared_qp_cnt: 程序启动预生成的QP的数量,默认1024 -* rdma_max_sge: 允许的最大发送SGList长度,默认为0,即采用硬件所支持的最大长度 -* rdma_sq_size: SQ大小,默认128 -* rdma_rq_size: RQ大小,默认128 -* rdma_cqe_poll_once: 从CQ中一次性poll出的CQE数量,默认32 -* rdma_gid_index: 使用本地GID表中的Index,默认为-1,即选用最大的可用GID Index -* rdma_port: 使用IB设备的port number,默认为1 -* rdma_device: 使用IB设备的名称,默认为空,即使用第一个active的设备 -* rdma_memory_pool_initial_size_mb: 内存池的初始大小,单位MB,默认1024 -* rdma_memory_pool_increase_size_mb: 内存池每次动态增长的大小,单位MB,默认1024 -* rdma_memory_pool_max_regions: 最大的内存池块数,默认16 -* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4 -* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128 -* rdma_use_polling: 是否使用RDMA的轮询模式,默认false -* rdma_poller_num: 轮询模式下的poller数目,默认1 -* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false -* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false -* rdma_disable_bthread: 禁用bthread,默认是false +* rdma_trace_verbose: 日志中打印RDMA建连相关信息,默认false。 +* rdma_recv_zerocopy: 是否启用接收零拷贝,默认true。 +* rdma_zerocopy_min_size: 接收零拷贝最小的msg大小,默认512B。 +* rdma_recv_block_type: 为接收数据预准备的block类型,分为三类default(8KB)/large(64KB)/huge(2MB),默认为default。 +* rdma_prepared_qp_size: 程序启动预生成的QP的大小,默认128。 +* rdma_prepared_qp_cnt: 程序启动预生成的QP的数量,默认1024。 +* rdma_max_sge: 允许的最大发送SGList长度,默认为0,即采用硬件所支持的最大长度。 +* rdma_sq_size: SQ大小,默认128。 +* rdma_rq_size: RQ大小,默认128。 +* rdma_cqe_poll_once: 从CQ中一次性poll出的CQE数量,默认32。 +* rdma_gid_index: 使用本地GID表中的Index,默认为-1,即选用最大的可用GID Index。 +* rdma_port: 使用IB设备的port number,默认为1。 +* rdma_device: 使用IB设备的名称,默认为空,即使用第一个active的设备。 +* rdma_memory_pool_initial_size_mb: 内存池的初始大小,单位MB,默认1024。 +* rdma_memory_pool_increase_size_mb: 内存池每次动态增长的大小,单位MB,默认1024。 +* rdma_memory_pool_max_regions: 最大的内存池块数,默认3。 +* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4。 +* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128。 +* rdma_use_polling: 是否使用RDMA的轮询模式,默认false。 +* rdma_poller_num: 轮询模式下的poller数目,默认1。 +* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false。 +* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false。 +* rdma_disable_bthread: 禁用bthread,默认是false。 diff --git a/docs/en/rdma.md b/docs/en/rdma.md index c0e88ce9b2..99f1ecd781 100644 --- a/docs/en/rdma.md +++ b/docs/en/rdma.md @@ -35,7 +35,7 @@ RDMA does not use socket API like TCP. However, the brpc::Socket class is still brpc uses RDMA RC mode. Every RdmaEndpoint has its own QP. Before establishing RDMA connection, a TCP connection is necessary to exchange some information such as GID and QPN. We call this procedure handshake. Since handshake needs TCP connection, the TCP fd in the corresponding Socket is still valid. The handshake procedure is completed in the AppConnect way in brpc. The TCP connection will keep in EST state but not be used for data transmission after RDMA connection is established. Once the TCP connection is closed, the corresponding RDMA connection will be set error. -The first key feature in RdmaEndpoint data transmission is zero copy. All data which need to transmit is in the Blocks of IOBuf. Thus all the Blocks need to be released after the remote side completes the receiving. The reference of these Blocks are stored in RdmaEndpoint::_sbuf. In order to realize receiving zero copy, the receive side must post receive buffers in Blocks of IOBuf, which are stored in RdmaEndpoint::_rbuf. Note that all the Blocks posted in the receive side has a fixed size (recv_block_size). The transmit side can only send message smaller than that. Otherwise the receive side cannot receive data successfully. +The first key feature in RdmaEndpoint data transmission is zero copy. All data which need to transmit is in the Blocks of IOBuf. Thus all the Blocks need to be released after the sent CQEs are triggered. The reference of these Blocks are stored in RdmaEndpoint::_sbuf. In order to realize receiving zero copy, the receive side must post receive buffers in Blocks of IOBuf, which are stored in RdmaEndpoint::_rbuf. Note that all the Blocks posted in the receive side has a fixed size (recv_block_size). The transmit side can only send message smaller than that. Otherwise the receive side cannot receive data successfully. The second key feature in RdmaEndpoint data transmission is sliding window flow control. The flow control is to avoid fast transmit side overwhelming slow receive side. TCP has similar mechanism in kernel TCP stack. RdmaEndpoint implements this mechanism with explicit ACKs from receive side. to reduce the overhead of ACKs, the ACK number can be piggybacked in ordinary data message as immediate data. @@ -50,21 +50,26 @@ RDMA is hardware-related. It has some different concepts such as device, port, G # Parameters Configurable parameters: -* rdma_trace_verbose: to print RDMA connection information in log,default is false -* rdma_recv_zerocopy: enable zero copy in receive side,default is true -* rdma_zerocopy_min_size: the min message size for receive zero copy (in Byte),default is 512 -* rdma_recv_block_type: the block type used for receiving, can be default(8KB)/large(64KB)/huge(2MB),default is default -* rdma_prepared_qp_size: the size of QPs created at the beginning of the application,default is 128 -* rdma_prepared_qp_cnt: the number of QPs created at the beginning of the application,default is 1024 -* rdma_max_sge: the max length of sglist, default is 0, which is the max length allowed by the device -* rdma_sq_size: the size of SQ,default is 128 -* rdma_rq_size: the size of RQ,default is 128 -* rdma_cqe_poll_once: the number of CQE pooled from CQ once,default is 32 -* rdma_gid_index: the index of local GID table used,default is -1,which is the maximum GID index -* rdma_port: the port number used,default is 1 -* rdma_device: the IB device name,default is empty,which is the first active device -* rdma_memory_pool_initial_size_mb: the initial region size of RDMA memory pool (in MB),default is 1024 -* rdma_memory_pool_increase_size_mb: the step increase region size of RDMA memory pool (in MB),default is 1024 -* rdma_memory_pool_max_regions: the max number of regions in RDMA memory pool,default is 16 -* rdma_memory_pool_buckets: the number of buckets for avoiding mutex contention in RDMA memory pool,default is 4 -* rdma_memory_pool_tls_cache_num: the number of thread local cached blocks in RDMA memory pool,default is 128 +* rdma_trace_verbose: to print RDMA connection information in log,default is false. +* rdma_recv_zerocopy: enable zero copy in receive side,default is true. +* rdma_zerocopy_min_size: the min message size for receive zero copy (in Byte),default is 512. +* rdma_recv_block_type: the block type used for receiving, can be default(8KB)/large(64KB)/huge(2MB),default is default. +* rdma_prepared_qp_size: the size of QPs created at the beginning of the application,default is 128. +* rdma_prepared_qp_cnt: the number of QPs created at the beginning of the application,default is 1024. +* rdma_max_sge: the max length of sglist, default is 0, which is the max length allowed by the device. +* rdma_sq_size: the size of SQ,default is 128. +* rdma_rq_size: the size of RQ,default is 128. +* rdma_cqe_poll_once: the number of CQE pooled from CQ once,default is 32. +* rdma_gid_index: the index of local GID table used,default is -1,which is the maximum GID index. +* rdma_port: the port number used,default is 1. +* rdma_device: the IB device name,default is empty,which is the first active device. +* rdma_memory_pool_initial_size_mb: the initial region size of RDMA memory pool (in MB),default is 1024. +* rdma_memory_pool_increase_size_mb: the step increase region size of RDMA memory pool (in MB),default is 1024. +* rdma_memory_pool_max_regions: the max number of regions in RDMA memory pool,default is 3. +* rdma_memory_pool_buckets: the number of buckets for avoiding mutex contention in RDMA memory pool,default is 4. +* rdma_memory_pool_tls_cache_num: the number of thread local cached blocks in RDMA memory pool,default is 128. +* rdma_use_polling: Whether to use RDMA polling mode, default is false. +* rdma_poller_num: The number of pollers in polling mode, default is 1. +* rdma_poller_yield: Whether pollers in polling mode voluntarily relinquish the CPU, default is false. +* rdma_edisp_unsched`: Prevents the event driver from being scheduled, default is false. +* rdma_disable_bthread: Disables bthread, default is false.