Skip to content

Commit f741922

Browse files
BiteTheDDDDtCopilot
andcommitted
[fix](be) Fix RuntimeFilter selectivity sampling_frequency lost during VExprContext recreation
### What problem does this PR solve? Issue Number: close #xxx Problem Summary: RuntimeFilter selectivity tracking was completely non-functional because `sampling_frequency` was lost during VExprContext recreation. In `RuntimeFilterConsumer::_get_push_exprs()`, `sampling_frequency=32` was set on a temporary `probe_ctx` VExprContext. However, only VRuntimeFilterWrapper expressions (VExpr) were returned, not the VExprContext. When `_append_rf_into_conjuncts()` later created a new VExprContext via `VExprContext::create_shared(expr)`, the new context had default `_sampling_frequency=-1` (DISABLE_SAMPLING). With `_sampling_frequency=-1`, the condition `(_judge_counter++) >= -1` evaluated to `0 >= -1 → true` on every call, causing `reset_judge_selectivity()` to fire every time. This meant selectivity counters were perpetually reset and never accumulated, making the runtime filter selectivity optimization completely ineffective. The fix stores `sampling_frequency` in VRuntimeFilterWrapper (which survives VExprContext recreation) and propagates it to VExprContext in `VRuntimeFilterWrapper::open()`, which is called on both original and cloned contexts. ### Release note Fixed a bug where RuntimeFilter selectivity tracking was non-functional due to sampling_frequency being lost during VExprContext recreation, causing runtime filters that should be skipped (due to low selectivity) to never be identified. ### Check List (For Author) - Test: Unit Test - Added 2 regression tests to runtime_filter_selectivity_test.cpp - Added 3 new tests in vruntimefilter_wrapper_sampling_test.cpp - All 22 tests pass - Behavior changed: No (selectivity tracking was broken before, this makes it work as designed) - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 3d75942 commit f741922

5 files changed

Lines changed: 243 additions & 12 deletions

File tree

be/src/exec/runtime_filter/runtime_filter_consumer.cpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "exec/runtime_filter/runtime_filter_consumer.h"
1919

20+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
2021
#include "exprs/minmax_predicate.h"
2122
#include "exprs/vbitmap_predicate.h"
2223
#include "exprs/vbloom_predicate.h"
@@ -83,11 +84,11 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
8384
auto real_filter_type = _wrapper->get_real_type();
8485
bool null_aware = _wrapper->contain_null();
8586

86-
// Set sampling frequency based on disable_always_true_logic status
87+
// Determine sampling frequency for the always_true optimization.
88+
// This will be propagated to VExprContext in VRuntimeFilterWrapper::open().
8789
int sampling_frequency = _wrapper->disable_always_true_logic()
8890
? RuntimeFilterSelectivity::DISABLE_SAMPLING
8991
: config::runtime_filter_sampling_frequency;
90-
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);
9192

9293
switch (real_filter_type) {
9394
case RuntimeFilterType::IN_FILTER: {
@@ -105,7 +106,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
105106
in_pred->add_child(probe_ctx->root());
106107
auto wrapper = VRuntimeFilterWrapper::create_shared(
107108
node, in_pred, get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
108-
null_aware, _wrapper->filter_id());
109+
null_aware, _wrapper->filter_id(), sampling_frequency);
109110
container.push_back(wrapper);
110111
break;
111112
}
@@ -125,7 +126,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
125126
}
126127
container.push_back(VRuntimeFilterWrapper::create_shared(
127128
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
128-
_wrapper->filter_id()));
129+
_wrapper->filter_id(), sampling_frequency));
129130
break;
130131
}
131132
case RuntimeFilterType::MAX_FILTER: {
@@ -144,7 +145,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
144145
}
145146
container.push_back(VRuntimeFilterWrapper::create_shared(
146147
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
147-
_wrapper->filter_id()));
148+
_wrapper->filter_id(), sampling_frequency));
148149
break;
149150
}
150151
case RuntimeFilterType::MINMAX_FILTER: {
@@ -160,7 +161,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
160161
max_pred->add_child(max_literal);
161162
container.push_back(VRuntimeFilterWrapper::create_shared(
162163
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
163-
_wrapper->filter_id()));
164+
_wrapper->filter_id(), sampling_frequency));
164165

