From 343f79ef4c350e1dbfa27aa2c3c9a16c180a42ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Wed, 12 Feb 2025 03:54:53 +0000 Subject: [PATCH 01/35] Revert "[fix](stuck): revert "[feat](http): use async reply to provide better http performance * fix some bug for partition * fix for thrift * fix the thrift exit bug * Revert "[feat](http): use async reply to provide better http performance * ensure free order * [Fix](stream-load) Fix stream load stuck under high concurrency (#36772) When the concurrency of streamload exceeds the number of threads in the remote scanner, streamload may get stuck. The reason is that the libevent thread blocks and waits for streamload to complete, and when there is no intersection between the tasks handled by the scanner thread and the libevent thread, it gets stuck. The solution is to convert the synchronous waiting tasks of libevent into asynchronous execution by using callbacks in the streamload executor thread. See merge request: !740" Revert commit d9e74efa762c8161a5ca3df4290bbd0ab896f1ef See merge request: !745" Revert commit 396cb2ec7e0b1a21bc0d7424c627f0d9321884bc --- be/src/http/action/stream_load.cpp | 100 ++++++++++-------- be/src/http/action/stream_load.h | 4 +- be/src/http/ev_http_server.cpp | 1 + be/src/http/http_channel.cpp | 3 + be/src/http/http_request.h | 12 +++ .../stream_load/stream_load_executor.cpp | 14 ++- .../stream_load/stream_load_executor.h | 5 + 7 files changed, 94 insertions(+), 45 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 1ace2b697d212a..ddf5f7cc3d5bb0 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include #include #include @@ -108,15 +110,65 @@ void StreamLoadAction::handle(HttpRequest* req) { // status already set to fail if (ctx->status.ok()) { - ctx->status = _handle(ctx); + ctx->status = _handle(ctx, req); if (!ctx->status.ok() && !ctx->status.is()) { LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << ctx->status; + _send_reply(ctx, req); } } +} + +Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpRequest* req) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error("receive body don't equal with body bytes"); + } + + // if we use non-streaming, MessageBodyFileSink.finish will close the file + RETURN_IF_ERROR(ctx->body_sink->finish()); + if (!ctx->use_streaming) { + // we need to close file first, then execute_plan_fragment here + ctx->body_sink.reset(); + TPipelineFragmentParamsList mocked; + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked + [req, this](std::shared_ptr ctx) { _on_finish(ctx, req); })); + } + + return Status::OK(); +} + +void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRequest* req) { + ctx->status = ctx->future.get(); + if (ctx->status.ok()) { + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" + << ctx->id.to_string(); + } else if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get()); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get()); + ctx->commit_and_publish_txn_cost_nanos = + MonotonicNanos() - commit_and_publish_start_time; + g_stream_load_commit_and_publish_latency_ms + << ctx->commit_and_publish_txn_cost_nanos / 1000000; + } + } + _send_reply(ctx, req); +} + +void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; @@ -155,45 +207,6 @@ void StreamLoadAction::handle(HttpRequest* req) { streaming_load_duration_ms->increment(ctx->load_cost_millis); } -Status StreamLoadAction::_handle(std::shared_ptr ctx) { - if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { - LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes - << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::Error("receive body don't equal with body bytes"); - } - - // if we use non-streaming, MessageBodyFileSink.finish will close the file - RETURN_IF_ERROR(ctx->body_sink->finish()); - if (!ctx->use_streaming) { - // we need to close file first, then execute_plan_fragment here - ctx->body_sink.reset(); - TPipelineFragmentParamsList mocked; - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked)); - } - - // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); - - if (ctx->group_commit) { - LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); - return Status::OK(); - } - - if (ctx->two_phase_commit) { - int64_t pre_commit_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); - ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; - } else { - // If put file success we need commit this load - int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); - ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; - g_stream_load_commit_and_publish_latency_ms - << ctx->commit_and_publish_txn_cost_nanos / 1000000; - } - return Status::OK(); -} - int StreamLoadAction::on_header(HttpRequest* req) { streaming_load_current_processing->increment(1); @@ -771,8 +784,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!ctx->use_streaming) { return Status::OK(); } - TPipelineFragmentParamsList mocked; - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); + + return _exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked, [http_req, this](std::shared_ptr ctx) { + _on_finish(ctx, http_req); + }); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index d1de89c9397018..f91334e7305b15 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -46,11 +46,13 @@ class StreamLoadAction : public HttpHandler { private: Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); - Status _handle(std::shared_ptr ctx); + Status _handle(std::shared_ptr ctx, HttpRequest* req); Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); + void _on_finish(std::shared_ptr ctx, HttpRequest* req); + void _send_reply(std::shared_ptr ctx, HttpRequest* req); private: ExecEnv* _exec_env; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 1bbd2c0e178fd2..962ccea7cd968c 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -56,6 +56,7 @@ static void on_chunked(struct evhttp_request* ev_req, void* param) { static void on_free(struct evhttp_request* ev_req, void* arg) { HttpRequest* request = (HttpRequest*)arg; + request->wait_finish_send_reply(); delete request; } diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 918f54d81a0a0b..fd4d5ae3409a25 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -53,6 +53,7 @@ void HttpChannel::send_error(HttpRequest* request, HttpStatus status) { void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), nullptr); + request->finish_send_reply(); } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { @@ -66,6 +67,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std: evbuffer_add(evb, content.c_str(), content.size()); } evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } @@ -80,6 +82,7 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); } evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index a9286410aff0a2..0a0155fe4df0b2 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -79,6 +79,14 @@ class HttpRequest { const char* remote_host() const; + void finish_send_reply() { + promise.set_value(true); + } + + void wait_finish_send_reply() { + futrue.get(); + } + private: HttpMethod _method; std::string _uri; @@ -93,6 +101,10 @@ class HttpRequest { std::shared_ptr _handler_ctx; std::string _request_body; + + // ensure send_reply finished + std::promise promise; + std::future futrue = promise.get_future(); }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index cdea553d896492..253b2c20ca7c30 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -27,8 +27,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -66,14 +68,20 @@ bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precom bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, - const TPipelineFragmentParamsList& parent) { + const TPipelineFragmentParams& parent) { + return execute_plan_fragment(ctx, parent, [](std::shared_ptr ctx) {}); +} + +Status StreamLoadExecutor::execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParams& parent, + const std::function ctx)>& cb) { // submit this params #ifndef BE_TEST ctx->start_write_data_nanos = MonotonicNanos(); LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; - auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) { + auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) { if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id(); @@ -141,6 +149,8 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr(this->commit_txn(ctx.get())); } } + + cb(ctx); }; if (ctx->put_result.__isset.params) { diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 3472ae5a200096..5e739b41e354ce 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -19,6 +19,7 @@ #include +#include #include #include "common/factory_creator.h" @@ -54,6 +55,10 @@ class StreamLoadExecutor { Status execute_plan_fragment(std::shared_ptr ctx, const TPipelineFragmentParamsList& parent); + Status execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParams& parent, + const std::function ctx)>& cb); + protected: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false From c11fce758d7838ac20002f2be4656918f49268b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Mon, 23 Jun 2025 10:05:02 +0000 Subject: [PATCH 02/35] fix buffer wait * fix missing ; * fix stream load block See merge request: !855 --- be/src/http/action/stream_load.cpp | 2 ++ be/src/http/http_request.h | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ddf5f7cc3d5bb0..6b88758c0a20e3 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -208,6 +208,8 @@ void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpR } int StreamLoadAction::on_header(HttpRequest* req) { + req->mark_send_reply(); + streaming_load_current_processing->increment(1); std::shared_ptr ctx = std::make_shared(_exec_env); diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 0a0155fe4df0b2..03c8231bc90e5e 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -32,6 +32,11 @@ namespace doris { class HttpHandler; +enum SendReplyType { + SYNC = 0, + ASYNC = 1 +}; + class HttpRequest { public: HttpRequest(evhttp_request* ev_req); @@ -79,15 +84,27 @@ class HttpRequest { const char* remote_host() const; + void mark_send_reply(SendReplyType type = ASYNC) { + _send_reply_type = type; + } + void finish_send_reply() { promise.set_value(true); } void wait_finish_send_reply() { - futrue.get(); + if (_send_reply_type == SYNC) { + return; + } + + auto status = _futrue.wait_for(std::chrono::seconds(600)); + if (status != std::future_status::ready) { + LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); + } } private: + SendReplyType _send_reply_type = SYNC; HttpMethod _method; std::string _uri; std::string _raw_path; From 81662b94a2c79b98c733ce603cfdd0198975859b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Mon, 23 Jun 2025 10:18:44 +0000 Subject: [PATCH 03/35] fix futrue not compile --- be/src/http/http_request.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 03c8231bc90e5e..20177db0c5c891 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -121,7 +121,7 @@ class HttpRequest { // ensure send_reply finished std::promise promise; - std::future futrue = promise.get_future(); + std::future _futrue = promise.get_future(); }; } // namespace doris From a4191dc340ef31c4ebf0e00b379a0ec8da053031 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:40:13 +0800 Subject: [PATCH 04/35] fix redefinition --- be/src/http/http_request.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 20177db0c5c891..728776fc5e2451 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -33,8 +33,8 @@ namespace doris { class HttpHandler; enum SendReplyType { - SYNC = 0, - ASYNC = 1 + REPLY_SYNC = 0, + REPLY_ASYNC = 1 }; class HttpRequest { @@ -84,7 +84,7 @@ class HttpRequest { const char* remote_host() const; - void mark_send_reply(SendReplyType type = ASYNC) { + void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } @@ -93,7 +93,7 @@ class HttpRequest { } void wait_finish_send_reply() { - if (_send_reply_type == SYNC) { + if (_send_reply_type == REPLY_SYNC) { return; } @@ -104,7 +104,7 @@ class HttpRequest { } private: - SendReplyType _send_reply_type = SYNC; + SendReplyType _send_reply_type = REPLY_SYNC; HttpMethod _method; std::string _uri; std::string _raw_path; From 2b3f934b20e15f1c6929a78a768c1cb91ffc5331 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:47:01 +0800 Subject: [PATCH 05/35] fix clang compile --- be/src/http/action/stream_load.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 6b88758c0a20e3..4e8521906acacf 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -133,7 +133,7 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpReq ctx->body_sink.reset(); TPipelineFragmentParamsList mocked; RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( - ctx, mocked + ctx, mocked, [req, this](std::shared_ptr ctx) { _on_finish(ctx, req); })); } @@ -787,6 +787,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::OK(); } + TPipelineFragmentParamsList mocked; return _exec_env->stream_load_executor()->execute_plan_fragment( ctx, mocked, [http_req, this](std::shared_ptr ctx) { _on_finish(ctx, http_req); From 429adbe793ca6a34da6407632f011dcaad2ca064 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:49:36 +0800 Subject: [PATCH 06/35] fix clang compile --- be/src/runtime/stream_load/stream_load_executor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 5e739b41e354ce..0455e31e088fe5 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -56,7 +56,7 @@ class StreamLoadExecutor { const TPipelineFragmentParamsList& parent); Status execute_plan_fragment( - std::shared_ptr ctx, const TPipelineFragmentParams& parent, + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, const std::function ctx)>& cb); protected: From c74b6d5757bb8b2e88799e63fe66420dd39c27b3 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 13:13:00 +0800 Subject: [PATCH 07/35] fix clang compile --- be/src/runtime/stream_load/stream_load_executor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 253b2c20ca7c30..724c06ec538542 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -68,12 +68,12 @@ bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precom bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, - const TPipelineFragmentParams& parent) { + const TPipelineFragmentParamsList& parent) { return execute_plan_fragment(ctx, parent, [](std::shared_ptr ctx) {}); } Status StreamLoadExecutor::execute_plan_fragment( - std::shared_ptr ctx, const TPipelineFragmentParams& parent, + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, const std::function ctx)>& cb) { // submit this params #ifndef BE_TEST From be29a7ef80256c2cac2be63ad0a1286dcdfc384e Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 14 Jul 2025 14:06:46 +0000 Subject: [PATCH 08/35] reformat --- be/src/http/action/stream_load.cpp | 2 +- be/src/http/http_request.h | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 4e8521906acacf..56ea2abffe9e9e 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -29,10 +29,10 @@ #include #include +#include #include #include #include -#include #include #include #include diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 728776fc5e2451..5fbc4f244ac792 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -32,10 +32,7 @@ namespace doris { class HttpHandler; -enum SendReplyType { - REPLY_SYNC = 0, - REPLY_ASYNC = 1 -}; +enum SendReplyType { REPLY_SYNC = 0, REPLY_ASYNC = 1 }; class HttpRequest { public: @@ -84,13 +81,9 @@ class HttpRequest { const char* remote_host() const; - void mark_send_reply(SendReplyType type = REPLY_ASYNC) { - _send_reply_type = type; - } + void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } - void finish_send_reply() { - promise.set_value(true); - } + void finish_send_reply() { promise.set_value(true); } void wait_finish_send_reply() { if (_send_reply_type == REPLY_SYNC) { From 31748a219ec039b26abb90b88422dbd0f0f1bef8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Wed, 12 Feb 2025 03:54:53 +0000 Subject: [PATCH 09/35] Revert "[fix](stuck): revert "[feat](http): use async reply to provide better http performance * fix some bug for partition * fix for thrift * fix the thrift exit bug * Revert "[feat](http): use async reply to provide better http performance * ensure free order * [Fix](stream-load) Fix stream load stuck under high concurrency (#36772) When the concurrency of streamload exceeds the number of threads in the remote scanner, streamload may get stuck. The reason is that the libevent thread blocks and waits for streamload to complete, and when there is no intersection between the tasks handled by the scanner thread and the libevent thread, it gets stuck. The solution is to convert the synchronous waiting tasks of libevent into asynchronous execution by using callbacks in the streamload executor thread. See merge request: !740" Revert commit d9e74efa762c8161a5ca3df4290bbd0ab896f1ef See merge request: !745" Revert commit 396cb2ec7e0b1a21bc0d7424c627f0d9321884bc --- be/src/http/action/stream_load.cpp | 100 ++++++++++-------- be/src/http/action/stream_load.h | 4 +- be/src/http/ev_http_server.cpp | 1 + be/src/http/http_channel.cpp | 3 + be/src/http/http_request.h | 12 +++ .../stream_load/stream_load_executor.cpp | 14 ++- .../stream_load/stream_load_executor.h | 5 + 7 files changed, 94 insertions(+), 45 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 72ade89da9a055..1aa3c642c8ad51 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include #include #include @@ -108,15 +110,65 @@ void StreamLoadAction::handle(HttpRequest* req) { // status already set to fail if (ctx->status.ok()) { - ctx->status = _handle(ctx); + ctx->status = _handle(ctx, req); if (!ctx->status.ok() && !ctx->status.is()) { LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << ctx->status; + _send_reply(ctx, req); } } +} + +Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpRequest* req) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error("receive body don't equal with body bytes"); + } + + // if we use non-streaming, MessageBodyFileSink.finish will close the file + RETURN_IF_ERROR(ctx->body_sink->finish()); + if (!ctx->use_streaming) { + // we need to close file first, then execute_plan_fragment here + ctx->body_sink.reset(); + TPipelineFragmentParamsList mocked; + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked + [req, this](std::shared_ptr ctx) { _on_finish(ctx, req); })); + } + + return Status::OK(); +} + +void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRequest* req) { + ctx->status = ctx->future.get(); + if (ctx->status.ok()) { + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" + << ctx->id.to_string(); + } else if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get()); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get()); + ctx->commit_and_publish_txn_cost_nanos = + MonotonicNanos() - commit_and_publish_start_time; + g_stream_load_commit_and_publish_latency_ms + << ctx->commit_and_publish_txn_cost_nanos / 1000000; + } + } + _send_reply(ctx, req); +} + +void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; @@ -155,45 +207,6 @@ void StreamLoadAction::handle(HttpRequest* req) { streaming_load_duration_ms->increment(ctx->load_cost_millis); } -Status StreamLoadAction::_handle(std::shared_ptr ctx) { - if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { - LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes - << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::Error("receive body don't equal with body bytes"); - } - - // if we use non-streaming, MessageBodyFileSink.finish will close the file - RETURN_IF_ERROR(ctx->body_sink->finish()); - if (!ctx->use_streaming) { - // we need to close file first, then execute_plan_fragment here - ctx->body_sink.reset(); - TPipelineFragmentParamsList mocked; - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked)); - } - - // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); - - if (ctx->group_commit) { - LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); - return Status::OK(); - } - - if (ctx->two_phase_commit) { - int64_t pre_commit_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); - ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; - } else { - // If put file success we need commit this load - int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); - ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; - g_stream_load_commit_and_publish_latency_ms - << ctx->commit_and_publish_txn_cost_nanos / 1000000; - } - return Status::OK(); -} - int StreamLoadAction::on_header(HttpRequest* req) { streaming_load_current_processing->increment(1); @@ -790,8 +803,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!ctx->use_streaming) { return Status::OK(); } - TPipelineFragmentParamsList mocked; - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); + + return _exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked, [http_req, this](std::shared_ptr ctx) { + _on_finish(ctx, http_req); + }); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index d1de89c9397018..f91334e7305b15 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -46,11 +46,13 @@ class StreamLoadAction : public HttpHandler { private: Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); - Status _handle(std::shared_ptr ctx); + Status _handle(std::shared_ptr ctx, HttpRequest* req); Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); + void _on_finish(std::shared_ptr ctx, HttpRequest* req); + void _send_reply(std::shared_ptr ctx, HttpRequest* req); private: ExecEnv* _exec_env; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 457cf0e0322ba5..0231337a238102 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -56,6 +56,7 @@ static void on_chunked(struct evhttp_request* ev_req, void* param) { static void on_free(struct evhttp_request* ev_req, void* arg) { HttpRequest* request = (HttpRequest*)arg; + request->wait_finish_send_reply(); delete request; } diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 918f54d81a0a0b..fd4d5ae3409a25 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -53,6 +53,7 @@ void HttpChannel::send_error(HttpRequest* request, HttpStatus status) { void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), nullptr); + request->finish_send_reply(); } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { @@ -66,6 +67,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std: evbuffer_add(evb, content.c_str(), content.size()); } evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } @@ -80,6 +82,7 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); } evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index a9286410aff0a2..0a0155fe4df0b2 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -79,6 +79,14 @@ class HttpRequest { const char* remote_host() const; + void finish_send_reply() { + promise.set_value(true); + } + + void wait_finish_send_reply() { + futrue.get(); + } + private: HttpMethod _method; std::string _uri; @@ -93,6 +101,10 @@ class HttpRequest { std::shared_ptr _handler_ctx; std::string _request_body; + + // ensure send_reply finished + std::promise promise; + std::future futrue = promise.get_future(); }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index cdea553d896492..253b2c20ca7c30 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -27,8 +27,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -66,14 +68,20 @@ bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precom bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, - const TPipelineFragmentParamsList& parent) { + const TPipelineFragmentParams& parent) { + return execute_plan_fragment(ctx, parent, [](std::shared_ptr ctx) {}); +} + +Status StreamLoadExecutor::execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParams& parent, + const std::function ctx)>& cb) { // submit this params #ifndef BE_TEST ctx->start_write_data_nanos = MonotonicNanos(); LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; - auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) { + auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) { if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id(); @@ -141,6 +149,8 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr(this->commit_txn(ctx.get())); } } + + cb(ctx); }; if (ctx->put_result.__isset.params) { diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 3472ae5a200096..5e739b41e354ce 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -19,6 +19,7 @@ #include +#include #include #include "common/factory_creator.h" @@ -54,6 +55,10 @@ class StreamLoadExecutor { Status execute_plan_fragment(std::shared_ptr ctx, const TPipelineFragmentParamsList& parent); + Status execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParams& parent, + const std::function ctx)>& cb); + protected: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false From 2c46f61534540be3042440a68756f481f87c8e38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Mon, 23 Jun 2025 10:05:02 +0000 Subject: [PATCH 10/35] fix buffer wait * fix missing ; * fix stream load block See merge request: !855 --- be/src/http/action/stream_load.cpp | 2 ++ be/src/http/http_request.h | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 1aa3c642c8ad51..a125dec63a60b7 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -208,6 +208,8 @@ void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpR } int StreamLoadAction::on_header(HttpRequest* req) { + req->mark_send_reply(); + streaming_load_current_processing->increment(1); std::shared_ptr ctx = std::make_shared(_exec_env); diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 0a0155fe4df0b2..03c8231bc90e5e 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -32,6 +32,11 @@ namespace doris { class HttpHandler; +enum SendReplyType { + SYNC = 0, + ASYNC = 1 +}; + class HttpRequest { public: HttpRequest(evhttp_request* ev_req); @@ -79,15 +84,27 @@ class HttpRequest { const char* remote_host() const; + void mark_send_reply(SendReplyType type = ASYNC) { + _send_reply_type = type; + } + void finish_send_reply() { promise.set_value(true); } void wait_finish_send_reply() { - futrue.get(); + if (_send_reply_type == SYNC) { + return; + } + + auto status = _futrue.wait_for(std::chrono::seconds(600)); + if (status != std::future_status::ready) { + LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); + } } private: + SendReplyType _send_reply_type = SYNC; HttpMethod _method; std::string _uri; std::string _raw_path; From bdfa223a311fc2215880d5eb7b28da909eff96d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BF=8A=E8=BE=89?= Date: Mon, 23 Jun 2025 10:18:44 +0000 Subject: [PATCH 11/35] fix futrue not compile --- be/src/http/http_request.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 03c8231bc90e5e..20177db0c5c891 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -121,7 +121,7 @@ class HttpRequest { // ensure send_reply finished std::promise promise; - std::future futrue = promise.get_future(); + std::future _futrue = promise.get_future(); }; } // namespace doris From 77ec143a932027ec92b386dd5a96aa06d9f9232e Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:40:13 +0800 Subject: [PATCH 12/35] fix redefinition --- be/src/http/http_request.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 20177db0c5c891..728776fc5e2451 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -33,8 +33,8 @@ namespace doris { class HttpHandler; enum SendReplyType { - SYNC = 0, - ASYNC = 1 + REPLY_SYNC = 0, + REPLY_ASYNC = 1 }; class HttpRequest { @@ -84,7 +84,7 @@ class HttpRequest { const char* remote_host() const; - void mark_send_reply(SendReplyType type = ASYNC) { + void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } @@ -93,7 +93,7 @@ class HttpRequest { } void wait_finish_send_reply() { - if (_send_reply_type == SYNC) { + if (_send_reply_type == REPLY_SYNC) { return; } @@ -104,7 +104,7 @@ class HttpRequest { } private: - SendReplyType _send_reply_type = SYNC; + SendReplyType _send_reply_type = REPLY_SYNC; HttpMethod _method; std::string _uri; std::string _raw_path; From 2d4ff70c012f82b371ab6b05a0cc0737068770a2 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:47:01 +0800 Subject: [PATCH 13/35] fix clang compile --- be/src/http/action/stream_load.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index a125dec63a60b7..513b8116f81b83 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -133,7 +133,7 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpReq ctx->body_sink.reset(); TPipelineFragmentParamsList mocked; RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( - ctx, mocked + ctx, mocked, [req, this](std::shared_ptr ctx) { _on_finish(ctx, req); })); } @@ -806,6 +806,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::OK(); } + TPipelineFragmentParamsList mocked; return _exec_env->stream_load_executor()->execute_plan_fragment( ctx, mocked, [http_req, this](std::shared_ptr ctx) { _on_finish(ctx, http_req); From 7b3501dd00b74453e57fe43319c9323cac4f5a6b Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 12:49:36 +0800 Subject: [PATCH 14/35] fix clang compile --- be/src/runtime/stream_load/stream_load_executor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 5e739b41e354ce..0455e31e088fe5 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -56,7 +56,7 @@ class StreamLoadExecutor { const TPipelineFragmentParamsList& parent); Status execute_plan_fragment( - std::shared_ptr ctx, const TPipelineFragmentParams& parent, + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, const std::function ctx)>& cb); protected: From 984fb5c9cc3f4bea57294f99e0ce1a8859627e65 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Sat, 12 Jul 2025 13:13:00 +0800 Subject: [PATCH 15/35] fix clang compile --- be/src/runtime/stream_load/stream_load_executor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 253b2c20ca7c30..724c06ec538542 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -68,12 +68,12 @@ bvar::LatencyRecorder g_stream_load_precommit_txn_latency("stream_load", "precom bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_txn"); Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, - const TPipelineFragmentParams& parent) { + const TPipelineFragmentParamsList& parent) { return execute_plan_fragment(ctx, parent, [](std::shared_ptr ctx) {}); } Status StreamLoadExecutor::execute_plan_fragment( - std::shared_ptr ctx, const TPipelineFragmentParams& parent, + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, const std::function ctx)>& cb) { // submit this params #ifndef BE_TEST From 11171ae5904168f3018c6fa59c44c28395a67104 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 14 Jul 2025 14:06:46 +0000 Subject: [PATCH 16/35] reformat --- be/src/http/action/stream_load.cpp | 2 +- be/src/http/http_request.h | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 513b8116f81b83..604cbe19cac481 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -29,10 +29,10 @@ #include #include +#include #include #include #include -#include #include #include #include diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 728776fc5e2451..5fbc4f244ac792 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -32,10 +32,7 @@ namespace doris { class HttpHandler; -enum SendReplyType { - REPLY_SYNC = 0, - REPLY_ASYNC = 1 -}; +enum SendReplyType { REPLY_SYNC = 0, REPLY_ASYNC = 1 }; class HttpRequest { public: @@ -84,13 +81,9 @@ class HttpRequest { const char* remote_host() const; - void mark_send_reply(SendReplyType type = REPLY_ASYNC) { - _send_reply_type = type; - } + void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } - void finish_send_reply() { - promise.set_value(true); - } + void finish_send_reply() { promise.set_value(true); } void wait_finish_send_reply() { if (_send_reply_type == REPLY_SYNC) { From 5f657e58149e02bcd469cd9365b49482af774ed7 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Tue, 15 Jul 2025 11:01:13 +0000 Subject: [PATCH 17/35] fix be ut --- be/src/http/http_request.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 5fbc4f244ac792..603437b0b2039a 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -19,6 +19,7 @@ #include +#include #include #include #include From 18844dc054e9456faf7c7f3d4ff6c0fe746503a6 Mon Sep 17 00:00:00 2001 From: "caojunhui.sticey" Date: Thu, 24 Jul 2025 07:12:22 +0000 Subject: [PATCH 18/35] Because stream load fragment cancel depends on the free_handler_ctx, so we need to invoke it before wait reply done --- be/src/http/http_request.cpp | 22 ++++++++++++++++++++++ be/src/http/http_request.h | 13 ++----------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 14bde591b4ca1b..4ba5eda05e6635 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -22,12 +22,14 @@ #include #include +#include #include #include #include #include #include "http/http_handler.h" +#include "runtime/stream_load/stream_load_context.h" namespace doris { @@ -133,4 +135,24 @@ const char* HttpRequest::remote_host() const { return _ev_req->remote_host; } +void HttpRequest::wait_finish_send_reply() { + if (_send_reply_type == SYNC) { + return; + } + + std::string infos; + if (_handler_ctx != nullptr) { + infos = reinterpret_cast(_handler_ctx.get())->brief(); + _handler->free_handler_ctx(_handler_ctx); + _handler_ctx = nullptr; + } + VLOG_NOTICE << "start to wait send reply, infos=" << infos; + auto status = _futrue.wait_for(std::chrono::seconds(600)); + if (status != std::future_status::ready) { + LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); + } else { + VLOG_NOTICE << "wait send reply finished"; + } +} + } // namespace doris diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 603437b0b2039a..49de9a318e1e6d 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "http/http_method.h" #include "util/string_util.h" @@ -86,17 +87,7 @@ class HttpRequest { void finish_send_reply() { promise.set_value(true); } - void wait_finish_send_reply() { - if (_send_reply_type == REPLY_SYNC) { - return; - } - - auto status = _futrue.wait_for(std::chrono::seconds(600)); - if (status != std::future_status::ready) { - LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); - } - } - + void wait_finish_send_reply(); private: SendReplyType _send_reply_type = REPLY_SYNC; HttpMethod _method; From a6b61146f59f6ba9f4675c47b500eab2e9dcc33b Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 4 Aug 2025 12:32:48 +0000 Subject: [PATCH 19/35] add stack trace to locate set promise two times --- be/src/http/http_request.cpp | 17 ++++++++++++++--- be/src/http/http_request.h | 4 ++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 4ba5eda05e6635..b58c5a295f5f5f 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -30,6 +30,7 @@ #include "http/http_handler.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/stack_util.h" namespace doris { @@ -135,8 +136,17 @@ const char* HttpRequest::remote_host() const { return _ev_req->remote_host; } +void HttpRequest::finish_send_reply() { + std::string infos; + if (_handler_ctx != nullptr) { + infos = reinterpret_cast(_handler_ctx.get())->brief(); + } + LOG(INFO) << "finish send reply, infos=" << infos << ", stack=" << get_stack_trace(); // temp locate problem + promise.set_value(true); +} + void HttpRequest::wait_finish_send_reply() { - if (_send_reply_type == SYNC) { + if (_send_reply_type == REPLY_SYNC) { return; } @@ -146,12 +156,13 @@ void HttpRequest::wait_finish_send_reply() { _handler->free_handler_ctx(_handler_ctx); _handler_ctx = nullptr; } - VLOG_NOTICE << "start to wait send reply, infos=" << infos; + + LOG(INFO) << "start to wait send reply, infos=" << infos; auto status = _futrue.wait_for(std::chrono::seconds(600)); if (status != std::future_status::ready) { LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); } else { - VLOG_NOTICE << "wait send reply finished"; + LOG(INFO) << "wait send reply finished"; } } diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 49de9a318e1e6d..1e704f59f77c8a 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -25,6 +25,7 @@ #include #include +#include "common/config.h" #include "http/http_method.h" #include "util/string_util.h" @@ -85,8 +86,7 @@ class HttpRequest { void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } - void finish_send_reply() { promise.set_value(true); } - + void finish_send_reply(); void wait_finish_send_reply(); private: SendReplyType _send_reply_type = REPLY_SYNC; From f85956174993da6d34144dde9a319148289caadb Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 4 Aug 2025 14:13:53 +0000 Subject: [PATCH 20/35] fix clang format --- be/src/http/http_request.cpp | 3 ++- be/src/http/http_request.h | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index b58c5a295f5f5f..12afafdef1dc61 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -141,7 +141,8 @@ void HttpRequest::finish_send_reply() { if (_handler_ctx != nullptr) { infos = reinterpret_cast(_handler_ctx.get())->brief(); } - LOG(INFO) << "finish send reply, infos=" << infos << ", stack=" << get_stack_trace(); // temp locate problem + LOG(INFO) << "finish send reply, infos=" << infos + << ", stack=" << get_stack_trace(); // temp locate problem promise.set_value(true); } diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 1e704f59f77c8a..c365b6bd346834 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -23,7 +23,6 @@ #include #include #include -#include #include "common/config.h" #include "http/http_method.h" From e1edf7e25c8f442be4fd085276bb6990f0e44821 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 4 Aug 2025 15:12:52 +0000 Subject: [PATCH 21/35] fix clang format --- be/src/http/http_request.cpp | 4 ++-- be/src/http/http_request.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 12afafdef1dc61..1ab2f77c6866aa 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -141,9 +141,9 @@ void HttpRequest::finish_send_reply() { if (_handler_ctx != nullptr) { infos = reinterpret_cast(_handler_ctx.get())->brief(); } - LOG(INFO) << "finish send reply, infos=" << infos + LOG(INFO) << "finish send reply, infos=" << infos << ", stack=" << get_stack_trace(); // temp locate problem - promise.set_value(true); + promise.set_value(true); } void HttpRequest::wait_finish_send_reply() { diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index c365b6bd346834..da76a45dd15f59 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -87,6 +87,7 @@ class HttpRequest { void finish_send_reply(); void wait_finish_send_reply(); + private: SendReplyType _send_reply_type = REPLY_SYNC; HttpMethod _method; From 254e095a4ce88452df25b96b684d31a9ae158af5 Mon Sep 17 00:00:00 2001 From: caojunhui Date: Mon, 22 Sep 2025 06:00:16 +0000 Subject: [PATCH 22/35] fix be core --- be/src/http/action/http_stream.cpp | 2 +- be/src/http/action/stream_load.cpp | 2 +- be/src/http/http_request.cpp | 4 ++-- be/src/http/http_request.h | 4 ++-- be/src/io/fs/multi_table_pipe.cpp | 2 +- be/src/olap/wal/wal_table.cpp | 2 +- .../routine_load/routine_load_task_executor.cpp | 8 ++++---- be/src/runtime/stream_load/stream_load_context.h | 4 ++-- .../runtime/stream_load/stream_load_executor.cpp | 15 ++++++++++++--- be/src/service/internal_service.cpp | 2 +- 10 files changed, 27 insertions(+), 18 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 495ddf38bd933c..e0ee23b5d0884c 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -131,7 +131,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptrbody_sink->finish()); // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); + RETURN_IF_ERROR(ctx->load_status_future.get()); if (ctx->group_commit) { LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index b1c5334e31a723..09030daf46b198 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -141,7 +141,7 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpReq } void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRequest* req) { - ctx->status = ctx->future.get(); + ctx->status = ctx->load_status_future.get(); if (ctx->status.ok()) { if (ctx->group_commit) { LOG(INFO) << "skip commit because this is group commit, pipe_id=" diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 1ab2f77c6866aa..2cac57640620e7 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -143,7 +143,7 @@ void HttpRequest::finish_send_reply() { } LOG(INFO) << "finish send reply, infos=" << infos << ", stack=" << get_stack_trace(); // temp locate problem - promise.set_value(true); + _http_reply_promise.set_value(true); } void HttpRequest::wait_finish_send_reply() { @@ -159,7 +159,7 @@ void HttpRequest::wait_finish_send_reply() { } LOG(INFO) << "start to wait send reply, infos=" << infos; - auto status = _futrue.wait_for(std::chrono::seconds(600)); + auto status = _http_reply_futrue.wait_for(std::chrono::seconds(600)); if (status != std::future_status::ready) { LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); } else { diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index da76a45dd15f59..2917f9dac19d65 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -105,8 +105,8 @@ class HttpRequest { std::string _request_body; // ensure send_reply finished - std::promise promise; - std::future _futrue = promise.get_future(); + std::promise _http_reply_promise; + std::future _http_reply_futrue = _http_reply_promise.get_future(); }; } // namespace doris diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 0f52bfde950e06..4f4d38d8a37d94 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -342,7 +342,7 @@ void MultiTablePipe::_handle_consumer_finished() { << " number_filtered_rows=" << _ctx->number_filtered_rows << " number_unselected_rows=" << _ctx->number_unselected_rows << ", ctx: " << _ctx->brief(); - _ctx->promise.set_value(_status); // when all done, finish the routine load task + _ctx->load_status_promise.set_value(_status); // when all done, finish the routine load task } } // namespace io diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 83d2b280dc721f..3b2939195409a5 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -255,7 +255,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, auto st = _http_stream_action->process_put(nullptr, ctx); if (st.ok()) { // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); + RETURN_IF_ERROR(ctx->load_status_future.get()); if (ctx->status.ok()) { // deprecated and should be removed in 3.1, use token instead. ctx->auth.auth_code = wal_id; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index e51a91913a6e8e..6f65522aa4c6dd 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -365,7 +365,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, if (UNLIKELY(!_status_.ok() && !_status_.is())) { \ err_handler(ctx, _status_, err_msg); \ cb(ctx); \ - _status_ = ctx->future.get(); \ + _status_ = ctx->load_status_future.get(); \ if (!_status_.ok()) { \ LOG(ERROR) << "failed to get future, " << ctx->brief(); \ } \ @@ -452,7 +452,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, } // wait for all consumers finished - HANDLE_ERROR(ctx->future.get(), "consume failed"); + HANDLE_ERROR(ctx->load_status_future.get(), "consume failed"); ctx->load_cost_millis = UnixMillis() - ctx->start_millis; @@ -533,12 +533,12 @@ Status RoutineLoadTaskExecutor::_execute_plan_for_test(std::shared_ptrread_at(0, result, &read_bytes); if (!st.ok()) { LOG(WARNING) << "read failed"; - ctx->promise.set_value(st); + ctx->load_status_promise.set_value(st); break; } if (read_bytes == 0) { - ctx->promise.set_value(Status::OK()); + ctx->load_status_promise.set_value(Status::OK()); break; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index ff9f0b8be60ad4..7d4c00e4f5ca2c 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -204,8 +204,8 @@ class StreamLoadContext { std::vector commit_infos; - std::promise promise; - std::future future = promise.get_future(); + std::promise load_status_promise; + std::future load_status_future = load_status_promise.get_future(); Status status; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 26d99e9b305de9..cba9fb63aa6bc1 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -81,7 +82,15 @@ Status StreamLoadExecutor::execute_plan_fragment( LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; - auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) { + std::shared_ptr lock = std::make_shared(); + std::shared_ptr is_done = std::make_shared(false); + + auto exec_fragment = [ctx, cb, this, lock, is_done](RuntimeState* state, Status* status) { + std::lock_guard lock1(*lock); + if (*is_done) { + return; + } + *is_done = true; if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id(); @@ -129,7 +138,7 @@ Status StreamLoadExecutor::execute_plan_fragment( } } ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); + ctx->load_status_promise.set_value(*status); if (!status->ok() && ctx->body_sink != nullptr) { // In some cases, the load execution is exited early. @@ -166,7 +175,7 @@ Status StreamLoadExecutor::execute_plan_fragment( return st; } #else - ctx->promise.set_value(k_stream_load_plan_status); + ctx->load_status_promise.set_value(k_stream_load_plan_status); #endif return Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c8b1f471fc31d5..42d55c59bf4130 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1280,7 +1280,7 @@ void PInternalService::report_stream_load_status(google::protobuf::RpcController if (!stream_load_ctx) { st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - stream_load_ctx->promise.set_value(st); + stream_load_ctx->load_status_promise.set_value(st); st.to_protobuf(response->mutable_status()); } From 281504b6fc72a79f0838d65cd02939e7af1d479d Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Wed, 22 Oct 2025 12:29:42 +0800 Subject: [PATCH 23/35] avoid send reply two times --- be/src/http/action/stream_load.cpp | 23 +++++-------------- be/src/http/action/stream_load.h | 1 + .../runtime/stream_load/stream_load_context.h | 3 +++ 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 916f75f5076402..74434efc71e2c3 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -164,6 +164,11 @@ void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRe } void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { + std::lock_guard lock1(ctx->_send_reply_lock); + if (ctx->_finish_send_reply) { + return ; + } + ctx->_finish_send_reply = true; ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is()) { @@ -241,23 +246,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { } if (!st.ok()) { ctx->status = std::move(st); - if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx.get()); - ctx->need_rollback = false; - } - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(ctx->status.to_string()); - } - auto str = ctx->to_json(); - // add new line at end - str = str + '\n'; - HttpChannel::send_reply(req, str); -#ifndef BE_TEST - if (config::enable_stream_load_record) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); - } -#endif + _send_reply(ctx, req); return -1; } return 0; diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index 8861db86376d34..0fa9509a85b9a2 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "http/http_handler.h" diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index eae588bd9cf166..a10f172771aca8 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -256,6 +256,9 @@ class StreamLoadContext { std::string qualified_user; std::string cloud_cluster; + std::mutex _send_reply_lock; + bool _finish_send_reply = false; + public: ExecEnv* exec_env() { return _exec_env; } From 0c3bb9fa6d7c054410f4bec3f0a8f22a96c82551 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Wed, 22 Oct 2025 12:34:22 +0800 Subject: [PATCH 24/35] fix clang format error --- be/src/http/action/stream_load.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 74434efc71e2c3..b226e72637b205 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -166,7 +166,7 @@ void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRe void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { std::lock_guard lock1(ctx->_send_reply_lock); if (ctx->_finish_send_reply) { - return ; + return; } ctx->_finish_send_reply = true; ctx->load_cost_millis = UnixMillis() - ctx->start_millis; From e109eacb78ba383c72e13cac8c0d4955e1655666 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Wed, 5 Nov 2025 19:23:33 +0800 Subject: [PATCH 25/35] other http request don't need to finish reply --- be/src/http/http_request.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 71489df9ff5b6c..31b184a467f70c 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -145,6 +145,10 @@ const char* HttpRequest::remote_host() const { } void HttpRequest::finish_send_reply() { + if (_send_reply_type == REPLY_SYNC) { + return; + } + std::string infos; if (_handler_ctx != nullptr) { infos = reinterpret_cast(_handler_ctx.get())->brief(); From 29e2fb90dae65deb9e7697d9fc905cd5060db7ea Mon Sep 17 00:00:00 2001 From: caojunhui Date: Thu, 6 Nov 2025 16:54:34 +0800 Subject: [PATCH 26/35] fix on_free may block 10min --- be/src/http/http_request.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 31b184a467f70c..caf12aba0d74bc 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -153,7 +153,7 @@ void HttpRequest::finish_send_reply() { if (_handler_ctx != nullptr) { infos = reinterpret_cast(_handler_ctx.get())->brief(); } - LOG(INFO) << "finish send reply, infos=" << infos + VLOG_NOTICE << "finish send reply, infos=" << infos << ", stack=" << get_stack_trace(); // temp locate problem _http_reply_promise.set_value(true); } @@ -164,19 +164,28 @@ void HttpRequest::wait_finish_send_reply() { } std::string infos; + StreamLoadContext* ctx = nullptr; if (_handler_ctx != nullptr) { - infos = reinterpret_cast(_handler_ctx.get())->brief(); + ctx = reinterpret_cast(_handler_ctx.get()); + infos = ctx->brief(); _handler->free_handler_ctx(_handler_ctx); - _handler_ctx = nullptr; } - LOG(INFO) << "start to wait send reply, infos=" << infos; - auto status = _http_reply_futrue.wait_for(std::chrono::seconds(600)); + VLOG_NOTICE << "start to wait send reply, infos=" << infos; + auto status = _http_reply_futrue.wait_for(std::chrono::seconds(3)); + // if request is timeout and can't cancel fragment in time, it will cause some new request block + // so we will free cancelled request in time. if (status != std::future_status::ready) { LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); + std::lock_guard lock1(ctx->_send_reply_lock); + // do not send_reply after free current request + ctx->_finish_send_reply = true; } else { - LOG(INFO) << "wait send reply finished"; + VLOG_NOTICE << "wait send reply finished"; } + + // delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info + _handler_ctx = nullptr; } } // namespace doris From 1873e3b978cd9bfd0666bf3b75767e1fc8ca542a Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Fri, 7 Nov 2025 13:50:31 +0800 Subject: [PATCH 27/35] use _can_send_reply to ensure send_reply after on_chunked_data --- be/src/http/action/stream_load.cpp | 18 +++++++++++++++++- be/src/http/http_request.cpp | 4 +++- .../runtime/stream_load/stream_load_context.h | 7 +++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index b226e72637b205..f3fe149fa72828 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -108,6 +108,12 @@ void StreamLoadAction::handle(HttpRequest* req) { return; } + { + std::unique_lock lock1(ctx->_send_reply_lock); + ctx->_can_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); + } + // status already set to fail if (ctx->status.ok()) { ctx->status = _handle(ctx, req); @@ -164,11 +170,16 @@ void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRe } void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { - std::lock_guard lock1(ctx->_send_reply_lock); + std::unique_lock lock1(ctx->_send_reply_lock); + while(!ctx->_finish_send_reply && !ctx->_can_send_reply) { + ctx->_can_send_reply_cv.wait(lock1); + } if (ctx->_finish_send_reply) { return; } + DCHECK(ctx->_can_send_reply); ctx->_finish_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is()) { @@ -246,6 +257,11 @@ int StreamLoadAction::on_header(HttpRequest* req) { } if (!st.ok()) { ctx->status = std::move(st); + { + std::unique_lock lock1(ctx->_send_reply_lock); + ctx->_can_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); + } _send_reply(ctx, req); return -1; } diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index caf12aba0d74bc..01070bec4b3247 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -177,9 +177,11 @@ void HttpRequest::wait_finish_send_reply() { // so we will free cancelled request in time. if (status != std::future_status::ready) { LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); - std::lock_guard lock1(ctx->_send_reply_lock); + std::unique_lock lock1(ctx->_send_reply_lock); // do not send_reply after free current request + ctx->_can_send_reply = false; ctx->_finish_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); } else { VLOG_NOTICE << "wait send reply finished"; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index a10f172771aca8..9d09498c12fdda 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -257,6 +258,12 @@ class StreamLoadContext { std::string cloud_cluster; std::mutex _send_reply_lock; + // maybe > 1 callback send reply, so we need to notify all, + // but `_finish_send_reply` will protect that only one callback send reply + std::condition_variable _can_send_reply_cv; + // avoid sending reply before on_chunk_data finish, or client will receive `broken pipe` + bool _can_send_reply = false; + // avoid sending reply two times bool _finish_send_reply = false; public: From fc686f4f32dd52add68943f3244120cead243f16 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Fri, 7 Nov 2025 13:52:15 +0800 Subject: [PATCH 28/35] format --- be/src/http/action/stream_load.cpp | 2 +- be/src/http/http_request.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index f3fe149fa72828..76d3c70ebbf5e0 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -171,7 +171,7 @@ void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRe void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { std::unique_lock lock1(ctx->_send_reply_lock); - while(!ctx->_finish_send_reply && !ctx->_can_send_reply) { + while (!ctx->_finish_send_reply && !ctx->_can_send_reply) { ctx->_can_send_reply_cv.wait(lock1); } if (ctx->_finish_send_reply) { diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 01070bec4b3247..2798b124e0a200 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -154,7 +154,7 @@ void HttpRequest::finish_send_reply() { infos = reinterpret_cast(_handler_ctx.get())->brief(); } VLOG_NOTICE << "finish send reply, infos=" << infos - << ", stack=" << get_stack_trace(); // temp locate problem + << ", stack=" << get_stack_trace(); // temp locate problem _http_reply_promise.set_value(true); } From 5f362ee207ba6a5ff208b17f5ebbaf3434cbabbb Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Thu, 1 Jan 2026 00:34:28 +0800 Subject: [PATCH 29/35] add stacktrace log when be block --- bin/stop_be.sh | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bin/stop_be.sh b/bin/stop_be.sh index 974df77a1d844e..696f5c90ed6fb0 100755 --- a/bin/stop_be.sh +++ b/bin/stop_be.sh @@ -73,12 +73,24 @@ if [[ -f "${pidfile}" ]]; then exit 1 fi + count=0 + timeout=240 # kill PID process and check it if kill "-${signum}" "${pid}" >/dev/null 2>&1; then while true; do if kill -0 "${pid}" >/dev/null 2>&1; then echo "Waiting for be process with PID ${pid} to terminate" sleep 2 + if [[ ${count} -ge ${timeout} ]]; then + gdb --version + echo "超时!打印 PID ${pid} 堆栈后退出" + # gdb 打印所有线程栈(直接输出到控制台) + gdb -q -batch -ex "thread apply all bt" -p "${pid}" 2>/dev/null + echo "完成打印堆栈" + sleep 10 + exit 1 + fi + count=$((count + 2)) else echo "stop ${pidcomm} and remove PID file" if [[ -f "${pidfile}" ]]; then rm "${pidfile}"; fi From ff5973180eae643461a858424b15e2c70f7e5663 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Thu, 1 Jan 2026 11:04:21 +0800 Subject: [PATCH 30/35] add prepare log --- be/src/runtime/fragment_mgr.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fed6c3931fed9b..0e65eb6883023d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -873,12 +873,14 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed"); }); if (!prepare_st.ok()) { + LOG(INFO) << "prepare failed " << prepare_st.to_json(); query_ctx->cancel(prepare_st, params.fragment_id); return prepare_st; } } g_fragmentmgr_prepare_latency << (duration_ns / 1000); + LOG(INFO) << "prepare success"; DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); { From 3395d6a78cc6298e9cf8488df4a3b6c0a94b26b1 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Fri, 2 Jan 2026 15:47:47 +0800 Subject: [PATCH 31/35] fix deadlock when fragment prepare fail --- be/src/http/action/stream_load.cpp | 4 +++- be/src/runtime/fragment_mgr.cpp | 8 +++++--- be/src/runtime/fragment_mgr.h | 4 +++- be/src/runtime/stream_load/stream_load_executor.cpp | 11 ++++++++--- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 950abcd7241ba2..ef5adca12bfc23 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -197,7 +197,7 @@ void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpR auto str = ctx->to_json(); // add new line at end str = str + '\n'; - HttpChannel::send_reply(req, str); + #ifndef BE_TEST if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { @@ -207,6 +207,8 @@ void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpR } #endif + HttpChannel::send_reply(req, str); + LOG(INFO) << "finished to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id << ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms=" diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0e65eb6883023d..64617d23b62de2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -846,7 +846,8 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, QuerySource query_source, const FinishCallback& cb, - const TPipelineFragmentParamsList& parent) { + const TPipelineFragmentParamsList& parent, + std::shared_ptr is_prepare_success) { VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TPipelineFragmentParams debug string is too long and glog @@ -873,14 +874,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed"); }); if (!prepare_st.ok()) { - LOG(INFO) << "prepare failed " << prepare_st.to_json(); query_ctx->cancel(prepare_st, params.fragment_id); return prepare_st; } } g_fragmentmgr_prepare_latency << (duration_ns / 1000); - LOG(INFO) << "prepare success"; DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); { @@ -907,6 +906,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_pipeline_context(params.fragment_id, context); RETURN_IF_ERROR(context->submit()); + if (is_prepare_success != nullptr) { + *is_prepare_success = true; + } return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 826bca81901e69..07ba5e3a5e6cc5 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -126,8 +126,10 @@ class FragmentMgr : public RestMonitorIface { void remove_pipeline_context(std::pair key); void remove_query_context(const TUniqueId& key); + // `is_prepare_success` is used by invoker to ensure callback can be handle correctly (eg. stream_load_executor) Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, - const FinishCallback& cb, const TPipelineFragmentParamsList& parent); + const FinishCallback& cb, const TPipelineFragmentParamsList& parent, + std::shared_ptr is_prepare_success = nullptr); Status start_query_execution(const PExecPlanFragmentStartRequest* request); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index c7158b294686fe..71473ea5640021 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -83,10 +83,12 @@ Status StreamLoadExecutor::execute_plan_fragment( LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; + // todo: maybe can be deleted std::shared_ptr lock = std::make_shared(); std::shared_ptr is_done = std::make_shared(false); + std::shared_ptr is_prepare_success = std::make_shared(false); - auto exec_fragment = [ctx, cb, this, lock, is_done](RuntimeState* state, Status* status) { + auto exec_fragment = [ctx, cb, this, lock, is_done, is_prepare_success](RuntimeState* state, Status* status) { std::lock_guard lock1(*lock); if (*is_done) { return; @@ -161,10 +163,13 @@ Status StreamLoadExecutor::execute_plan_fragment( } } - cb(ctx); + if (*is_prepare_success) { + // if prepare failed, on_header will send reply + cb(ctx); + } }; st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.pipeline_params, QuerySource::STREAM_LOAD, exec_fragment, parent); + ctx->put_result.pipeline_params, QuerySource::STREAM_LOAD, exec_fragment, parent, is_prepare_success); if (!st.ok()) { // no need to check unref's return value From f1b10c53a8867e27004fb6d8ef4687aa726a274b Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Fri, 2 Jan 2026 15:52:33 +0800 Subject: [PATCH 32/35] reformat --- be/src/runtime/stream_load/stream_load_executor.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 71473ea5640021..e8867d8b4e3840 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -86,9 +86,10 @@ Status StreamLoadExecutor::execute_plan_fragment( // todo: maybe can be deleted std::shared_ptr lock = std::make_shared(); std::shared_ptr is_done = std::make_shared(false); - std::shared_ptr is_prepare_success = std::make_shared(false); + std::shared_ptr is_prepare_success = std::make_shared(false); - auto exec_fragment = [ctx, cb, this, lock, is_done, is_prepare_success](RuntimeState* state, Status* status) { + auto exec_fragment = [ctx, cb, this, lock, is_done, is_prepare_success](RuntimeState* state, + Status* status) { std::lock_guard lock1(*lock); if (*is_done) { return; @@ -168,8 +169,9 @@ Status StreamLoadExecutor::execute_plan_fragment( cb(ctx); } }; - st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.pipeline_params, QuerySource::STREAM_LOAD, exec_fragment, parent, is_prepare_success); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, + QuerySource::STREAM_LOAD, exec_fragment, + parent, is_prepare_success); if (!st.ok()) { // no need to check unref's return value From 6383de4b08396780021f95fa851a20f0428b15aa Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Fri, 2 Jan 2026 21:55:58 +0800 Subject: [PATCH 33/35] fix timeout --- bin/stop_be.sh | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/bin/stop_be.sh b/bin/stop_be.sh index 696f5c90ed6fb0..974df77a1d844e 100755 --- a/bin/stop_be.sh +++ b/bin/stop_be.sh @@ -73,24 +73,12 @@ if [[ -f "${pidfile}" ]]; then exit 1 fi - count=0 - timeout=240 # kill PID process and check it if kill "-${signum}" "${pid}" >/dev/null 2>&1; then while true; do if kill -0 "${pid}" >/dev/null 2>&1; then echo "Waiting for be process with PID ${pid} to terminate" sleep 2 - if [[ ${count} -ge ${timeout} ]]; then - gdb --version - echo "超时!打印 PID ${pid} 堆栈后退出" - # gdb 打印所有线程栈(直接输出到控制台) - gdb -q -batch -ex "thread apply all bt" -p "${pid}" 2>/dev/null - echo "完成打印堆栈" - sleep 10 - exit 1 - fi - count=$((count + 2)) else echo "stop ${pidcomm} and remove PID file" if [[ -f "${pidfile}" ]]; then rm "${pidfile}"; fi From e136c71abe558974a142e9327fe972353bb1a70d Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Sat, 3 Jan 2026 10:12:04 +0800 Subject: [PATCH 34/35] remove unless lock --- be/src/runtime/stream_load/stream_load_executor.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index e8867d8b4e3840..52f46b5bf3f799 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -83,18 +83,10 @@ Status StreamLoadExecutor::execute_plan_fragment( LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id; Status st; - // todo: maybe can be deleted - std::shared_ptr lock = std::make_shared(); - std::shared_ptr is_done = std::make_shared(false); std::shared_ptr is_prepare_success = std::make_shared(false); - auto exec_fragment = [ctx, cb, this, lock, is_done, is_prepare_success](RuntimeState* state, + auto exec_fragment = [ctx, cb, this, is_prepare_success](RuntimeState* state, Status* status) { - std::lock_guard lock1(*lock); - if (*is_done) { - return; - } - *is_done = true; if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id(); From e3fd0ba4fd56f8e4109b3598db618f34d74a32a1 Mon Sep 17 00:00:00 2001 From: UserWhite <925945594@qq.com> Date: Sat, 3 Jan 2026 10:14:19 +0800 Subject: [PATCH 35/35] reformat clang --- be/src/runtime/stream_load/stream_load_executor.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 52f46b5bf3f799..31a8598f55a1b9 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -85,8 +85,7 @@ Status StreamLoadExecutor::execute_plan_fragment( Status st; std::shared_ptr is_prepare_success = std::make_shared(false); - auto exec_fragment = [ctx, cb, this, is_prepare_success](RuntimeState* state, - Status* status) { + auto exec_fragment = [ctx, cb, this, is_prepare_success](RuntimeState* state, Status* status) { if (ctx->group_commit) { ctx->label = state->import_label(); ctx->txn_id = state->wal_id();