Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
343f79e
Revert "[fix](stuck): revert "[feat](http): use async reply to provid…
Feb 12, 2025
c11fce7
fix buffer wait
Jun 23, 2025
81662b9
fix futrue not compile
Jun 23, 2025
a4191dc
fix redefinition
Jul 12, 2025
2b3f934
fix clang compile
Jul 12, 2025
429adbe
fix clang compile
Jul 12, 2025
c74b6d5
fix clang compile
Jul 12, 2025
be29a7e
reformat
Jul 14, 2025
31748a2
Revert "[fix](stuck): revert "[feat](http): use async reply to provid…
Feb 12, 2025
2c46f61
fix buffer wait
Jun 23, 2025
bdfa223
fix futrue not compile
Jun 23, 2025
77ec143
fix redefinition
Jul 12, 2025
2d4ff70
fix clang compile
Jul 12, 2025
7b3501d
fix clang compile
Jul 12, 2025
984fb5c
fix clang compile
Jul 12, 2025
11171ae
reformat
Jul 14, 2025
b8342c7
Merge branch 'master' into cherry-pick-stream-load-async
Userwhite Jul 15, 2025
75374ad
Merge branch 'cherry-pick-stream-load-async' of github.com:Userwhite/…
Jul 15, 2025
5f657e5
fix be ut
Jul 15, 2025
05bd3ff
Merge branch 'master' into cherry-pick-stream-load-async
dataroaring Jul 16, 2025
93b0928
Merge branch 'master' into cherry-pick-stream-load-async
Userwhite Jul 16, 2025
6bdf507
Merge branch 'master' into cherry-pick-stream-load-async
dataroaring Jul 18, 2025
5f5109c
Merge branch 'master' into cherry-pick-stream-load-async
dataroaring Jul 20, 2025
8f95b4d
Merge branch 'master' into cherry-pick-stream-load-async
dataroaring Jul 29, 2025
18844dc
Because stream load fragment cancel depends on the free_handler_ctx, …
Jul 24, 2025
a6b6114
add stack trace to locate set promise two times
Aug 4, 2025
f859561
fix clang format
Aug 4, 2025
415b38b
Merge branch 'master' of https://github.com/apache/doris into cherry-…
Aug 4, 2025
e1edf7e
fix clang format
Aug 4, 2025
254e095
fix be core
Sep 22, 2025
fa6e7e4
Merge branch 'master' into cherry-pick-stream-load-async
Userwhite Sep 22, 2025
3cfff10
Merge branch 'master' into cherry-pick-stream-load-async
Userwhite Oct 20, 2025
281504b
avoid send reply two times
Userwhite Oct 22, 2025
0c3bb9f
fix clang format error
Userwhite Oct 22, 2025
e109eac
other http request don't need to finish reply
Userwhite Nov 5, 2025
29e2fb9
fix on_free may block 10min
Nov 6, 2025
1873e3b
use _can_send_reply to ensure send_reply after on_chunked_data
Userwhite Nov 7, 2025
fc686f4
format
Userwhite Nov 7, 2025
5f362ee
add stacktrace log when be block
Userwhite Dec 31, 2025
58ba25f
Merge remote-tracking branch 'origin/master' into cherry-pick-stream-…
Userwhite Dec 31, 2025
ff59731
add prepare log
Userwhite Jan 1, 2026
3395d6a
fix deadlock when fragment prepare fail
Userwhite Jan 2, 2026
f1b10c5
reformat
Userwhite Jan 2, 2026
c046f5b
Merge branch 'master' into cherry-pick-stream-load-async
Userwhite Jan 2, 2026
6383de4
fix timeout
Userwhite Jan 2, 2026
e136c71
remove unless lock
Userwhite Jan 3, 2026
e3fd0ba
reformat clang
Userwhite Jan 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo
RETURN_IF_ERROR(ctx->body_sink->finish());

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
RETURN_IF_ERROR(ctx->load_status_future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
Expand Down
145 changes: 84 additions & 61 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
#include <sys/time.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <ctime>
#include <functional>
#include <future>
#include <sstream>
#include <stdexcept>
Expand Down Expand Up @@ -106,17 +108,83 @@ void StreamLoadAction::handle(HttpRequest* req) {
return;
}

{
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
ctx->_can_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
}

// status already set to fail
if (ctx->status.ok()) {
ctx->status = _handle(ctx);
ctx->status = _handle(ctx, req);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_send_reply(ctx, req);
}
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
TPipelineFragmentParamsList mocked;
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
ctx, mocked,
[req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); }));
}

return Status::OK();
}

void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
ctx->status = ctx->load_status_future.get();
if (ctx->status.ok()) {
if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id="
<< ctx->id.to_string();
} else if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get());
ctx->commit_and_publish_txn_cost_nanos =
MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
}
_send_reply(ctx, req);
}

void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
while (!ctx->_finish_send_reply && !ctx->_can_send_reply) {
ctx->_can_send_reply_cv.wait(lock1);
}
if (ctx->_finish_send_reply) {
return;
}
DCHECK(ctx->_can_send_reply);
ctx->_finish_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
Expand All @@ -129,7 +197,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
auto str = ctx->to_json();
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);

#ifndef BE_TEST
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
Expand All @@ -139,6 +207,8 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
#endif

HttpChannel::send_reply(req, str);

