-
Notifications
You must be signed in to change notification settings - Fork 3
Add PGP ingestion pipeline (Excel -> PostgreSQL) #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| """ | ||
| PGP Excel ingestion script | ||
|
|
||
| Reads all sheets from the PGP export file and loads them into PostgreSQL. | ||
| Tables are created automatically in the raw schema. | ||
|
|
||
| Each ingestion adds a snapshot_date so historical data is preserved. | ||
| If the script runs multiple times the same day, duplicates are avoided. | ||
| """ | ||
|
|
||
| import pandas as pd | ||
| import psycopg2 | ||
| from psycopg2 import sql | ||
| from datetime import date | ||
| import re | ||
|
|
||
| # ========================= | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code mort à retirer. |
||
| # Configuration | ||
| # ========================= | ||
|
|
||
| FILE_PATH = "data/PGP x D4G- Exported Vaccine Data.xlsx" | ||
|
|
||
| DB_CONFIG = { | ||
| "host": "localhost", | ||
| "port": "5432", # Change docker postgres port | ||
| "dbname": "eu_fact_force", | ||
| "user": "eu_fact_force", | ||
| "password": "eu_fact_force" | ||
| } | ||
|
|
||
| RAW_SCHEMA = "raw" | ||
|
|
||
|
|
||
| # ========================= | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code mort à retirer. |
||
| # Helper functions | ||
| # ========================= | ||
|
|
||
| def clean_name(name): | ||
| """ | ||
| Clean sheet and column names so they are valid SQL identifiers. | ||
| """ | ||
| name = name.lower() | ||
| name = name.replace("%", "percent") | ||
| name = re.sub(r"[^\w]+", "_", name) | ||
| return name.strip("_") | ||
|
|
||
|
|
||
| def create_schema(cursor): | ||
| """ | ||
| Ensure schemas exist. | ||
| """ | ||
| cursor.execute("CREATE SCHEMA IF NOT EXISTS raw;") | ||
| cursor.execute("CREATE SCHEMA IF NOT EXISTS analytics;") | ||
|
|
||
|
|
||
| def create_table_if_not_exists(cursor, table_name, columns): | ||
| """ | ||
| Create table dynamically from dataframe columns. | ||
| """ | ||
|
|
||
| column_defs = [] | ||
|
|
||
| for col in columns: | ||
| column_defs.append(sql.SQL("{} TEXT").format(sql.Identifier(col))) | ||
|
|
||
| column_defs.append(sql.SQL("snapshot_date DATE")) | ||
|
|
||
| query = sql.SQL(""" | ||
| CREATE TABLE IF NOT EXISTS {}.{} ( | ||
| id SERIAL PRIMARY KEY, | ||
| {} | ||
| ) | ||
| """).format( | ||
| sql.Identifier(RAW_SCHEMA), | ||
| sql.Identifier(table_name), | ||
| sql.SQL(", ").join(column_defs) | ||
| ) | ||
|
|
||
| cursor.execute(query) | ||
|
|
||
|
|
||
| def insert_dataframe(cursor, table_name, df): | ||
| """ | ||
| Insert dataframe rows into PostgreSQL. | ||
| """ | ||
|
|
||
| cols = list(df.columns) | ||
| cols.append("snapshot_date") | ||
|
|
||
| insert_query = sql.SQL(""" | ||
| INSERT INTO {}.{} ({}) | ||
| VALUES ({}) | ||
| """).format( | ||
| sql.Identifier(RAW_SCHEMA), | ||
| sql.Identifier(table_name), | ||
| sql.SQL(", ").join(map(sql.Identifier, cols)), | ||
| sql.SQL(", ").join(sql.Placeholder() * len(cols)) | ||
| ) | ||
|
|
||
| snapshot = date.today() | ||
|
|
||
| for _, row in df.iterrows(): | ||
| values = [str(v) if pd.notna(v) else None for v in row.tolist()] | ||
| values.append(snapshot) | ||
| cursor.execute(insert_query, values) | ||
|
|
||
|
|
||
| # ========================= | ||
| # Main ingestion process | ||
| # ========================= | ||
|
|
||
| def ingest_excel(): | ||
|
|
||
| print("Reading Excel file...") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A priori le fichier sera un google sheet. |
||
|
|
||
| xls = pd.ExcelFile(FILE_PATH) | ||
| sheets = xls.sheet_names | ||
|
|
||
| conn = psycopg2.connect(**DB_CONFIG) | ||
| cursor = conn.cursor() | ||
|
|
||
| create_schema(cursor) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Django permet de directement créer les modèles de tes tables dans models.py et d'utiliser des objets python qui représente des lignes dans les bases. |
||
|
|
||
| for sheet in sheets: | ||
|
|
||
| print(f"Ingesting sheet: {sheet}") | ||
|
|
||
| df = pd.read_excel(FILE_PATH, sheet_name=sheet) | ||
|
|
||
| # clean column names | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. commentaire inutile. |
||
| df.columns = [clean_name(c) for c in df.columns] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tu peux utiliser rename à la fin de la ligne précédente. |
||
|
|
||
| table_name = clean_name(sheet) | ||
|
|
||
| create_table_if_not_exists(cursor, table_name, df.columns) | ||
|
|
||
| insert_dataframe(cursor, table_name, df) | ||
|
|
||
| conn.commit() | ||
|
|
||
| cursor.close() | ||
| conn.close() | ||
|
|
||
| print("Ingestion complete.") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mettre un log plutot qu'un print. |
||
|
|
||
|
|
||
| # ========================= | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code mort |
||
|
|
||
| if __name__ == "__main__": | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Peux tu regarder la documentation pour créer des commandes django? |
||
| ingest_excel() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plutot mettre ce fichier comme fixture pour un test unitaire avec un nom plus simple.