From c04e3c9ddcd2e89fe5ac06360c1060ee4aac5797 Mon Sep 17 00:00:00 2001 From: RV Date: Thu, 26 Dec 2024 22:42:31 +0100 Subject: [PATCH 1/7] feat: use SpireAisData.spire_update_statement instead of SpireAisData.created_at --- ..._clean_position_on_updated_vessels_only.py | 29 +++++++++++++++++++ .../repositories/repository_spire_ais_data.py | 4 +-- 2 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py diff --git a/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py new file mode 100644 index 00000000..342b512c --- /dev/null +++ b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py @@ -0,0 +1,29 @@ +"""Clean position on updated vessels only + +Revision ID: dbe1b900df12 +Revises: 5bfe00a08853 +Create Date: 2024-12-26 13:10:19.501996 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'dbe1b900df12' +down_revision = '5bfe00a08853' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index("spire_ais_data", + "spire_ais_data_spire_update_statement_idx", + ["spire_update_statement"]) + pass + + +def downgrade() -> None: + op.drop_index("spire_ais_data", + "spire_ais_data_spire_update_statement_idx") + pass diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 696ed98d..902d1a6d 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -109,8 +109,8 @@ def get_all_data_between_date( ).where( and_( sql_model.Vessel.tracking_activated == True, - sql_model.SpireAisData.created_at > created_updated_after, - sql_model.SpireAisData.created_at <= created_updated_before + sql_model.SpireAisData.spire_update_statement > created_updated_after, + sql_model.SpireAisData.spire_update_statement <= created_updated_before ) ).order_by(sql_model.SpireAisData.created_at.asc()) result = session.execute(stmt) From 026ad44c46387eb1113e0891f4c0830a50fd85ce Mon Sep 17 00:00:00 2001 From: RV Date: Wed, 8 Jan 2025 01:15:08 +0100 Subject: [PATCH 2/7] feat: use timestamp instead of created_at # Conflicts: # backend/bloom/tasks/create_update_excursions_segments.py --- .../bloom/infra/repositories/repository_vessel_position.py | 6 +++--- backend/bloom/tasks/create_update_excursions_segments.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/bloom/infra/repositories/repository_vessel_position.py b/backend/bloom/infra/repositories/repository_vessel_position.py index 65e340ac..656de736 100644 --- a/backend/bloom/infra/repositories/repository_vessel_position.py +++ b/backend/bloom/infra/repositories/repository_vessel_position.py @@ -53,8 +53,8 @@ def get_vessel_positions(self, session: Session, vessel_id:int, else: return [] - def get_positions_with_vessel_created_updated_after(self, session: Session, - created_updated_after: datetime) -> pd.DataFrame: + def get_positions_with_vessel_updated_after(self, session: Session, + updated_after: datetime) -> pd.DataFrame: stmt = select(sql_model.VesselPosition.id, sql_model.VesselPosition.timestamp, sql_model.VesselPosition.accuracy, sql_model.VesselPosition.collection_type, sql_model.VesselPosition.course, sql_model.VesselPosition.heading, @@ -62,7 +62,7 @@ def get_positions_with_vessel_created_updated_after(self, session: Session, sql_model.VesselPosition.latitude, sql_model.VesselPosition.rot, sql_model.VesselPosition.speed, sql_model.VesselPosition.created_at, sql_model.Vessel.id, sql_model.Vessel.mmsi).where( - sql_model.VesselPosition.created_at > created_updated_after + sql_model.VesselPosition.timestamp > updated_after ).join(sql_model.Vessel, sql_model.VesselPosition.vessel_id == sql_model.Vessel.id).order_by( sql_model.VesselPosition.created_at.asc()) diff --git a/backend/bloom/tasks/create_update_excursions_segments.py b/backend/bloom/tasks/create_update_excursions_segments.py index 943384b9..bb0b4f51 100644 --- a/backend/bloom/tasks/create_update_excursions_segments.py +++ b/backend/bloom/tasks/create_update_excursions_segments.py @@ -115,7 +115,7 @@ def run(): session, "create_update_excursions_segments", ) logger.info(f"Lecture des nouvelles positions depuis le {point_in_time}") - batch = vessel_position_repository.get_positions_with_vessel_created_updated_after(session, point_in_time) + batch = vessel_position_repository.get_positions_with_vessel_updated_after(session, point_in_time) position_count=len(batch) logger.info(f"{position_count} nouvelles positions") last_segment = segment_repository.get_last_vessel_id_segments(session) From 8c2a33bd79f815b201b31be301bbe4ca86d679d5 Mon Sep 17 00:00:00 2001 From: RV Date: Thu, 26 Dec 2024 23:59:44 +0100 Subject: [PATCH 3/7] feat: update insert ++ --- .../bloom/infra/repositories/repository_spire_ais_data.py | 2 +- backend/bloom/tasks/clean_positions.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 902d1a6d..63c2092b 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -112,7 +112,7 @@ def get_all_data_between_date( sql_model.SpireAisData.spire_update_statement > created_updated_after, sql_model.SpireAisData.spire_update_statement <= created_updated_before ) - ).order_by(sql_model.SpireAisData.created_at.asc()) + ).order_by(sql_model.SpireAisData.spire_update_statement.asc()) result = session.execute(stmt) return pd.DataFrame(result, columns=[ "id", diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 4e801f87..14068cb7 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -66,13 +66,13 @@ def run(batch_time): ) batch_limit = point_in_time + timedelta(days=batch_time) # Step 1: load SPIRE batch: read from SpireAisData - logger.info(f"Lecture des nouvelles positions de Spire en base") + logger.info(f"Lecture des nouvelles positions de Spire en base between {point_in_time} and {batch_limit}") batch = spire_repository.get_all_data_between_date(session, point_in_time, batch_limit) # Recherche de la date de l'enregistrement traité le plus récent. # Cette date est stockée comme date d'exécution du traitement ce qui permettra de repartir de cette date # à la prochaine execution pour traiter les enregistrements + récents - max_created = max(batch["created_at"]) if len(batch) > 0 else batch_limit + max_created = max(batch["position_timestamp"]) if len(batch) > 0 else batch_limit logger.info(f"Traitement des positions entre le {point_in_time} et le {max_created}") position_count=len(batch) logger.info(f"{position_count} nouvelles positions de Spire") @@ -164,6 +164,7 @@ def run(batch_time): # Step 9: insert to DataBase clean_positions = batch.apply(map_vessel_position_to_domain, axis=1).values.tolist() + print(f"Batch final:\n{clean_positions}") vessel_position_repository.batch_create_vessel_position(session, clean_positions) TaskExecutionRepository.set_point_in_time(session, "clean_positions", max_created) logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions") From 3b9e3996d3f4f92d104104c602b136c9c597f235 Mon Sep 17 00:00:00 2001 From: RV Date: Fri, 27 Dec 2024 01:11:15 +0100 Subject: [PATCH 4/7] feat: use spire_update_statement instead of position_timestamp => vessel_position.timestamp --- backend/bloom/tasks/clean_positions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 14068cb7..3db815ef 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -27,7 +27,7 @@ def get_distance(current_position: tuple, last_position: tuple): def map_vessel_position_to_domain(row: pd.Series) -> VesselPosition: return VesselPosition( vessel_id=row["vessel_id"], - timestamp=row["position_timestamp"], + timestamp=row["spire_update_statement"], accuracy=row["position_accuracy"], collection_type=row["position_collection_type"], course=row["position_course"], From 62b5687033ca61af2ad5c8ff6f856adb0ddac63b Mon Sep 17 00:00:00 2001 From: RV Date: Fri, 27 Dec 2024 01:15:52 +0100 Subject: [PATCH 5/7] clean --- backend/bloom/tasks/clean_positions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 3db815ef..01085efc 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -164,7 +164,7 @@ def run(batch_time): # Step 9: insert to DataBase clean_positions = batch.apply(map_vessel_position_to_domain, axis=1).values.tolist() - print(f"Batch final:\n{clean_positions}") + #print(f"Batch final:\n{clean_positions}") vessel_position_repository.batch_create_vessel_position(session, clean_positions) TaskExecutionRepository.set_point_in_time(session, "clean_positions", max_created) logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions") From 32f71c6874c0a4bc27026a6680d7fb095ebb32ab Mon Sep 17 00:00:00 2001 From: RV Date: Fri, 27 Dec 2024 01:19:26 +0100 Subject: [PATCH 6/7] feat: position_timestamp => spire_update_statement for max_created --- backend/bloom/tasks/clean_positions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 01085efc..4367a49c 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -72,7 +72,7 @@ def run(batch_time): # Recherche de la date de l'enregistrement traité le plus récent. # Cette date est stockée comme date d'exécution du traitement ce qui permettra de repartir de cette date # à la prochaine execution pour traiter les enregistrements + récents - max_created = max(batch["position_timestamp"]) if len(batch) > 0 else batch_limit + max_created = max(batch["spire_update_statement"]) if len(batch) > 0 else batch_limit logger.info(f"Traitement des positions entre le {point_in_time} et le {max_created}") position_count=len(batch) logger.info(f"{position_count} nouvelles positions de Spire") From c9a5f942999b6b4e5c3026aa9a68c2f393f6b097 Mon Sep 17 00:00:00 2001 From: RV Date: Fri, 27 Dec 2024 01:45:50 +0100 Subject: [PATCH 7/7] fix: alembic migration, index creation on spire_ais_data/spire_update_statement --- ...dbe1b900df12_clean_position_on_updated_vessels_only.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py index 342b512c..10452d34 100644 --- a/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py +++ b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py @@ -17,13 +17,13 @@ def upgrade() -> None: - op.create_index("spire_ais_data", - "spire_ais_data_spire_update_statement_idx", + op.create_index("spire_ais_data_spire_update_statement_idx", + "spire_ais_data", ["spire_update_statement"]) pass def downgrade() -> None: - op.drop_index("spire_ais_data", - "spire_ais_data_spire_update_statement_idx") + op.drop_index("spire_ais_data_spire_update_statement_idx", + "spire_ais_data") pass