Skip to content

Commit 7f8487f

Browse files
committed
Attempt at fixing flakiness
```mac count=2 failures: test_kernel_request_priority_over_idle_tasks test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.70s stderr ─── thread 'test_kernel_request_priority_over_idle_tasks' (109970) panicked at crates/ark/tests/data_explorer_priority.rs:72:14: Expected Idle status, got CommMsg(JupyterMessage { zmq_identities: [], header: JupyterHeader { msg_id: "243cf539-e28a-487d-9fc7-43e72f3d55af", session: "71034bea-15a2-4bdf-906f-48acf4ced1e3", username: "kernel", date: "2026-03-16T14:20:32.871678+00:00", msg_type: "comm_msg", version: "5.3" }, parent_header: Some(JupyterHeader { msg_id: "8d15e26e-89bb-4791-b560-ec8907f0eda0", session: "test-session", username: "test-user", date: "2026-03-16T14:20:32.860651+00:00", msg_type: "comm_msg", version: "5.3" }), content: CommWireMsg { comm_id: "3ee9adb6-b60e-4b5d-b3e5-ce401e9d6bfa", data: Object {"method": String("GetSchemaReply"), "result": Object {"columns": Array [Object {"column_name": String("a"), "column_label": Null, "column_index": Number(0), "type_name": String("int"), "type_display": String("integer"), "description": Null, "children": Null, "precision": Null, "scale": Null, "timezone": Null, "type_size": Null}]}} } }) note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ```
1 parent e95feb6 commit 7f8487f

3 files changed

Lines changed: 38 additions & 104 deletions

File tree

crates/ark/src/r_task.rs

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ use std::sync::LazyLock;
1212
use std::sync::Mutex;
1313
use std::time::Duration;
1414

15-
use amalthea::comm::comm_channel::CommMsg;
16-
use amalthea::wire::header::JupyterHeader;
1715
use crossbeam::channel::bounded;
1816
use crossbeam::channel::unbounded;
1917
use crossbeam::channel::Receiver;
@@ -24,7 +22,6 @@ use uuid::Uuid;
2422
use crate::console::Console;
2523
use crate::console::ConsoleOutputCapture;
2624
use crate::fixtures::r_test_init;
27-
use crate::request::KernelRequest;
2825

