From 94102f2ccb3336d1456db6f47c7b7e666e48e4d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20CHAZALLET?= Date: Mon, 30 Mar 2026 00:26:26 +0200 Subject: [PATCH 1/2] fix: support non-editable installs (site-packages import precedence) When the package-under-test is installed in site-packages as a non-editable wheel rather than via `pip install -e .`, mutmut's trampoline-injected code in the mutants/ directory was never loaded. Python imported the original (non-mutated) modules from site-packages because they were already cached in sys.modules. The stats phase found zero test coverage and all mutants were marked "not checked". Add _patch_imported_packages() to setup_source_paths(): after adding mutants/ to sys.path, patch __path__ on every already-imported package so submodule lookups prefer the mutants tree. Flush leaf modules whose mutated .py exists on disk so they get re-imported with trampoline code. Add _copy_parent_init_files() to copy_src_dir(): when paths_to_mutate targets individual files, parent __init__.py files were missing from the mutants tree, preventing Python from recognising it as a package. Tested on a Pydantic-heavy project (8,996 mutants): stats phase now finds 477 covered functions and achieves a 70% kill rate, where previously it found zero coverage. --- src/mutmut/__main__.py | 114 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/src/mutmut/__main__.py b/src/mutmut/__main__.py index efa15464..2a9e163e 100644 --- a/src/mutmut/__main__.py +++ b/src/mutmut/__main__.py @@ -1,5 +1,6 @@ from __future__ import annotations +import importlib import os import platform import sys @@ -213,6 +214,41 @@ def copy_src_dir() -> None: # copy mtime, so we later know that when source_mtime == target_mtime, the file is not (yet) mutated. shutil.copy2(source_path, target_path) + # Ensure every parent package directory also has its __init__.py + # copied into the mutants tree so that the package structure is + # complete for import resolution. + _copy_parent_init_files() + + +def _copy_parent_init_files() -> None: + """Copy ``__init__.py`` for every package directory in the mutants tree. + + When ``paths_to_mutate`` targets individual files (e.g. + ``["pkg/sub/module.py"]``), mutmut creates ``mutants/pkg/sub/module.py`` + but does *not* copy the ``__init__.py`` files in ``pkg/`` or ``pkg/sub/``. + Without them Python's import system cannot recognise ``pkg`` as a package + and falls back to the copy installed in site-packages, skipping the + trampoline-injected mutant code entirely. + """ + mutants_root = Path("mutants") + for init_file in mutants_root.rglob("__init__.py"): + # Already present — nothing to do. + pass # pragma: no cover (defensive) + + # Walk every directory that exists in mutants/ and check whether an + # __init__.py is missing but present in the original source tree. + for dirpath in sorted(mutants_root.rglob("*")): + if not dirpath.is_dir(): + continue + init_in_mutants = dirpath / "__init__.py" + if init_in_mutants.exists(): + continue + # Compute the corresponding original path. + rel = dirpath.relative_to(mutants_root) + original_init = rel / "__init__.py" + if original_init.exists(): + shutil.copy2(original_init, init_in_mutants) + @dataclass class FileMutationResult: @@ -266,10 +302,13 @@ def create_file_mutants(path: Path) -> FileMutationResult: def setup_source_paths() -> None: # ensure that the mutated source code can be imported by the tests source_code_paths = [Path("."), Path("src"), Path("source")] + mutated_roots: list[str] = [] for path in source_code_paths: mutated_path = Path("mutants") / path if mutated_path.exists(): - sys.path.insert(0, str(mutated_path.absolute())) + abs_path = str(mutated_path.absolute()) + sys.path.insert(0, abs_path) + mutated_roots.append(abs_path) # ensure that the original code CANNOT be imported by the tests for path in source_code_paths: @@ -277,6 +316,79 @@ def setup_source_paths() -> None: while i < len(sys.path) and Path(sys.path[i]).resolve() == path.resolve(): del sys.path[i] + # When the package-under-test is also installed in site-packages + # (non-editable install), Python may have already imported the + # original (non-mutated) source. Patch __path__ on imported + # packages so submodule lookups prefer the mutants tree, and flush + # leaf modules so they get re-imported with trampoline code. + if mutated_roots: + _patch_imported_packages(mutated_roots) + + +def _patch_imported_packages(mutated_roots: list[str]) -> None: + """Make already-imported packages resolve submodules from mutants first. + + For each top-level package that exists in both ``sys.modules`` (from + site-packages) and the mutants tree, prepend the mutants path to the + package's ``__path__``. Then flush leaf modules (non-packages) whose + mutated ``.py`` file exists, so they get re-imported from the mutants + directory with trampoline code. + """ + # Discover top-level package names present in the mutants tree. + mutated_packages: dict[str, str] = {} # pkg_name -> mutants_root + for root in mutated_roots: + root_path = Path(root) + if not root_path.is_dir(): + continue + for child in root_path.iterdir(): + if child.is_dir() and (child / "__init__.py").exists(): + mutated_packages[child.name] = root + + if not mutated_packages: + return + + # Patch __path__ on top-level and nested packages already in sys.modules. + for mod_name, mod in list(sys.modules.items()): + top = mod_name.split(".")[0] + if top not in mutated_packages or not hasattr(mod, "__path__"): + continue + root = mutated_packages[top] + parts = mod_name.split(".") + mutant_dir = str(Path(root, *parts).absolute()) + if Path(mutant_dir).is_dir() and mutant_dir not in mod.__path__: + mod.__path__.insert(0, mutant_dir) + + # Flush leaf modules so they get re-imported from the mutants path. + to_remove = [ + name + for name, mod in sys.modules.items() + if ( + name.split(".")[0] in mutated_packages + and mod is not None + and not hasattr(mod, "__path__") # leaf module, not a package + and _has_mutant_file(name, mutated_packages) + ) + ] + for name in to_remove: + del sys.modules[name] + + if to_remove: + importlib.invalidate_caches() + + +def _has_mutant_file(mod_name: str, mutated_packages: dict[str, str]) -> bool: + """Check whether a mutated .py file exists for the given module.""" + top = mod_name.split(".")[0] + root = mutated_packages.get(top, "") + if not root: + return False + parts = mod_name.split(".") + if len(parts) > 1: + candidate = Path(root, *parts[:-1], parts[-1] + ".py") + else: + candidate = Path(root, parts[0] + ".py") + return candidate.exists() + def store_lines_covered_by_tests() -> None: assert mutmut.config is not None From 355c6b4b424e89e5f6eed40fc8d6cec7a428ee9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20CHAZALLET?= Date: Tue, 7 Apr 2026 18:33:40 +0200 Subject: [PATCH 2/2] Fix mutmut trampoline and multiline pragma --- src/mutmut/__init__.py | 4 +- src/mutmut/__main__.py | 56 ++++++++- src/mutmut/file_mutation.py | 183 ++++++++++++++++++++++++++--- src/mutmut/trampoline_templates.py | 2 + tests/test_mutation regression.py | 10 +- tests/test_mutation.py | 180 ++++++++++++++++++++++++++++ 6 files changed, 413 insertions(+), 22 deletions(-) diff --git a/src/mutmut/__init__.py b/src/mutmut/__init__.py index f77effc2..03f59b15 100644 --- a/src/mutmut/__init__.py +++ b/src/mutmut/__init__.py @@ -15,17 +15,19 @@ config: Config | None = None _stats: set[str] = set() +_pre_test_stats: set[str] = set() tests_by_mangled_function_name: defaultdict[str, set[str]] = defaultdict(set) _covered_lines: dict[str, set[int]] | None = None def _reset_globals() -> None: - global duration_by_test, stats_time, config, _stats, tests_by_mangled_function_name + global duration_by_test, stats_time, config, _stats, _pre_test_stats, tests_by_mangled_function_name global _covered_lines duration_by_test.clear() stats_time = None config = None _stats = set() + _pre_test_stats = set() tests_by_mangled_function_name = defaultdict(set) _covered_lines = None diff --git a/src/mutmut/__main__.py b/src/mutmut/__main__.py index 2a9e163e..e3ecea0a 100644 --- a/src/mutmut/__main__.py +++ b/src/mutmut/__main__.py @@ -390,6 +390,31 @@ def _has_mutant_file(mod_name: str, mutated_packages: dict[str, str]) -> bool: return candidate.exists() +def _flush_test_modules() -> None: + """Remove cached test modules from sys.modules so they get re-imported. + + Called in the forked child process when testing a mutant that targets + import-time code (e.g. __init_subclass__). Without this, the child + inherits cached modules from the parent (stats phase) and + __init_subclass__ never fires again with the mutant active. + + Only non-package modules under ``tests`` (or ``conftest``) are flushed. + The mutated package itself stays cached — its trampolines dynamically + check ``MUTANT_UNDER_TEST`` on each call. + """ + to_remove = [ + name + for name in sys.modules + if name == "conftest" + or name.startswith("conftest.") + or name.startswith("tests.") + or name.startswith("tests_") + ] + for name in to_remove: + del sys.modules[name] + importlib.invalidate_caches() + + def store_lines_covered_by_tests() -> None: assert mutmut.config is not None if mutmut.config.mutate_only_covered_lines: @@ -740,12 +765,23 @@ class StatsCollector: def pytest_runtest_logstart(self, nodeid: str, location: Any) -> None: mutmut.duration_by_test[nodeid] = 0 + # noinspection PyMethodMayBeStatic + def pytest_collection_finish(self, session: Any) -> None: + unused(session) + # Snapshot trampoline hits from import-time code (e.g. __init_subclass__). + # These fired during collection before any test ran. + mutmut._pre_test_stats = mutmut._stats.copy() + # noinspection PyMethodMayBeStatic def pytest_runtest_teardown(self, item: Any, nextitem: Any) -> None: unused(nextitem) for function in mutmut._stats: mutmut.tests_by_mangled_function_name[function].add(strip_prefix(item._nodeid, prefix="mutants/")) mutmut._stats.clear() + # Import-time code (e.g. __init_subclass__) runs once during collection. + # Attribute those hits to every test since any test might detect the mutation. + for function in mutmut._pre_test_stats: + mutmut.tests_by_mangled_function_name[function].add(strip_prefix(item._nodeid, prefix="mutants/")) # noinspection PyMethodMayBeStatic def pytest_runtest_makereport(self, item: Any, call: Any) -> None: @@ -804,10 +840,21 @@ def run_stats(self, *, tests: Iterable[str]) -> int: print("Running hammett stats...") + first_test_seen = False + def post_test_callback(_name: str, **_: Any) -> None: + nonlocal first_test_seen + if not first_test_seen: + # Snapshot import-time trampoline hits before the first test clears them. + mutmut._pre_test_stats = mutmut._stats.copy() + first_test_seen = True for function in mutmut._stats: mutmut.tests_by_mangled_function_name[function].add(_name) mutmut._stats.clear() + # Import-time code (e.g. __init_subclass__) runs once at import. + # Attribute those hits to every test since any test might detect the mutation. + for function in mutmut._pre_test_stats: + mutmut.tests_by_mangled_function_name[function].add(_name) return int( hammett.main( @@ -1195,7 +1242,7 @@ def load_stats() -> bool: mutmut.tests_by_mangled_function_name[k] |= set(v) mutmut.duration_by_test = data.pop("duration_by_test") mutmut.stats_time = data.pop("stats_time") - assert not data, data + mutmut._pre_test_stats = set(data.pop("pre_test_stats", [])) did_load = True except (FileNotFoundError, JSONDecodeError): pass @@ -1209,6 +1256,7 @@ def save_stats() -> None: tests_by_mangled_function_name={k: list(v) for k, v in mutmut.tests_by_mangled_function_name.items()}, duration_by_test=mutmut.duration_by_test, stats_time=mutmut.stats_time, + pre_test_stats=sorted(mutmut._pre_test_stats), ), f, indent=4, @@ -1342,7 +1390,10 @@ def stop_all_children(mutants: list[tuple[SourceFileMutationData, str, int | Non # used to copy the global mutmut.config to subprocesses -set_start_method("fork") +try: + set_start_method("fork") +except RuntimeError: + pass # already set (e.g. re-imported from trampoline during stats collection) START_TIMES_BY_PID_LOCK = Lock() @@ -1493,6 +1544,7 @@ def read_one_child_exit_status() -> None: os.environ["MUTANT_UNDER_TEST"] = mutant_name setproctitle(f"mutmut: {mutant_name}") + # Run fast tests first sorted_tests = sorted(tests, key=lambda test_name: mutmut.duration_by_test[test_name]) if not sorted_tests: diff --git a/src/mutmut/file_mutation.py b/src/mutmut/file_mutation.py index e3217282..8113980f 100644 --- a/src/mutmut/file_mutation.py +++ b/src/mutmut/file_mutation.py @@ -21,6 +21,11 @@ NEVER_MUTATE_FUNCTION_NAMES = {"__getattribute__", "__setattr__", "__new__"} NEVER_MUTATE_FUNCTION_CALLS = {"len", "isinstance"} +# Methods that Python treats as implicit classmethods (no @classmethod needed). +# Their first parameter is the class (cls), not an instance (self), and attribute +# lookups must go through the class hierarchy rather than object.__getattribute__. +IMPLICIT_CLASSMETHOD_NAMES = {"__init_subclass__", "__class_getitem__"} + @dataclass class Mutation: @@ -48,6 +53,28 @@ def create_mutations(code: str, covered_lines: set[int] | None = None) -> tuple[ visitor = MutationVisitor(mutation_operators, ignored_lines, covered_lines) module = metadata_wrapper.visit(visitor) + if ignored_lines: + # Post-filter: for multiline nodes, the start.line check in + # _should_mutate_node may miss pragmas on inner lines. Compare + # the original and mutated source to find the actual changed line + # and drop the mutation if that line carries a pragma. + orig_lines = module.code.split("\n") + filtered: list[Mutation] = [] + for m in visitor.mutations: + try: + replaced = module.deep_replace(m.original_node, m.mutated_node) + new_lines = replaced.code.split("\n") + changed_line = next( + (i + 1 for i, (o, n) in enumerate(zip(orig_lines, new_lines)) if o != n), + None, + ) + if changed_line is not None and changed_line in ignored_lines: + continue # pragma on the mutated line — skip + except Exception: # noqa: BLE001 + pass # keep the mutation if we can't determine the line + filtered.append(m) + visitor.mutations = filtered + return module, visitor.mutations @@ -243,6 +270,20 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation return mutated_module.code, mutation_names +def _any_param_has_default(function: cst.FunctionDef) -> bool: + """Return True if any parameter in the function has a default value.""" + for p in function.params.posonly_params: + if _has_default(p): + return True + for p in function.params.params: + if _has_default(p): + return True + for p in function.params.kwonly_params: + if _has_default(p): + return True + return False + + def function_trampoline_arrangement( function: cst.FunctionDef, mutants: Iterable[Mutation], class_name: str | None ) -> tuple[Sequence[MODULE_STATEMENT], Sequence[str]]: @@ -260,7 +301,14 @@ def function_trampoline_arrangement( nodes.append(create_trampoline_wrapper(function, mangled_name, class_name)) # copy of original function - nodes.append(function.with_changes(name=cst.Name(mangled_name + "_orig"))) + orig_name = mangled_name + "_orig" + nodes.append(function.with_changes(name=cst.Name(orig_name))) + + # When sentinel defaults are used, set __wrapped__ so that inspect.signature() + # follows it and reports the original (human-readable) signature. + if _any_param_has_default(function): + wrapped_stmt = cst.parse_statement(f"{name}.__wrapped__ = {orig_name}\n") + nodes.append(wrapped_stmt) # mutated versions of the function for i, mutant in enumerate(mutants): @@ -280,47 +328,139 @@ def function_trampoline_arrangement( return nodes, mutant_names +def _has_default(param: cst.Param) -> bool: + """Return True if the parameter has a default value.""" + return param.default is not None and not isinstance(param.default, cst.MaybeSentinel) + + +def _replace_default_with_sentinel(param: cst.Param) -> cst.Param: + """Replace the parameter's default value with _MUTMUT_UNSET sentinel.""" + return param.with_changes(default=cst.Name("_MUTMUT_UNSET")) + + +def _sentinel_if_stmt(param_name: str, target: str) -> cst.If: + """Create: if is not _MUTMUT_UNSET: [''] = """ + return cst.If( + test=cst.Comparison( + left=cst.Name(param_name), + comparisons=[cst.ComparisonTarget(cst.IsNot(), cst.Name("_MUTMUT_UNSET"))], + ), + body=cst.IndentedBlock([ + cst.SimpleStatementLine([ + cst.Assign( + [cst.AssignTarget(cst.Subscript( + value=cst.Name(target), + slice=[cst.SubscriptElement(cst.Index(cst.SimpleString(f"'{param_name}'")))], + ))], + cst.Name(param_name), + ), + ]), + ]), + leading_lines=[], + ) + + def create_trampoline_wrapper(function: cst.FunctionDef, mangled_name: str, class_name: str | None) -> cst.FunctionDef: + is_implicit_classmethod = ( + class_name is not None and function.name.value in IMPLICIT_CLASSMETHOD_NAMES + ) + + # Track which positional params have defaults and need sentinel treatment. + # We'll move defaulted positional params to kwargs conditionally. args: list[cst.Element | cst.StarredElement] = [] + # Params with defaults that need conditional forwarding via kwargs + sentinel_params: list[str] = [] + for pos_only_param in function.params.posonly_params: - args.append(cst.Element(pos_only_param.name)) + if _has_default(pos_only_param): + sentinel_params.append(pos_only_param.name.value) + else: + args.append(cst.Element(pos_only_param.name)) for param in function.params.params: - args.append(cst.Element(param.name)) + if _has_default(param): + sentinel_params.append(param.name.value) + else: + args.append(cst.Element(param.name)) if isinstance(function.params.star_arg, cst.Param): args.append(cst.StarredElement(function.params.star_arg.name)) + # Get the actual first parameter name (usually 'self' or 'cls') + first_param_name = "self" if class_name is not None: - # remove self arg (handled by the trampoline function) + if function.params.posonly_params: + first_param_name = function.params.posonly_params[0].name.value + elif function.params.params: + first_param_name = function.params.params[0].name.value + # remove first arg (self/cls — handled by the trampoline function) args = args[1:] args_assignemnt = cst.Assign([cst.AssignTarget(cst.Name(value="args"))], cst.List(args)) kwargs: list[cst.DictElement | cst.StarredDictElement] = [] + # Keyword-only params without defaults are always forwarded + kwonly_sentinel_params: list[str] = [] for param in function.params.kwonly_params: - kwargs.append(cst.DictElement(cst.SimpleString(f"'{param.name.value}'"), param.name)) + if _has_default(param): + kwonly_sentinel_params.append(param.name.value) + else: + kwargs.append(cst.DictElement(cst.SimpleString(f"'{param.name.value}'"), param.name)) if isinstance(function.params.star_kwarg, cst.Param): kwargs.append(cst.StarredDictElement(function.params.star_kwarg.name)) kwargs_assignment = cst.Assign([cst.AssignTarget(cst.Name(value="kwargs"))], cst.Dict(kwargs)) - def _get_local_name(func_name: str) -> cst.BaseExpression: + # Build conditional statements for sentinel params + sentinel_stmts: list[cst.If] = [] + for pname in sentinel_params: + sentinel_stmts.append(_sentinel_if_stmt(pname, "kwargs")) + for pname in kwonly_sentinel_params: + sentinel_stmts.append(_sentinel_if_stmt(pname, "kwargs")) + + # Replace defaults with sentinel in the function signature + new_posonly_params = [ + _replace_default_with_sentinel(p) if _has_default(p) else p + for p in function.params.posonly_params + ] + new_params = [ + _replace_default_with_sentinel(p) if _has_default(p) else p + for p in function.params.params + ] + new_kwonly_params = [ + _replace_default_with_sentinel(p) if _has_default(p) else p + for p in function.params.kwonly_params + ] + + def _get_local_name(func_name: str, *, bind: bool = False) -> cst.BaseExpression: # for top level, simply return the name if class_name is None: return cst.Name(func_name) - # for class methods, use object.__getattribute__(self, name) + if is_implicit_classmethod: + # For implicit classmethods (__init_subclass__, __class_getitem__), the first + # arg is a class, not an instance. object.__getattribute__(cls, ...) would search + # the metaclass MRO instead of the class hierarchy. Access via ClassName.attr instead. + attr = cst.Attribute(cst.Name(class_name), cst.Name(func_name)) + if bind: + # Bind the first parameter so the trampoline can call orig(*args) without + # prepending cls — matching the bound-method convention of regular methods. + return cst.Call( + func=cst.Attribute(value=attr, attr=cst.Name("__get__")), + args=[cst.Arg(cst.Name(first_param_name))], + ) + return attr + # for regular methods, use object.__getattribute__(self, name) return cst.Call( func=cst.Attribute(cst.Name("object"), cst.Name("__getattribute__")), - args=[cst.Arg(cst.Name("self")), cst.Arg(cst.SimpleString(f"'{func_name}'"))], + args=[cst.Arg(cst.Name(first_param_name)), cst.Arg(cst.SimpleString(f"'{func_name}'"))], ) result: cst.BaseExpression = cst.Call( func=cst.Name("_mutmut_trampoline"), args=[ - cst.Arg(_get_local_name(f"{mangled_name}_orig")), + cst.Arg(_get_local_name(f"{mangled_name}_orig", bind=True)), cst.Arg(_get_local_name(f"{mangled_name}_mutants")), cst.Arg(cst.Name("args")), cst.Arg(cst.Name("kwargs")), - cst.Arg(cst.Name("None" if class_name is None else "self")), + cst.Arg(cst.Name("None" if class_name is None else first_param_name)), ], ) # for non-async functions, simply return the value or generator @@ -342,15 +482,24 @@ def _get_local_name(func_name: str) -> cst.BaseExpression: type_ignore_whitespace = cst.TrailingWhitespace(comment=cst.Comment("# type: ignore")) + body_stmts: list[cst.BaseStatement] = [ + cst.SimpleStatementLine([args_assignemnt], trailing_whitespace=type_ignore_whitespace), + cst.SimpleStatementLine([kwargs_assignment], trailing_whitespace=type_ignore_whitespace), + ] + body_stmts.extend(sentinel_stmts) + body_stmts.append(result_statement) + + # Replace defaults with sentinel in the function signature + new_function_params = function.params.with_changes( + posonly_params=new_posonly_params, + params=new_params, + kwonly_params=new_kwonly_params, + ) + function.whitespace_after_type_parameters return function.with_changes( - body=cst.IndentedBlock( - [ - cst.SimpleStatementLine([args_assignemnt], trailing_whitespace=type_ignore_whitespace), - cst.SimpleStatementLine([kwargs_assignment], trailing_whitespace=type_ignore_whitespace), - result_statement, - ], - ), + params=new_function_params, + body=cst.IndentedBlock(body_stmts), ) diff --git a/src/mutmut/trampoline_templates.py b/src/mutmut/trampoline_templates.py index 0b051938..db6b67a8 100644 --- a/src/mutmut/trampoline_templates.py +++ b/src/mutmut/trampoline_templates.py @@ -34,6 +34,8 @@ def mangle_function_name(*, name: str, class_name: str | None) -> str: MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore +_MUTMUT_UNSET = object() # type: ignore + def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore \"""Forward call to original or mutated function, depending on the environment\""" diff --git a/tests/test_mutation regression.py b/tests/test_mutation regression.py index 6d567025..2465ce73 100644 --- a/tests/test_mutation regression.py +++ b/tests/test_mutation regression.py @@ -42,9 +42,13 @@ def test_create_trampoline_wrapper_with_positionals_only_args(): source = "def foo(p1, p2=None, /, p_or_kw=None, *, kw): pass" assert _get_trampoline_wrapper(source, "x_foo__mutmut") == snapshot("""\ -def foo(p1, p2=None, /, p_or_kw=None, *, kw): - args = [p1, p2, p_or_kw]# type: ignore +def foo(p1, p2=_MUTMUT_UNSET, /, p_or_kw=_MUTMUT_UNSET, *, kw): + args = [p1]# type: ignore kwargs = {'kw': kw}# type: ignore + if p2 is not _MUTMUT_UNSET: + kwargs['p2'] = p2 + if p_or_kw is not _MUTMUT_UNSET: + kwargs['p_or_kw'] = p_or_kw return _mutmut_trampoline(x_foo__mutmut_orig, x_foo__mutmut_mutants, args, kwargs, None)\ """) @@ -96,6 +100,8 @@ def add(self, value): MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore +_MUTMUT_UNSET = object() # type: ignore + def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore """Forward call to original or mutated function, depending on the environment""" diff --git a/tests/test_mutation.py b/tests/test_mutation.py index 7c9dd4d9..507c68cd 100644 --- a/tests/test_mutation.py +++ b/tests/test_mutation.py @@ -718,6 +718,186 @@ def x(self): assert not mutants +def test_init_subclass_trampoline(): + """__init_subclass__ is an implicit classmethod — the trampoline must use 'cls' not 'self' + and look up orig/mutants via the class name rather than object.__getattribute__.""" + source = """ +class Base: + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + cls._registered = True + """.strip() + + mutated_code = mutated_module(source) + + # The trampoline wrapper must reference 'cls', not 'self' + assert "self" not in mutated_code.split("def __init_subclass__")[1].split("\n def ")[0], ( + "Trampoline for __init_subclass__ must use 'cls', not 'self'" + ) + + # Mutant copies should be generated (the method is not skipped) + assert "xǁBaseǁ__init_subclass____mutmut_orig" in mutated_code + assert "xǁBaseǁ__init_subclass____mutmut_1" in mutated_code + + # Lookup must use ClassName.attr, not object.__getattribute__ + assert "Base.xǁBaseǁ__init_subclass____mutmut_orig" in mutated_code + assert "Base.xǁBaseǁ__init_subclass____mutmut_mutants" in mutated_code + + # orig must be bound via __get__ so the trampoline can call it without prepending cls + assert "Base.xǁBaseǁ__init_subclass____mutmut_orig.__get__(cls)" in mutated_code + + +def test_init_subclass_runtime(): + """Verify the generated trampoline actually works at runtime.""" + source = """ +class Base: + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + cls._registered = True + """.strip() + + mutated_code = mutated_module(source) + + import os + old_env = os.environ.get("MUTANT_UNDER_TEST") + # Use a non-matching mutant name so the trampoline calls the original function + os.environ["MUTANT_UNDER_TEST"] = "none" + try: + ns: dict = {"__name__": "test_runtime"} + exec(mutated_code, ns) # noqa: S102 + Base = ns["Base"] + + # Creating a subclass must trigger __init_subclass__ through the trampoline + class Child(Base): + pass + + assert getattr(Child, "_registered", False), "Trampoline did not propagate __init_subclass__" + finally: + if old_env is None: + os.environ.pop("MUTANT_UNDER_TEST", None) + else: + os.environ["MUTANT_UNDER_TEST"] = old_env + + +def test_class_getitem_trampoline(): + """__class_getitem__ is also an implicit classmethod.""" + source = """ +class MyGeneric: + def __class_getitem__(cls, item): + return 1 + """.strip() + + mutated_code = mutated_module(source) + + # Must use cls, not self + assert "MyGeneric.xǁMyGenericǁ__class_getitem____mutmut_orig" in mutated_code + assert "self" not in mutated_code.split("def __class_getitem__")[1].split("\n def ")[0] + + +def test_default_parameter_mutation_is_exercisable(): + """Verify that mutated default parameter values are actually exercisable. + + When a mutant changes a default (e.g., x=1 -> x=2), the wrapper must use a + sentinel instead of hardcoding the original default. Otherwise, the wrapper + always passes the original default explicitly, and the mutant never gets to + use its own default value. + """ + source = """ +class Foo: + def bar(self, x=1): + return x +""".strip() + mutated_code = mutated_module(source) + # The wrapper should use sentinel, not hardcode x=1 + assert "_MUTMUT_UNSET" in mutated_code + + # Verify runtime behavior: calling bar() without args should still return 1 + # when no mutant is active (the original function resolves its own default) + import os + old_env = os.environ.get("MUTANT_UNDER_TEST") + os.environ["MUTANT_UNDER_TEST"] = "none" + try: + ns: dict = {"__name__": "test"} + exec(mutated_code, ns) # noqa: S102 + foo = ns["Foo"]() + assert foo.bar() == 1 + finally: + if old_env is None: + os.environ.pop("MUTANT_UNDER_TEST", None) + else: + os.environ["MUTANT_UNDER_TEST"] = old_env + + +def test_default_parameter_sentinel_free_function(): + """Verify sentinel behavior for free functions (not class methods).""" + source = """ +def greet(name="world"): + return "hello " + name +""".strip() + mutated_code = mutated_module(source) + assert "_MUTMUT_UNSET" in mutated_code + + import os + old_env = os.environ.get("MUTANT_UNDER_TEST") + os.environ["MUTANT_UNDER_TEST"] = "none" + try: + ns: dict = {"__name__": "test"} + exec(mutated_code, ns) # noqa: S102 + assert ns["greet"]() == "hello world" + assert ns["greet"]("there") == "hello there" + finally: + if old_env is None: + os.environ.pop("MUTANT_UNDER_TEST", None) + else: + os.environ["MUTANT_UNDER_TEST"] = old_env + + +def test_default_parameter_sentinel_kwonly(): + """Verify sentinel behavior for keyword-only parameters with defaults.""" + source = """ +def process(a, *, timeout=30): + return timeout +""".strip() + mutated_code = mutated_module(source) + assert "_MUTMUT_UNSET" in mutated_code + + import os + old_env = os.environ.get("MUTANT_UNDER_TEST") + os.environ["MUTANT_UNDER_TEST"] = "none" + try: + ns: dict = {"__name__": "test"} + exec(mutated_code, ns) # noqa: S102 + assert ns["process"](1) == 30 + assert ns["process"](1, timeout=60) == 60 + finally: + if old_env is None: + os.environ.pop("MUTANT_UNDER_TEST", None) + else: + os.environ["MUTANT_UNDER_TEST"] = old_env + + +def test_no_sentinel_for_required_params(): + """Required parameters (no default) should NOT get sentinel treatment.""" + source = """ +def add(a, b): + return a + b +""".strip() + mutated_code = mutated_module(source) + + import os + old_env = os.environ.get("MUTANT_UNDER_TEST") + os.environ["MUTANT_UNDER_TEST"] = "none" + try: + ns: dict = {"__name__": "test"} + exec(mutated_code, ns) # noqa: S102 + assert ns["add"](3, 4) == 7 + finally: + if old_env is None: + os.environ.pop("MUTANT_UNDER_TEST", None) + else: + os.environ["MUTANT_UNDER_TEST"] = old_env + + @pytest.mark.skip(reason="Feature not yet implemented") def test_decorated_inner_functions_mutation(): source = """