This repository was archived by the owner on Jun 11, 2025. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathaccess_token.py
More file actions
126 lines (103 loc) · 4.44 KB
/
access_token.py
File metadata and controls
126 lines (103 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from datetime import datetime
from typing import Any, Dict, Generic, Optional, Type
from fastapi_users.authentication.strategy.db import AP, AccessTokenDatabase
from pydantic import UUID4, ConfigDict
from pydantic.version import VERSION as PYDANTIC_VERSION
from sqlalchemy import Column, types
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import Field, Session, SQLModel, select
from fastapi_users_db_sqlmodel.generics import TIMESTAMPAware, now_utc
PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")
class SQLModelBaseAccessToken(SQLModel):
__tablename__ = "accesstoken"
token: str = Field(
sa_column=Column("token", types.String(length=43), primary_key=True)
)
created_at: datetime = Field(
default_factory=now_utc,
sa_column=Column(
"created_at", TIMESTAMPAware(timezone=True), nullable=False, index=True
),
)
user_id: UUID4 = Field(foreign_key="user.id", nullable=False)
if PYDANTIC_V2: # pragma: no cover
model_config = ConfigDict(from_attributes=True) # type: ignore
else: # pragma: no cover
class Config:
orm_mode = True
class SQLModelAccessTokenDatabase(Generic[AP], AccessTokenDatabase[AP]):
"""
Access token database adapter for SQLModel.
:param session: SQLAlchemy session.
:param access_token_model: SQLModel access token model.
"""
def __init__(self, session: Session, access_token_model: Type[AP]):
self.session = session
self.access_token_model = access_token_model
async def get_by_token(
self, token: str, max_age: Optional[datetime] = None
) -> Optional[AP]:
statement = select(self.access_token_model).where( # type: ignore
self.access_token_model.token == token
)
if max_age is not None:
statement = statement.where(self.access_token_model.created_at >= max_age)
results = self.session.execute(statement)
access_token = results.first()
if access_token is None:
return None
return access_token[0]
async def create(self, create_dict: Dict[str, Any]) -> AP:
access_token = self.access_token_model(**create_dict)
self.session.add(access_token)
self.session.commit()
self.session.refresh(access_token)
return access_token
async def update(self, access_token: AP, update_dict: Dict[str, Any]) -> AP:
for key, value in update_dict.items():
setattr(access_token, key, value)
self.session.add(access_token)
self.session.commit()
self.session.refresh(access_token)
return access_token
async def delete(self, access_token: AP) -> None:
self.session.delete(access_token)
self.session.commit()
class SQLModelAccessTokenDatabaseAsync(Generic[AP], AccessTokenDatabase[AP]):
"""
Access token database adapter for SQLModel working purely asynchronously.
:param session: SQLAlchemy async session.
:param access_token_model: SQLModel access token model.
"""
def __init__(self, session: AsyncSession, access_token_model: Type[AP]):
self.session = session
self.access_token_model = access_token_model
async def get_by_token(
self, token: str, max_age: Optional[datetime] = None
) -> Optional[AP]:
statement = select(self.access_token_model).where( # type: ignore
self.access_token_model.token == token
)
if max_age is not None:
statement = statement.where(self.access_token_model.created_at >= max_age)
results = await self.session.execute(statement)
access_token = results.first()
if access_token is None:
return None
return access_token[0]
async def create(self, create_dict: Dict[str, Any]) -> AP:
access_token = self.access_token_model(**create_dict)
self.session.add(access_token)
await self.session.commit()
await self.session.refresh(access_token)
return access_token
async def update(self, access_token: AP, update_dict: Dict[str, Any]) -> AP:
for key, value in update_dict.items():
setattr(access_token, key, value)
self.session.add(access_token)
await self.session.commit()
await self.session.refresh(access_token)
return access_token
async def delete(self, access_token: AP) -> None:
await self.session.delete(access_token)
await self.session.commit()