2926
/// Task channels for interrupt-time tasks
3027
static INTERRUPT_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);
@@ -457,22 +454,6 @@ pub(crate) fn spawn(task: RTask) {
457454
static TEST_PENDING_TASK_TX: Mutex<Option<futures::channel::oneshot::Sender<()>>> =
458455
Mutex::new(None);
459456

460-
/// Clone of the kernel-request channel sender, stored at startup when
461-
/// `IS_TESTING` is true. Allows R-callable test helpers (e.g.
462-
/// `ps_test_send_kernel_comm_request`) to enqueue kernel requests directly
463-
/// from the R thread, bypassing the Shell thread. This is used to create
464-
/// deterministic contention between kernel requests and idle tasks in the
465-
/// event loop without any race window.
466-
#[cfg(debug_assertions)]
467-
static TEST_KERNEL_REQUEST_TX: Mutex<Option<Sender<KernelRequest>>> = Mutex::new(None);
468-
469-
/// Store a clone of `kernel_request_tx` so test helpers can enqueue kernel
470-
/// requests directly from the R thread.
471-
#[cfg(debug_assertions)]
472-
pub(crate) fn set_test_kernel_request_tx(tx: Sender<KernelRequest>) {
473-
*TEST_KERNEL_REQUEST_TX.lock().unwrap() = Some(tx);
474-
}
475-
476457
#[cfg(debug_assertions)]
477458
#[harp::register]
478459
unsafe extern "C-unwind" fn ps_test_spawn_pending_task() -> anyhow::Result<SEXP> {
@@ -538,51 +519,3 @@ unsafe extern "C-unwind" fn ps_test_spawn_sleeping_idle_tasks(
538519

539520
Ok(libr::R_NilValue)
540521
}
541-
542-
/// Send a comm RPC request directly onto the kernel-request channel from
543-
/// the R thread. This bypasses the Shell thread entirely, so the request
544-
/// is already pending when R enters the event loop.
545-
///
546-
/// Used by integration tests to create deterministic contention between
547-
/// kernel requests and idle tasks, without any race window.
548-
#[cfg(debug_assertions)]
549-
#[harp::register]
550-
unsafe extern "C-unwind" fn ps_test_send_kernel_comm_request(
551-
comm_id: SEXP,
552-
data: SEXP,
553-
) -> anyhow::Result<SEXP> {
554-
stdext::assert_testing();
555-
556-
let comm_id: String = harp::RObject::view(comm_id).try_into()?;
557-
let data: String = harp::RObject::view(data).try_into()?;
558-
let json: serde_json::Value = serde_json::from_str(&data)?;
559-
560-
let msg = CommMsg::Rpc {
561-
id: String::from("test-rpc"),
562-
parent_header: JupyterHeader::create(
563-
String::from("comm_msg"),
564-
String::from("test-session"),
565-
String::from("test-user"),
566-
),
567-
data: json,
568-
};
569-
570-
// We don't wait on `done_rx`. When the event loop processes the
571-
// request it will send on `done_tx`, which harmlessly fails since the
572-
// receiver is already dropped.
573-
let (done_tx, _done_rx) = bounded(0);
574-
575-
let tx = TEST_KERNEL_REQUEST_TX.lock().unwrap();
576-
let tx = tx
577-
.as_ref()
578-
.ok_or_else(|| anyhow::anyhow!("kernel_request_tx not set; is the kernel running?"))?;
579-
580-
tx.try_send(KernelRequest::CommMsg {
581-
comm_id,
582-
msg,
583-
done_tx,
584-
})
585-
.map_err(|err| anyhow::anyhow!("Failed to send kernel request: {err}"))?;
586-
587-
Ok(libr::R_NilValue)
588-
}

crates/ark/src/start.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ pub fn start_kernel(
5959
let (r_request_tx, r_request_rx) = bounded::<RRequest>(1);
6060
let (kernel_request_tx, kernel_request_rx) = bounded::<KernelRequest>(1);
6161

62-
#[cfg(debug_assertions)]
63-
if stdext::IS_TESTING {
64-
crate::r_task::set_test_kernel_request_tx(kernel_request_tx.clone());
65-
}
66-
6762
// Async communication channel with the R thread (Console)
6863
let (console_notification_tx, console_notification_rx) =
6964
tokio::sync::mpsc::unbounded_channel::<ConsoleNotification>();

crates/ark/tests/data_explorer_priority.rs

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,20 @@ use ark_test::DummyArkFrontend;
2121
/// Verify that Shell requests (`get_schema`) are prioritized over idle tasks.
2222
///
2323
/// 1. Open a data explorer so we have a comm to send RPCs on.
24-
/// 2. In a single execute request, spawn 5 idle tasks that each sleep for
25-
/// 200ms **and** enqueue a `get_schema` kernel request directly on the
26-
/// kernel-request channel. Because both happen inside the same execute
27-
/// window, the kernel request is already pending when R returns to the
28-
/// event loop — no race.
29-
/// 3. Assert that the reply arrives in well under a single sleep duration.
30-
/// With the priority fix the kernel request is serviced before any
31-
/// sleeping idle task runs, so the reply is near-instant. Without the
32-
/// fix `select` picks randomly, so at least one 200ms sleeper runs
33-
/// first.
24+
/// 2. Spawn 5 idle tasks that each sleep for 200ms.
25+
/// 3. After the execute request completes, send `get_schema` through Shell.
26+
/// Shell dispatches a `KernelRequest` to the R thread and blocks until
27+
/// it's processed, so the ordering on IOPub is deterministic:
28+
/// Busy -> CommMsg -> Idle.
29+
/// 4. With the priority fix, R processes the kernel request after finishing
30+
/// at most one idle task (~200ms). Without the fix, `select` picks
31+
/// randomly among ready channels, so multiple idle tasks could run
32+
/// first (~600ms+).
3433
#[test]
3534
fn test_kernel_request_priority_over_idle_tasks() {
3635
let frontend = DummyArkFrontend::lock();
3736

38-
// A small data frame is enough the contention comes from the
37+
// A small data frame is enough -- the contention comes from the
3938
// sleeping idle tasks, not from profile computation.
4039
frontend.send_execute_request(
4140
"test_priority_df <- data.frame(a = 1:5)",
@@ -48,35 +47,41 @@ fn test_kernel_request_priority_over_idle_tasks() {
4847

4948
let comm_id = frontend.open_data_explorer("test_priority_df");
5049

51-
// Build the JSON for the `get_schema` RPC.
50+
// Build the get_schema RPC data. The `id` field marks it as an RPC
51+
// so Shell creates a `CommMsg::Rpc`.
5252
let schema_request = DataExplorerBackendRequest::GetSchema(GetSchemaParams {
5353
column_indices: vec![0],
5454
});
55-
let schema_json = serde_json::to_string(&schema_request).unwrap();
55+
let mut data = serde_json::to_value(&schema_request).unwrap();
56+
data["id"] = serde_json::Value::String(String::from("test-rpc"));
5657

57-
// In a single execute request:
58-
// - Spawn 5 idle tasks that each block the R thread for 200ms
59-
// - Enqueue a `get_schema` kernel request directly on the
60-
// kernel-request channel via `ps_test_send_kernel_comm_request`
61-
//
62-
// Both are pending when R returns to the event loop.
63-
let code = format!(
64-
r#"
65-
invisible(.Call("ps_test_spawn_sleeping_idle_tasks", 5L, 200L))
66-
invisible(.Call("ps_test_send_kernel_comm_request", "{comm_id}", '{schema_json}'))
67-
"#,
58+
// Spawn 5 idle tasks that each block the R thread for 200ms.
59+
// After this execute request completes, R enters the event loop with
60+
// all 5 tasks ready on the idle-task channel.
61+
frontend.send_execute_request(
62+
r#"invisible(.Call("ps_test_spawn_sleeping_idle_tasks", 5L, 200L))"#,
63+
ExecuteRequestOptions::default(),
6864
);
69-
frontend.send_execute_request(&code, ExecuteRequestOptions::default());
7065
frontend.recv_iopub_busy();
7166
frontend.recv_iopub_execute_input();
7267
frontend.recv_iopub_idle();
7368
frontend.recv_shell_execute_reply();
7469

75-
// The kernel request was enqueued directly (bypassing Shell), so the
76-
// RPC reply arrives on IOPub without the Shell's Busy/Idle wrapper.
70+
// Send get_schema through Shell. Shell dispatches a `KernelRequest`
71+
// to the R thread and blocks on `done_rx`, so the IOPub ordering is
72+
// deterministic: Busy -> CommMsg -> Idle (no race).
73+
//
74+
// R is likely already executing an idle task (sleeping for 200ms).
75+
// When it finishes, the priority check at the top of the event loop
76+
// picks up the kernel request before `select` can hand R another
77+
// idle task.
7778
let start = Instant::now();
79+
frontend.send_shell_comm_msg(comm_id.clone(), data);
80+
81+
frontend.recv_iopub_busy();
7882
let msg = frontend.recv_iopub_comm_msg();
7983
let schema_latency = start.elapsed();
84+
frontend.recv_iopub_idle();
8085

8186
assert_eq!(msg.comm_id, comm_id);
8287
let reply: DataExplorerBackendReply = serde_json::from_value(msg.data).unwrap();
@@ -88,12 +93,13 @@ fn test_kernel_request_priority_over_idle_tasks() {
8893
other => panic!("Expected GetSchemaReply, got: {other:?}"),
8994
}
9095

91-
// With the priority fix the kernel request is serviced before any
92-
// sleeping idle task runs, so the reply is near-instant. Without the
93-
// fix, `select` picks randomly between the kernel-request channel and
94-
// the idle-task channel, so at least one 200ms sleeper runs first.
96+
// With the priority fix the kernel request is serviced after at most
97+
// one sleeping idle task (~200ms). Without the fix, `select` picks
98+
// randomly among ready channels, so multiple 200ms sleepers could
99+
// run first (~600ms+). The 350ms threshold is 200ms for one idle
100+
// task plus 150ms headroom for slow CI machines.
95101
assert!(
96-
schema_latency < Duration::from_millis(150),
102+
schema_latency < Duration::from_millis(350),
97103
"get_schema took {schema_latency:?}, which suggests kernel requests \
98104
are being starved by idle tasks"
99105
);

0 commit comments

Comments
 (0)