Skip to content
This repository was archived by the owner on Jun 12, 2021. It is now read-only.

Commit 53bb77d

Browse files
committed
Added an endpoint that supports RFC7662.
1 parent f62064f commit 53bb77d

4 files changed

Lines changed: 251 additions & 1 deletion

File tree

src/oidcendpoint/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
except ImportError:
66
import random as rnd
77

8-
__version__ = '0.8.4'
8+
__version__ = '0.8.5'
99

1010

1111
DEF_SIGN_ALG = {"id_token": "RS256",

src/oidcendpoint/oauth2/__init__.py

Whitespace-only changes.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Implements RFC7662"""
2+
import logging
3+
4+
from cryptojwt import JWT
5+
from oidcmsg import oauth2
6+
from oidcmsg.time_util import utc_time_sans_frac
7+
8+
from oidcendpoint.client_authn import verify_client
9+
from oidcendpoint.endpoint import Endpoint
10+
11+
LOGGER = logging.getLogger(__name__)
12+
13+
14+
class Introspection(Endpoint):
15+
request_cls = oauth2.TokenIntrospectionRequest
16+
response_cls = oauth2.TokenIntrospectionResponse
17+
request_format = 'urlencoded'
18+
response_format = 'json'
19+
endpoint_name = 'introspection'
20+
21+
def do_response(self, response_args=None, request=None, **kwargs):
22+
"""Construct an Introspection response.
23+
24+
:param response_args:
25+
:param request:
26+
:param kwargs: request arguments
27+
:return: Response information
28+
"""
29+
info = {
30+
'response': response_args.to_json(),
31+
'http_headers': [('Content-type', 'application/json')]
32+
}
33+
34+
return info
35+
36+
def client_authentication(self, request, auth=None, **kwargs):
37+
"""
38+
Deal with client authentication
39+
40+
:param request: The introspection request
41+
:param auth: Client authentication information
42+
:param kwargs: Extra keyword arguments
43+
:return: dictionary containing client id, client authentication method.
44+
"""
45+
46+
try:
47+
auth_info = verify_client(self.endpoint_context, request, auth)
48+
except Exception as err:
49+
msg = "Failed to verify client due to: {}".format(err)
50+
LOGGER.error(msg)
51+
return self.error_cls(error="unauthorized_client",
52+
error_description=msg)
53+
else:
54+
if 'client_id' not in auth_info:
55+
LOGGER.error('No client_id, authentication failed')
56+
return self.error_cls(error="unauthorized_client",
57+
error_description='unknown client')
58+
59+
return auth_info
60+
61+
def process_request(self, request_info=None, **kwargs):
62+
"""
63+
64+
:param request_info: The authorization request as a dictionary
65+
:param kwargs:
66+
:return:
67+
"""
68+
_introspect_request = self.request_cls(**request_info)
69+
70+
_jwt = JWT(key_jar=self.endpoint_context.keyjar)
71+
72+
try:
73+
_jwt_info = _jwt.unpack(_introspect_request['token'])
74+
except Exception:
75+
return {'response': {'active': False}}
76+
77+
# expired ?
78+
if 'exp' in _jwt_info:
79+
now = utc_time_sans_frac()
80+
if _jwt_info['exp'] < now:
81+
return {'response': {'active': False}}
82+
83+
if 'release' in self.kwargs:
84+
if 'username' in self.kwargs['release']:
85+
try:
86+
_jwt_info['username'] = self.endpoint_context.userinfo.search(sub=_jwt_info['sub'])
87+
except KeyError:
88+
return {'response': {'active': False}}
89+
90+
_resp = self.response_cls(**_jwt_info)
91+
_resp.weed()
92+
_resp['active'] = True
93+
94+
return {'response_args': _resp}

tests/test_31_introspection.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import json
2+
import os
3+
4+
import pytest
5+
from cryptojwt import JWT
6+
from oidcmsg.oauth2 import TokenIntrospectionRequest
7+
from oidcmsg.oidc import AuthorizationRequest
8+
9+
from oidcendpoint.client_authn import verify_client
10+
from oidcendpoint.endpoint_context import EndpointContext
11+
from oidcendpoint.oauth2.introspection import Introspection
12+
from oidcendpoint.oidc.authorization import Authorization
13+
from oidcendpoint.session import setup_session
14+
from oidcendpoint.user_authn.authn_context import INTERNETPROTOCOLPASSWORD
15+
from oidcendpoint.user_info import UserInfo
16+
17+
KEYDEFS = [
18+
{"type": "RSA", "key": '', "use": ["sig"]},
19+
{"type": "EC", "crv": "P-256", "use": ["sig"]}
20+
]
21+
22+
RESPONSE_TYPES_SUPPORTED = [
23+
["code"], ["token"], ["id_token"], ["code", "token"], ["code", "id_token"],
24+
["id_token", "token"], ["code", "token", "id_token"], ['none']]
25+
26+
CAPABILITIES = {
27+
"response_types_supported": [" ".join(x) for x in RESPONSE_TYPES_SUPPORTED],
28+
"token_endpoint_auth_methods_supported": [
29+
"client_secret_post", "client_secret_basic",
30+
"client_secret_jwt", "private_key_jwt"],
31+
"response_modes_supported": ['query', 'fragment', 'form_post'],
32+
"subject_types_supported": ["public", "pairwise"],
33+
"grant_types_supported": [
34+
"authorization_code", "implicit",
35+
"urn:ietf:params:oauth:grant-type:jwt-bearer", "refresh_token"],
36+
"claim_types_supported": ["normal", "aggregated", "distributed"],
37+
"claims_parameter_supported": True,
38+
"request_parameter_supported": True,
39+
"request_uri_parameter_supported": True,
40+
}
41+
42+
AUTH_REQ = AuthorizationRequest(client_id='client_1',
43+
redirect_uri='https://example.com/cb',
44+
scope=['openid'],
45+
state='STATE',
46+
response_type='code id_token')
47+
48+
BASEDIR = os.path.abspath(os.path.dirname(__file__))
49+
50+
51+
def full_path(local_file):
52+
return os.path.join(BASEDIR, local_file)
53+
54+
55+
class TestEndpoint(object):
56+
@pytest.fixture(autouse=True)
57+
def create_endpoint(self):
58+
conf = {
59+
"issuer": "https://example.com/",
60+
"password": "mycket hemligt",
61+
"token_expires_in": 600,
62+
"grant_expires_in": 300,
63+
"refresh_token_expires_in": 86400,
64+
"verify_ssl": False,
65+
"capabilities": CAPABILITIES,
66+
"jwks": {
67+
'uri_path': 'jwks.json',
68+
'key_defs': KEYDEFS,
69+
},
70+
'endpoint': {
71+
'authorization': {
72+
'path': '{}/authorization',
73+
'class': Authorization,
74+
'kwargs': {}
75+
},
76+
'introspection': {
77+
'path': '{}/intro',
78+
'class': Introspection,
79+
'kwargs': {
80+
"release": ['username']
81+
}
82+
}
83+
},
84+
"authentication": {
85+
'anon': {
86+
'acr': INTERNETPROTOCOLPASSWORD,
87+
'class': 'oidcendpoint.user_authn.user.NoAuthn',
88+
'kwargs': {'user': 'diana'}
89+
}
90+
},
91+
'userinfo': {
92+
'path': '{}/userinfo',
93+
'class': UserInfo,
94+
'kwargs': {'db_file': 'users.json'}
95+
},
96+
'client_authn': verify_client,
97+
'template_dir': 'template'
98+
}
99+
endpoint_context = EndpointContext(conf)
100+
endpoint_context.cdb['client_1'] = {
101+
"client_secret": 'hemligt',
102+
"redirect_uris": [("https://example.com/cb", None)],
103+
"client_salt": "salted",
104+
'token_endpoint_auth_method': 'client_secret_post',
105+
'response_types': ['code', 'token', 'code id_token', 'id_token']
106+
}
107+
endpoint_context.keyjar.import_jwks_as_json(
108+
endpoint_context.keyjar.export_jwks_as_json(private=True),
109+
endpoint_context.issuer)
110+
self.introspection_endpoint = endpoint_context.endpoint['introspection']
111+
112+
def _create_jwt(self, uid, lifetime=0, with_jti=False):
113+
_jwt = JWT(self.introspection_endpoint.endpoint_context.keyjar,
114+
iss=self.introspection_endpoint.endpoint_context.issuer,
115+
lifetime=lifetime)
116+
117+
if with_jti:
118+
_jwt.with_jti = with_jti
119+
120+
_info = self.introspection_endpoint.endpoint_context.userinfo.db[uid]
121+
_payload = {'sub': _info['sub']}
122+
return _jwt.pack(_payload, aud='client_1')
123+
124+
def test_parse(self):
125+
_ = setup_session(self.introspection_endpoint.endpoint_context,
126+
AUTH_REQ, uid='diana')
127+
_token = self._create_jwt('diana')
128+
_req = self.introspection_endpoint.parse_request({"token": _token})
129+
130+
assert isinstance(_req, TokenIntrospectionRequest)
131+
assert set(_req.keys()) == {"token"}
132+
133+
def test_process_request(self):
134+
_ = setup_session(self.introspection_endpoint.endpoint_context,
135+
AUTH_REQ, uid='diana')
136+
_token = self._create_jwt('diana', lifetime=6000)
137+
_req = self.introspection_endpoint.parse_request({"token": _token})
138+
_resp = self.introspection_endpoint.process_request(_req)
139+
140+
assert _resp
141+
assert set(_resp.keys()) == {'response_args'}
142+
143+
def test_do_response(self):
144+
_ = setup_session(self.introspection_endpoint.endpoint_context,
145+
AUTH_REQ, uid='diana')
146+
_token = self._create_jwt('diana', lifetime=6000, with_jti=True)
147+
_req = self.introspection_endpoint.parse_request({"token": _token})
148+
_resp = self.introspection_endpoint.process_request(_req)
149+
msg_info = self.introspection_endpoint.do_response(request=_req, **_resp)
150+
assert isinstance(msg_info, dict)
151+
assert set(msg_info.keys()) == {'response', 'http_headers'}
152+
assert msg_info['http_headers'] == [('Content-type', 'application/json')]
153+
_payload = json.loads(msg_info['response'])
154+
assert set(_payload.keys()) == {'sub', 'username', 'exp', 'iat', 'aud',
155+
'active', 'iss', 'jti'}
156+
assert _payload['active'] == True

0 commit comments

Comments
 (0)