-
Notifications
You must be signed in to change notification settings - Fork 388
Expand file tree
/
Copy pathsession_token_test.py
More file actions
106 lines (74 loc) · 4.35 KB
/
session_token_test.py
File metadata and controls
106 lines (74 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from shopify import session_token
from test.test_helper import TestCase
from datetime import datetime, timedelta
import jwt
def timestamp(date):
return date.timestamp()
class TestSessionTokenGetDecodedSessionToken(TestCase):
@classmethod
def setUpClass(self):
self.secret = "API Secret"
self.api_key = "API key"
@classmethod
def setUp(self):
current_time = datetime.now()
self.payload = {
"iss": "https://test-shop.myshopify.com/admin",
"dest": "https://test-shop.myshopify.com",
"aud": self.api_key,
"sub": "1",
"exp": timestamp((current_time + timedelta(0, 60))),
"nbf": timestamp(current_time),
"iat": timestamp(current_time),
"jti": "4321",
"sid": "abc123",
}
@classmethod
def build_auth_header(self):
mock_session_token = jwt.encode(self.payload, self.secret, algorithm="HS256")
return "Bearer {session_token}".format(session_token=mock_session_token)
def test_raises_if_token_authentication_header_is_not_bearer(self):
authorization_header = "Bad auth header"
with self.assertRaises(session_token.TokenAuthenticationError) as cm:
session_token.decode_from_header(authorization_header, api_key=self.api_key, secret=self.secret)
self.assertEqual("The HTTP_AUTHORIZATION_HEADER provided does not contain a Bearer token", str(cm.exception))
def test_raises_jwt_error_if_session_token_is_expired(self):
self.payload["exp"] = timestamp((datetime.now() + timedelta(0, -11)))
with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
self.assertEqual("Signature has expired", str(cm.exception))
def test_raises_jwt_error_if_invalid_alg(self):
bad_session_token = jwt.encode(self.payload, None, algorithm="none")
invalid_header = "Bearer {session_token}".format(session_token=bad_session_token)
with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(invalid_header, api_key=self.api_key, secret=self.secret)
self.assertEqual("The specified alg value is not allowed", str(cm.exception))
def test_raises_jwt_error_if_invalid_signature(self):
bad_session_token = jwt.encode(self.payload, "bad_secret", algorithm="HS256")
invalid_header = "Bearer {session_token}".format(session_token=bad_session_token)
with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(invalid_header, api_key=self.api_key, secret=self.secret)
self.assertEqual("Signature verification failed", str(cm.exception))
def test_raises_if_aud_doesnt_match_api_key(self):
self.payload["aud"] = "bad audience"
with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
self.assertEqual("Audience doesn't match", str(cm.exception))
def test_raises_if_issuer_hostname_is_invalid(self):
self.payload["iss"] = "bad_shop_hostname"
with self.assertRaises(session_token.InvalidIssuerError) as cm:
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
self.assertEqual("Invalid issuer", str(cm.exception))
def test_raises_if_iss_and_dest_dont_match(self):
self.payload["dest"] = "bad_shop.myshopify.com"
with self.assertRaises(session_token.MismatchedHostsError) as cm:
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)
self.assertEqual("The issuer and destination do not match", str(cm.exception))
def test_returns_decoded_payload(self):
decoded_payload = session_token.decode_from_header(
self.build_auth_header(), api_key=self.api_key, secret=self.secret
)
self.assertEqual(self.payload, decoded_payload)
def test_allow_10_seconds_clock_skew_in_nbf(self):
self.payload["nbf"] = timestamp((datetime.now() + timedelta(seconds=10)))
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)