Skip to content

Commit f6048ba

Browse files
committed
Revamp airflow samples
1 parent fa49288 commit f6048ba

12 files changed

Lines changed: 213 additions & 42 deletions

airflow/.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AIRFLOW_UID=50000

airflow/.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
docker-compose.yaml
2+
logs/
3+
plugins/
4+
.minio/
5+
airflow

airflow/Dockerfile

Lines changed: 0 additions & 15 deletions
This file was deleted.

airflow/Makefile

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,34 @@
1-
IMAGE_NAME:=luisbelloch/airflow
2-
VERSION_TAG:=2019.1
3-
DAG_VOLUME:=${PWD}/dags:/var/airflow/dags
4-
WEB_PORT:=8080:8080
1+
.PHONY: all
2+
all: clean docker-compose.yaml airflow init up
53

6-
.PHONY: build
7-
build:
8-
docker build -t $(IMAGE_NAME) .
4+
docker-compose.yaml:
5+
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.2/docker-compose.yaml'
96

10-
.PHONY: tag
11-
tag:
12-
docker tag ${IMAGE_NAME} ${IMAGE_NAME}:${VERSION_TAG}
7+
airflow:
8+
curl -Lf 'https://airflow.apache.org/docs/apache-airflow/2.2.2/airflow.sh' > airflow
9+
chmod +x airflow
1310

14-
.PHONY: push
15-
push:
16-
docker push ${IMAGE_NAME}
17-
docker push ${IMAGE_NAME}:${VERSION_TAG}
11+
.PHONY: init
12+
init:
13+
docker-compose up airflow-init
1814

19-
.PHONY: web
20-
web:
21-
docker run -p ${WEB_PORT} -v ${DAG_VOLUME} -ti $(IMAGE_NAME) airflow webserver
15+
.PHONY: up
16+
up:
17+
docker-compose up
2218

23-
.PHONY: scheduler
24-
scheduler:
25-
docker run -p ${WEB_PORT} -v ${DAG_VOLUME} -ti $(IMAGE_NAME) airflow scheduler
19+
.PHONY: down
20+
down:
21+
docker-compose down --remove-orphans
2622

27-
.PHONY: shell
28-
shell:
29-
docker run -p ${WEB_PORT} -v ${DAG_VOLUME} -ti $(IMAGE_NAME) /bin/bash
23+
.PHONY: minio
24+
minio:
25+
docker-compose -f docker-compose.yaml -f docker-compose.minio.yaml up minio
3026

27+
.PHONY: minio_connection
28+
minio_connection: airflow
29+
./airflow connections import minio_connection.json
30+
31+
.PHONY: clean
32+
clean:
33+
-docker-compose down --volumes --remove-orphans
34+
-rm -rf logs/ plugins/ .minio/ docker-compose.yaml airflow dags/__pycache__

airflow/dags/gasolina_naive.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import json
2+
from datetime import datetime
3+
4+
from airflow import AirflowException
5+
from airflow.decorators import dag, task
6+
7+
import requests
8+
9+
codigo_postal = "50197"
10+
endpoint = "https://sedeaplicaciones.minetur.gob.es/ServiciosRESTCarburantes/PreciosCarburantes/EstacionesTerrestres/"
11+
12+
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['upv'])
13+
def extraer_precio_gasolina_naive():
14+
15+
@task
16+
def recogida():
17+
print("Recogiendo datos...")
18+
response = requests.get(endpoint)
19+
if response.status_code != 200:
20+
AirflowException(f"Fallo de conexión {response.status_code}")
21+
22+
datos = response.json()
23+
return datos['ListaEESSPrecio']
24+
25+
@task
26+
def filtrado(datos, codigo_postal):
27+
return list(filter(lambda x: x['C.P.'] == codigo_postal, datos))
28+
29+
@task
30+
def almacenamiento(datos):
31+
print("Almacenando datos...")
32+
print(json.dumps(datos, indent=2))
33+
34+
todos_los_datos = recogida()
35+
datos_del_codigo_postal_x = filtrado(todos_los_datos, codigo_postal)
36+
almacenamiento(datos_del_codigo_postal_x)
37+
38+
dag_gasolina = extraer_precio_gasolina_naive()
39+

