-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathauth.py
More file actions
144 lines (115 loc) · 4.53 KB
/
auth.py
File metadata and controls
144 lines (115 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
from fastapi import Depends, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlowPassword
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer
from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt
from passlib.context import CryptContext
from .. import models, schemas
from ..config.config import settings
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 30 minutes
ALGORITHM = "HS256"
class OAuth2PasswordBearerWithCookie(OAuth2):
"""
Class used to get Authorization token from request HttpOnly cookie instead of
header. Used to refresh token during SSO login process.
"""
def __init__(
self,
tokenUrl: str,
scheme_name: str | None = None,
scopes: dict[str, str] | None = None,
description: str | None = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
password=OAuthFlowPassword(tokenUrl=tokenUrl, scopes=scopes)
)
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)
async def __call__(self, request: Request) -> str | None:
authorization = request.cookies.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
oauth2_scheme_with_cookies = OAuth2PasswordBearerWithCookie(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_hashed_password(password: str) -> str:
return password_context.hash(password)
def verify_password(password: str, hashed_pass: str) -> bool:
return password_context.verify(password, hashed_pass)
async def authenticate_user(email: str, password: str) -> models.User | None:
user = await models.User.find_one({"email": email})
if not user:
return None
if user.hashed_password is None or not verify_password(
password, user.hashed_password
):
return None
return user
def create_access_token(subject: str | Any, expires_delta: timedelta | None = None):
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
return await _get_current_user(token)
async def get_current_user_from_cookie(
token: str = Depends(oauth2_scheme_with_cookies),
):
return await _get_current_user(token)
async def _get_current_user(token):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
userid: UUID | None = payload.get("sub")
if userid is None:
raise credentials_exception
token_data = schemas.TokenPayload(uuid=userid)
except JWTError:
raise credentials_exception
user = await models.User.find_one({"uuid": token_data.uuid})
if user is None:
raise credentials_exception
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_superuser:
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user