Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plutot mettre ce fichier comme fixture pour un test unitaire avec un nom plus simple.

Binary file not shown.
150 changes: 150 additions & 0 deletions pipeline_pgp/pgp_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
PGP Excel ingestion script

Reads all sheets from the PGP export file and loads them into PostgreSQL.
Tables are created automatically in the raw schema.

Each ingestion adds a snapshot_date so historical data is preserved.
If the script runs multiple times the same day, duplicates are avoided.
"""

import pandas as pd
import psycopg2
from psycopg2 import sql
from datetime import date
import re

# =========================
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code mort à retirer.

# Configuration
# =========================

FILE_PATH = "data/PGP x D4G- Exported Vaccine Data.xlsx"

DB_CONFIG = {
"host": "localhost",
"port": "5432", # Change docker postgres port
"dbname": "eu_fact_force",
"user": "eu_fact_force",
"password": "eu_fact_force"
}

RAW_SCHEMA = "raw"


# =========================
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code mort à retirer.

# Helper functions
# =========================

def clean_name(name):
"""
Clean sheet and column names so they are valid SQL identifiers.
"""
name = name.lower()
name = name.replace("%", "percent")
name = re.sub(r"[^\w]+", "_", name)
return name.strip("_")


def create_schema(cursor):
"""
Ensure schemas exist.
"""
cursor.execute("CREATE SCHEMA IF NOT EXISTS raw;")
cursor.execute("CREATE SCHEMA IF NOT EXISTS analytics;")


def create_table_if_not_exists(cursor, table_name, columns):
"""
Create table dynamically from dataframe columns.
"""

column_defs = []

for col in columns:
column_defs.append(sql.SQL("{} TEXT").format(sql.Identifier(col)))

column_defs.append(sql.SQL("snapshot_date DATE"))

query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {}.{} (
id SERIAL PRIMARY KEY,
{}
)
""").format(
sql.Identifier(RAW_SCHEMA),
sql.Identifier(table_name),
sql.SQL(", ").join(column_defs)
)

cursor.execute(query)


def insert_dataframe(cursor, table_name, df):
"""
Insert dataframe rows into PostgreSQL.
"""

cols = list(df.columns)
cols.append("snapshot_date")

insert_query = sql.SQL("""
INSERT INTO {}.{} ({})
VALUES ({})
""").format(
sql.Identifier(RAW_SCHEMA),
sql.Identifier(table_name),
sql.SQL(", ").join(map(sql.Identifier, cols)),
sql.SQL(", ").join(sql.Placeholder() * len(cols))
)

snapshot = date.today()

for _, row in df.iterrows():
values = [str(v) if pd.notna(v) else None for v in row.tolist()]
values.append(snapshot)
cursor.execute(insert_query, values)


# =========================
# Main ingestion process
# =========================

def ingest_excel():

print("Reading Excel file...")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A priori le fichier sera un google sheet.
Donc il faudrait une étape pour soit le lire directement en pandas grâce à l'url, soit une étape pour le télécharger avant de le lire.


xls = pd.ExcelFile(FILE_PATH)
sheets = xls.sheet_names

conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()

create_schema(cursor)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Django permet de directement créer les modèles de tes tables dans models.py et d'utiliser des objets python qui représente des lignes dans les bases.


for sheet in sheets:

print(f"Ingesting sheet: {sheet}")

df = pd.read_excel(FILE_PATH, sheet_name=sheet)

# clean column names
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commentaire inutile.

df.columns = [clean_name(c) for c in df.columns]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tu peux utiliser rename à la fin de la ligne précédente.


table_name = clean_name(sheet)

create_table_if_not_exists(cursor, table_name, df.columns)

insert_dataframe(cursor, table_name, df)

conn.commit()

cursor.close()
conn.close()

print("Ingestion complete.")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mettre un log plutot qu'un print.



# =========================
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code mort


if __name__ == "__main__":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peux tu regarder la documentation pour créer des commandes django?

ingest_excel()
Loading