airflow/dags/gasolina_s3.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import json
2+
import socket
3+
4+
from datetime import datetime
5+
6+
from airflow import AirflowException
7+
from airflow.decorators import dag, task
8+
from airflow.operators.bash_operator import BashOperator
9+
from airflow.contrib.sensors.file_sensor import FileSensor
10+
11+
import boto3
12+
import botocore.client
13+
import requests
14+
15+
codigo_postal = "50197"
16+
bucket_name = "gasolina"
17+
endpoint = "https://sedeaplicaciones.minetur.gob.es/ServiciosRESTCarburantes/PreciosCarburantes/EstacionesTerrestres/"
18+
19+
def s3_resource():
20+
return boto3.resource('s3',
21+
endpoint_url='http://minio:9000',
22+
aws_access_key_id='bigdataupv',
23+
aws_secret_access_key='bigdataupv',
24+
config=botocore.client.Config(signature_version='s3v4'), region_name='us-east-1')
25+
26+
def read_json_from_s3(key):
27+
obj = s3_resource().Object(bucket_name, key)
28+
return json.loads(obj.get()['Body'].read().decode('utf-8'))
29+
30+
def save_to_s3(key, data):
31+
obj = s3_resource().Object(bucket_name, key)
32+
obj.put(Body=data)
33+
34+
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['upv'])
35+
def extraer_precio_gasolina_s3():
36+
37+
@task
38+
def recogida_s3():
39+
print("Recogiendo datos...")
40+
response = requests.get(endpoint)
41+
if response.status_code != 200:
42+
AirflowException(f"Fallo de conexión {response.status_code}")
43+
44+
filename = f'recogida-{datetime.now().strftime("%Y%m%d%H%M%S")}.json'
45+
save_to_s3(filename, response.text)
46+
return { "recogida": filename }
47+
48+
@task
49+
def filtrado_s3(contexto, codigo_postal):
50+
print("Filtrando datos...")
51+
52+
datos = read_json_from_s3(contexto['recogida'])
53+
filtrados = list(filter(lambda x: x['C.P.'] == codigo_postal, datos['ListaEESSPrecio']))
54+
55+
filename = f'filtrado-{datetime.now().strftime("%Y%m%d%H%M%S")}.json'
56+
save_to_s3(filename, json.dumps(filtrados))
57+
58+
return { **contexto, "filtrado": filename }
59+
60+
@task
61+
def almacenamiento_s3(contexto):
62+
print("Almacenando datos... Nothing to do!")
63+
import socket
64+
print("hostname:", socket.gethostname())
65+
return 42
66+
67+
todos_los_datos = recogida_s3()
68+
datos_del_codigo_postal_x = filtrado_s3(todos_los_datos, codigo_postal)
69+
almacenamiento_s3(datos_del_codigo_postal_x)
70+
71+
dag_gasolina = extraer_precio_gasolina_s3()
72+
73+
# Additionally, use Amazon operator, particularly S3KeySensor
74+
# https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html

airflow/dags/hello_dags.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airflow.operators.dummy_operator import DummyOperator
88
from airflow.operators.python_operator import PythonOperator
99

10-
dag = DAG('hello_dags', schedule_interval=None, start_date=airflow.utils.dates.days_ago(2))
10+
dag = DAG('hello_dags', schedule_interval=None, start_date=airflow.utils.dates.days_ago(2), tags=['upv'])
1111

1212
def print_hello():
1313
return 'Hello world!'
@@ -18,8 +18,7 @@ def print_hello():
1818
paso3 = DummyOperator(task_id='paso3', dag=dag)
1919
ultima_tarea = DummyOperator(task_id='ultima_tarea', dag=dag)
2020

21-
inicio >> paso1
22-
inicio >> paso3
21+
inicio >> [paso1, paso3]
2322
paso1 >> paso2 >> ultima_tarea
2423
paso3 >> ultima_tarea
2524

airflow/dags/hello_python_operator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def print_hello():
1212
description='Simple tutorial DAG',
1313
schedule_interval='20 * * * *',
1414
start_date=datetime(2017, 3, 20),
15+
tags=['upv'],
1516
catchup=False)
1617

1718
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

airflow/dags/hello_simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from airflow.operators.dummy_operator import DummyOperator
55

66
default_args = {"start_date": datetime(2019, 2, 5)}
7-
dag = DAG('hello', default_args=default_args, schedule_interval=None)
7+
dag = DAG('hello', default_args=default_args, schedule_interval=None, tags=['upv'],)
88

99
dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
1010
hello_operator = DummyOperator(task_id='hello_task', dag=dag)

airflow/dags/s3_file_sensor.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import os
2+
from datetime import datetime
3+
4+
from airflow.decorators import task
5+
from airflow.models.dag import DAG
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
8+
9+
# By default, it will use 'aws_default' connection. You can create it here by running `make minio_credentials`
10+
# If you want to change it, use a variable and pass it as `aws_conn_id` to all AWS operators.
11+
AWS_CONN_ID = 'aws_default'
12+
13+
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'patatas')
14+
15+
@task(task_id="s3_bucket_dag_add_keys_to_bucket")
16+
def upload_keys():
17+
s3_hook = S3Hook()
18+
for i in range(0, 3):
19+
s3_hook.load_string(string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME)
20+
21+
with DAG(
22+
dag_id='s3_bucket_dag',
23+
schedule_interval=None,
24+
start_date=datetime(2021, 1, 1),
25+
catchup=False,
26+
default_args={"bucket_name": BUCKET_NAME},
27+
max_active_runs=1,
28+
tags=['upv'],
29+
) as dag:
30+
31+
create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
32+
add_keys_to_bucket = upload_keys()
33+
delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
34+
create_bucket >> add_keys_to_bucket >> delete_bucket

0 commit comments

Comments
 (0)