Skip to content

Commit

Permalink
fix cases
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Feb 15, 2025
1 parent afd1856 commit 85b6160
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 76 deletions.
1 change: 1 addition & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>();
_mem_tracker = std::make_shared<MemTracker>();
}

void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -186,8 +187,11 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in

DEFER_RELEASE_RESERVED();

/// FIXME: support UT
#ifndef BE_TEST
auto reserve_size = memtable->get_flush_reserve_memory_size();
RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
#endif

Defer defer {[&]() {
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
Expand Down
15 changes: 8 additions & 7 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,14 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
if (_sink_buffer) {
fmt::format_to(
debug_string_buffer,
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {} , each queue size: {}",
_sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(),
_working_channels_count.load(), _sink_buffer->debug_each_instance_queue_size());
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {}, total "
"channels: {}, remote channels: {}, each queue size: {}",
_sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(),
_reach_limit.load(), _working_channels_count.load(), channels.size(),
_rpc_channels_num, _sink_buffer->debug_each_instance_queue_size());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
14 changes: 9 additions & 5 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
Expand Down Expand Up @@ -113,11 +114,11 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));

#ifndef NDEBUG
if (state->fuzzy_disable_runtime_filter_in_be()) {
if ((_parent->operator_id() + random()) % 2 == 0) {
RETURN_IF_ERROR(disable_runtime_filters(state));
}
}
// if (state->fuzzy_disable_runtime_filter_in_be()) {
// if ((_parent->operator_id() + random()) % 2 == 0) {
// RETURN_IF_ERROR(disable_runtime_filters(state));
// }
// }
#endif

return Status::OK();
Expand Down Expand Up @@ -237,6 +238,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

LOG(INFO) << "Query: " << print_id(state->query_id())
<< "hashjoin sink close: " << _runtime_filter_slots << ", " << _runtime_filters.size()
<< ", " << _eos << ", " << _runtime_filters_disabled;
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos ||
_runtime_filters_disabled) {
return Base::close(state, exec_status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
UPDATE_PROFILE("MemoryUsageHashTable");
UPDATE_PROFILE("MemoryUsageSerializeKeyArena");
UPDATE_PROFILE("BuildTime");
UPDATE_PROFILE("SerializeKeyTime");
UPDATE_PROFILE("MergeTime");
UPDATE_PROFILE("SerializeDataTime");
UPDATE_PROFILE("DeserializeAndMergeTime");
UPDATE_PROFILE("HashTableComputeTime");
UPDATE_PROFILE("HashTableEmplaceTime");
Expand Down
13 changes: 3 additions & 10 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,14 @@ void PartitionedHashJoinProbeLocalState::update_build_profile(RuntimeProfile* ch

template <bool spilled>
void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* child_profile) {
UPDATE_COUNTER_FROM_INNER("ProbeTime");
UPDATE_COUNTER_FROM_INNER("JoinFilterTimer");
UPDATE_COUNTER_FROM_INNER("BuildOutputBlock");
UPDATE_COUNTER_FROM_INNER("ProbeRows");
UPDATE_COUNTER_FROM_INNER("ProbeFindNextTime");
UPDATE_COUNTER_FROM_INNER("ProbeExprCallTime");
UPDATE_COUNTER_FROM_INNER("ProbeWhenSearchHashTableTime");
UPDATE_COUNTER_FROM_INNER("ProbeWhenBuildSideOutputTime");
UPDATE_COUNTER_FROM_INNER("ProbeWhenProbeSideOutputTime");
UPDATE_COUNTER_FROM_INNER("ProbeWhenProcessHashTableTime");
UPDATE_COUNTER_FROM_INNER("OtherJoinConjunctTime");
UPDATE_COUNTER_FROM_INNER("NonEqualJoinConjunctEvaluationTime");
UPDATE_COUNTER_FROM_INNER("InitProbeSideTime");
}

Expand Down Expand Up @@ -826,15 +823,11 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
auto& local_state = get_local_state(state);
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = _revocable_mem_size(state);
const auto min_revocable_size = state->spill_min_revocable_mem();

if (state->get_query_ctx()->low_memory_mode()) {
return revocable_size >
std::min<int64_t>(min_revocable_size,
static_cast<int64_t>(
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM));
return revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
} else {
return revocable_size > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
return revocable_size >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
}
}
return false;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
if (PipelineXSpillSinkLocalState::_closed) {
return Status::OK();
}
DCHECK(_shared_state->inner_runtime_state != nullptr);
VLOG_DEBUG << "Query:" << print_id(state->query_id())
<< ", hash join sink:" << _parent->node_id() << ", task:" << state->task_id()
<< ", close";
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) {
RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(),
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,

sink_info.local_state->_memory_used_counter->set(
sink_info.local_state->_shared_state->mem_usage);

return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
#include "vec/common/sort/heap_sorter.h"
#include "vec/common/sort/topn_sorter.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/spill/spill_stream.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -1865,7 +1866,7 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const
}