165166
VExprContextSPtr new_probe_ctx;
166167
RETURN_IF_ERROR(VExpr::create_expr_tree(probe_expr, new_probe_ctx));
@@ -177,7 +178,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
177178
min_pred->add_child(min_literal);
178179
container.push_back(VRuntimeFilterWrapper::create_shared(
179180
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
180-
_wrapper->filter_id()));
181+
_wrapper->filter_id(), sampling_frequency));
181182
break;
182183
}
183184
case RuntimeFilterType::BLOOM_FILTER: {
@@ -194,7 +195,8 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
194195
bloom_pred->add_child(probe_ctx->root());
195196
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bloom_pred,
196197
get_bloom_filter_ignore_thredhold(),
197-
null_aware, _wrapper->filter_id());
198+
null_aware, _wrapper->filter_id(),
199+
sampling_frequency);
198200
container.push_back(wrapper);
199201
break;
200202
}
@@ -214,7 +216,8 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
214216
return Status::InternalError("bitmap predicate do not support null aware");
215217
}
216218
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bitmap_pred, 0, null_aware,
217-
_wrapper->filter_id());
219+
_wrapper->filter_id(),
220+
sampling_frequency);
218221
container.push_back(wrapper);
219222
break;
220223
}

be/src/exprs/vruntimefilter_wrapper.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ class VExprContext;
5757

5858
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl,
5959
double ignore_thredhold, bool null_aware,
60-
int filter_id)
60+
int filter_id, int sampling_frequency)
6161
: VExpr(node),
6262
_impl(std::move(impl)),
6363
_ignore_thredhold(ignore_thredhold),
6464
_null_aware(null_aware),
65-
_filter_id(filter_id) {}
65+
_filter_id(filter_id),
66+
_sampling_frequency(sampling_frequency) {}
6667

6768
Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
6869
VExprContext* context) {
@@ -76,6 +77,7 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
7677
FunctionContext::FunctionStateScope scope) {
7778
DCHECK(_prepare_finished);
7879
RETURN_IF_ERROR(_impl->open(state, context, scope));
80+
context->get_runtime_filter_selectivity().set_sampling_frequency(_sampling_frequency);
7981
_open_finished = true;
8082
return Status::OK();
8183
}

be/src/exprs/vruntimefilter_wrapper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include "common/config.h"
2828
#include "common/status.h"
29+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
2930
#include "exprs/function_context.h"
3031
#include "exprs/vexpr.h"
3132
#include "runtime/runtime_profile.h"
@@ -51,7 +52,8 @@ class VRuntimeFilterWrapper final : public VExpr {
5152

5253
public:
5354
VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double ignore_thredhold,
54-
bool null_aware, int filter_id);
55+
bool null_aware, int filter_id,
56+
int sampling_frequency = RuntimeFilterSelectivity::DISABLE_SAMPLING);
5557
~VRuntimeFilterWrapper() override = default;
5658
Status execute_column(VExprContext* context, const Block* block, Selector* selector,
5759
size_t count, ColumnPtr& result_column) const override;
@@ -126,6 +128,7 @@ class VRuntimeFilterWrapper final : public VExpr {
126128
double _ignore_thredhold;
127129
bool _null_aware;
128130
int _filter_id;
131+
int _sampling_frequency;
129132
};
130133

131134
using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;

be/test/exec/runtime_filter/runtime_filter_selectivity_test.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,50 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
228228
}
229229
}
230230

