From c611a36b6c908753131cc6eba416c1f3772922d6 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Mon, 4 May 2026 04:58:08 -0700 Subject: [PATCH 1/5] test: add unit tests for GCG core algorithm components Add 26 new unit tests covering: - get_filtered_cands: filtering, clamping, padding behavior - target_loss / control_loss: shape, finiteness, loss ordering - sample_control: shape, vocab bounds, single-position changes, non-ASCII filtering - _build_params: ConfigDict construction from kwargs - _apply_target_augmentation: length preservation, modification, seed reproducibility - _create_attack: transfer flag routing (Progressive vs Individual) - Embedding helpers: error handling for unknown model types - PromptManager init: validation of goals/targets - EvaluateAttack init: worker count validation Total GCG test count: 24 -> 50 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../auxiliary_attacks/gcg/test_gcg_core.py | 528 ++++++++++++++++++ 1 file changed, 528 insertions(+) create mode 100644 tests/unit/auxiliary_attacks/gcg/test_gcg_core.py diff --git a/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py b/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py new file mode 100644 index 0000000000..d21074e1c2 --- /dev/null +++ b/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py @@ -0,0 +1,528 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +attack_manager_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.attack.base.attack_manager", + reason="GCG optional dependencies (torch, mlflow, etc.) not installed", +) +torch = pytest.importorskip("torch", reason="torch not installed") + +MultiPromptAttack = attack_manager_mod.MultiPromptAttack +get_embedding_layer = attack_manager_mod.get_embedding_layer +get_embedding_matrix = attack_manager_mod.get_embedding_matrix +get_embeddings = attack_manager_mod.get_embeddings + +gcg_attack_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.attack.gcg.gcg_attack", + reason="GCG optional dependencies not installed", +) +GCGPromptManager = gcg_attack_mod.GCGPromptManager +token_gradients = gcg_attack_mod.token_gradients + + +class TestGetFilteredCands: + """Tests for MultiPromptAttack.get_filtered_cands.""" + + def _make_attack_with_worker(self, *, vocab_size: int = 100) -> tuple: + """Create a minimal MultiPromptAttack with a mocked worker for get_filtered_cands.""" + attack = object.__new__(MultiPromptAttack) + mock_worker = MagicMock() + mock_worker.tokenizer.vocab_size = vocab_size + # Mock decode to return a simple string representation + mock_worker.tokenizer.decode.side_effect = lambda ids, **kwargs: "tok_" + "_".join(str(t) for t in ids.tolist()) + # Mock tokenizer call to return input_ids matching the length of input + mock_worker.tokenizer.side_effect = lambda text, **kwargs: MagicMock( + input_ids=list(range(len(text.split("_")) - 1)) + ) + # "!" token maps to id 0 + mock_worker.tokenizer.__call__ = mock_worker.tokenizer.side_effect + first_call = MagicMock() + first_call.input_ids = [0] + mock_worker.tokenizer.return_value = first_call + attack.workers = [mock_worker] + return attack, mock_worker + + def test_returns_list_of_strings(self) -> None: + """get_filtered_cands should return a list of decoded strings.""" + attack, worker = self._make_attack_with_worker() + # Simple decode: each row -> "tok_X_Y" + worker.tokenizer.decode.side_effect = lambda ids, **kwargs: f"ctrl_{ids[0]}" + worker.tokenizer.side_effect = lambda text, **kwargs: MagicMock(input_ids=[0]) + + cands = torch.tensor([[5], [6], [7]]) + result = attack.get_filtered_cands(0, cands, filter_cand=False) + assert isinstance(result, list) + assert len(result) == 3 + assert all(isinstance(s, str) for s in result) + + def test_filter_cand_false_returns_all(self) -> None: + """With filter_cand=False, all candidates should be returned.""" + attack, worker = self._make_attack_with_worker() + worker.tokenizer.decode.side_effect = lambda ids, **kwargs: f"ctrl_{ids[0]}" + # Reset side_effect so return_value is used for tokenizer("!") call + worker.tokenizer.side_effect = None + worker.tokenizer.return_value = MagicMock(input_ids=[0]) + + cands = torch.tensor([[5], [6], [7]]) + result = attack.get_filtered_cands(0, cands, filter_cand=False) + assert len(result) == 3 + + def test_clamps_out_of_vocab_tokens(self) -> None: + """Tokens above vocab_size should be replaced.""" + attack, worker = self._make_attack_with_worker(vocab_size=10) + worker.tokenizer.decode.side_effect = lambda ids, **kwargs: f"ctrl_{ids[0]}" + worker.tokenizer.side_effect = lambda text, **kwargs: MagicMock(input_ids=[0]) + + cands = torch.tensor([[5], [15], [7]]) # 15 > vocab_size=10 + attack.get_filtered_cands(0, cands, filter_cand=False) + # After clamping, the out-of-range token should have been replaced + assert cands[1][0].item() != 15 + + def test_filter_cand_true_pads_to_batch_size(self) -> None: + """With filter_cand=True, result should be padded to match input batch size.""" + attack, worker = self._make_attack_with_worker() + # Make all candidates decode to the same as curr_control so they get filtered out + worker.tokenizer.decode.side_effect = lambda ids, **kwargs: "same_control" + worker.tokenizer.side_effect = lambda text, **kwargs: MagicMock(input_ids=[0]) + + # But make the last one different + decode_results = ["same_control", "same_control", "different"] + call_count = [0] + + def decode_fn(ids, **kwargs): + idx = min(call_count[0], len(decode_results) - 1) + call_count[0] += 1 + return decode_results[idx] + + worker.tokenizer.decode.side_effect = decode_fn + worker.tokenizer.side_effect = lambda text, **kwargs: MagicMock(input_ids=[0]) + + cands = torch.tensor([[1], [2], [3]]) + result = attack.get_filtered_cands(0, cands, filter_cand=True, curr_control="same_control") + # Should always return exactly len(cands) results + assert len(result) == 3 + + +class TestTargetAndControlLoss: + """Tests for AttackPrompt.target_loss and control_loss.""" + + def test_target_loss_returns_correct_shape(self) -> None: + """target_loss should return tensor of shape (batch, target_len).""" + AttackPrompt = attack_manager_mod.AttackPrompt + prompt = object.__new__(AttackPrompt) + prompt._target_slice = slice(5, 8) # 3 target tokens + + batch_size = 4 + seq_len = 10 + vocab_size = 50 + logits = torch.randn(batch_size, seq_len, vocab_size) + ids = torch.randint(0, vocab_size, (batch_size, seq_len)) + + loss = prompt.target_loss(logits, ids) + assert loss.shape == (batch_size, 3) + + def test_target_loss_is_finite(self) -> None: + """target_loss should always return finite values.""" + AttackPrompt = attack_manager_mod.AttackPrompt + prompt = object.__new__(AttackPrompt) + prompt._target_slice = slice(3, 6) + + logits = torch.randn(2, 8, 30) + ids = torch.randint(0, 30, (2, 8)) + + loss = prompt.target_loss(logits, ids) + assert torch.isfinite(loss).all() + + def test_control_loss_returns_correct_shape(self) -> None: + """control_loss should return tensor of shape (batch, control_len).""" + AttackPrompt = attack_manager_mod.AttackPrompt + prompt = object.__new__(AttackPrompt) + prompt._control_slice = slice(2, 5) # 3 control tokens + + batch_size = 4 + seq_len = 10 + vocab_size = 50 + logits = torch.randn(batch_size, seq_len, vocab_size) + ids = torch.randint(0, vocab_size, (batch_size, seq_len)) + + loss = prompt.control_loss(logits, ids) + assert loss.shape == (batch_size, 3) + + def test_control_loss_is_finite(self) -> None: + """control_loss should always return finite values.""" + AttackPrompt = attack_manager_mod.AttackPrompt + prompt = object.__new__(AttackPrompt) + prompt._control_slice = slice(2, 5) + + logits = torch.randn(2, 8, 30) + ids = torch.randint(0, 30, (2, 8)) + + loss = prompt.control_loss(logits, ids) + assert torch.isfinite(loss).all() + + def test_target_loss_higher_for_wrong_predictions(self) -> None: + """Loss should be higher when logits don't predict the correct target tokens.""" + AttackPrompt = attack_manager_mod.AttackPrompt + prompt = object.__new__(AttackPrompt) + prompt._target_slice = slice(3, 5) + + vocab_size = 10 + ids = torch.zeros(1, 6, dtype=torch.long) + ids[0, 3] = 2 + ids[0, 4] = 3 + + # Logits that perfectly predict the target + good_logits = torch.full((1, 6, vocab_size), -10.0) + good_logits[0, 2, 2] = 10.0 # predicts token 2 at position 3 + good_logits[0, 3, 3] = 10.0 # predicts token 3 at position 4 + + # Logits that predict wrong tokens + bad_logits = torch.full((1, 6, vocab_size), -10.0) + bad_logits[0, 2, 7] = 10.0 # predicts wrong token + bad_logits[0, 3, 8] = 10.0 # predicts wrong token + + good_loss = prompt.target_loss(good_logits, ids).mean() + bad_loss = prompt.target_loss(bad_logits, ids).mean() + assert bad_loss > good_loss + + +class TestSampleControl: + """Tests for GCGPromptManager.sample_control.""" + + def _make_prompt_manager(self, *, n_control_tokens: int = 5, vocab_size: int = 50) -> GCGPromptManager: + """Create a minimal GCGPromptManager with stubbed internals for sample_control testing.""" + pm = object.__new__(GCGPromptManager) + pm._nonascii_toks = torch.tensor([]) + # Simulate control_toks property + pm._prompts = [MagicMock()] + pm._prompts[0].control_toks = torch.randint(0, vocab_size, (n_control_tokens,)) + return pm + + def test_returns_correct_shape(self) -> None: + """sample_control should return (batch_size, n_control_tokens) tensor.""" + n_control = 5 + vocab_size = 50 + batch_size = 16 + pm = self._make_prompt_manager(n_control_tokens=n_control, vocab_size=vocab_size) + + grad = torch.randn(n_control, vocab_size) + result = pm.sample_control(grad, batch_size, topk=10) + assert result.shape == (batch_size, n_control) + + def test_output_tokens_within_vocab(self) -> None: + """All sampled tokens should be within vocabulary range.""" + n_control = 5 + vocab_size = 50 + batch_size = 32 + pm = self._make_prompt_manager(n_control_tokens=n_control, vocab_size=vocab_size) + + grad = torch.randn(n_control, vocab_size) + result = pm.sample_control(grad, batch_size, topk=10) + assert (result >= 0).all() + assert (result < vocab_size).all() + + def test_each_candidate_differs_in_one_position(self) -> None: + """Each candidate should differ from the original in exactly one position.""" + n_control = 10 + vocab_size = 50 + batch_size = 8 + pm = self._make_prompt_manager(n_control_tokens=n_control, vocab_size=vocab_size) + + grad = torch.randn(n_control, vocab_size) + original_toks = pm._prompts[0].control_toks.clone() + result = pm.sample_control(grad, batch_size, topk=10) + + for i in range(batch_size): + diffs = (result[i] != original_toks.to(result.device)).sum().item() + # Each candidate changes exactly 1 position + assert diffs == 1, f"Candidate {i} differs in {diffs} positions, expected 1" + + def test_non_ascii_filtering(self) -> None: + """When allow_non_ascii=False, the newly sampled token should not be non-ASCII. + + Note: sample_control only changes ONE position per candidate, so unchanged positions + may still contain non-ASCII tokens from the original control. We verify that the + *changed* position doesn't use a non-ASCII token. + """ + n_control = 5 + vocab_size = 20 + batch_size = 64 + pm = self._make_prompt_manager(n_control_tokens=n_control, vocab_size=vocab_size) + # Use only ASCII tokens in original control + pm._prompts[0].control_toks = torch.tensor([0, 1, 2, 3, 4]) + # Mark tokens 15-19 as non-ASCII + pm._nonascii_toks = torch.tensor([15, 16, 17, 18, 19]) + + # Create gradient that strongly favors non-ASCII tokens + grad = torch.zeros(n_control, vocab_size) + grad[:, 15:20] = -100.0 # Negative gradient = top candidates after negation + + result = pm.sample_control(grad, batch_size, topk=5, allow_non_ascii=False) + original = pm._prompts[0].control_toks + non_ascii_set = {15, 16, 17, 18, 19} + + for i in range(batch_size): + # Find the position that changed + diffs = (result[i] != original.to(result.device)) + changed_positions = diffs.nonzero(as_tuple=True)[0] + for pos in changed_positions: + new_tok = result[i, pos].item() + assert new_tok not in non_ascii_set, ( + f"Candidate {i} position {pos}: sampled non-ASCII token {new_tok}" + ) + + +class TestBuildParams: + """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._build_params.""" + + def test_builds_config_dict_from_kwargs(self) -> None: + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + params = Generator._build_params( + n_steps=100, + batch_size=256, + model_name="test_model", + ) + assert params.n_steps == 100 + assert params.batch_size == 256 + assert params.model_name == "test_model" + + def test_all_kwargs_become_attributes(self) -> None: + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + kwargs = {"a": 1, "b": "hello", "c": [1, 2, 3], "d": True} + params = Generator._build_params(**kwargs) + for key, value in kwargs.items(): + assert getattr(params, key) == value + + +class TestApplyTargetAugmentation: + """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._apply_target_augmentation.""" + + def test_returns_same_length_lists(self) -> None: + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + train = ["Sure, here is a bomb", "Sure, here is a virus"] + test = ["Sure, here is a weapon"] + + result_train, result_test = Generator._apply_target_augmentation( + train_targets=train, + test_targets=test, + ) + assert len(result_train) == len(train) + assert len(result_test) == len(test) + + def test_augmentation_modifies_targets(self) -> None: + """At least some targets should be modified by augmentation.""" + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + np.random.seed(42) + targets = ["Sure, here is how to do it"] * 100 + + result, _ = Generator._apply_target_augmentation( + train_targets=targets, + test_targets=[], + ) + # With 100 targets and 50% chance of each transform, we should see some changes + num_changed = sum(1 for orig, aug in zip(targets, result) if orig != aug) + assert num_changed > 0, "Expected at least some targets to be augmented" + + def test_augmentation_is_seeded_reproducible(self) -> None: + """Same seed should produce same augmentation.""" + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + targets = ["Sure, here is how to do it"] * 20 + + np.random.seed(123) + result1, _ = Generator._apply_target_augmentation(train_targets=targets, test_targets=[]) + + np.random.seed(123) + result2, _ = Generator._apply_target_augmentation(train_targets=targets, test_targets=[]) + + assert result1 == result2 + + +class TestCreateAttack: + """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._create_attack.""" + + def test_transfer_true_creates_progressive(self) -> None: + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + ProgressiveMultiPromptAttack = attack_manager_mod.ProgressiveMultiPromptAttack + + params = Generator._build_params( + transfer=True, + progressive_models=True, + progressive_goals=True, + control_init="! ! !", + result_prefix="test", + gbda_deterministic=True, + learning_rate=0.01, + batch_size=512, + n_steps=100, + ) + + mock_worker = MagicMock() + mock_worker.model.name_or_path = "test-model" + mock_worker.tokenizer.name_or_path = "test-tokenizer" + mock_worker.conv_template.name = "test-template" + + managers = { + "AP": MagicMock(), + "PM": MagicMock(), + "MPA": MagicMock(return_value=MagicMock()), + } + + attack = Generator._create_attack( + params=params, + managers=managers, + train_goals=["goal1"], + train_targets=["target1"], + test_goals=[], + test_targets=[], + workers=[mock_worker], + test_workers=[], + ) + assert isinstance(attack, ProgressiveMultiPromptAttack) + + def test_transfer_false_creates_individual(self) -> None: + train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", + ) + Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + IndividualPromptAttack = attack_manager_mod.IndividualPromptAttack + + params = Generator._build_params( + transfer=False, + control_init="! ! !", + result_prefix="test", + gbda_deterministic=True, + learning_rate=0.01, + batch_size=512, + n_steps=100, + ) + + mock_worker = MagicMock() + mock_worker.model.name_or_path = "test-model" + mock_worker.tokenizer.name_or_path = "test-tokenizer" + mock_worker.conv_template.name = "test-template" + + managers = { + "AP": MagicMock(), + "PM": MagicMock(), + "MPA": MagicMock(return_value=MagicMock()), + } + + attack = Generator._create_attack( + params=params, + managers=managers, + train_goals=["goal1"], + train_targets=["target1"], + test_goals=[], + test_targets=[], + workers=[mock_worker], + test_workers=[], + ) + assert isinstance(attack, IndividualPromptAttack) + + +class TestEmbeddingHelpers: + """Tests for get_embedding_layer, get_embedding_matrix, get_embeddings.""" + + def test_get_embedding_layer_raises_for_unknown_model(self) -> None: + """Should raise ValueError for unsupported model types.""" + mock_model = MagicMock() + # Ensure it doesn't match any isinstance checks + mock_model.__class__ = type("UnknownModel", (), {}) + with pytest.raises(ValueError, match="Unknown model type"): + get_embedding_layer(mock_model) + + def test_get_embedding_matrix_raises_for_unknown_model(self) -> None: + mock_model = MagicMock() + mock_model.__class__ = type("UnknownModel", (), {}) + with pytest.raises(ValueError, match="Unknown model type"): + get_embedding_matrix(mock_model) + + def test_get_embeddings_raises_for_unknown_model(self) -> None: + mock_model = MagicMock() + mock_model.__class__ = type("UnknownModel", (), {}) + with pytest.raises(ValueError, match="Unknown model type"): + get_embeddings(mock_model, torch.tensor([1, 2, 3])) + + +class TestPromptManagerInit: + """Tests for PromptManager initialization validation.""" + + def test_raises_on_mismatched_goals_targets(self) -> None: + PromptManager = attack_manager_mod.PromptManager + with pytest.raises(ValueError, match="Length of goals and targets must match"): + PromptManager( + goals=["goal1", "goal2"], + targets=["target1"], + tokenizer=MagicMock(), + conv_template=MagicMock(), + managers={"AP": MagicMock()}, + ) + + def test_raises_on_empty_goals(self) -> None: + PromptManager = attack_manager_mod.PromptManager + with pytest.raises(ValueError, match="Must provide at least one goal"): + PromptManager( + goals=[], + targets=[], + tokenizer=MagicMock(), + conv_template=MagicMock(), + managers={"AP": MagicMock()}, + ) + + +class TestEvaluateAttackInit: + """Tests for EvaluateAttack initialization validation.""" + + def test_raises_with_multiple_workers(self) -> None: + EvaluateAttack = attack_manager_mod.EvaluateAttack + mock_worker1 = MagicMock() + mock_worker1.model.name_or_path = "m1" + mock_worker1.tokenizer.name_or_path = "t1" + mock_worker1.conv_template.name = "c1" + mock_worker2 = MagicMock() + mock_worker2.model.name_or_path = "m2" + mock_worker2.tokenizer.name_or_path = "t2" + mock_worker2.conv_template.name = "c2" + + with pytest.raises(ValueError, match="exactly 1 worker"): + EvaluateAttack( + goals=["goal"], + targets=["target"], + workers=[mock_worker1, mock_worker2], + managers={"AP": MagicMock(), "PM": MagicMock(), "MPA": MagicMock()}, + ) From 9a2e7fc03fb53caf2996620a9071985b7cffeb44 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Mon, 4 May 2026 05:05:19 -0700 Subject: [PATCH 2/5] test: add data/config and lifecycle tests for GCG Data & config tests (test_data_and_config.py, 12 tests): - YAML loading: valid files, list values, missing file error - Real config validation: all 11 shipped configs parse, have required keys, individual vs transfer configs have correct settings - get_goals_and_targets: seed reproducibility, different seeds differ, separate test data files, n_train_data limiting - run_trainer validation: unsupported model names, missing HF token Lifecycle tests (test_lifecycle.py, 7 tests): - GPU memory: nvidia-smi parsing (single/multi GPU), MLflow logging, failure handling - generate_suffix lifecycle: MLflow started before training, workers stopped after training, BUG CHARACTERIZATION: workers NOT stopped on failure (leak) Total GCG test count: 24 -> 69 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../gcg/test_data_and_config.py | 245 ++++++++++++++++++ .../auxiliary_attacks/gcg/test_lifecycle.py | 201 ++++++++++++++ 2 files changed, 446 insertions(+) create mode 100644 tests/unit/auxiliary_attacks/gcg/test_data_and_config.py create mode 100644 tests/unit/auxiliary_attacks/gcg/test_lifecycle.py diff --git a/tests/unit/auxiliary_attacks/gcg/test_data_and_config.py b/tests/unit/auxiliary_attacks/gcg/test_data_and_config.py new file mode 100644 index 0000000000..2777bb1eea --- /dev/null +++ b/tests/unit/auxiliary_attacks/gcg/test_data_and_config.py @@ -0,0 +1,245 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os +import tempfile +from unittest.mock import MagicMock, patch + +import pytest + +attack_manager_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.attack.base.attack_manager", + reason="GCG optional dependencies (torch, mlflow, etc.) not installed", +) +get_goals_and_targets = attack_manager_mod.get_goals_and_targets + +run_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.run", + reason="GCG run module not available", +) +_load_yaml_to_dict = run_mod._load_yaml_to_dict +run_trainer = run_mod.run_trainer + +CONFIGS_DIR = os.path.join( + os.path.dirname(__file__), + "..", + "..", + "..", + "..", + "pyrit", + "auxiliary_attacks", + "gcg", + "experiments", + "configs", +) + + +class TestLoadYamlToDict: + """Tests for YAML config loading.""" + + def test_loads_valid_yaml(self) -> None: + """Should parse a valid YAML file into a dict.""" + content = "n_steps: 100\nbatch_size: 256\ntransfer: False\n" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(content) + path = f.name + + try: + result = _load_yaml_to_dict(path) + assert result == {"n_steps": 100, "batch_size": 256, "transfer": False} + finally: + os.unlink(path) + + def test_loads_list_values(self) -> None: + """Should handle YAML list values correctly.""" + content = 'model_paths: ["model/a", "model/b"]\ndevices: ["cuda:0", "cuda:1"]\n' + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(content) + path = f.name + + try: + result = _load_yaml_to_dict(path) + assert result["model_paths"] == ["model/a", "model/b"] + assert result["devices"] == ["cuda:0", "cuda:1"] + finally: + os.unlink(path) + + def test_raises_on_missing_file(self) -> None: + """Should raise FileNotFoundError for nonexistent config.""" + with pytest.raises(FileNotFoundError): + _load_yaml_to_dict("/nonexistent/config.yaml") + + +class TestRealConfigFiles: + """Tests that the shipped YAML config files parse correctly and have expected keys.""" + + @pytest.fixture() + def config_files(self) -> list[str]: + """Return list of all YAML config files shipped with GCG.""" + configs_dir = os.path.normpath(CONFIGS_DIR) + if not os.path.isdir(configs_dir): + pytest.skip(f"Config directory not found: {configs_dir}") + return [os.path.join(configs_dir, f) for f in os.listdir(configs_dir) if f.endswith(".yaml")] + + def test_all_configs_parse_without_error(self, config_files: list[str]) -> None: + """Every shipped YAML config should parse into a non-empty dict.""" + assert len(config_files) > 0, "No config files found" + for path in config_files: + result = _load_yaml_to_dict(path) + assert isinstance(result, dict), f"{path} did not parse to dict" + assert len(result) > 0, f"{path} parsed to empty dict" + + def test_all_configs_have_required_keys(self, config_files: list[str]) -> None: + """Every config should have the minimum required keys for GCG.""" + required_keys = { + "tokenizer_paths", + "model_paths", + "conversation_templates", + "devices", + } + for path in config_files: + config = _load_yaml_to_dict(path) + missing = required_keys - set(config.keys()) + assert not missing, f"{os.path.basename(path)} missing keys: {missing}" + + def test_individual_vs_transfer_configs_differ(self, config_files: list[str]) -> None: + """Individual configs should have transfer=False, transfer configs transfer=True.""" + for path in config_files: + config = _load_yaml_to_dict(path) + basename = os.path.basename(path) + if basename.startswith("individual_"): + assert config.get("transfer") is False, f"{basename} should have transfer=False" + elif basename.startswith("transfer_"): + assert config.get("transfer") is True or config.get("progressive_goals") is True, ( + f"{basename} should use transfer or progressive_goals" + ) + + +class TestGetGoalsAndTargetsAdditional: + """Additional tests for get_goals_and_targets beyond the existing file.""" + + def test_shuffle_is_reproducible_with_same_seed(self) -> None: + """Same random_seed should produce the same goal/target ordering.""" + csv_content = "goal,target\n" + "\n".join(f"goal{i},target{i}" for i in range(20)) + "\n" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write(csv_content) + csv_path = f.name + + try: + params1 = MagicMock() + params1.train_data = csv_path + params1.n_train_data = 10 + params1.n_test_data = 0 + params1.test_data = "" + params1.random_seed = 42 + + params2 = MagicMock() + params2.train_data = csv_path + params2.n_train_data = 10 + params2.n_test_data = 0 + params2.test_data = "" + params2.random_seed = 42 + + goals1, targets1, _, _ = get_goals_and_targets(params1) + goals2, targets2, _, _ = get_goals_and_targets(params2) + + assert goals1 == goals2 + assert targets1 == targets2 + finally: + os.unlink(csv_path) + + def test_different_seeds_produce_different_ordering(self) -> None: + """Different seeds should (almost certainly) produce different orderings.""" + csv_content = "goal,target\n" + "\n".join(f"goal{i},target{i}" for i in range(50)) + "\n" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write(csv_content) + csv_path = f.name + + try: + params1 = MagicMock() + params1.train_data = csv_path + params1.n_train_data = 50 + params1.n_test_data = 0 + params1.test_data = "" + params1.random_seed = 42 + + params2 = MagicMock() + params2.train_data = csv_path + params2.n_train_data = 50 + params2.n_test_data = 0 + params2.test_data = "" + params2.random_seed = 99 + + goals1, _, _, _ = get_goals_and_targets(params1) + goals2, _, _, _ = get_goals_and_targets(params2) + + assert goals1 != goals2, "Different seeds should produce different orderings" + finally: + os.unlink(csv_path) + + def test_separate_test_data_file(self) -> None: + """Should load test data from a separate CSV file when provided.""" + train_csv = "goal,target\ntrain_goal1,train_target1\ntrain_goal2,train_target2\n" + test_csv = "goal,target\ntest_goal1,test_target1\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write(train_csv) + train_path = f.name + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write(test_csv) + test_path = f.name + + try: + params = MagicMock() + params.train_data = train_path + params.n_train_data = 2 + params.n_test_data = 1 + params.test_data = test_path + params.random_seed = 42 + + train_goals, train_targets, test_goals, test_targets = get_goals_and_targets(params) + assert len(train_goals) == 2 + assert len(test_goals) == 1 + assert test_goals[0] == "test_goal1" + assert test_targets[0] == "test_target1" + finally: + os.unlink(train_path) + os.unlink(test_path) + + def test_n_train_data_limits_output(self) -> None: + """n_train_data should cap the number of returned training examples.""" + csv_content = "goal,target\n" + "\n".join(f"goal{i},target{i}" for i in range(100)) + "\n" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write(csv_content) + csv_path = f.name + + try: + params = MagicMock() + params.train_data = csv_path + params.n_train_data = 5 + params.n_test_data = 0 + params.test_data = "" + params.random_seed = 42 + + goals, targets, _, _ = get_goals_and_targets(params) + assert len(goals) == 5 + assert len(targets) == 5 + finally: + os.unlink(csv_path) + + +class TestRunTrainerValidation: + """Tests for run_trainer input validation (no actual model loading).""" + + def test_raises_on_unsupported_model_name(self) -> None: + """Should raise ValueError for unsupported model names.""" + with pytest.raises(ValueError, match="Model name not supported"): + run_trainer(model_name="nonexistent_model") + + @patch.dict("os.environ", {"HUGGINGFACE_TOKEN": ""}, clear=False) + @patch("pyrit.auxiliary_attacks.gcg.experiments.run._load_environment_files") + def test_raises_without_hf_token(self, mock_load_env: MagicMock) -> None: + """Should raise ValueError when HUGGINGFACE_TOKEN is not set.""" + with patch.dict("os.environ", {"HUGGINGFACE_TOKEN": ""}, clear=False): + with pytest.raises(ValueError, match="HUGGINGFACE_TOKEN"): + run_trainer(model_name="phi_3_mini") diff --git a/tests/unit/auxiliary_attacks/gcg/test_lifecycle.py b/tests/unit/auxiliary_attacks/gcg/test_lifecycle.py new file mode 100644 index 0000000000..7792c98224 --- /dev/null +++ b/tests/unit/auxiliary_attacks/gcg/test_lifecycle.py @@ -0,0 +1,201 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +log_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.log", + reason="GCG optional dependencies (mlflow, etc.) not installed", +) +log_gpu_memory = log_mod.log_gpu_memory +get_gpu_memory = log_mod.get_gpu_memory + +train_mod = pytest.importorskip( + "pyrit.auxiliary_attacks.gcg.experiments.train", + reason="GCG train module not available", +) +Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + + +class TestGpuMemoryLogging: + """Tests for GPU memory query and logging.""" + + @patch("pyrit.auxiliary_attacks.gcg.experiments.log.sp") + def test_get_gpu_memory_parses_nvidia_smi(self, mock_sp: MagicMock) -> None: + """Should parse nvidia-smi output into a dict of GPU -> free memory.""" + mock_sp.check_output.return_value = b"memory.free [MiB]\n8000 MiB\n16000 MiB\n" + result = get_gpu_memory() + assert result == {"gpu1_free_memory": 8000, "gpu2_free_memory": 16000} + + @patch("pyrit.auxiliary_attacks.gcg.experiments.log.sp") + def test_get_gpu_memory_single_gpu(self, mock_sp: MagicMock) -> None: + """Should handle single GPU output.""" + mock_sp.check_output.return_value = b"memory.free [MiB]\n24000 MiB\n" + result = get_gpu_memory() + assert result == {"gpu1_free_memory": 24000} + + @patch("pyrit.auxiliary_attacks.gcg.experiments.log.mlflow") + @patch("pyrit.auxiliary_attacks.gcg.experiments.log.sp") + def test_log_gpu_memory_logs_to_mlflow(self, mock_sp: MagicMock, mock_mlflow: MagicMock) -> None: + """Should log each GPU's free memory as an MLflow metric.""" + mock_sp.check_output.return_value = b"memory.free [MiB]\n8000 MiB\n16000 MiB\n" + log_gpu_memory(step=5) + + assert mock_mlflow.log_metric.call_count == 2 + calls = mock_mlflow.log_metric.call_args_list + assert calls[0].args == ("gpu1_free_memory", 8000) + assert calls[0].kwargs["step"] == 5 + assert calls[1].args == ("gpu2_free_memory", 16000) + + @patch("pyrit.auxiliary_attacks.gcg.experiments.log.sp") + def test_get_gpu_memory_handles_nvidia_smi_failure(self, mock_sp: MagicMock) -> None: + """Should propagate exception when nvidia-smi is not available.""" + mock_sp.check_output.side_effect = subprocess.CalledProcessError(1, "nvidia-smi") + with pytest.raises(subprocess.CalledProcessError): + get_gpu_memory() + + +class TestGenerateSuffixLifecycle: + """Tests for generate_suffix MLflow and worker lifecycle management.""" + + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_workers") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_goals_and_targets") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_gpu_memory") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_params") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_train_goals") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.mlflow") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.attack_lib") + def test_mlflow_run_started_before_training( + self, + mock_attack_lib: MagicMock, + mock_mlflow: MagicMock, + mock_log_train_goals: MagicMock, + mock_log_params: MagicMock, + mock_log_gpu_memory: MagicMock, + mock_get_goals: MagicMock, + mock_get_workers: MagicMock, + ) -> None: + """MLflow run should be started before any training begins.""" + mock_get_goals.return_value = (["goal1"], ["target1"], [], []) + mock_worker = MagicMock() + mock_worker.model.name_or_path = "test-model" + mock_worker.tokenizer.name_or_path = "test-tokenizer" + mock_worker.conv_template.name = "test-template" + mock_get_workers.return_value = ([mock_worker], []) + + mock_attack_instance = MagicMock() + mock_attack_lib.GCGAttackPrompt = MagicMock + mock_attack_lib.GCGPromptManager = MagicMock + mock_attack_lib.GCGMultiPromptAttack = MagicMock + + # Patch _create_attack to avoid IndividualPromptAttack's logfile writing + with patch.object(Generator, "_create_attack", return_value=mock_attack_instance): + generator = Generator.__new__(Generator) + generator.generate_suffix( + tokenizer_paths=["test/path"], + model_paths=["test/path"], + conversation_templates=["llama-2"], + train_data="", + n_steps=1, + ) + + mock_mlflow.start_run.assert_called_once() + + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_workers") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_goals_and_targets") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_gpu_memory") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_params") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_train_goals") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.mlflow") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.attack_lib") + def test_workers_stopped_after_training( + self, + mock_attack_lib: MagicMock, + mock_mlflow: MagicMock, + mock_log_train_goals: MagicMock, + mock_log_params: MagicMock, + mock_log_gpu_memory: MagicMock, + mock_get_goals: MagicMock, + mock_get_workers: MagicMock, + ) -> None: + """All workers should be stopped after training completes.""" + mock_get_goals.return_value = (["goal1"], ["target1"], [], []) + mock_worker1 = MagicMock() + mock_worker1.model.name_or_path = "test-model-1" + mock_worker1.tokenizer.name_or_path = "test-tokenizer-1" + mock_worker1.conv_template.name = "test-template-1" + mock_worker2 = MagicMock() + mock_worker2.model.name_or_path = "test-model-2" + mock_worker2.tokenizer.name_or_path = "test-tokenizer-2" + mock_worker2.conv_template.name = "test-template-2" + mock_get_workers.return_value = ([mock_worker1], [mock_worker2]) + + mock_attack_instance = MagicMock() + mock_attack_lib.GCGAttackPrompt = MagicMock + mock_attack_lib.GCGPromptManager = MagicMock + mock_attack_lib.GCGMultiPromptAttack = MagicMock + + with patch.object(Generator, "_create_attack", return_value=mock_attack_instance): + generator = Generator.__new__(Generator) + generator.generate_suffix( + tokenizer_paths=["test/path"], + model_paths=["test/path"], + conversation_templates=["llama-2"], + train_data="", + n_steps=1, + ) + + mock_worker1.stop.assert_called_once() + mock_worker2.stop.assert_called_once() + + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_workers") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.get_goals_and_targets") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_gpu_memory") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_params") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.log_train_goals") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.mlflow") + @patch("pyrit.auxiliary_attacks.gcg.experiments.train.attack_lib") + def test_workers_not_stopped_on_training_failure( + self, + mock_attack_lib: MagicMock, + mock_mlflow: MagicMock, + mock_log_train_goals: MagicMock, + mock_log_params: MagicMock, + mock_log_gpu_memory: MagicMock, + mock_get_goals: MagicMock, + mock_get_workers: MagicMock, + ) -> None: + """BUG CHARACTERIZATION: Workers are NOT stopped when attack.run() raises. + + This documents the current (buggy) behavior — workers leak on failure. + A future fix should ensure workers are cleaned up even on exceptions. + """ + mock_get_goals.return_value = (["goal1"], ["target1"], [], []) + mock_worker = MagicMock() + mock_worker.model.name_or_path = "test-model" + mock_worker.tokenizer.name_or_path = "test-tokenizer" + mock_worker.conv_template.name = "test-template" + mock_get_workers.return_value = ([mock_worker], []) + + mock_attack_instance = MagicMock() + mock_attack_instance.run.side_effect = RuntimeError("Simulated failure") + mock_attack_lib.GCGAttackPrompt = MagicMock + mock_attack_lib.GCGPromptManager = MagicMock + mock_attack_lib.GCGMultiPromptAttack = MagicMock + + with patch.object(Generator, "_create_attack", return_value=mock_attack_instance): + generator = Generator.__new__(Generator) + with pytest.raises(RuntimeError, match="Simulated failure"): + generator.generate_suffix( + tokenizer_paths=["test/path"], + model_paths=["test/path"], + conversation_templates=["llama-2"], + train_data="", + n_steps=1, + ) + + # Workers are NOT stopped on failure — this is a bug we'll fix later + mock_worker.stop.assert_not_called() From 612c66f8289a9ecc7a9c1876b11decf80074ba79 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Mon, 4 May 2026 05:20:43 -0700 Subject: [PATCH 3/5] test: add GCG integration tests with real GPT-2 model Add 10 integration tests that exercise the GCG attack pipeline with a real GPT-2 model on CPU, validating end-to-end correctness: - token_gradients: gradient shape matches (n_control, vocab_size), values are finite and non-zero - GCGAttackPrompt: initializes with valid non-overlapping slices, grad() returns correct shape, test_loss() returns finite positive float - GCGPromptManager.sample_control: sampled candidates are decodable, correct batch size - Embedding helpers: layer/matrix/embeddings work with GPT2LMHeadModel, get_nonascii_toks returns non-empty tensor Uses llama-2 conversation template (has explicit handling in _update_ids). Marked @run_only_if_all_tests (requires RUN_ALL_TESTS=true + torch/transformers). Runs in ~18s on CPU. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../auxiliary_attacks/test_gcg_integration.py | 226 ++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 tests/integration/auxiliary_attacks/test_gcg_integration.py diff --git a/tests/integration/auxiliary_attacks/test_gcg_integration.py b/tests/integration/auxiliary_attacks/test_gcg_integration.py new file mode 100644 index 0000000000..d611e13fc9 --- /dev/null +++ b/tests/integration/auxiliary_attacks/test_gcg_integration.py @@ -0,0 +1,226 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Integration tests for GCG attack using a real GPT-2 model on CPU. + +These tests validate that the GCG attack pipeline works end-to-end with a real +(tiny) model. They use GPT-2 (~124M params) which can run on CPU, paired with +the llama-2 conversation template (which has explicit handling in _update_ids). + +Requires: torch, transformers, fastchat, mlflow (GCG optional deps). +Skipped unless RUN_ALL_TESTS=true. +""" + +import pytest + +torch = pytest.importorskip("torch", reason="torch not installed") +transformers = pytest.importorskip("transformers", reason="transformers not installed") +pytest.importorskip("fastchat", reason="fastchat not installed") + +from unittest.mock import MagicMock, patch + +from fastchat.model import get_conversation_template +from transformers import AutoTokenizer, GPT2LMHeadModel + +from pyrit.auxiliary_attacks.gcg.attack.base.attack_manager import ( + MultiPromptAttack, + get_embedding_layer, + get_embedding_matrix, + get_embeddings, + get_nonascii_toks, +) +from pyrit.auxiliary_attacks.gcg.attack.gcg.gcg_attack import ( + GCGAttackPrompt, + GCGMultiPromptAttack, + GCGPromptManager, + token_gradients, +) + + +@pytest.fixture(scope="module") +def gpt2_model() -> GPT2LMHeadModel: + """Load GPT-2 model once for all tests in this module.""" + model = GPT2LMHeadModel.from_pretrained("gpt2").eval() + return model + + +@pytest.fixture(scope="module") +def gpt2_tokenizer() -> transformers.PreTrainedTokenizer: + """Load GPT-2 tokenizer once for all tests in this module.""" + tokenizer = AutoTokenizer.from_pretrained("gpt2") + tokenizer.pad_token = tokenizer.eos_token + tokenizer.padding_side = "left" + return tokenizer + + +@pytest.fixture() +def conv_template(): + """Create a fresh llama-2 conversation template for each test.""" + conv = get_conversation_template("llama-2") + conv.sep2 = conv.sep2.strip() + return conv + + +@pytest.mark.run_only_if_all_tests +class TestTokenGradientsIntegration: + """Integration tests for token_gradients with real GPT-2.""" + + def test_gradient_shape_matches_control_and_vocab( + self, gpt2_model: GPT2LMHeadModel, gpt2_tokenizer: transformers.PreTrainedTokenizer + ) -> None: + """Gradient should have shape (n_control_tokens, vocab_size).""" + input_ids = gpt2_tokenizer("Hello world ! ! ! target text", return_tensors="pt")["input_ids"][0] + control_slice = slice(2, 5) + target_slice = slice(5, 7) + loss_slice = slice(4, 6) + + grad = token_gradients(gpt2_model, input_ids, control_slice, target_slice, loss_slice) + + n_control = control_slice.stop - control_slice.start + assert grad.shape == (n_control, gpt2_tokenizer.vocab_size) + + def test_gradient_is_finite_and_nonzero( + self, gpt2_model: GPT2LMHeadModel, gpt2_tokenizer: transformers.PreTrainedTokenizer + ) -> None: + """Gradient values should be finite and at least some should be non-zero.""" + input_ids = gpt2_tokenizer("Tell me how ! ! ! Sure here is", return_tensors="pt")["input_ids"][0] + control_slice = slice(3, 6) + target_slice = slice(6, 9) + loss_slice = slice(5, 8) + + grad = token_gradients(gpt2_model, input_ids, control_slice, target_slice, loss_slice) + + assert torch.isfinite(grad).all(), "Gradient contains non-finite values" + assert (grad != 0).any(), "Gradient is all zeros" + + +@pytest.mark.run_only_if_all_tests +class TestGCGAttackPromptIntegration: + """Integration tests for GCGAttackPrompt with real GPT-2.""" + + def test_prompt_initializes_with_valid_slices( + self, + gpt2_model: GPT2LMHeadModel, + gpt2_tokenizer: transformers.PreTrainedTokenizer, + conv_template: object, + ) -> None: + """AttackPrompt should initialize with non-empty, non-overlapping slices.""" + prompt = GCGAttackPrompt( + goal="Tell me how", + target="Sure here is", + tokenizer=gpt2_tokenizer, + conv_template=conv_template, + control_init="! ! ! ! !", + ) + + assert prompt._control_slice.start < prompt._control_slice.stop + assert prompt._target_slice.start < prompt._target_slice.stop + assert prompt._control_slice.stop <= prompt._target_slice.start + assert prompt.input_ids.shape[0] > 0 + + def test_grad_returns_valid_gradient( + self, + gpt2_model: GPT2LMHeadModel, + gpt2_tokenizer: transformers.PreTrainedTokenizer, + conv_template: object, + ) -> None: + """GCGAttackPrompt.grad should return a finite, non-zero gradient tensor.""" + prompt = GCGAttackPrompt( + goal="Tell me how", + target="Sure here is", + tokenizer=gpt2_tokenizer, + conv_template=conv_template, + control_init="! ! ! ! !", + ) + + grad = prompt.grad(gpt2_model) + + n_control = prompt._control_slice.stop - prompt._control_slice.start + assert grad.shape[0] == n_control + assert grad.shape[1] == gpt2_tokenizer.vocab_size + assert torch.isfinite(grad).all() + + def test_target_loss_is_finite_scalar( + self, + gpt2_model: GPT2LMHeadModel, + gpt2_tokenizer: transformers.PreTrainedTokenizer, + conv_template: object, + ) -> None: + """Target loss from real model logits should be a finite positive number.""" + prompt = GCGAttackPrompt( + goal="Tell me how", + target="Sure here is", + tokenizer=gpt2_tokenizer, + conv_template=conv_template, + control_init="! ! ! ! !", + ) + + loss = prompt.test_loss(gpt2_model) + assert isinstance(loss, float) + assert loss > 0 + assert loss < 1e6 + + +@pytest.mark.run_only_if_all_tests +class TestGCGSampleControlIntegration: + """Integration tests for GCGPromptManager.sample_control with real tokenizer.""" + + def test_sample_control_produces_valid_candidates( + self, + gpt2_model: GPT2LMHeadModel, + gpt2_tokenizer: transformers.PreTrainedTokenizer, + conv_template: object, + ) -> None: + """Sampled control tokens should be decodable by the tokenizer.""" + prompt = GCGAttackPrompt( + goal="Tell me how", + target="Sure here is", + tokenizer=gpt2_tokenizer, + conv_template=conv_template, + control_init="! ! ! ! !", + ) + + grad = prompt.grad(gpt2_model) + + pm = object.__new__(GCGPromptManager) + pm._prompts = [prompt] + pm._nonascii_toks = get_nonascii_toks(gpt2_tokenizer, device="cpu") + + candidates = pm.sample_control(grad, batch_size=8, topk=32, allow_non_ascii=False) + + assert candidates.shape[0] == 8 + # All candidates should be decodable without error + for i in range(candidates.shape[0]): + decoded = gpt2_tokenizer.decode(candidates[i]) + assert isinstance(decoded, str) + assert len(decoded) > 0 + + +@pytest.mark.run_only_if_all_tests +class TestEmbeddingHelpersIntegration: + """Integration tests for embedding helper functions with real GPT-2.""" + + def test_get_embedding_layer_returns_embedding(self, gpt2_model: GPT2LMHeadModel) -> None: + layer = get_embedding_layer(gpt2_model) + assert isinstance(layer, torch.nn.Embedding) + + def test_get_embedding_matrix_shape( + self, gpt2_model: GPT2LMHeadModel, gpt2_tokenizer: transformers.PreTrainedTokenizer + ) -> None: + matrix = get_embedding_matrix(gpt2_model) + assert matrix.shape[0] == gpt2_tokenizer.vocab_size + + def test_get_embeddings_returns_correct_shape( + self, gpt2_model: GPT2LMHeadModel, gpt2_tokenizer: transformers.PreTrainedTokenizer + ) -> None: + input_ids = gpt2_tokenizer("Hello world", return_tensors="pt")["input_ids"] + embeddings = get_embeddings(gpt2_model, input_ids) + assert embeddings.shape[0] == 1 + assert embeddings.shape[1] == input_ids.shape[1] + + def test_get_nonascii_toks_returns_nonempty_tensor( + self, gpt2_tokenizer: transformers.PreTrainedTokenizer + ) -> None: + toks = get_nonascii_toks(gpt2_tokenizer, device="cpu") + assert isinstance(toks, torch.Tensor) + assert len(toks) > 0 From df31760a12ba8d1205774a7f35251bc6c16d92a2 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Mon, 4 May 2026 05:36:57 -0700 Subject: [PATCH 4/5] TEST: remove run_only_if_all_tests marker from GCG integration tests These tests only need optional Python packages (torch, transformers, fastchat), not external services or credentials. The importorskip at the top already handles skipping when deps are not installed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/integration/auxiliary_attacks/test_gcg_integration.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/integration/auxiliary_attacks/test_gcg_integration.py b/tests/integration/auxiliary_attacks/test_gcg_integration.py index d611e13fc9..00764b8307 100644 --- a/tests/integration/auxiliary_attacks/test_gcg_integration.py +++ b/tests/integration/auxiliary_attacks/test_gcg_integration.py @@ -61,7 +61,6 @@ def conv_template(): return conv -@pytest.mark.run_only_if_all_tests class TestTokenGradientsIntegration: """Integration tests for token_gradients with real GPT-2.""" @@ -94,7 +93,6 @@ def test_gradient_is_finite_and_nonzero( assert (grad != 0).any(), "Gradient is all zeros" -@pytest.mark.run_only_if_all_tests class TestGCGAttackPromptIntegration: """Integration tests for GCGAttackPrompt with real GPT-2.""" @@ -161,7 +159,6 @@ def test_target_loss_is_finite_scalar( assert loss < 1e6 -@pytest.mark.run_only_if_all_tests class TestGCGSampleControlIntegration: """Integration tests for GCGPromptManager.sample_control with real tokenizer.""" @@ -196,7 +193,6 @@ def test_sample_control_produces_valid_candidates( assert len(decoded) > 0 -@pytest.mark.run_only_if_all_tests class TestEmbeddingHelpersIntegration: """Integration tests for embedding helper functions with real GPT-2.""" From fb5cb70143ccffabfba533bb9f9c9015c52e89a0 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Mon, 4 May 2026 06:34:27 -0700 Subject: [PATCH 5/5] MAINT: fix pre-commit lint issues in GCG tests - Move class references to module level to fix N806 (variable naming) - Add noqa: E402 for imports after importorskip guards - Fix ruff format issues - Remove outdated RUN_ALL_TESTS reference in docstring Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../auxiliary_attacks/test_gcg_integration.py | 20 ++---- .../auxiliary_attacks/gcg/test_gcg_core.py | 65 +++++++++---------- 2 files changed, 36 insertions(+), 49 deletions(-) diff --git a/tests/integration/auxiliary_attacks/test_gcg_integration.py b/tests/integration/auxiliary_attacks/test_gcg_integration.py index 00764b8307..d2e32d5974 100644 --- a/tests/integration/auxiliary_attacks/test_gcg_integration.py +++ b/tests/integration/auxiliary_attacks/test_gcg_integration.py @@ -8,7 +8,7 @@ the llama-2 conversation template (which has explicit handling in _update_ids). Requires: torch, transformers, fastchat, mlflow (GCG optional deps). -Skipped unless RUN_ALL_TESTS=true. +Skipped via importorskip when deps are not installed. """ import pytest @@ -17,21 +17,18 @@ transformers = pytest.importorskip("transformers", reason="transformers not installed") pytest.importorskip("fastchat", reason="fastchat not installed") -from unittest.mock import MagicMock, patch -from fastchat.model import get_conversation_template -from transformers import AutoTokenizer, GPT2LMHeadModel +from fastchat.model import get_conversation_template # noqa: E402 +from transformers import AutoTokenizer, GPT2LMHeadModel # noqa: E402 -from pyrit.auxiliary_attacks.gcg.attack.base.attack_manager import ( - MultiPromptAttack, +from pyrit.auxiliary_attacks.gcg.attack.base.attack_manager import ( # noqa: E402 get_embedding_layer, get_embedding_matrix, get_embeddings, get_nonascii_toks, ) -from pyrit.auxiliary_attacks.gcg.attack.gcg.gcg_attack import ( +from pyrit.auxiliary_attacks.gcg.attack.gcg.gcg_attack import ( # noqa: E402 GCGAttackPrompt, - GCGMultiPromptAttack, GCGPromptManager, token_gradients, ) @@ -40,8 +37,7 @@ @pytest.fixture(scope="module") def gpt2_model() -> GPT2LMHeadModel: """Load GPT-2 model once for all tests in this module.""" - model = GPT2LMHeadModel.from_pretrained("gpt2").eval() - return model + return GPT2LMHeadModel.from_pretrained("gpt2").eval() @pytest.fixture(scope="module") @@ -214,9 +210,7 @@ def test_get_embeddings_returns_correct_shape( assert embeddings.shape[0] == 1 assert embeddings.shape[1] == input_ids.shape[1] - def test_get_nonascii_toks_returns_nonempty_tensor( - self, gpt2_tokenizer: transformers.PreTrainedTokenizer - ) -> None: + def test_get_nonascii_toks_returns_nonempty_tensor(self, gpt2_tokenizer: transformers.PreTrainedTokenizer) -> None: toks = get_nonascii_toks(gpt2_tokenizer, device="cpu") assert isinstance(toks, torch.Tensor) assert len(toks) > 0 diff --git a/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py b/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py index d21074e1c2..e1568e1a0f 100644 --- a/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py +++ b/tests/unit/auxiliary_attacks/gcg/test_gcg_core.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import numpy as np import pytest @@ -13,6 +13,11 @@ torch = pytest.importorskip("torch", reason="torch not installed") MultiPromptAttack = attack_manager_mod.MultiPromptAttack +AttackPrompt = attack_manager_mod.AttackPrompt +PromptManager = attack_manager_mod.PromptManager +EvaluateAttack = attack_manager_mod.EvaluateAttack +IndividualPromptAttack = attack_manager_mod.IndividualPromptAttack +ProgressiveMultiPromptAttack = attack_manager_mod.ProgressiveMultiPromptAttack get_embedding_layer = attack_manager_mod.get_embedding_layer get_embedding_matrix = attack_manager_mod.get_embedding_matrix get_embeddings = attack_manager_mod.get_embeddings @@ -113,7 +118,6 @@ class TestTargetAndControlLoss: def test_target_loss_returns_correct_shape(self) -> None: """target_loss should return tensor of shape (batch, target_len).""" - AttackPrompt = attack_manager_mod.AttackPrompt prompt = object.__new__(AttackPrompt) prompt._target_slice = slice(5, 8) # 3 target tokens @@ -128,7 +132,6 @@ def test_target_loss_returns_correct_shape(self) -> None: def test_target_loss_is_finite(self) -> None: """target_loss should always return finite values.""" - AttackPrompt = attack_manager_mod.AttackPrompt prompt = object.__new__(AttackPrompt) prompt._target_slice = slice(3, 6) @@ -140,7 +143,6 @@ def test_target_loss_is_finite(self) -> None: def test_control_loss_returns_correct_shape(self) -> None: """control_loss should return tensor of shape (batch, control_len).""" - AttackPrompt = attack_manager_mod.AttackPrompt prompt = object.__new__(AttackPrompt) prompt._control_slice = slice(2, 5) # 3 control tokens @@ -155,7 +157,6 @@ def test_control_loss_returns_correct_shape(self) -> None: def test_control_loss_is_finite(self) -> None: """control_loss should always return finite values.""" - AttackPrompt = attack_manager_mod.AttackPrompt prompt = object.__new__(AttackPrompt) prompt._control_slice = slice(2, 5) @@ -167,7 +168,6 @@ def test_control_loss_is_finite(self) -> None: def test_target_loss_higher_for_wrong_predictions(self) -> None: """Loss should be higher when logits don't predict the correct target tokens.""" - AttackPrompt = attack_manager_mod.AttackPrompt prompt = object.__new__(AttackPrompt) prompt._target_slice = slice(3, 5) @@ -268,26 +268,24 @@ def test_non_ascii_filtering(self) -> None: for i in range(batch_size): # Find the position that changed - diffs = (result[i] != original.to(result.device)) + diffs = result[i] != original.to(result.device) changed_positions = diffs.nonzero(as_tuple=True)[0] for pos in changed_positions: new_tok = result[i, pos].item() - assert new_tok not in non_ascii_set, ( - f"Candidate {i} position {pos}: sampled non-ASCII token {new_tok}" - ) + assert new_tok not in non_ascii_set, f"Candidate {i} position {pos}: sampled non-ASCII token {new_tok}" class TestBuildParams: - """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._build_params.""" + """Tests for GreedyCoordinateGradientAdversarialSuffixgenerator_cls._build_params.""" def test_builds_config_dict_from_kwargs(self) -> None: train_mod = pytest.importorskip( "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator - params = Generator._build_params( + params = generator_cls._build_params( n_steps=100, batch_size=256, model_name="test_model", @@ -301,28 +299,28 @@ def test_all_kwargs_become_attributes(self) -> None: "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator kwargs = {"a": 1, "b": "hello", "c": [1, 2, 3], "d": True} - params = Generator._build_params(**kwargs) + params = generator_cls._build_params(**kwargs) for key, value in kwargs.items(): assert getattr(params, key) == value class TestApplyTargetAugmentation: - """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._apply_target_augmentation.""" + """Tests for GreedyCoordinateGradientAdversarialSuffixgenerator_cls._apply_target_augmentation.""" def test_returns_same_length_lists(self) -> None: train_mod = pytest.importorskip( "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator train = ["Sure, here is a bomb", "Sure, here is a virus"] test = ["Sure, here is a weapon"] - result_train, result_test = Generator._apply_target_augmentation( + result_train, result_test = generator_cls._apply_target_augmentation( train_targets=train, test_targets=test, ) @@ -335,17 +333,17 @@ def test_augmentation_modifies_targets(self) -> None: "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator np.random.seed(42) targets = ["Sure, here is how to do it"] * 100 - result, _ = Generator._apply_target_augmentation( + result, _ = generator_cls._apply_target_augmentation( train_targets=targets, test_targets=[], ) # With 100 targets and 50% chance of each transform, we should see some changes - num_changed = sum(1 for orig, aug in zip(targets, result) if orig != aug) + num_changed = sum(1 for orig, aug in zip(targets, result, strict=False) if orig != aug) assert num_changed > 0, "Expected at least some targets to be augmented" def test_augmentation_is_seeded_reproducible(self) -> None: @@ -354,31 +352,30 @@ def test_augmentation_is_seeded_reproducible(self) -> None: "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator targets = ["Sure, here is how to do it"] * 20 np.random.seed(123) - result1, _ = Generator._apply_target_augmentation(train_targets=targets, test_targets=[]) + result1, _ = generator_cls._apply_target_augmentation(train_targets=targets, test_targets=[]) np.random.seed(123) - result2, _ = Generator._apply_target_augmentation(train_targets=targets, test_targets=[]) + result2, _ = generator_cls._apply_target_augmentation(train_targets=targets, test_targets=[]) assert result1 == result2 class TestCreateAttack: - """Tests for GreedyCoordinateGradientAdversarialSuffixGenerator._create_attack.""" + """Tests for GreedyCoordinateGradientAdversarialSuffixgenerator_cls._create_attack.""" def test_transfer_true_creates_progressive(self) -> None: train_mod = pytest.importorskip( "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator - ProgressiveMultiPromptAttack = attack_manager_mod.ProgressiveMultiPromptAttack + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator - params = Generator._build_params( + params = generator_cls._build_params( transfer=True, progressive_models=True, progressive_goals=True, @@ -401,7 +398,7 @@ def test_transfer_true_creates_progressive(self) -> None: "MPA": MagicMock(return_value=MagicMock()), } - attack = Generator._create_attack( + attack = generator_cls._create_attack( params=params, managers=managers, train_goals=["goal1"], @@ -418,10 +415,9 @@ def test_transfer_false_creates_individual(self) -> None: "pyrit.auxiliary_attacks.gcg.experiments.train", reason="GCG train module not available", ) - Generator = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator - IndividualPromptAttack = attack_manager_mod.IndividualPromptAttack + generator_cls = train_mod.GreedyCoordinateGradientAdversarialSuffixGenerator - params = Generator._build_params( + params = generator_cls._build_params( transfer=False, control_init="! ! !", result_prefix="test", @@ -442,7 +438,7 @@ def test_transfer_false_creates_individual(self) -> None: "MPA": MagicMock(return_value=MagicMock()), } - attack = Generator._create_attack( + attack = generator_cls._create_attack( params=params, managers=managers, train_goals=["goal1"], @@ -483,7 +479,6 @@ class TestPromptManagerInit: """Tests for PromptManager initialization validation.""" def test_raises_on_mismatched_goals_targets(self) -> None: - PromptManager = attack_manager_mod.PromptManager with pytest.raises(ValueError, match="Length of goals and targets must match"): PromptManager( goals=["goal1", "goal2"], @@ -494,7 +489,6 @@ def test_raises_on_mismatched_goals_targets(self) -> None: ) def test_raises_on_empty_goals(self) -> None: - PromptManager = attack_manager_mod.PromptManager with pytest.raises(ValueError, match="Must provide at least one goal"): PromptManager( goals=[], @@ -509,7 +503,6 @@ class TestEvaluateAttackInit: """Tests for EvaluateAttack initialization validation.""" def test_raises_with_multiple_workers(self) -> None: - EvaluateAttack = attack_manager_mod.EvaluateAttack mock_worker1 = MagicMock() mock_worker1.model.name_or_path = "m1" mock_worker1.tokenizer.name_or_path = "t1"