size_t revocable_size = task->get_revocable_size();
if (revocable_size > _runtime_state->spill_min_revocable_mem()) {
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
res += revocable_size;
}
}
Expand All @@ -1878,8 +1879,7 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const
for (const auto& task_instances : _tasks) {
for (const auto& task : task_instances) {
size_t revocable_size_ = task->get_revocable_size();
if (revocable_size_ > _runtime_state->spill_min_revocable_mem() ||
(revocable_size_ > 0 && _query_ctx->enable_force_spill())) {
if (revocable_size_ >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
revocable_tasks.emplace_back(task.get());
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ Status PipelineTask::execute(bool* eos) {
}
// If the operator is not spillable or it is spillable but not has much memory to spill
// not need add to paused list, just let it go.
if (_sink->revocable_mem_size(_state) >= _state->spill_min_revocable_mem()) {
if (_sink->revocable_mem_size(_state) >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
VLOG_DEBUG << debug_msg;
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
Expand Down
30 changes: 15 additions & 15 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "common/status.h"
#include "runtime/memory/mem_counter.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/query_statistics.h"
#include "util/string_util.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -147,6 +146,7 @@ class MemTrackerLimiter final {
Status check_limit(int64_t bytes = 0);
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();
bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; }
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }
Expand All @@ -172,6 +172,19 @@ class MemTrackerLimiter final {

void release(int64_t bytes) { _mem_counter.sub(bytes); }

bool try_consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool rt = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
rt = _mem_counter.try_add(bytes, _limit);
} else {
_mem_counter.add(bytes);
}
return rt;
}

void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }

// Transfer 'bytes' of consumption from this tracker to 'dst'.
Expand All @@ -198,22 +211,11 @@ class MemTrackerLimiter final {
return;
}
_mem_counter.add(bytes);
if (_query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
_reserved_counter.add(bytes);
}

bool try_reserve(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool rt = _mem_counter.try_add(bytes, _limit.load());
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
bool rt = try_consume(bytes);
if (rt) {
_reserved_counter.add(bytes);
}
Expand Down Expand Up @@ -336,8 +338,6 @@ class MemTrackerLimiter final {
// Avoid frequent printing.
bool _enable_print_log_usage = false;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;

std::shared_ptr<MemTrackerLimiter> _write_tracker;

struct AddressSanitizer {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
4, wg->get_metrics()->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(
5, wg->get_metrics()->get_remote_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), block);
}
}

Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,19 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
do {
THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
} while (block->empty() && !_is_eof);
if (!block->empty()) {
if (block->empty()) {
return false;
}

if (!_ordering_expr.empty()) {
DCHECK_EQ(_ordering_expr.size(), desc.size());
for (int i = 0; i < desc.size(); ++i) {
THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(), &desc[i].column_number));
}
MergeSortCursorImpl::reset();
return true;
}
return false;

MergeSortCursorImpl::reset();
return true;
}

Block* block_ptr() override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
_mgr(stream_mgr),
_memory_used_counter(memory_used_counter),
_resource_ctx(state->get_query_ctx()->resource_ctx()),
_query_context(state->get_query_ctx()),
_query_context(state->get_query_ctx()->shared_from_this()),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_row_desc(row_desc),
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/spill/spill_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "runtime/query_statistics.h"
#include "runtime/workload_management/resource_context.h"
#include "util/runtime_profile.h"

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/spill/spill_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ Status SpillStream::prepare() {
}

SpillReaderUPtr SpillStream::create_separate_reader() const {
return std::make_unique<SpillReader>(
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
writer_->get_file_path());
return std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
writer_->get_file_path());
}

const TUniqueId& SpillStream::query_id() const {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/spill/spill_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <string>

#include "io/fs/file_writer.h"
#include "runtime/query_statistics.h"
#include "runtime/workload_management/resource_context.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
Expand Down
Loading

0 comments on commit 85b6160

Please sign in to comment.