Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .pyrit_conf_example
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ operation: op_trash_panda
# Applies only to the pyrit_backend server.
max_concurrent_scenario_runs: 3

# Custom Initializer Registration (REST API)
# -------------------------------------------
# When true, the REST API accepts POST /api/initializers to register custom
# initializer scripts and DELETE /api/initializers/{name} to remove any
# initializer.
#
# ⚠️ WARNING: Enabling this allows arbitrary Python code execution on the
# server via the REST API. Only enable on trusted networks.
# The pyrit_backend default host is localhost, which limits exposure.
# If you bind to 0.0.0.0, ensure you are on a trusted network.
#
# Default: false
allow_custom_initializers: false

# Silent Mode
# -----------
# If true, suppresses print statements during initialization.
Expand Down
2 changes: 2 additions & 0 deletions pyrit/backend/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
InitializerParameterSummary,
ListRegisteredInitializersResponse,
RegisteredInitializer,
RegisterInitializerRequest,
)
from pyrit.backend.models.scenarios import (
ListRegisteredScenariosResponse,
Expand Down Expand Up @@ -110,6 +111,7 @@
"InitializerParameterSummary",
"ListRegisteredInitializersResponse",
"RegisteredInitializer",
"RegisterInitializerRequest",
# Targets
"CreateTargetRequest",
"TargetCapabilitiesInfo",
Expand Down
11 changes: 8 additions & 3 deletions pyrit/backend/models/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
before scenario execution. These models represent initializer metadata.
"""

from typing import Optional

from pydantic import BaseModel, Field

from pyrit.backend.models.common import PaginationInfo
Expand All @@ -20,7 +18,7 @@ class InitializerParameterSummary(BaseModel):

name: str = Field(..., description="Parameter name")
description: str = Field(..., description="Human-readable description of the parameter")
default: Optional[list[str]] = Field(None, description="Default value(s), or None if required")
default: list[str] | None = Field(None, description="Default value(s), or None if required")


class RegisteredInitializer(BaseModel):
Expand All @@ -42,3 +40,10 @@ class ListRegisteredInitializersResponse(BaseModel):

items: list[RegisteredInitializer] = Field(..., description="List of initializer summaries")
pagination: PaginationInfo = Field(..., description="Pagination metadata")


class RegisterInitializerRequest(BaseModel):
"""Request body for registering a custom initializer by uploading script content."""

name: str = Field(..., description="Registry name for the initializer (e.g., 'my_custom')")
script_content: str = Field(..., description="Python source code containing a PyRITInitializer subclass")
108 changes: 101 additions & 7 deletions pyrit/backend/routes/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,56 @@
"""
Initializer API routes.

Provides endpoints for listing available initializers and their metadata.
Provides endpoints for listing, registering, and removing initializers.

Route structure:
/api/initializers — list all initializers
/api/initializers/{name} — get single initializer detail
GET /api/initializers — list all initializers
GET /api/initializers/{name} — get single initializer detail
POST /api/initializers — register initializer from script
DELETE /api/initializers/{name} — unregister an initializer
"""

from typing import Optional

from fastapi import APIRouter, HTTPException, Query, status
from fastapi import APIRouter, HTTPException, Query, Request, status

from pyrit.backend.models.common import ProblemDetail
from pyrit.backend.models.initializers import (
ListRegisteredInitializersResponse,
RegisteredInitializer,
RegisterInitializerRequest,
)
from pyrit.backend.services.initializer_service import get_initializer_service

router = APIRouter(prefix="/initializers", tags=["initializers"])


def _check_custom_initializers_allowed(request: Request) -> None:
"""
Check that allow_custom_initializers is enabled on the server.

Args:
request: The incoming FastAPI request.

Raises:
HTTPException: 403 if custom initializer operations are not enabled.
"""
allowed = getattr(request.app.state, "allow_custom_initializers", False)
if not allowed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Custom initializer operations are disabled. "
"Set allow_custom_initializers: true in .pyrit_conf to enable."
),
)


@router.get(
"",
response_model=ListRegisteredInitializersResponse,
)
async def list_initializers(
limit: int = Query(50, ge=1, le=200, description="Maximum items per page"),
cursor: Optional[str] = Query(None, description="Pagination cursor (initializer_name to start after)"),
cursor: str | None = Query(None, description="Pagination cursor (initializer_name to start after)"),
) -> ListRegisteredInitializersResponse:
"""
List all available initializers.
Expand Down Expand Up @@ -73,3 +95,75 @@ async def get_initializer(initializer_name: str) -> RegisteredInitializer:
)

return initializer


@router.post(
"",
response_model=RegisteredInitializer,
status_code=status.HTTP_201_CREATED,
responses={
403: {"model": ProblemDetail, "description": "Custom initializer operations disabled"},
409: {"model": ProblemDetail, "description": "Initializer name already registered"},
},
)
async def register_initializer(
request: Request,
body: RegisterInitializerRequest,
) -> RegisteredInitializer:
"""
Register an initializer by uploading Python source code.

The script must contain a concrete PyRITInitializer subclass.
Requires allow_custom_initializers to be enabled in pyrit_conf.

Args:
request: The incoming FastAPI request.
body: Request body with name and script_content.

Returns:
The newly registered initializer summary.
"""
_check_custom_initializers_allowed(request)
service = get_initializer_service()

try:
return await service.register_initializer_async(name=body.name, script_content=body.script_content)
except ValueError as e:
detail = str(e)
if "already registered" in detail:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=detail) from None
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) from None