LOG(INFO) << "finished to execute stream load. label=" << ctx->label
<< ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
<< ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
Expand All @@ -160,46 +230,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
TPipelineFragmentParamsList mocked;
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked));
}

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
return Status::OK();
}

int StreamLoadAction::on_header(HttpRequest* req) {
req->mark_send_reply();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这么改完之后 所有的请求都走async了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只有 StreamLoad 请求,另外只是提交 reply 是 async 的,客户端感知仍然是同步的。


streaming_load_current_processing->increment(1);

std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
Expand Down Expand Up @@ -228,26 +261,12 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
{
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
ctx->_can_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
}
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
auto str = ctx->to_json();
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record ||
config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
#endif
_send_reply(ctx, req);
return -1;
}
return 0;
Expand Down Expand Up @@ -821,8 +840,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!ctx->use_streaming) {
return Status::OK();
}

TPipelineFragmentParamsList mocked;
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked);
return _exec_env->stream_load_executor()->execute_plan_fragment(
ctx, mocked, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
_on_finish(ctx, http_req);
});
}

Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path,
Expand Down
5 changes: 4 additions & 1 deletion be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <memory>
#include <mutex>
#include <string>

#include "http/http_handler.h"
Expand Down Expand Up @@ -46,11 +47,13 @@ class StreamLoadAction : public HttpHandler {

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);

private:
ExecEnv* _exec_env;
Expand Down
1 change: 1 addition & 0 deletions be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static void on_chunked(struct evhttp_request* ev_req, void* param) {

static void on_free(struct evhttp_request* ev_req, void* arg) {
HttpRequest* request = (HttpRequest*)arg;
request->wait_finish_send_reply();
delete request;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/http/http_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {
void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {
evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(),
nullptr);
request->finish_send_reply();
}

void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) {
Expand All @@ -66,6 +67,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
evbuffer_add(evb, content.c_str(), content.size());
}
evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb);
request->finish_send_reply();
evbuffer_free(evb);
}

Expand All @@ -80,6 +82,7 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz
bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
}
evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
request->finish_send_reply();
evbuffer_free(evb);
}

Expand Down
49 changes: 49 additions & 0 deletions be/src/http/http_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <event2/http_struct.h>
#include <event2/keyvalq_struct.h>

#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>

#include "http/http_handler.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/stack_util.h"

namespace doris {

Expand Down Expand Up @@ -141,4 +144,50 @@ const char* HttpRequest::remote_host() const {
return _ev_req->remote_host;
}

void HttpRequest::finish_send_reply() {
if (_send_reply_type == REPLY_SYNC) {
return;
}

std::string infos;
if (_handler_ctx != nullptr) {
infos = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get())->brief();
}
VLOG_NOTICE << "finish send reply, infos=" << infos
<< ", stack=" << get_stack_trace(); // temp locate problem
_http_reply_promise.set_value(true);
}

void HttpRequest::wait_finish_send_reply() {
if (_send_reply_type == REPLY_SYNC) {
return;
}

std::string infos;
StreamLoadContext* ctx = nullptr;
if (_handler_ctx != nullptr) {
ctx = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get());
infos = ctx->brief();
_handler->free_handler_ctx(_handler_ctx);
}

VLOG_NOTICE << "start to wait send reply, infos=" << infos;
auto status = _http_reply_futrue.wait_for(std::chrono::seconds(3));
// if request is timeout and can't cancel fragment in time, it will cause some new request block
// so we will free cancelled request in time.
if (status != std::future_status::ready) {
LOG(WARNING) << "wait for send reply timeout, " << this->debug_string();
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
// do not send_reply after free current request
ctx->_can_send_reply = false;
ctx->_finish_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
} else {
VLOG_NOTICE << "wait send reply finished";
}

// delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info
_handler_ctx = nullptr;
}

} // namespace doris
14 changes: 14 additions & 0 deletions be/src/http/http_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include <glog/logging.h>

#include <future>
#include <map>
#include <memory>
#include <string>

#include "common/config.h"
#include "http/http_method.h"
#include "util/string_util.h"

Expand All @@ -32,6 +34,8 @@ namespace doris {

class HttpHandler;

enum SendReplyType { REPLY_SYNC = 0, REPLY_ASYNC = 1 };

class HttpRequest {
public:
HttpRequest(evhttp_request* ev_req);
Expand Down Expand Up @@ -81,7 +85,13 @@ class HttpRequest {

const char* remote_host() const;

void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; }

void finish_send_reply();
void wait_finish_send_reply();

private:
SendReplyType _send_reply_type = REPLY_SYNC;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个初始值是不是没啥用 因为 on_header 把它改写了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还有其他 HttpAction 会用到,他们需要默认是 SYNC 的。

HttpMethod _method;
std::string _uri;
std::string _raw_path;
Expand All @@ -95,6 +105,10 @@ class HttpRequest {

std::shared_ptr<void> _handler_ctx;
std::string _request_body;

// ensure send_reply finished
std::promise<bool> _http_reply_promise;
std::future<bool> _http_reply_futrue = _http_reply_promise.get_future();
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void MultiTablePipe::_handle_consumer_finished() {
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows
<< ", ctx: " << _ctx->brief();
_ctx->promise.set_value(_status); // when all done, finish the routine load task
_ctx->load_status_promise.set_value(_status); // when all done, finish the routine load task
}

} // namespace io
Expand Down
Loading
Loading