Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e52d463
update fraud_detection_durable to feb 12 agent-framework
Feb 17, 2026
91369f9
enhance fraud detection durable
Feb 19, 2026
ee3f9f2
update readme
Feb 19, 2026
391b2d3
Updating Durable Agent Implementation (#404)
james-tn Feb 19, 2026
85c574f
chore: reduce anomaly probability to 1% for controlled demo pace
Feb 19, 2026
ea5bc41
Merge branch 'int-agentic' into james-dev
james-tn Feb 19, 2026
2c19358
Reduce anomaly probability to 1% for controlled demo pace (#406)
james-tn Feb 19, 2026
6c8527c
add mcp_agent_demo
Feb 19, 2026
3132693
Merge branch 'james-dev' of https://github.com/microsoft/OpenAIWorksh…
Feb 19, 2026
a5394be
Add hybrid MCP server (strict-schema + natural-language tools) and ty…
Feb 19, 2026
3eac2c3
Replace workflow_local_remote.py with simplified proxy agent (Script 3)
Feb 20, 2026
72ad453
Clean up MCP demos: remove comparison commentary, drop typed-contracts
Feb 20, 2026
7487b66
Add LangGraph + MAF GroupChat cross-framework demo (Scripts 8-9)
Feb 20, 2026
e93bc7c
MCP agent demo: rewrite README as MCP-vs-A2A thesis, delete proxy age…
Feb 20, 2026
c4af733
README: Mermaid diagrams, professional tone, remove LinkedIn teasing …
Feb 20, 2026
8dfcd4c
README: reframe around multi-framework interop problem, two design pa…
Feb 20, 2026
b72c14b
README: fix Mermaid diagrams - use br tags instead of \n for line breaks
Feb 20, 2026
5515120
README: fix A2A comparison table - elicitation supported, structured …
Feb 20, 2026
23837d8
Upgrade agent-framework to 1.0.0rc1 and fastmcp to 3.0.2
Feb 23, 2026
82bfbee
Fix CI: regenerate requirements.txt with agent-framework-core==1.0.0rc1
Feb 23, 2026
0cd61f9
chore: merge james-dev into int-agentic (auto)
james-tn Feb 23, 2026
640c3d3
Fix: use GH_PAT for auto-merge to trigger downstream workflows
Feb 23, 2026
c081130
Add workflow_dispatch trigger to promote-to-main
Feb 23, 2026
07af66c
Merge branch 'int-a
Feb 23, 2026
ae60f8a
Merge remote-tracking branch 'origin/main' into int-agentic
Feb 23, 2026
87a77db
Fix: Cosmos DB auth + remove phantom reflection_workflow_agent
Feb 23, 2026
cbadc7d
chore: merge james-dev into int-agentic (auto)
james-tn Feb 23, 2026
d9ef29c
Fix promote-to-main + auto-destroy dev environments
Feb 23, 2026
c7c92e6
Fix promote-to-main + auto-destroy dev environments
Feb 23, 2026
50efa21
Trigger rebuild: deploy Cosmos DB auth fix and agent list cleanup
Feb 23, 2026
cc79a16
Fix: iterate ResponseStream directly, not .updates
Feb 23, 2026
bd24124
Fix: make integration test failures block the pipeline
Feb 23, 2026
e6e5e7e
Fix: use SHA-tagged images for container deployment, not :latest
Feb 23, 2026
d8d49ef
Skip MCP localhost check in CI mode
Feb 23, 2026
6b0abc4
Fix: auto-merge creates PR if none exists
Feb 23, 2026
22cd10b
Merge remote-tracking branch 'origin/james-dev' into int-agentic
Feb 23, 2026
13fd93f
Merge remote-tracking branch 'origin/main' into int-agentic
Feb 23, 2026
add8833
chore: merge james-dev into int-agentic (auto)
james-tn Feb 24, 2026
e203077
Merge remote-tracking branch 'origin/main' into int-agentic
Feb 24, 2026
ee02809
chore: merge james-dev into int-agentic (auto)
james-tn Feb 24, 2026
8dc4983
Migrate workshop to agent-framework==1.2.1 (incl. native HandoffBuild…
Copilot Apr 28, 2026
931ca05
docs: Remove misleading RequestInfoExecutor references (#418)
ericchansen Apr 28, 2026
870b1f1
Replace custom DictCheckpointStorage with built-in 1.2.1 storages
Copilot Apr 28, 2026
7073916
tests: use asyncio.run() instead of new/get_event_loop()
Copilot Apr 28, 2026
a3a5035
Fix backend 500s by renaming ChatOptions(model_id=…) to ChatOptions(m…
Copilot Apr 28, 2026
ce1afd2
chore: resolve merge conflict - keep model= fix from int-agentic over…
Copilot Apr 28, 2026
95e7ac5
Merge int-agentic; rename ChatOptions(model_id=...) to ChatOptions(mo…
Copilot Apr 28, 2026
c40a5a9
Fix wrong-key regression test and managed-identity gating in magentic…
Copilot Apr 28, 2026
e8ee036
Fix integration test 500: pass Azure deployment (not OPENAI_MODEL_NAM…
Copilot Apr 28, 2026
4349eba
Merge remote-tracking branch 'origin/int-agentic' into copilot/review…
Apr 28, 2026
754dbb6
Merge remote-tracking branch 'origin/main' into copilot/review-checkp…
Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion agentic_ai/agents/agent_framework/STATE_MANAGEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,43 @@ Executors can include arbitrary JSON-friendly payloads in their `snapshot_state`

## 6. External checkpoint storage implementations

### 6.1 Redis-backed CheckpointStorage
Starting with `agent-framework` 1.2.1 the framework ships ready-made
`CheckpointStorage` implementations, so most workshops should use those
directly rather than rolling their own. The samples below are kept for
reference / advanced customization.

### 6.0 Built-in storages (recommended)

```python
# In-process (default) – great for tests and single-process demos.
from agent_framework import InMemoryCheckpointStorage
storage = InMemoryCheckpointStorage()

# JSON-on-disk – atomic writes, path-traversal protection, type-restricted
# pickle deserialization for safety.
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage("/var/lib/workshop/checkpoints")

# Durable, partitioned by workflow_name. Auto-creates the database and
# container on first use; supports DefaultAzureCredential or an account key.
from agent_framework_azure_cosmos import CosmosCheckpointStorage
from azure.identity.aio import DefaultAzureCredential
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
)
```

In this workshop the multi-agent agents pick a storage via
`agentic_ai/agents/agent_framework/multi_agent/checkpoint_storage.py` based on
the `WORKFLOW_CHECKPOINT_BACKEND` environment variable
(`memory` (default) | `file` | `cosmos`). For `file` the location can be
overridden with `WORKFLOW_CHECKPOINT_DIR`; for `cosmos` the standard
`AZURE_COSMOS_*` environment variables apply.

### 6.1 Custom Redis-backed CheckpointStorage

```python
import json
Expand Down Expand Up @@ -306,6 +342,11 @@ class RedisCheckpointStorage(CheckpointStorage):

### 6.2 Azure Cosmos DB CheckpointStorage

> Prefer the built-in `agent_framework_azure_cosmos.CosmosCheckpointStorage`
> shown in §6.0 for production deployments. The custom implementation
> below is kept for reference if you need to integrate with an existing
> Cosmos schema.

```python
from azure.cosmos.aio import CosmosClient
from agent_framework._workflow._checkpoint import CheckpointStorage, WorkflowCheckpoint
Expand Down
208 changes: 208 additions & 0 deletions agentic_ai/agents/agent_framework/multi_agent/checkpoint_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""Checkpoint storage factory for multi-agent workflows.

This module exposes a single ``create_checkpoint_storage`` helper that returns
a ``CheckpointStorage`` instance using the storages shipped with
``agent-framework`` 1.2.1:

* ``InMemoryCheckpointStorage`` — default; in-process, lost on restart.
* ``FileCheckpointStorage`` — JSON-on-disk with atomic writes and
path-traversal protection.
* ``CosmosCheckpointStorage`` — durable, partitioned by ``workflow_name``,
shipped in ``agent_framework_azure_cosmos``.

Selection is driven by the ``WORKFLOW_CHECKPOINT_BACKEND`` environment
variable (``memory`` | ``file`` | ``cosmos``) so deployments can opt into
durable checkpointing without touching agent code.

Storage instances are cached per (backend, session) tuple so successive
agent invocations within the same process share the same in-memory state
(matching the behaviour of the previous hand-rolled ``DictCheckpointStorage``).

Helpers:

* ``prune_checkpoints`` — bound the number of saved checkpoints per workflow
using only the public ``CheckpointStorage`` protocol, replacing the
``_RETENTION`` cap that lived inside the old custom storage classes.
"""

from __future__ import annotations

import logging
import os
from threading import Lock
from typing import Any, Dict, Optional, Tuple

from agent_framework import CheckpointStorage, FileCheckpointStorage, InMemoryCheckpointStorage

logger = logging.getLogger(__name__)


_BACKEND_ENV = "WORKFLOW_CHECKPOINT_BACKEND"
_FILE_DIR_ENV = "WORKFLOW_CHECKPOINT_DIR"
_DEFAULT_FILE_DIR = ".checkpoints"

_storage_cache: Dict[Tuple[str, str], CheckpointStorage] = {}
_cache_lock = Lock()


def _resolve_backend() -> str:
backend = (os.getenv(_BACKEND_ENV) or "memory").strip().lower()
if backend not in {"memory", "file", "cosmos"}:
logger.warning(
"Unknown %s=%r; falling back to 'memory'. Allowed values: memory, file, cosmos.",
_BACKEND_ENV,
backend,
)
backend = "memory"
return backend


def _build_file_storage(session_id: str) -> CheckpointStorage:
base_dir = os.getenv(_FILE_DIR_ENV) or _DEFAULT_FILE_DIR
# Scope per session so concurrent sessions cannot accidentally read each
# other's checkpoint files. Session IDs are sanitized by collapsing any
# path-traversal characters before joining.
safe_session = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in session_id)
storage_path = os.path.join(base_dir, safe_session)
return FileCheckpointStorage(storage_path)


def _build_cosmos_storage() -> CheckpointStorage:
# Imported lazily so the cosmos extra is only required when actually used.
try:
from agent_framework_azure_cosmos import CosmosCheckpointStorage
except ImportError as exc: # pragma: no cover - defensive
raise RuntimeError(
"WORKFLOW_CHECKPOINT_BACKEND=cosmos requires the "
"'agent-framework-azure-cosmos' package to be installed."
) from exc

# Try managed-identity first when no AZURE_COSMOS_KEY is configured. The
# CosmosCheckpointStorage already reads endpoint / database / container /
# key from AZURE_COSMOS_* environment variables, so we only need to
# supply a credential for the keyless case.
if os.getenv("AZURE_COSMOS_KEY"):
return CosmosCheckpointStorage()

try:
from azure.identity.aio import DefaultAzureCredential
except ImportError as exc: # pragma: no cover - defensive
raise RuntimeError(
"WORKFLOW_CHECKPOINT_BACKEND=cosmos without AZURE_COSMOS_KEY "
"requires 'azure-identity' to be installed for managed-identity auth."
) from exc

return CosmosCheckpointStorage(credential=DefaultAzureCredential())


def create_checkpoint_storage(session_id: str) -> CheckpointStorage:
"""Return a per-session ``CheckpointStorage`` from configuration.

Args:
session_id: Used to scope file-backed storage to a per-session
directory and to key the in-process cache so successive calls
within the same process share state.

Returns:
A storage instance compatible with the 1.2.x ``CheckpointStorage``
protocol.
"""
backend = _resolve_backend()
cache_key = (backend, session_id)

with _cache_lock:
existing = _storage_cache.get(cache_key)
if existing is not None:
return existing

if backend == "file":
storage: CheckpointStorage = _build_file_storage(session_id)
elif backend == "cosmos":
storage = _build_cosmos_storage()
else:
storage = InMemoryCheckpointStorage()

_storage_cache[cache_key] = storage
logger.info("Created %s checkpoint storage for session=%s", backend, session_id)
return storage


def reset_storage_cache() -> None:
"""Clear the in-process storage cache. Intended for tests."""
with _cache_lock:
_storage_cache.clear()


async def prune_checkpoints(
storage: CheckpointStorage,
workflow_name: str,
*,
retain: int,
) -> None:
"""Bound the number of checkpoints retained for ``workflow_name``.

Only the most recent ``retain`` checkpoints (by ``timestamp``) are kept;
older ones are deleted via the public ``CheckpointStorage.delete`` method.
Failures are logged and swallowed so checkpoint hygiene cannot break a
chat turn.
"""
if retain <= 0:
return
try:
checkpoints = await storage.list_checkpoints(workflow_name=workflow_name)
except Exception as exc: # pragma: no cover - defensive
logger.debug("Unable to list checkpoints for pruning (%s): %s", workflow_name, exc)
return

if len(checkpoints) <= retain:
return

# Most-recent-first ordering using the timestamp field of WorkflowCheckpoint.
checkpoints.sort(key=lambda cp: getattr(cp, "timestamp", "") or "", reverse=True)
for stale in checkpoints[retain:]:
try:
await storage.delete(stale.checkpoint_id)
except Exception as exc: # pragma: no cover - defensive
logger.debug("Failed to prune checkpoint %s: %s", stale.checkpoint_id, exc)


async def purge_checkpoints(storage: CheckpointStorage, workflow_name: Optional[str]) -> None:
"""Delete every checkpoint for ``workflow_name`` using the public protocol.

No-ops when ``workflow_name`` is missing (the protocol cannot enumerate
across workflows).
"""
if not workflow_name:
return
try:
ids = await storage.list_checkpoint_ids(workflow_name=workflow_name)
except Exception as exc: # pragma: no cover - defensive
logger.debug("Unable to list checkpoint ids for purge (%s): %s", workflow_name, exc)
return
for checkpoint_id in ids:
try:
await storage.delete(checkpoint_id)
except Exception as exc: # pragma: no cover - defensive
logger.debug("Failed to delete checkpoint %s during purge: %s", checkpoint_id, exc)


__all__ = [
"create_checkpoint_storage",
"prune_checkpoints",
"purge_checkpoints",
"reset_storage_cache",
]


def _coerce_checkpoint_storage(candidate: Any) -> Optional[CheckpointStorage]:
"""Validate that ``candidate`` looks like a ``CheckpointStorage`` instance.

Used by callers that accept storage overrides from configuration so that
test doubles can be substituted without inheriting from the protocol class.
"""
if candidate is None:
return None
for method_name in ("save", "load", "delete", "get_latest", "list_checkpoints", "list_checkpoint_ids"):
if not callable(getattr(candidate, method_name, None)):
return None
return candidate # type: ignore[return-value]
Loading
Loading