231+
// Regression test: with default sampling_frequency (-1), update_judge_counter()
232+
// always resets because (_judge_counter++) >= -1 is always true.
233+
// This was the root cause of the selectivity accumulation bug.
234+
TEST_F(RuntimeFilterSelectivityTest, default_sampling_frequency_always_resets) {
235+
RuntimeFilterSelectivity selectivity;
236+
// Don't set sampling_frequency — defaults to DISABLE_SAMPLING (-1)
237+
238+
// Accumulate selectivity data: low filter rate -> should be always_true
239+
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
240+
// With default -1, maybe_always_true_can_ignore returns false (disabled)
241+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
242+
243+
// Now call update_judge_counter — with -1, it immediately resets
244+
selectivity.update_judge_counter();
245+
// Verify: accumulated data has been wiped out by the reset
246+
// Even after setting a valid sampling_frequency, the previously accumulated
247+
// selectivity data is gone
248+
selectivity.set_sampling_frequency(100);
249+
// always_true was reset to false by the premature reset
250+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
251+
}
252+
253+
// Verify that setting sampling_frequency correctly prevents premature reset
254+
TEST_F(RuntimeFilterSelectivityTest, proper_sampling_frequency_preserves_accumulation) {
255+
RuntimeFilterSelectivity selectivity;
256+
selectivity.set_sampling_frequency(32);
257+
258+
// Accumulate selectivity: low filter rate
259+
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
260+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
261+
262+
// Counter increments don't reset before reaching sampling_frequency.
263+
// Post-increment semantics: check uses old value, so need 33 calls total
264+
// to trigger reset (counter must reach 32 before comparison fires).
265+
for (int i = 0; i < 32; i++) {
266+
selectivity.update_judge_counter();
267+
}
268+
// Still always_true because counter value 31 was compared last (31 >= 32 → false)
269+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
270+
271+
// 33rd call: counter=32, 32 >= 32 → true → triggers reset
272+
selectivity.update_judge_counter();
273+
// After reset, needs re-evaluation
274+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
275+
}
276+
231277
} // namespace doris
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <glog/logging.h>
19+
#include <gtest/gtest.h>
20+
21+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
22+
#include "exec/runtime_filter/runtime_filter_test_utils.h"
23+
#include "exprs/vexpr_context.h"
24+
#include "exprs/vruntimefilter_wrapper.h"
25+
26+
namespace doris {
27+
28+
// Minimal VExpr implementation for testing VRuntimeFilterWrapper in isolation.
29+
class StubVExpr : public VExpr {
30+
public:
31+
StubVExpr() : VExpr(make_texpr_node()) {}
32+
33+
const std::string& expr_name() const override {
34+
static const std::string name = "StubVExpr";
35+
return name;
36+
}
37+
38+
Status execute(VExprContext*, Block*, int*) const override { return Status::OK(); }
39+
40+
Status execute_column(VExprContext*, const Block*, Selector*, size_t,
41+
ColumnPtr&) const override {
42+
return Status::OK();
43+
}
44+
45+
private:
46+
static TExprNode make_texpr_node() {
47+
return TExprNodeBuilder(
48+
TExprNodeType::SLOT_REF,
49+
TTypeDescBuilder()
50+
.set_types(TTypeNodeBuilder()
51+
.set_type(TTypeNodeType::SCALAR)
52+
.set_scalar_type(TPrimitiveType::INT)
53+
.build())
54+
.build(),
55+
0)
56+
.build();
57+
}
58+
};
59+
60+
class VRuntimeFilterWrapperSamplingTest : public RuntimeFilterTest {};
61+
62+
// Test that VRuntimeFilterWrapper stores and propagates sampling_frequency
63+
// through open() to VExprContext. This is the core fix for the bug where
64+
// sampling_frequency was lost when _append_rf_into_conjuncts creates a new
65+
// VExprContext via VExprContext::create_shared(expr).
66+
TEST_F(VRuntimeFilterWrapperSamplingTest, open_propagates_sampling_frequency) {
67+
auto stub = std::make_shared<StubVExpr>();
68+
auto node = TExprNodeBuilder(
69+
TExprNodeType::SLOT_REF,
70+
TTypeDescBuilder()
71+
.set_types(TTypeNodeBuilder()
72+
.set_type(TTypeNodeType::SCALAR)
73+
.set_scalar_type(TPrimitiveType::INT)
74+
.build())
75+
.build(),
76+
0)
77+
.build();
78+
79+
const int expected_frequency = 32;
80+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
81+
expected_frequency);
82+
83+
// Simulate the VExprContext recreation that happens in _append_rf_into_conjuncts.
84+
// A fresh VExprContext has default sampling_frequency = DISABLE_SAMPLING (-1).
85+
auto context = std::make_shared<VExprContext>(wrapper);
86+
ASSERT_EQ(context->get_runtime_filter_selectivity().maybe_always_true_can_ignore(), false);
87+
88+
RowDescriptor row_desc;
89+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
90+
ASSERT_TRUE(wrapper->open(_runtime_states[0].get(), context.get(),
91+
FunctionContext::FRAGMENT_LOCAL)
92+
.ok());
93+
94+
// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
95+
// to VExprContext. Verify by accumulating low-selectivity data and checking
96+
// that always_true can now be detected.
97+
auto& selectivity = context->get_runtime_filter_selectivity();
98+
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
99+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
100+
}
101+
102+
// Test that default sampling_frequency (DISABLE_SAMPLING) disables the always_true
103+
// optimization, matching the behavior when disable_always_true_logic is set.
104+
TEST_F(VRuntimeFilterWrapperSamplingTest, default_sampling_frequency_disables_optimization) {
105+
auto stub = std::make_shared<StubVExpr>();
106+
auto node = TExprNodeBuilder(
107+
TExprNodeType::SLOT_REF,
108+
TTypeDescBuilder()
109+
.set_types(TTypeNodeBuilder()
110+
.set_type(TTypeNodeType::SCALAR)
111+
.set_scalar_type(TPrimitiveType::INT)
112+
.build())
113+
.build(),
114+
0)
115+
.build();
116+
117+
// No sampling_frequency argument - uses default DISABLE_SAMPLING
118+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1);
119+
120+
auto context = std::make_shared<VExprContext>(wrapper);
121+
RowDescriptor row_desc;
122+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
123+
ASSERT_TRUE(wrapper->open(_runtime_states[0].get(), context.get(),
124+
FunctionContext::FRAGMENT_LOCAL)
125+
.ok());
126+
127+
// Even with low-selectivity data, always_true should NOT be detected
128+
// because sampling is disabled
129+
auto& selectivity = context->get_runtime_filter_selectivity();
130+
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
131+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
132+
}
133+
134+
// Test that sampling_frequency survives VExprContext recreation, which is the
135+
// exact scenario that caused the original bug.
136+
TEST_F(VRuntimeFilterWrapperSamplingTest, sampling_frequency_survives_context_recreation) {
137+
auto stub = std::make_shared<StubVExpr>();
138+
auto node = TExprNodeBuilder(
139+
TExprNodeType::SLOT_REF,
140+
TTypeDescBuilder()
141+
.set_types(TTypeNodeBuilder()
142+
.set_type(TTypeNodeType::SCALAR)
143+
.set_scalar_type(TPrimitiveType::INT)
144+
.build())
145+
.build(),
146+
0)
147+
.build();
148+
149+
const int expected_frequency = 32;
150+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
151+
expected_frequency);
152+
153+
// First context - prepare and open work
154+
auto context1 = std::make_shared<VExprContext>(wrapper);
155+
RowDescriptor row_desc;
156+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context1.get()).ok());
157+
ASSERT_TRUE(wrapper->open(_runtime_states[0].get(), context1.get(),
158+
FunctionContext::FRAGMENT_LOCAL)
159+
.ok());
160+
161+
// Simulate context recreation (what _append_rf_into_conjuncts does):
162+
// Create a brand new VExprContext with the same VRuntimeFilterWrapper.
163+
// The new context starts with default sampling_frequency = -1.
164+
auto context2 = std::make_shared<VExprContext>(wrapper);
165+
EXPECT_FALSE(context2->get_runtime_filter_selectivity().maybe_always_true_can_ignore());
166+
167+
// After open() on the new context, sampling_frequency should be propagated
168+
ASSERT_TRUE(wrapper->open(_runtime_states[0].get(), context2.get(),
169+
FunctionContext::THREAD_LOCAL)
170+
.ok());
171+
172+
auto& selectivity2 = context2->get_runtime_filter_selectivity();
173+
selectivity2.update_judge_selectivity(1, 2000, 50000, 0.1);
174+
EXPECT_TRUE(selectivity2.maybe_always_true_can_ignore());
175+
}
176+
177+
} // namespace doris

0 commit comments

Comments
 (0)