diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index c47a22164ecd3d..9372f00b79585c 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -133,7 +133,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptrbody_sink->finish()); // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); + RETURN_IF_ERROR(ctx->load_status_future.get()); if (ctx->group_commit) { LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ee660d9a1bf617..ef5adca12bfc23 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -29,9 +29,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -106,17 +108,83 @@ void StreamLoadAction::handle(HttpRequest* req) { return; } + { + std::unique_lock lock1(ctx->_send_reply_lock); + ctx->_can_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); + } + // status already set to fail if (ctx->status.ok()) { - ctx->status = _handle(ctx); + ctx->status = _handle(ctx, req); if (!ctx->status.ok() && !ctx->status.is()) { LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << ctx->status; + _send_reply(ctx, req); } } +} + +Status StreamLoadAction::_handle(std::shared_ptr ctx, HttpRequest* req) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error("receive body don't equal with body bytes"); + } + + // if we use non-streaming, MessageBodyFileSink.finish will close the file + RETURN_IF_ERROR(ctx->body_sink->finish()); + if (!ctx->use_streaming) { + // we need to close file first, then execute_plan_fragment here + ctx->body_sink.reset(); + TPipelineFragmentParamsList mocked; + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked, + [req, this](std::shared_ptr ctx) { _on_finish(ctx, req); })); + } + + return Status::OK(); +} + +void StreamLoadAction::_on_finish(std::shared_ptr ctx, HttpRequest* req) { + ctx->status = ctx->load_status_future.get(); + if (ctx->status.ok()) { + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" + << ctx->id.to_string(); + } else if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get()); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get()); + ctx->commit_and_publish_txn_cost_nanos = + MonotonicNanos() - commit_and_publish_start_time; + g_stream_load_commit_and_publish_latency_ms + << ctx->commit_and_publish_txn_cost_nanos / 1000000; + } + } + _send_reply(ctx, req); +} + +void StreamLoadAction::_send_reply(std::shared_ptr ctx, HttpRequest* req) { + std::unique_lock lock1(ctx->_send_reply_lock); + while (!ctx->_finish_send_reply && !ctx->_can_send_reply) { + ctx->_can_send_reply_cv.wait(lock1); + } + if (ctx->_finish_send_reply) { + return; + } + DCHECK(ctx->_can_send_reply); + ctx->_finish_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && !ctx->status.is()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; @@ -129,7 +197,7 @@ void StreamLoadAction::handle(HttpRequest* req) { auto str = ctx->to_json(); // add new line at end str = str + '\n'; - HttpChannel::send_reply(req, str); + #ifndef BE_TEST if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { @@ -139,6 +207,8 @@ void StreamLoadAction::handle(HttpRequest* req) { } #endif + HttpChannel::send_reply(req, str); + LOG(INFO) << "finished to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id << ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms=" @@ -160,46 +230,9 @@ void StreamLoadAction::handle(HttpRequest* req) { } } -Status StreamLoadAction::_handle(std::shared_ptr ctx) { - if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { - LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes - << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::Error("receive body don't equal with body bytes"); - } - - // if we use non-streaming, MessageBodyFileSink.finish will close the file - RETURN_IF_ERROR(ctx->body_sink->finish()); - if (!ctx->use_streaming) { - // we need to close file first, then execute_plan_fragment here - ctx->body_sink.reset(); - TPipelineFragmentParamsList mocked; - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked)); - } - - // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); - - if (ctx->group_commit) { - LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); - return Status::OK(); - } - - if (ctx->two_phase_commit) { - int64_t pre_commit_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); - ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; - } else { - // If put file success we need commit this load - int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); - ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; - g_stream_load_commit_and_publish_latency_ms - << ctx->commit_and_publish_txn_cost_nanos / 1000000; - } - return Status::OK(); -} - int StreamLoadAction::on_header(HttpRequest* req) { + req->mark_send_reply(); + streaming_load_current_processing->increment(1); std::shared_ptr ctx = std::make_shared(_exec_env); @@ -228,26 +261,12 @@ int StreamLoadAction::on_header(HttpRequest* req) { } if (!st.ok()) { ctx->status = std::move(st); - if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx.get()); - ctx->need_rollback = false; + { + std::unique_lock lock1(ctx->_send_reply_lock); + ctx->_can_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); } - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(ctx->status.to_string()); - } - auto str = ctx->to_json(); - // add new line at end - str = str + '\n'; - HttpChannel::send_reply(req, str); -#ifndef BE_TEST - if (config::enable_stream_load_record || - config::enable_stream_load_record_to_audit_log_table) { - if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); - } - } -#endif + _send_reply(ctx, req); return -1; } return 0; @@ -821,8 +840,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!ctx->use_streaming) { return Status::OK(); } + TPipelineFragmentParamsList mocked; - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); + return _exec_env->stream_load_executor()->execute_plan_fragment( + ctx, mocked, [http_req, this](std::shared_ptr ctx) { + _on_finish(ctx, http_req); + }); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path, diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index bf359317f94ba1..0fa9509a85b9a2 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "http/http_handler.h" @@ -46,11 +47,13 @@ class StreamLoadAction : public HttpHandler { private: Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); - Status _handle(std::shared_ptr ctx); + Status _handle(std::shared_ptr ctx, HttpRequest* req); Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); + void _on_finish(std::shared_ptr ctx, HttpRequest* req); + void _send_reply(std::shared_ptr ctx, HttpRequest* req); private: ExecEnv* _exec_env; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 457cf0e0322ba5..0231337a238102 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -56,6 +56,7 @@ static void on_chunked(struct evhttp_request* ev_req, void* param) { static void on_free(struct evhttp_request* ev_req, void* arg) { HttpRequest* request = (HttpRequest*)arg; + request->wait_finish_send_reply(); delete request; } diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 918f54d81a0a0b..fd4d5ae3409a25 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -53,6 +53,7 @@ void HttpChannel::send_error(HttpRequest* request, HttpStatus status) { void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), nullptr); + request->finish_send_reply(); } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { @@ -66,6 +67,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std: evbuffer_add(evb, content.c_str(), content.size()); } evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } @@ -80,6 +82,7 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); } evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb); + request->finish_send_reply(); evbuffer_free(evb); } diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index f743297801695b..2798b124e0a200 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -22,12 +22,15 @@ #include #include +#include #include #include #include #include #include "http/http_handler.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/stack_util.h" namespace doris { @@ -141,4 +144,50 @@ const char* HttpRequest::remote_host() const { return _ev_req->remote_host; } +void HttpRequest::finish_send_reply() { + if (_send_reply_type == REPLY_SYNC) { + return; + } + + std::string infos; + if (_handler_ctx != nullptr) { + infos = reinterpret_cast(_handler_ctx.get())->brief(); + } + VLOG_NOTICE << "finish send reply, infos=" << infos + << ", stack=" << get_stack_trace(); // temp locate problem + _http_reply_promise.set_value(true); +} + +void HttpRequest::wait_finish_send_reply() { + if (_send_reply_type == REPLY_SYNC) { + return; + } + + std::string infos; + StreamLoadContext* ctx = nullptr; + if (_handler_ctx != nullptr) { + ctx = reinterpret_cast(_handler_ctx.get()); + infos = ctx->brief(); + _handler->free_handler_ctx(_handler_ctx); + } + + VLOG_NOTICE << "start to wait send reply, infos=" << infos; + auto status = _http_reply_futrue.wait_for(std::chrono::seconds(3)); + // if request is timeout and can't cancel fragment in time, it will cause some new request block + // so we will free cancelled request in time. + if (status != std::future_status::ready) { + LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); + std::unique_lock lock1(ctx->_send_reply_lock); + // do not send_reply after free current request + ctx->_can_send_reply = false; + ctx->_finish_send_reply = true; + ctx->_can_send_reply_cv.notify_all(); + } else { + VLOG_NOTICE << "wait send reply finished"; + } + + // delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info + _handler_ctx = nullptr; +} + } // namespace doris diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 41d8cf98baaadd..1031d9023b46bd 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -19,10 +19,12 @@ #include +#include #include #include #include +#include "common/config.h" #include "http/http_method.h" #include "util/string_util.h" @@ -32,6 +34,8 @@ namespace doris { class HttpHandler; +enum SendReplyType { REPLY_SYNC = 0, REPLY_ASYNC = 1 }; + class HttpRequest { public: HttpRequest(evhttp_request* ev_req); @@ -81,7 +85,13 @@ class HttpRequest { const char* remote_host() const; + void mark_send_reply(SendReplyType type = REPLY_ASYNC) { _send_reply_type = type; } + + void finish_send_reply(); + void wait_finish_send_reply(); + private: + SendReplyType _send_reply_type = REPLY_SYNC; HttpMethod _method; std::string _uri; std::string _raw_path; @@ -95,6 +105,10 @@ class HttpRequest { std::shared_ptr _handler_ctx; std::string _request_body; + + // ensure send_reply finished + std::promise _http_reply_promise; + std::future _http_reply_futrue = _http_reply_promise.get_future(); }; } // namespace doris diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 37afd2a695e3a8..3f21fa10ffdf5e 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -341,7 +341,7 @@ void MultiTablePipe::_handle_consumer_finished() { << " number_filtered_rows=" << _ctx->number_filtered_rows << " number_unselected_rows=" << _ctx->number_unselected_rows << ", ctx: " << _ctx->brief(); - _ctx->promise.set_value(_status); // when all done, finish the routine load task + _ctx->load_status_promise.set_value(_status); // when all done, finish the routine load task } } // namespace io diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index e990ec452e6cc9..3af133fd108214 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -259,7 +259,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, { st = Status::InternalError("WalTable::_handle_stream_load.fail"); }); if (st.ok()) { // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); + RETURN_IF_ERROR(ctx->load_status_future.get()); if (ctx->status.ok()) { // deprecated and should be removed in 3.1, use token instead. ctx->auth.auth_code = wal_id; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fed6c3931fed9b..64617d23b62de2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -846,7 +846,8 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, QuerySource query_source, const FinishCallback& cb, - const TPipelineFragmentParamsList& parent) { + const TPipelineFragmentParamsList& parent, + std::shared_ptr is_prepare_success) { VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TPipelineFragmentParams debug string is too long and glog @@ -905,6 +906,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_pipeline_context(params.fragment_id, context); RETURN_IF_ERROR(context->submit()); + if (is_prepare_success != nullptr) { + *is_prepare_success = true; + } return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 826bca81901e69..07ba5e3a5e6cc5 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -126,8 +126,10 @@ class FragmentMgr : public RestMonitorIface { void remove_pipeline_context(std::pair key); void remove_query_context(const TUniqueId& key); + // `is_prepare_success` is used by invoker to ensure callback can be handle correctly (eg. stream_load_executor) Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, - const FinishCallback& cb, const TPipelineFragmentParamsList& parent); + const FinishCallback& cb, const TPipelineFragmentParamsList& parent, + std::shared_ptr is_prepare_success = nullptr); Status start_query_execution(const PExecPlanFragmentStartRequest* request); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index df8a9cd748e129..82db5a9e3f961c 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -381,7 +381,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, if (UNLIKELY(!_status_.ok() && !_status_.is())) { \ err_handler(ctx, _status_, err_msg); \ cb(ctx); \ - _status_ = ctx->future.get(); \ + _status_ = ctx->load_status_future.get(); \ if (!_status_.ok()) { \ LOG(ERROR) << "failed to get future, " << ctx->brief(); \ } \ @@ -468,7 +468,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, } // wait for all consumers finished - HANDLE_ERROR(ctx->future.get(), "consume failed"); + HANDLE_ERROR(ctx->load_status_future.get(), "consume failed"); ctx->load_cost_millis = UnixMillis() - ctx->start_millis; @@ -549,12 +549,12 @@ Status RoutineLoadTaskExecutor::_execute_plan_for_test(std::shared_ptrread_at(0, result, &read_bytes); if (!st.ok()) { LOG(WARNING) << "read failed"; - ctx->promise.set_value(st); + ctx->load_status_promise.set_value(st); break; } if (read_bytes == 0) { - ctx->promise.set_value(Status::OK()); + ctx->load_status_promise.set_value(Status::OK()); break; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index d66602a8d2387f..9d09498c12fdda 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -204,8 +205,8 @@ class StreamLoadContext { std::vector commit_infos; - std::promise promise; - std::future future = promise.get_future(); + std::promise load_status_promise; + std::future load_status_future = load_status_promise.get_future(); Status status; @@ -256,6 +257,15 @@ class StreamLoadContext { std::string qualified_user; std::string cloud_cluster; + std::mutex _send_reply_lock; + // maybe > 1 callback send reply, so we need to notify all, + // but `_finish_send_reply` will protect that only one callback send reply + std::condition_variable _can_send_reply_cv; + // avoid sending reply before on_chunk_data finish, or client will receive `broken pipe` + bool _can_send_reply = false; + // avoid sending reply two times + bool _finish_send_reply = false; + public: ExecEnv* exec_env() { return _exec_env; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 5ac18954e1d091..31a8598f55a1b9 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -27,8 +27,11 @@ #include #include +#include #include #include +#include +#include #include #include #include @@ -67,6 +70,12 @@ bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", "commit_tx Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, const TPipelineFragmentParamsList& parent) { + return execute_plan_fragment(ctx, parent, [](std::shared_ptr ctx) {}); +} + +Status StreamLoadExecutor::execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, + const std::function ctx)>& cb) { // submit this params #ifndef BE_TEST ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false); @@ -74,7 +83,9 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx, const TPipelineFragmentParamsList& parent); + Status execute_plan_fragment( + std::shared_ptr ctx, const TPipelineFragmentParamsList& parent, + const std::function ctx)>& cb); + protected: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index cd75a45717d5ab..f22e6fc8350bf8 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1280,7 +1280,7 @@ void PInternalService::report_stream_load_status(google::protobuf::RpcController if (!stream_load_ctx) { st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - stream_load_ctx->promise.set_value(st); + stream_load_ctx->load_status_promise.set_value(st); st.to_protobuf(response->mutable_status()); }