|
| 1 | +import boto3 |
| 2 | +import json |
| 3 | +import pandas as pd |
| 4 | +from datetime import datetime, timedelta |
| 5 | + |
| 6 | +# --- Configuration --- |
| 7 | +S3_BUCKET = "eligibility-signposting-api-dev-dq-metrics" |
| 8 | +LOCAL_HYPER_PATH = "converted.hyper" |
| 9 | +LOOKBACK_MONTHS = 3 |
| 10 | + |
| 11 | + |
| 12 | +def get_filtered_s3_data(): |
| 13 | + s3 = boto3.client('s3') |
| 14 | + all_data = [] |
| 15 | + |
| 16 | + # Calculate the date threshold (3 months ago) and corresponding date range |
| 17 | + start_date = datetime.now() - timedelta(days=90) |
| 18 | + end_date = datetime.now() |
| 19 | + threshold_date = start_date.strftime('%Y%m%d') |
| 20 | + print(f"Filtering for data where processing_date >= {threshold_date}...") |
| 21 | + |
| 22 | + # List objects in the bucket, narrowed by expected processing_date=YYYYMMDD/ prefixes |
| 23 | + paginator = s3.get_paginator('list_objects_v2') |
| 24 | + current_date = start_date |
| 25 | + while current_date <= end_date: |
| 26 | + date_prefix = current_date.strftime('%Y%m%d') |
| 27 | + s3_prefix = f"processing_date={date_prefix}/" |
| 28 | + |
| 29 | + for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=s3_prefix): |
| 30 | + for obj in page.get('Contents', []): |
| 31 | + key = obj['Key'] |
| 32 | + |
| 33 | + # Parse partition from Key (e.g. processing_date=20260303/datamart=cohorts/file.json) |
| 34 | + # We check if the key contains our date pattern |
| 35 | + try: |
| 36 | + if 'processing_date=' in key and key.endswith('.json'): |
| 37 | + date_part = key.split('processing_date=')[1].split('/')[0] |
| 38 | + |
| 39 | + # Keep the original threshold check as a safeguard |
| 40 | + if date_part >= threshold_date: |
| 41 | + # Read JSON file |
| 42 | + response = s3.get_object(Bucket=S3_BUCKET, Key=key) |
| 43 | + line = response['Body'].read().decode('utf-8') |
| 44 | + |
| 45 | + # Handle multiple JSON objects in one file (JSONL) if necessary |
| 46 | + for json_line in line.strip().split('\n'): |
| 47 | + if json_line: |
| 48 | + all_data.append(json.loads(json_line)) |
| 49 | + except Exception as e: |
| 50 | + print(f"Skipping key {key} due to error: {e}") |
| 51 | + |
| 52 | + current_date += timedelta(days=1) |
| 53 | + return pd.DataFrame(all_data) |
| 54 | + |
| 55 | + |
| 56 | +def create_hyper_from_df(df, hyper_path): |
| 57 | + from tableauhyperapi import ( |
| 58 | + Connection, CreateMode, HyperProcess, Inserter, |
| 59 | + TableDefinition, Telemetry, SqlType, TableName, Date, Timestamp |
| 60 | + ) |
| 61 | + |
| 62 | + # Define the schema and table names clearly |
| 63 | + SCHEMA_NAME = "Extract" |
| 64 | + TABLE_NAME = "Extract" |
| 65 | + |
| 66 | + with HyperProcess(Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper: |
| 67 | + with Connection(endpoint=hyper.endpoint, |
| 68 | + database=hyper_path, |
| 69 | + create_mode=CreateMode.CREATE_AND_REPLACE) as connection: |
| 70 | + # Align TableName with the schema we are about to create |
| 71 | + target_table = TableName(SCHEMA_NAME, TABLE_NAME) |
| 72 | + |
| 73 | + schema_def = TableDefinition(target_table, [ |
| 74 | + TableDefinition.Column("timestamp", SqlType.timestamp()), |
| 75 | + TableDefinition.Column("datamart", SqlType.text()), |
| 76 | + TableDefinition.Column("attribute_type", SqlType.text()), |
| 77 | + TableDefinition.Column("attribute", SqlType.text()), |
| 78 | + TableDefinition.Column("dimension", SqlType.text()), |
| 79 | + TableDefinition.Column("success_count", SqlType.big_int()), |
| 80 | + TableDefinition.Column("total_rows", SqlType.big_int()), |
| 81 | + TableDefinition.Column("success_percent", SqlType.double()), |
| 82 | + TableDefinition.Column("processing_date", SqlType.date()), |
| 83 | + ]) |
| 84 | + |
| 85 | + # Create the schema first, then the table within that schema |
| 86 | + connection.catalog.create_schema(SCHEMA_NAME) |
| 87 | + connection.catalog.create_table(schema_def) |
| 88 | + |
| 89 | + with Inserter(connection, schema_def) as inserter: |
| 90 | + for _, row in df.iterrows(): |
| 91 | + ts = datetime.strptime(str(row['timestamp']), "%Y-%m-%d %H:%M:%S") |
| 92 | + pd_str = str(row['processing_date']) |
| 93 | + pd_dt = datetime.strptime(pd_str, "%Y%m%d") |
| 94 | + |
| 95 | + inserter.add_row([ |
| 96 | + Timestamp(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second), |
| 97 | + row.get('datamart'), |
| 98 | + row.get('attribute_type'), |
| 99 | + row.get('attribute', ''), |
| 100 | + row.get('dimension'), |
| 101 | + int(row.get('success_count', 0)), |
| 102 | + int(row.get('total_rows', 0)), |
| 103 | + float(row.get('success_percent', 0.0)), |
| 104 | + Date(pd_dt.year, pd_dt.month, pd_dt.day) |
| 105 | + ]) |
| 106 | + inserter.execute() |
| 107 | + print(f"Hyper file created at {hyper_path} with schema '{SCHEMA_NAME}'") |
| 108 | + |
| 109 | + |
| 110 | +# --- Main Execution --- |
| 111 | +df = get_filtered_s3_data() |
| 112 | +if not df.empty: |
| 113 | + create_hyper_from_df(df, LOCAL_HYPER_PATH) |
| 114 | +else: |
| 115 | + print("No data found for the selected date range.") |
0 commit comments