@router.delete(
"/{initializer_name}",
status_code=status.HTTP_204_NO_CONTENT,
responses={
403: {"model": ProblemDetail, "description": "Custom initializer operations disabled"},
404: {"model": ProblemDetail, "description": "Initializer not found"},
},
)
async def unregister_initializer(
request: Request,
initializer_name: str,
) -> None:
"""
Remove an initializer from the registry.

Any initializer (built-in or custom) can be removed. Requires
allow_custom_initializers to be enabled in pyrit_conf.

Args:
request: The incoming FastAPI request.
initializer_name: Registry name of the initializer to remove.
"""
_check_custom_initializers_allowed(request)
service = get_initializer_service()

try:
await service.unregister_initializer_async(initializer_name=initializer_name)
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Initializer '{initializer_name}' not found",
) from None
51 changes: 48 additions & 3 deletions pyrit/backend/services/initializer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# Licensed under the MIT license.

"""
Initializer service for listing available initializers.
Initializer service for listing, registering, and removing initializers.

Provides read-only access to the InitializerRegistry, exposing initializer
Provides access to the InitializerRegistry, exposing initializer
metadata through the REST API.
"""

import logging
from functools import lru_cache

from pyrit.backend.models.common import PaginationInfo
Expand All @@ -18,6 +19,8 @@
)
from pyrit.registry import InitializerMetadata, InitializerRegistry

logger = logging.getLogger(__name__)


def _metadata_to_registered_initializer(metadata: InitializerMetadata) -> RegisteredInitializer:
"""
Expand Down Expand Up @@ -47,7 +50,7 @@ def _metadata_to_registered_initializer(metadata: InitializerMetadata) -> Regist

class InitializerService:
"""
Service for listing available initializers.
Service for listing, registering, and removing initializers.

Uses InitializerRegistry as the source of truth for initializer metadata.
"""
Expand Down Expand Up @@ -99,6 +102,48 @@ async def get_initializer_async(self, *, initializer_name: str) -> RegisteredIni
return _metadata_to_registered_initializer(metadata)
return None

async def register_initializer_async(
self,
*,
name: str,
script_content: str,
) -> RegisteredInitializer:
"""
Register an initializer from uploaded Python source code.

Args:
name: Registry name for the new initializer.
script_content: Python source code containing a PyRITInitializer subclass.

Returns:
The newly registered initializer summary.

Raises:
ValueError: If the script is invalid or contains no initializer class.
"""
self._registry.register_from_content(name=name, script_content=script_content)

initializer = await self.get_initializer_async(initializer_name=name)
if not initializer:
raise ValueError(f"Initializer '{name}' was registered but metadata could not be retrieved.")
return initializer

async def unregister_initializer_async(self, *, initializer_name: str) -> None:
"""
Remove an initializer from the registry.

Works for both built-in and custom initializers. If the
initializer was uploaded, its script file is also cleaned up.

Args:
initializer_name: The registry name to remove.

Raises:
KeyError: If the initializer is not registered.
"""
self._registry.unregister_and_cleanup(initializer_name)
logger.info(f"Unregistered initializer: {initializer_name}")

@staticmethod
def _paginate(
*,
Expand Down
2 changes: 2 additions & 0 deletions pyrit/cli/frontend_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(
self._operator = config.operator
self._operation = config.operation
self._max_concurrent_scenario_runs = config.max_concurrent_scenario_runs
self._allow_custom_initializers = config.allow_custom_initializers

# Lazy-loaded registries
self._scenario_registry: Optional[ScenarioRegistry] = None
Expand Down Expand Up @@ -223,6 +224,7 @@ def with_overrides(
derived._operator = self._operator
derived._operation = self._operation
derived._max_concurrent_scenario_runs = self._max_concurrent_scenario_runs
derived._allow_custom_initializers = self._allow_custom_initializers
derived._scenario_config = self._scenario_config

# Apply overrides or inherit
Expand Down
9 changes: 9 additions & 0 deletions pyrit/cli/pyrit_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,17 @@ async def initialize_and_run_async(*, parsed_args: Namespace) -> int:
default_labels["operation"] = context._operation
app.state.default_labels = default_labels
app.state.max_concurrent_scenario_runs = context._max_concurrent_scenario_runs
app.state.allow_custom_initializers = context._allow_custom_initializers

display_host = parsed_args.host
if context._allow_custom_initializers:
print("⚠️ WARNING: Custom initializer registration is ENABLED (allow_custom_initializers: true).")
print(" This allows arbitrary Python code execution via the REST API.")
if parsed_args.host == "0.0.0.0":
print(" 🚨 Server is bound to 0.0.0.0 — accessible from the NETWORK. Use only on trusted networks!")
else:
print(f" Server is bound to {display_host}.")

print(f"🚀 Starting PyRIT backend on http://{display_host}:{parsed_args.port}")
print(f" API Docs: http://{display_host}:{parsed_args.port}/docs")
if parsed_args.host == "0.0.0.0":
Expand Down
17 changes: 17 additions & 0 deletions pyrit/registry/class_registries/base_class_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,23 @@ def register(
self._class_entries[name] = entry
self._metadata_cache = None

def unregister(self, name: str) -> None:
"""
Remove a registered class from the registry.

Args:
name: The registry name of the class to remove.

Raises:
KeyError: If the name is not registered.
"""
self._ensure_discovered()
if name not in self._class_entries:
available = ", ".join(self.get_names())
raise KeyError(f"'{name}' not found in registry. Available: {available}")
del self._class_entries[name]
self._metadata_cache = None

def create_instance(self, name: str, **kwargs: object) -> T:
"""
Create an instance of a registered class.
Expand Down
Loading
Loading