Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to this project will be documented here.
- Increased default settings job timeout from 600s to 1200s (#707)
- Disable pytest cacheprovider to avoid creating .pytest_cache in isolated runs (#709)
- Fixed Python tester to correctly report marks when `markus_marks_earned` equals total or zero (#716)
- Fix worker orphan processes and add server-side max test timeout cap (#710)

## [v2.9.0]
- Install stack with GHCup (#626)
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ supervisor_url: # url used by the supervisor process. default is: '127.0.0.1:900

worker_log_dir: # an absolute path to a directory containing the worker's stdout and stderr logs.

max_test_timeout: # maximum number of seconds a single test is allowed to run before being killed.
# When set, any per-test timeout exceeding this value is capped to it, and tests
# with no timeout default to this value. default is: 3600

rlimit_settings: # RLIMIT settings (see details below)
nproc: # for example, this setting sets the hard and soft limits for the number of processes available to 300
- 300
Expand Down
48 changes: 40 additions & 8 deletions server/autotest_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import shutil
import time
import json
import subprocess
import signal
import subprocess
import socket
import getpass
import requests
Expand Down Expand Up @@ -101,6 +101,31 @@ def _kill_user_processes(test_username: str) -> None:
subprocess.run(kill_cmd, shell=True)


def _kill_pgid_children(proc: subprocess.Popen) -> None:
"""
Kill all processes in our process group except the current (worker) process.
Tests may spawn subprocesses (e.g. CSC209) that share our group;
proc.kill() alone would leave those running.
Falls back to proc.kill() when /proc is unavailable (non-Linux).
"""
worker_pid = os.getpid()
worker_pgid = os.getpgid(worker_pid)
try:
for entry in os.scandir("/proc"):
if not entry.name.isdigit():
continue
pid = int(entry.name)
if pid == worker_pid:
continue
try:
if os.getpgid(pid) == worker_pgid:
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
pass
except FileNotFoundError:
proc.kill()


def _create_test_script_command(tester_type: str) -> str:
"""
Return string representing a command line command to
Expand Down Expand Up @@ -218,13 +243,18 @@ def _run_test_specs(
out, err = "", ""
timeout_expired = None
timeout = test_data.get("timeout")
max_timeout = config.get("max_test_timeout")
if max_timeout is not None:
if timeout is None:
timeout = max_timeout
else:
timeout = min(timeout, max_timeout)
try:
env = settings.get("_env", {})
env_vars = {**os.environ, **_get_env_vars(test_username), **env}
env_vars = _update_env_vars(env_vars, test_env_vars)
proc = subprocess.Popen(
args,
start_new_session=True,
cwd=tests_path,
shell=True,
stdout=subprocess.PIPE,
Expand All @@ -238,14 +268,16 @@ def _run_test_specs(
settings_json = json.dumps({**settings, "test_data": test_data})
out, err = proc.communicate(input=settings_json, timeout=timeout)
except subprocess.TimeoutExpired:
if test_username == getpass.getuser():
pgrp = os.getpgid(proc.pid)
os.killpg(pgrp, signal.SIGKILL)
else:
if test_username != getpass.getuser():
_kill_user_processes(test_username)
else:
_kill_pgid_children(proc)
proc.wait()
out, err = proc.communicate()
if err == "Killed\n": # Default message from shell
test_group_name = test_data.get("extra_info", {}).get("name", "").strip()
test_group_name = test_data.get("extra_info", {}).get("name", "").strip()
if err == "Killed\n" or (not err and proc.returncode is not None and proc.returncode != 0):
# err can be "Killed\n" (shell default) or empty (SIGKILL/OOM silent crash).
# Check proc.returncode to reliably detect both cases.
if test_group_name:
err = f"Tests for {test_group_name} did not complete within time limit ({timeout}s)\n"
else:
Expand Down
1 change: 1 addition & 0 deletions server/autotest_server/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ worker_log_dir: !ENV ${WORKER_LOG_DIR}
default_remote_url: https://polymouth.teach.cs.toronto.edu:443/chat
remote_url_whitelist:
- https://polymouth.teach.cs.toronto.edu:443/chat
max_test_timeout: 3600
Comment thread
Naragod marked this conversation as resolved.
workers:
- user: !ENV ${USER}
queues:
Expand Down
4 changes: 4 additions & 0 deletions server/autotest_server/settings_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"rlimit_settings": {
"type": "object"
},
"max_test_timeout": {
"type": "integer",
"minimum": 1
},
"workers": {
"type": "array",
"minItems": 1,
Expand Down
271 changes: 271 additions & 0 deletions server/autotest_server/tests/test_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
import signal
import subprocess
import unittest
from unittest.mock import patch, MagicMock, ANY

import autotest_server

_UNSET = object()


class TestMaxTestTimeout(unittest.TestCase):
"""Tests for max_test_timeout configuration enforcement in _run_test_specs."""

@staticmethod
def _make_settings(timeout=_UNSET):
test_data = {"category": ["unit"], "extra_info": {"name": "test group"}}
if timeout is not _UNSET:
test_data["timeout"] = timeout
return {"testers": [{"tester_type": "py", "test_data": [test_data]}]}

def _run_and_get_proc(self, test_settings, max_test_timeout=_UNSET):
"""Run _run_test_specs with mocked dependencies, return mock proc."""
mock_proc = MagicMock()
mock_proc.communicate.return_value = ("{}", "")

test_config = {}
if max_test_timeout is not _UNSET:
test_config["max_test_timeout"] = max_test_timeout

with patch("autotest_server._create_test_script_command", return_value="echo test"), patch(
"autotest_server._get_env_vars", return_value={}
), patch("autotest_server._update_env_vars", side_effect=lambda b, t: {**b, **t}), patch(
"autotest_server.subprocess.Popen", return_value=mock_proc
), patch(
"autotest_server._get_feedback", return_value=([], [])
), patch.object(
autotest_server, "config", test_config
):
autotest_server._run_test_specs(
cmd="echo {}",
test_settings=test_settings,
categories=["unit"],
tests_path="/tmp/test",
test_username="testuser",
test_id=1,
test_env_vars={},
)
return mock_proc

def test_timeout_capped_when_exceeds_max(self):
mock_proc = self._run_and_get_proc(self._make_settings(timeout=600), max_test_timeout=300)
mock_proc.communicate.assert_called_once_with(input=ANY, timeout=300)

def test_timeout_unchanged_when_below_max(self):
mock_proc = self._run_and_get_proc(self._make_settings(timeout=60), max_test_timeout=300)
mock_proc.communicate.assert_called_once_with(input=ANY, timeout=60)

def test_timeout_defaults_to_max_when_unset(self):
mock_proc = self._run_and_get_proc(self._make_settings(), max_test_timeout=300)
mock_proc.communicate.assert_called_once_with(input=ANY, timeout=300)

def test_timeout_unchanged_when_max_not_configured(self):
mock_proc = self._run_and_get_proc(self._make_settings(timeout=600))
mock_proc.communicate.assert_called_once_with(input=ANY, timeout=600)

def test_timeout_none_passes_through_when_max_not_configured(self):
mock_proc = self._run_and_get_proc(self._make_settings())
mock_proc.communicate.assert_called_once_with(input=ANY, timeout=None)


class TestTimeoutKillHandler(unittest.TestCase):
"""Tests for timeout kill handler after removing start_new_session."""

@staticmethod
def _make_settings():
return {
"testers": [
{
"tester_type": "py",
"test_data": [{"category": ["unit"], "timeout": 30, "extra_info": {"name": "test group"}}],
}
]
}

def _run_with_timeout(self, test_username, current_user):
"""Run _run_test_specs where proc.communicate raises TimeoutExpired.

Returns (results, mock_proc, mock_kill_user).
"""
mock_proc = MagicMock()
mock_proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="test", timeout=30),
("", "Killed\n"),
]

with patch("autotest_server._create_test_script_command", return_value="echo test"), patch(
"autotest_server._get_env_vars", return_value={}
), patch("autotest_server._update_env_vars", side_effect=lambda b, t: {**b, **t}), patch(
"autotest_server.subprocess.Popen", return_value=mock_proc
), patch(
"autotest_server._get_feedback", return_value=([], [])
), patch(
"autotest_server.getpass.getuser", return_value=current_user
), patch(
"autotest_server._kill_user_processes"
) as mock_kill_user, patch.object(
autotest_server, "config", {"max_test_timeout": 30}
):
results = autotest_server._run_test_specs(
cmd="echo {}",
test_settings=self._make_settings(),
categories=["unit"],
tests_path="/tmp/test",
test_username=test_username,
test_id=1,
test_env_vars={},
)
return results, mock_proc, mock_kill_user

def test_kills_user_processes_for_different_user(self):
"""Production path: test_username != current user.
On tomlin, workers run as 'autotest' and tests run as 'autotst0'.
"""
results, mock_proc, mock_kill_user = self._run_with_timeout("autotst0", "autotest")
mock_kill_user.assert_called_once_with("autotst0")
mock_proc.kill.assert_not_called()
self.assertIn("did not complete within time limit", results[0]["stderr"])

def test_kills_pgid_children_for_same_user(self):
"""Dev/local path: kills all processes in our pgid except the worker."""
worker_pid = 100
child_pid = 200
grandchild_pid = 201
unrelated_pid = 300
worker_pgid = 50

mock_proc = MagicMock()
mock_proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="test", timeout=30),
("", "Killed\n"),
]

def make_entry(name):
e = MagicMock()
e.name = name
return e

mock_entries = [
make_entry(str(child_pid)),
make_entry(str(grandchild_pid)),
make_entry(str(unrelated_pid)),
make_entry(str(worker_pid)),
make_entry("self"),
]

def mock_getpgid(pid):
if pid in (worker_pid, child_pid, grandchild_pid):
return worker_pgid
return 999 # different group

with patch("autotest_server._create_test_script_command", return_value="echo test"), patch(
"autotest_server._get_env_vars", return_value={}
), patch("autotest_server._update_env_vars", side_effect=lambda b, t: {**b, **t}), patch(
"autotest_server.subprocess.Popen", return_value=mock_proc
), patch(
"autotest_server._get_feedback", return_value=([], [])
), patch(
"autotest_server.getpass.getuser", return_value="autotest"
), patch(
"autotest_server._kill_user_processes"
) as mock_kill_user, patch(
"autotest_server.os.getpid", return_value=worker_pid
), patch(
"autotest_server.os.getpgid", side_effect=mock_getpgid
), patch(
"autotest_server.os.scandir", return_value=mock_entries
), patch(
"autotest_server.os.kill"
) as mock_kill, patch.object(
autotest_server, "config", {"max_test_timeout": 30}
):
results = autotest_server._run_test_specs(
cmd="echo {}",
test_settings=self._make_settings(),
categories=["unit"],
tests_path="/tmp/test",
test_username="autotest",
test_id=1,
test_env_vars={},
)

# Should kill child and grandchild, not worker or unrelated
mock_kill.assert_any_call(child_pid, signal.SIGKILL)
mock_kill.assert_any_call(grandchild_pid, signal.SIGKILL)
self.assertEqual(mock_kill.call_count, 2)
mock_proc.wait.assert_called_once()
mock_kill_user.assert_not_called()
self.assertIn("did not complete within time limit", results[0]["stderr"])

def test_fallback_to_proc_kill_without_proc_fs(self):
"""Non-Linux fallback: /proc missing, falls back to proc.kill()."""
mock_proc = MagicMock()
mock_proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="test", timeout=30),
("", "Killed\n"),
]

with patch("autotest_server._create_test_script_command", return_value="echo test"), patch(
"autotest_server._get_env_vars", return_value={}
), patch("autotest_server._update_env_vars", side_effect=lambda b, t: {**b, **t}), patch(
"autotest_server.subprocess.Popen", return_value=mock_proc
), patch(
"autotest_server._get_feedback", return_value=([], [])
), patch(
"autotest_server.getpass.getuser", return_value="autotest"
), patch(
"autotest_server._kill_user_processes"
) as mock_kill_user, patch(
"autotest_server.os.scandir", side_effect=FileNotFoundError
), patch.object(
autotest_server, "config", {"max_test_timeout": 30}
):
results = autotest_server._run_test_specs(
cmd="echo {}",
test_settings=self._make_settings(),
categories=["unit"],
tests_path="/tmp/test",
test_username="autotest",
test_id=1,
test_env_vars={},
)

mock_proc.kill.assert_called_once()
mock_proc.wait.assert_called_once()
mock_kill_user.assert_not_called()
self.assertIn("did not complete within time limit", results[0]["stderr"])

def test_detects_silent_crash_via_returncode(self):
"""SIGKILL/OOM kill: err is empty but proc.returncode is non-zero."""
mock_proc = MagicMock()
mock_proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="test", timeout=30),
("", ""),
]
mock_proc.returncode = -9 # SIGKILL

with patch("autotest_server._create_test_script_command", return_value="echo test"), patch(
"autotest_server._get_env_vars", return_value={}
), patch("autotest_server._update_env_vars", side_effect=lambda b, t: {**b, **t}), patch(
"autotest_server.subprocess.Popen", return_value=mock_proc
), patch(
"autotest_server._get_feedback", return_value=([], [])
), patch(
"autotest_server.getpass.getuser", return_value="autotest"
), patch(
"autotest_server._kill_user_processes"
), patch(
"autotest_server.os.scandir", side_effect=FileNotFoundError
), patch.object(
autotest_server, "config", {"max_test_timeout": 30}
):
results = autotest_server._run_test_specs(
cmd="echo {}",
test_settings=self._make_settings(),
categories=["unit"],
tests_path="/tmp/test",
test_username="autotest",
test_id=1,
test_env_vars={},
)
self.assertIn("did not complete within time limit", results[0]["stderr"])
Loading