-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat](spill) spill and reserve #47462
Conversation
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
run buildall |
TPC-H: Total hot run time: 32548 ms
|
TPC-DS: Total hot run time: 193144 ms
|
ClickBench: Total hot run time: 31.37 s
|
9dce485
to
ccc257c
Compare
run buildall |
ccc257c
to
6613edc
Compare
run buildall |
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 31551 ms
|
TPC-DS: Total hot run time: 190786 ms
|
ClickBench: Total hot run time: 31.46 s
|
6613edc
to
85b6160
Compare
run buildall |
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 31894 ms
|
TPC-DS: Total hot run time: 190712 ms
|
ClickBench: Total hot run time: 31.48 s
|
85b6160
to
2b1cf11
Compare
run buildall |
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 31885 ms
|
TPC-DS: Total hot run time: 190515 ms
|
2b1cf11
to
1187a61
Compare
run buildall |
TPC-DS: Total hot run time: 184587 ms
|
ClickBench: Total hot run time: 31.4 s
|
run performance |
TPC-H: Total hot run time: 31557 ms
|
TPC-DS: Total hot run time: 184789 ms
|
ClickBench: Total hot run time: 30.5 s
|
@@ -209,6 +250,91 @@ void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> m | |||
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); | |||
} | |||
|
|||
int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO:WorkloadGroup::free_overcommited_memory 和 WorkloadGroup::gc_memory 合并成一个方法
auto& queries_list = it->second; | ||
const auto& wg = it->first; | ||
|
||
LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这行日志没啥意思吧,都没打印 wg id
continue; | ||
} | ||
if (is_low_watermark) { | ||
query_ctx->set_low_memory_mode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么只有 wg 设置了 slot 策略且 is_low_watermark 为 true,才把所有 query 设置 low memory mode
<< " enabled hard limit, but the slot count < 1, could not take affect"; | ||
} else { | ||
// If the query enable hard limit, then not use weighted info any more, just use the settings limit. | ||
query_weighted_mem_limit = (int64_t)((wg_high_water_mark_except_load * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那如果 wg 的 slot_memory_policy() == TWgSlotMemoryPolicy::FIXED
,这个 wg 所有 query 的 query_weighted_mem_limit
加起来会超过 wg_high_water_mark_except_load
么
// If there are some query in paused list, then limit should take effect. | ||
expected_query_weighted_mem_limit = | ||
total_used_slot_count > 0 | ||
? (int64_t)((wg_high_water_mark_except_load + total_used_slot_count) * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么 wg_high_water_mark_except_load
+ total_used_slot_count
,这两个变量的单位都不同,一个是 bytes,一个是 slot count
} | ||
} | ||
// calculate per query weighted memory limit | ||
debug_msg = "Query Memory Summary: \n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里应该是 debug_msg +=
,上面对 debug_msg 初始化了 wg 的信息,这里覆盖了
// memory failed and we did not hanle it. | ||
if (!query_ctx->is_pure_load_task()) { | ||
query_ctx->set_mem_limit(query_weighted_mem_limit); | ||
query_ctx->set_adjusted_mem_limit(expected_query_weighted_mem_limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryContext 中没有解释 _adjusted_mem_limit
和 mem_limit
的区别,建议加上注释,否则得把这个函数全看懂才知道他们的含义
然后可以考虑给 QueryContext 新增一个方法,比如:
void effect_adjusted_mem_limit() {
set_mem_limit(query_ctx->adjusted_mem_limit())
{
这样更好理解二者的含义
return true; | ||
} else if (time_in_queue >= config::spill_in_paused_queue_timeout_ms) { | ||
// if cannot find any memory to release, then let the query continue to run as far as possible | ||
// or cancelled by gc if memory is really not enough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其实让 query 继续执行,并 disable_reserve_memory 后,query 大概率会在 Allocator 中检查 query memory tracker limit 时抛出异常并终止。只有在进程内存超限时才会被 gc cancel。
注释问题,也可不改
} | ||
} else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { | ||
// Only deal with non overcommit workload group. | ||
if (wg->enable_memory_overcommit()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
允许 overcommit 的 wg 中的 query 就不应该因为 WORKLOAD_GROUP_MEMORY_EXCEEDED
而被放入 paused list 中吧。
这里将 paused query 重新 set_memory_sufficient 后执行,还会再次因为 WORKLOAD_GROUP_MEMORY_EXCEEDED
而被放入 paused list 中么。
另一个问题,一个 Query 会多次进入 paused list 么,如果是的话,那可能导致频繁暂停和继续执行。
// should set the query's limit only. | ||
// Check the query's reserve with expected limit. | ||
if (query_ctx->adjusted_mem_limit() < | ||
query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query 因为 WORKLOAD_GROUP_MEMORY_EXCEEDED
被暂停后,wg不允许overcommit,此时 adjusted_mem_limit
> query memory used
+ reserve_size
,为什么修改 query mem limit 后让 query pipeline task 继续执行?
继续执行后 query 只会再次进入 paused list 暂停或 set_low_memory_mode,应该调用 handle_single_query_
去 spill 或 cancel 吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我知道了,修改 query mem limit 后,query再次进入暂停状态的原因预期是 QUERY_MEMORY_EXCEEDED
,将调用 handle_single_query_
去 spill。
|
||
// Return the expected free bytes if memtable could flush | ||
int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg, | ||
int64_t need_free_mem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
参数 need_free_mem 没有用到
// should set the query's limit only. | ||
// Check the query's reserve with expected limit. | ||
if (query_ctx->adjusted_mem_limit() < | ||
query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我知道了,修改 query mem limit 后,query再次进入暂停状态的原因预期是 QUERY_MEMORY_EXCEEDED
,将调用 handle_single_query_
去 spill。
continue; | ||
} | ||
if (!has_changed_hard_limit) { | ||
update_queries_limit_(wg, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个 update_queries_limit_
有必要么,每20ms调用一次 refresh_wg_weighted_memory_limit
就会 update。
就算这里要 update,也应该挪到下面这段代码的上面:
if (query_ctx->adjusted_mem_limit() <
query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) {
query_ctx->get_mem_tracker()->consumption()) | ||
<< ", wg: " << wg->debug_string(); | ||
} | ||
if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
当多个query因为WORKLOAD_GROUP_MEMORY_EXCEEDED
暂停且 slot_memory_policy
等于 None 时,逻辑似乎不合理
当 slot_memory_policy
不等于 None 时
第一次 handle_paused_queries
时,会将 memory used + reserve > adjusted_mem_limit
的 query 的 adjusted_mem_limit
更新为 mem_limit
后继续执行,其他 query 继续等待。
第二次 handle_paused_queries
时内存超限的 query 预期 会因为 QUERY_MEMORY_EXCEEDED
进入暂停状态,然后将 spill 或者cancel,等内存释放后其他 Query 继续执行。
当 slot_memory_policy
等于 None 时
第一次 handle_paused_queries
时,同样会将 memory used + reserve > adjusted_mem_limit
的 query 的 adjusted_mem_limit
更新为 mem_limit
后继续执行。其他 query 则会直接调用 handle_single_query_
去 spill 或 cancel。
不合理在于,slot_memory_policy
等于 None 时,内存超限的 query 继续执行(虽然预期会因为 QUERY_MEMORY_EXCEEDED
再次暂停),而没有内存超限的 query 则直接 spill 了。
run buildall |
TPC-H: Total hot run time: 31797 ms
|
TPC-DS: Total hot run time: 184125 ms
|
ClickBench: Total hot run time: 30.72 s
|
TeamCity be ut coverage result: |
} | ||
} | ||
|
||
if (has_query_exceed_process_memlimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里应该是 if (!has_query_exceed_process_memlimit)
吧,取反
0.05 && | ||
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > | ||
0.05) { | ||
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO:对 cache 容量的调整放在这里不太好,后面看有没有更好的实现方式
// need to check config::disable_memory_gc here, if not, when config::disable_memory_gc == true, | ||
// cache is not adjusted, query_it->cache_ratio_ will always be 1, and this if branch will nenver | ||
// execute, this query will never be resumed, and will deadlock here | ||
if ((!config::disable_memory_gc && query_it->cache_ratio_ < 0.05) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
实际上 cache 释放需要时间,几十G的 page cache 全部释放需要接近1s,
这里 cache_ratio_ < 0.05
期望的语义是 "当前所有Cache已经都释放完了,如果其他地方不能释放内存就 spill",但实际上 Cache 可能并没有释放完。
TODO:给 CacheManager 加个方法,返回所有 Cache 的使用率,若低于某个值,则认为 "所有Cache释放完了,不必等待Cache释放内存了"
|
||
// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage task that with most memory. | ||
// 2. Maybe not valid because it's memory not exceed limit. | ||
int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_free_mem, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO:我后面会实现,但如上所说,revoke overcommited wg 的触发位置有待商榷
} | ||
if (hard_limit) { | ||
freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, | ||
doris::QUERY_MIN_MEMORY, profile.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为啥要判断 hard_limit,然后设置一个 cancel 的 lower_bound
呢
return 0; | ||
} | ||
|
||
int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
所以支持 overcommit 的 wg 内存 overcommited 后,只能是 process memory 超过 soft limit 后,由其他 wg 的 query 去释放这个 overcommited 的 wg 对吧。
这个调用关系看似合理,但会不会导致频繁调用呢,比如 100个暂停的 query 都去 revoke 其他 wg。
类似的需求,我第一反应在 GC 线程里通过一定策略去 revoke overcommited 的 wg。
continue; | ||
} | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果已经有 query A 去 revoke 了其他 overcommited wg,并将 has_revoked_from_other_group 设为 true
此处让其他 query 直接继续执行不合理,因为 query A 的 reserve size 可能很小,所以 revoke 其他 overcommited wg 很小一块内存。
其他 query 继续执行的语义是 "所有能释放的内存都释放了,所以不管三七二十一全跑起来吧! ",但可能还有 overcommited wg 没有全部释放 overcommited 的部分。
VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) | ||
<< " is resumed after revoke memory from other group."; | ||
query_it = queries_list.erase(query_it); | ||
// Do not care if the revoked_size > reserve size, and try to run again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里不管 revoked_size 是否大于 reserve size,都继续执行 query,真的没问题么。
因为没有让 adjusted_mem_limit 生效,所以 query 重新执行后不会因为 QUERY_MEMORY_EXCEEDED
进入暂停状态
Query 再次进入暂停状态大概率还是因为 process memory exceed
,那就会不断 revoke overcommited 的 wg,直到 revoke = 0,没有 overcommited 的 wg 后,才会去 spill 当前 query。看似合理,但总感觉线上会有坑,比如 query 看起来一卡一卡
的
} | ||
if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < | ||
0.05 && | ||
query_it->cache_ratio_ > 0.05) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的问题和上面对 cache_ratio_ < 0.05
含义的分析一样,
TODO:给 CacheManager 加个方法,返回所有 Cache 的使用率,若低于某个值,则认为 "所有Cache释放完了,不必等待Cache释放内存了"
run cloudut |
TeamCity cloud ut coverage result: |
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
And also fix the WorkloadGroupTest::testCreateNormal After merged the spill and reserve code(apache#47462), the properties of `WorkloadGroup` was changed.
What problem does this PR solve?
Problem Summary:
A brand-new spilling triggering strategy:
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)