From da5a8821fd4401c189f4686cbadd2b6cdb631381 Mon Sep 17 00:00:00 2001 From: RV Date: Mon, 23 Dec 2024 23:45:31 +0100 Subject: [PATCH 1/4] feat: add invoke package --- backend/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 3c5c6ecc..f791615a 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "fastapi[standard]>=0.115.0,<1.0.0", "uvicorn~=0.32", "redis~=5.0", + "invoke>=2.2.0", ] name = "bloom" version = "0.1.0" From 8cff6f93110f45d7827a5a9d81e67d422ef02a88 Mon Sep 17 00:00:00 2001 From: RV Date: Thu, 26 Dec 2024 02:09:00 +0100 Subject: [PATCH 2/4] feat: start redisign tasks system and batch processing --- backend/pyproject.toml | 1 + backend/tasks.py | 250 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 backend/tasks.py diff --git a/backend/pyproject.toml b/backend/pyproject.toml index f791615a..fef4ad74 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -47,6 +47,7 @@ dependencies = [ "uvicorn~=0.32", "redis~=5.0", "invoke>=2.2.0", + "isodate>=0.7.2", ] name = "bloom" version = "0.1.0" diff --git a/backend/tasks.py b/backend/tasks.py new file mode 100644 index 00000000..925d93ac --- /dev/null +++ b/backend/tasks.py @@ -0,0 +1,250 @@ +from invoke import Collection,task,Context,call +from datetime import datetime,timezone, timedelta +from math import floor,ceil +import time +from dateutil import parser +import multiprocessing as mp +import signal +import os +import isodate + +from bloom.logger import logger + + +from bloom.container import UseCases +import pandas as pd +import numpy as np +from geopy import distance +from bloom.domain.vessel_position import VesselPosition +from shapely.geometry import Point + +namespace=Collection() + +etl=Collection("etl") + +def schedule_next(start_date:datetime,schedule_interval:timedelta,now=None): + nb_interval=(now-start_date)/schedule_interval + next=start_date+(ceil(nb_interval))*schedule_interval + return next + +@task +def test(ctx): + logger.info("test invoke") + +@task() +def task_extract_ais_data_from_spire_api_table_spire_ais_data(ctx:Context,interval_start:datetime,interval_end:datetime): + logger.info(f"{os.getpid()} extract_ais_data_from_spire_api interval_start={interval_start}-{interval_end} STARTED") + if datetime.now(timezone.utc) - ctx['schedule_interval'] >= interval_start \ + and datetime.now(timezone.utc) - ctx['schedule_interval'] <= interval_end: + logger.info("Executing API CALL") + else: + logger.info("interval ") + logger.info(f"{os.getpid()} extract_ais_data_from_spire_api interval_start={interval_start}-{interval_end} FINISHED") + + + +def to_coords(row: pd.Series) -> pd.Series: + if pd.isna(row["end_position"]) is False: + row["longitude"] = row["end_position"].x + row["latitude"] = row["end_position"].y + + return row + + +# distance.distance takes pair of (lat, lon) tuples: +def get_distance(current_position: tuple, last_position: tuple): + if np.isnan(last_position[0]) or np.isnan(last_position[1]): + return np.nan + + return distance.geodesic(current_position, last_position).km + +def map_vessel_position_to_domain(row: pd.Series) -> VesselPosition: + return VesselPosition( + vessel_id=row["vessel_id"], + timestamp=row["position_timestamp"], + accuracy=row["position_accuracy"], + collection_type=row["position_collection_type"], + course=row["position_course"], + heading=row["position_heading"], + position=Point(row["position_longitude"], row["position_latitude"]), + latitude=row["position_latitude"], + longitude=row["position_longitude"], + maneuver=row["position_maneuver"], + navigational_status=row["position_navigational_status"], + rot=row["position_rot"], + speed=row["speed"], + ) + +@task(help={"batch": "size"}) +def task_transform_and_load_ais_data_from_table_spire_ais_data_to_vessel_positions(ctx,interval_start:datetime,interval_end:datetime,batch:int=7): + + logger.info(f"{os.getpid()} transform_and_load_ais_data_from_table_spire_ais_data_to_vessel_positions interval_start={interval_start}-{interval_end} STARTED") + + + use_cases = UseCases() + db = use_cases.db() + spire_repository = use_cases.spire_ais_data_repository() + excursion_repository = use_cases.excursion_repository() + segment_repository = use_cases.segment_repository() + vessel_position_repository = use_cases.vessel_position_repository() + with db.session() as session: + # Step 1: load SPIRE batch: read from SpireAisData + logger.info(f"Lecture des nouvelles positions de Spire en base") + batch = spire_repository.get_all_data_between_date(session, interval_start, interval_end) + logger.info(f"Traitement des positions entre le {interval_start} et le {interval_end}") + logger.info(f"{len(batch)} nouvelles positions de Spire") + batch.dropna( + subset=[ + "position_latitude", + "position_longitude", + "position_timestamp", + "position_update_timestamp", + ], + inplace=True, + ) + # Step 2: load excursion from fct_excursion where date_arrival IS NULL + excursions = excursion_repository.get_current_excursions(session) + + # Step 3: load last_segment where last_vessel_segment == 1 + last_segment = segment_repository.get_last_vessel_id_segments(session) + last_segment["longitude"] = None + last_segment["latitude"] = None + last_segment = last_segment.apply(to_coords, axis=1) + + + # Step 4: merge batch with last_segment on vessel_id. + # If column _merge == "left_only" --> this is a new vessel (keep it) + batch = batch.merge( + last_segment, + how="left", + on="vessel_id", + suffixes=["", "_segment"], + indicator=True, + ) + batch.rename(columns={"_merge": "new_vessel"}, inplace=True) + batch["new_vessel"] = batch["new_vessel"] == "left_only" + + # Step 5: merge batch with excursions + # If column _merge == "left_only" --> the excursion is closed + batch = batch.merge( + excursions, + how="left", + on="vessel_id", + suffixes=["", "_excursion"], + indicator=True, + ) + batch.rename(columns={"_merge": "excursion_closed"}, inplace=True) + batch["excursion_closed"] = batch["excursion_closed"] == "left_only" + + + # Step 6: compute speed between last and current position + # Step 6.1. Compute distance in km between last and current position + batch["distance_since_last_position"] = batch.apply( + lambda row: get_distance( + (row["position_latitude"], row["position_longitude"]), + (row["latitude"], row["longitude"]), + ), + axis=1, + ) + + # Step 6.2. Compute time in hours between last and current position + ## If timestamp_end is NULL --> new_vessel is TRUE (record will be kept) + ## --> fillna with anything to avoid exception + batch.loc[batch["timestamp_end"].isna(), "timestamp_end"] = batch.loc[ + batch["timestamp_end"].isna(), "position_timestamp" + ].copy() + batch["timestamp_end"] = pd.to_datetime(batch["timestamp_end"], utc=True) + batch["position_timestamp"] = pd.to_datetime(batch["position_timestamp"], utc=True) + batch["time_since_last_position"] = ( + batch["position_timestamp"] - batch["timestamp_end"] + ) + batch["hours_since_last_position"] = ( + batch["time_since_last_position"].dt.seconds / 3600 + ) + + # Step 6.3. Compute speed: speed = distance / time + batch["speed"] = ( + batch["distance_since_last_position"] / batch["hours_since_last_position"] + ) + batch["speed"] *= 0.5399568 # Conversion km/h to + batch["speed"] = batch["speed"].fillna(batch["position_speed"]) + batch.replace([np.nan], [None], inplace=True) + # Step 7: apply to_keep flag: keep only positions WHERE: + # - row["new_vessel"] is True, i.e. there is a new vessel_id + # - OR speed is not close to 0, i.e. vessel moved significantly since last position + batch["to_keep"] = (batch["new_vessel"] == True) | ~( # detect new vessels + (batch["excursion_closed"] == True) # if last excursion closed + & (batch["speed"] <= 0.01) # and speed < 0.01: don't keep + ) + + # Step 8: filter unflagged rows to insert to DB + batch = batch.loc[batch["to_keep"] == True].copy() + + # Step 9: insert to DataBase + clean_positions = batch.apply(map_vessel_position_to_domain, axis=1).values.tolist() + vessel_position_repository.batch_create_vessel_position(session, clean_positions) + logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions") + session.commit() + + logger.info(f"{os.getpid()} transform_and_load_ais_data_from_table_spire_ais_data_to_vessel_positions interval_start={interval_start}-{interval_end} FINISHED") + + +@task() +def task_api_etl(ctx,interval_start:datetime,interval_end:datetime): + task_extract_ais_data_from_spire_api_table_spire_ais_data(ctx,interval_start,interval_end) + task_transform_and_load_ais_data_from_table_spire_ais_data_to_vessel_positions(ctx,interval_start,interval_end) + + +@task() +def pipeline_main(ctx:Context, + start_date:str="2024-06-01 00:00:00+00:00", + schedule_interval:str="PT15M", + backfill:bool=False): + start_date=parser.parse(start_date) + schedule_interval=isodate.parse_duration(schedule_interval) + ctx["start_date"]=start_date + ctx["schedule_interval"]=schedule_interval + ctx["backfill"]=backfill + + logger.info(f"start_date:{start_date}") + logger.info(f"schedule_interval {schedule_interval}") + logger.info(f"backfill {backfill}") + + list_process=[] + + now=start_date + while(True): + if(backfill): + now=now+schedule_interval + else: + now=datetime.now(timezone.utc) + next=schedule_next(start_date=start_date, + schedule_interval=schedule_interval, + now=now) + logger.info(f"Waiting for next execution slot {next}") + while(now Date: Thu, 26 Dec 2024 22:42:31 +0100 Subject: [PATCH 3/4] feat: use SpireAisData.spire_update_statement instead of SpireAisData.created_at --- ..._clean_position_on_updated_vessels_only.py | 29 +++++++++++++++++++ .../repositories/repository_spire_ais_data.py | 4 +-- 2 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py diff --git a/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py new file mode 100644 index 00000000..342b512c --- /dev/null +++ b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py @@ -0,0 +1,29 @@ +"""Clean position on updated vessels only + +Revision ID: dbe1b900df12 +Revises: 5bfe00a08853 +Create Date: 2024-12-26 13:10:19.501996 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'dbe1b900df12' +down_revision = '5bfe00a08853' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index("spire_ais_data", + "spire_ais_data_spire_update_statement_idx", + ["spire_update_statement"]) + pass + + +def downgrade() -> None: + op.drop_index("spire_ais_data", + "spire_ais_data_spire_update_statement_idx") + pass diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 696ed98d..902d1a6d 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -109,8 +109,8 @@ def get_all_data_between_date( ).where( and_( sql_model.Vessel.tracking_activated == True, - sql_model.SpireAisData.created_at > created_updated_after, - sql_model.SpireAisData.created_at <= created_updated_before + sql_model.SpireAisData.spire_update_statement > created_updated_after, + sql_model.SpireAisData.spire_update_statement <= created_updated_before ) ).order_by(sql_model.SpireAisData.created_at.asc()) result = session.execute(stmt) From df355495726611c255d18d6cf91028c36cf2b3f9 Mon Sep 17 00:00:00 2001 From: RV Date: Sun, 2 Feb 2025 20:37:48 +0100 Subject: [PATCH 4/4] add: docker build/down/up script for Linux --- docker/build.sh | 3 +++ docker/down.sh | 3 ++- docker/up.sh | 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 docker/build.sh diff --git a/docker/build.sh b/docker/build.sh new file mode 100644 index 00000000..71a57494 --- /dev/null +++ b/docker/build.sh @@ -0,0 +1,3 @@ +#!/bin/sh +cd "$(dirname "$0")"/.. +docker compose build $1 || read -p "[$?] Appuyer sur une pour continuer" diff --git a/docker/down.sh b/docker/down.sh index ec0263b5..14ddbadb 100644 --- a/docker/down.sh +++ b/docker/down.sh @@ -1,2 +1,3 @@ +#!/bin/sh cd "$(dirname "$0")"/.. -docker compose -f docker-compose.yaml down || read -p "Error. Press any key to continue" \ No newline at end of file +docker compose down $1 || read -p "[$?] Appuyer sur une pour continuer" diff --git a/docker/up.sh b/docker/up.sh index 00f3b2fd..d5885364 100644 --- a/docker/up.sh +++ b/docker/up.sh @@ -1,2 +1,3 @@ +#!/bin/sh cd "$(dirname "$0")"/.. -docker compose -f docker-compose.yaml up || read -p "Error. Press any key to continue" \ No newline at end of file +docker compose up $1 || read -p "[$?] Appuyer sur une pour continuer"