-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
177 lines (144 loc) · 4.88 KB
/
main.py
File metadata and controls
177 lines (144 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from fastapi import FastAPI, Depends, HTTPException, status, Security
from fastapi.openapi.utils import get_openapi
from fastapi.security.api_key import APIKeyHeader
from starlette.middleware.cors import CORSMiddleware
# from controller import db_controller
from config.log_config import create_log
import yaml
from injector import logger
from job.job import create_job
import asyncio
from contextlib import asynccontextmanager
logger = create_log()
# DB 연결 객체와 세션 관리 변수
database_engine = None
db_session = None
API_TOKEN = "1234"
auth_header = APIKeyHeader(name="Authorization", auto_error=False)
async def create_task():
logger.info("-- create_task.. --")
async def initialize_database():
global database_engine, db_session
print("Database initialized.")
# database_url = "sqlite:///./database.db" # SQLite 예시
# database_engine = create_engine(database_url)
# SQLModel.metadata.create_all(database_engine) # 테이블 생성 (필요시)
# DB 세션 생성 (예시)
# db_session = Session(database_engine)
async def initialize_scheduler():
print("Scheduler initialized.")
# 이하 어플리케이션 시작 시 처리할 로직
# task0 = asyncio.create_task(create_task())
# await task0
# --
# Create task as background
# create_job()
# --
async def clean_up_database():
print("Database connection closed.")
async def clean_up_scheduler():
print("Scheduler stopped.")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup tasks
await initialize_database()
await initialize_scheduler()
print("All startup tasks completed.")
yield
# Shutdown tasks
await clean_up_database()
await clean_up_scheduler()
print("All shutdown tasks completed.")
app = FastAPI(
lifespan=lifespan,
title="FastAPI Basic Docker with k8s Service",
description="FastAPI Basic Docker with k8s Service",
version="0.0.1",
# terms_of_service="http://example.com/terms/",
dependencies=[Depends(auth_header)],
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
"""
loop = asyncio.get_event_loop()
if loop.is_running():
# Multiple topics from config.yml or docker arguments
# for topic in global_settings.get_Kafka_topic():
# asyncio.create_task(kafka_event(topic))
print('Tasks..')
asyncio.create_task(event_register())
"""
"""
# Load an external OpenAPI YAML file
with open("./api/openapi.yml", "r") as f:
external_openapi_spec = yaml.safe_load(f)
def custom_openapi():
if not app.openapi_schema:
app.openapi_schema = get_openapi(
title=app.title,
version=app.version,
openapi_version=app.openapi_version,
description=app.description,
terms_of_service=app.terms_of_service,
contact=app.contact,
license_info=app.license_info,
routes=app.routes,
tags=app.openapi_tags,
servers=app.servers,
)
for _, method_item in app.openapi_schema.get('paths').items():
for _, param in method_item.items():
responses = param.get('responses')
# remove 422 response, also can remove other status code
if '422' in responses:
del responses['422']
return app.openapi_schema
# return external_openapi_spec
app.openapi = custom_openapi
"""
def get_api_key(api_key_header: str = Security(auth_header)):
if api_key_header == API_TOKEN:
return api_key_header
else:
raise HTTPException(
status_code=403, detail="Could not validate credentials"
)
@app.get("/", tags=['API'],
status_code=200,
responses={
200: {"description" : "OK"},
500 :{"description" : "Unexpected error"}
},
description="Default GET API",
summary="Return Default Json")
async def root(api_key: str = Depends(get_api_key)):
# async def root():
logger.info("logger..")
return {"message": "python-fastapi-openapi.yml k8s"}
'''
@app.get("/test", tags=['API'],
status_code=200,
description="Default GET Param API",
summary="Return GET Param Json")
async def root_with_arg(id):
logger.info('root_with_arg - {}'.format(id))
return {"message": "Hello World [{}]".format(id)}
'''
@app.get("/test/{id}", tags=['API'],
status_code=200,
responses={
200: {"description" : "OK"},
500 :{"description" : "Unexpected error"}
},
description="Default GET with Body API",
summary="Return GET with Body Json")
async def test_api(id: int):
logger.info('test_api - {}'.format(id))
return {"message": "Hello World [{}]".format(id)}
# router
# app.include_router(db_controller.app, tags=["DB API"], )