|
5 | 5 |
|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
| 8 | +import dataclasses |
8 | 9 | import json |
9 | 10 | import time |
10 | 11 | from collections.abc import Callable |
|
24 | 25 | http_oauth_metadata, |
25 | 26 | jwt_authenticate, |
26 | 27 | make_sync_client, |
| 28 | + parse_client_id, |
27 | 29 | parse_resource_metadata_url, |
28 | 30 | ) |
29 | 31 |
|
@@ -120,6 +122,8 @@ def authenticate(req: falcon.Request) -> AuthContext: |
120 | 122 | resource_name="Test Service", |
121 | 123 | ) |
122 | 124 |
|
| 125 | +_METADATA_WITH_CLIENT_ID = dataclasses.replace(_METADATA, client_id="my-client-id") |
| 126 | + |
123 | 127 |
|
124 | 128 | # --------------------------------------------------------------------------- |
125 | 129 | # TestOAuthResourceMetadata |
@@ -183,6 +187,7 @@ def test_well_known_omits_default_fields(self) -> None: |
183 | 187 | assert "scopes_supported" not in d |
184 | 188 | assert "bearer_methods_supported" not in d |
185 | 189 | assert "resource_name" not in d |
| 190 | + assert "client_id" not in d |
186 | 191 |
|
187 | 192 | def test_well_known_exempt_from_auth(self) -> None: |
188 | 193 | """Well-known endpoint is accessible even with auth enabled.""" |
@@ -287,6 +292,91 @@ def test_parse_resource_metadata_url_missing(self) -> None: |
287 | 292 | assert parse_resource_metadata_url("Basic realm=test") is None |
288 | 293 | assert parse_resource_metadata_url("") is None |
289 | 294 |
|
| 295 | + def test_client_id_rejects_unsafe_characters(self) -> None: |
| 296 | + """client_id with non-URL-safe characters raises ValueError.""" |
| 297 | + with pytest.raises(ValueError, match="URL-safe"): |
| 298 | + OAuthResourceMetadata( |
| 299 | + resource="https://example.com/vgi", |
| 300 | + authorization_servers=("https://auth.example.com",), |
| 301 | + client_id='bad"id', |
| 302 | + ) |
| 303 | + with pytest.raises(ValueError, match="URL-safe"): |
| 304 | + OAuthResourceMetadata( |
| 305 | + resource="https://example.com/vgi", |
| 306 | + authorization_servers=("https://auth.example.com",), |
| 307 | + client_id="has space", |
| 308 | + ) |
| 309 | + |
| 310 | + def test_client_id_in_well_known_json(self) -> None: |
| 311 | + """client_id appears in well-known JSON when set.""" |
| 312 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 313 | + client = make_sync_client(server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_CLIENT_ID) |
| 314 | + resp = client.get("/.well-known/oauth-protected-resource") |
| 315 | + body = json.loads(resp.content) |
| 316 | + assert body["client_id"] == "my-client-id" |
| 317 | + |
| 318 | + def test_client_id_in_www_authenticate(self) -> None: |
| 319 | + """client_id appears in WWW-Authenticate header when metadata has client_id.""" |
| 320 | + _priv, pub = _make_rsa_key() |
| 321 | + auth_fn = _make_local_auth(pub) |
| 322 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 323 | + client = make_sync_client( |
| 324 | + server, |
| 325 | + signing_key=b"k", |
| 326 | + authenticate=auth_fn, |
| 327 | + oauth_resource_metadata=_METADATA_WITH_CLIENT_ID, |
| 328 | + ) |
| 329 | + resp = client.post( |
| 330 | + "/vgi/echo", |
| 331 | + content=b"garbage", |
| 332 | + headers={"Content-Type": "application/octet-stream"}, |
| 333 | + ) |
| 334 | + assert resp.status_code == 401 |
| 335 | + www_auth = resp.headers.get("www-authenticate", "") |
| 336 | + assert 'client_id="my-client-id"' in www_auth |
| 337 | + |
| 338 | + def test_client_id_absent_from_www_authenticate(self) -> None: |
| 339 | + """client_id absent from WWW-Authenticate when metadata has no client_id.""" |
| 340 | + _priv, pub = _make_rsa_key() |
| 341 | + auth_fn = _make_local_auth(pub) |
| 342 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 343 | + client = make_sync_client( |
| 344 | + server, |
| 345 | + signing_key=b"k", |
| 346 | + authenticate=auth_fn, |
| 347 | + oauth_resource_metadata=_METADATA, |
| 348 | + ) |
| 349 | + resp = client.post( |
| 350 | + "/vgi/echo", |
| 351 | + content=b"garbage", |
| 352 | + headers={"Content-Type": "application/octet-stream"}, |
| 353 | + ) |
| 354 | + assert resp.status_code == 401 |
| 355 | + www_auth = resp.headers.get("www-authenticate", "") |
| 356 | + assert "client_id" not in www_auth |
| 357 | + |
| 358 | + def test_parse_client_id_extracts_value(self) -> None: |
| 359 | + """parse_client_id() extracts value from header.""" |
| 360 | + header = ( |
| 361 | + 'Bearer resource_metadata="https://example.com/.well-known/oauth-protected-resource/vgi"' |
| 362 | + ', client_id="my-app"' |
| 363 | + ) |
| 364 | + assert parse_client_id(header) == "my-app" |
| 365 | + |
| 366 | + def test_parse_client_id_returns_none_when_absent(self) -> None: |
| 367 | + """parse_client_id() returns None when not present.""" |
| 368 | + assert parse_client_id("Bearer") is None |
| 369 | + assert parse_client_id('Bearer resource_metadata="https://example.com"') is None |
| 370 | + assert parse_client_id("") is None |
| 371 | + |
| 372 | + def test_client_discovery_round_trip_with_client_id(self) -> None: |
| 373 | + """Client discovers client_id set on server.""" |
| 374 | + server = RpcServer(_EchoService, _EchoImpl()) |
| 375 | + client = make_sync_client(server, signing_key=b"k", oauth_resource_metadata=_METADATA_WITH_CLIENT_ID) |
| 376 | + meta = http_oauth_metadata(client=client) |
| 377 | + assert meta is not None |
| 378 | + assert meta.client_id == "my-client-id" |
| 379 | + |
290 | 380 | def test_401_discovery_flow(self) -> None: |
291 | 381 | """Full 401-based discovery: get 401, parse header, fetch metadata.""" |
292 | 382 | _priv, pub = _make_rsa_key() |
|
0 commit comments