Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions livekit-agents/livekit/agents/llm/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def __init__(
tool_result_resolver=tool_result_resolver,
)
self.url = url
self.headers = headers
self._headers = headers or {}
self._timeout = timeout
self._sse_read_timeout = sse_read_timeout
self._allowed_tools = set(allowed_tools) if allowed_tools else None
Expand All @@ -278,6 +278,37 @@ def __init__(
# Fall back to URL-based detection for backward compatibility
self._use_streamable_http = self._should_use_streamable_http(url)

self._http_client: httpx.AsyncClient | None = None

@property
def headers(self) -> dict[str, Any]:
return self._headers

@headers.setter
def headers(self, headers: dict[str, Any]) -> None:
self._headers = headers
if self._http_client is not None:
self._http_client.headers = headers

def _create_http_client(
self,
headers: dict[str, Any] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
# ported from mcp.shared._httpx_utils.create_mcp_http_client
kwargs: dict[str, Any] = {
"follow_redirects": True,
"timeout": timeout
if timeout is not None
else httpx.Timeout(self._timeout, read=self._sse_read_timeout),
"headers": headers if headers is not None else self._headers,
}
if auth is not None:
kwargs["auth"] = auth
self._http_client = httpx.AsyncClient(**kwargs)
return self._http_client

def _should_use_streamable_http(self, url: str) -> bool:
"""
Determine transport type based on URL path (for backward compatibility).
Expand Down Expand Up @@ -306,10 +337,7 @@ def client_streams(

@asynccontextmanager
async def _streamable_http_with_client(): # type: ignore[no-untyped-def]
async with httpx.AsyncClient(
headers=self.headers or {},
timeout=httpx.Timeout(self._timeout, read=self._sse_read_timeout),
) as http_client:
async with self._create_http_client() as http_client:
async with streamable_http_client(
url=self.url, http_client=http_client
) as streams:
Expand All @@ -319,9 +347,10 @@ async def _streamable_http_with_client(): # type: ignore[no-untyped-def]
else:
return sse_client( # type: ignore[no-any-return]
url=self.url,
headers=self.headers,
headers=self._headers,
timeout=self._timeout,
sse_read_timeout=self._sse_read_timeout,
httpx_client_factory=self._create_http_client,
)

async def list_tools(self) -> list[MCPTool]:
Expand Down
Loading