Skip to content

Commit 160c240

Browse files
committed
Squashed commit of the following:
commit 14f92b8 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:07:20 2026 +0000 Introduce batchInterval and batchDebounce. commit 12d839c Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:06:45 2026 +0000 Introduce batch notify and assign actions. commit 7393a00 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:06:12 2026 +0000 Use RWLock instead of single Mutext in MemoryAwaitedActionDb. commit 8cfeade Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:05:37 2026 +0000 Fix batch matching by allowing same worker accept multiple jobs. commit 7cd29c9 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Fri Jan 23 12:15:40 2026 +0000 Introduce batch worker matching.
1 parent b0a7c23 commit 160c240

7 files changed

Lines changed: 550 additions & 52 deletions

File tree

nativelink-config/src/schedulers.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ const fn default_worker_match_logging_interval_s() -> i64 {
8282
10
8383
}
8484

85+
/// Default batch interval in milliseconds (100ms).
86+
/// This is the maximum time between batch matching cycles.
87+
const fn default_batch_interval_ms() -> u64 {
88+
100
89+
}
90+
91+
/// Default debounce window in milliseconds (20ms).
92+
/// After a trigger, wait this long to collect more changes before running.
93+
const fn default_batch_debounce_ms() -> u64 {
94+
20
95+
}
96+
8597
#[derive(Deserialize, Serialize, Debug, Default)]
8698
#[serde(deny_unknown_fields)]
8799
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
@@ -166,6 +178,38 @@ pub struct SimpleSpec {
166178
deserialize_with = "convert_duration_with_shellexpand_and_negative"
167179
)]
168180
pub worker_match_logging_interval_s: i64,
181+
182+
/// Enable batch worker matching optimization.
183+
/// When enabled, the scheduler will collect queued actions and match them
184+
/// to workers in a single batch operation, reducing lock contention.
185+
/// This can significantly improve throughput when there are many queued
186+
/// actions and workers.
187+
/// Default: false
188+
#[serde(default)]
189+
pub enable_batch_worker_matching: bool,
190+
191+
/// Maximum interval between batch matching cycles (milliseconds).
192+
/// Even without triggers, matching runs at least this often.
193+
/// Only used when `enable_batch_worker_matching` is true.
194+
/// Default: 100ms
195+
#[serde(
196+
default = "default_batch_interval_ms",
197+
deserialize_with = "convert_numeric_with_shellexpand"
198+
)]
199+
pub batch_interval_ms: u64,
200+
201+
/// Debounce window after first trigger (milliseconds).
202+
/// When a task or worker change notification is received, wait this long
203+
/// to collect additional changes before running batch match.
204+
/// This improves batching efficiency under bursty load.
205+
/// 0 = immediate (no debounce).
206+
/// Only used when `enable_batch_worker_matching` is true.
207+
/// Default: 20ms
208+
#[serde(
209+
default = "default_batch_debounce_ms",
210+
deserialize_with = "convert_numeric_with_shellexpand"
211+
)]
212+
pub batch_debounce_ms: u64,
169213
}
170214

171215
#[derive(Deserialize, Serialize, Debug)]

nativelink-metric/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,18 @@ impl<T: MetricsComponent> MetricsComponent for async_lock::Mutex<T> {
458458
}
459459
}
460460

461+
impl<T: MetricsComponent> MetricsComponent for async_lock::RwLock<T> {
462+
fn publish(
463+
&self,
464+
kind: MetricKind,
465+
field_metadata: MetricFieldData,
466+
) -> Result<MetricPublishKnownKindData, Error> {
467+
// It is safe to block in the publishing thread.
468+
let lock = self.read_blocking();
469+
lock.publish(kind, field_metadata)
470+
}
471+
}
472+
461473
impl<T: MetricsComponent> MetricsComponent for parking_lot::Mutex<T> {
462474
fn publish(
463475
&self,

0 commit comments

Comments
 (0)