Skip to content

Commit f25a69f

Browse files
committed
Two more samples for Airflow + S3
1 parent f6048ba commit f25a69f

2 files changed

Lines changed: 43 additions & 16 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import os
2+
from datetime import datetime
3+
4+
from airflow.decorators import task
5+
from airflow.models.dag import DAG
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
8+
9+
# By default, it will use 'aws_default' connection. You can create it here by running `make minio_credentials`
10+
# If you want to change it, use a variable and pass it as `aws_conn_id` to all AWS operators.
11+
AWS_CONN_ID = 'aws_default'
12+
13+
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'patatas')
14+
15+
@task(task_id="s3_bucket_dag_add_keys_to_bucket")
16+
def upload_keys():
17+
s3_hook = S3Hook()
18+
for i in range(0, 3):
19+
s3_hook.load_string(string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME)
20+
21+
with DAG(
22+
dag_id='s3_bucket_operations',
23+
schedule_interval=None,
24+
start_date=datetime(2021, 1, 1),
25+
catchup=False,
26+
default_args={"bucket_name": BUCKET_NAME},
27+
max_active_runs=1,
28+
tags=['upv'],
29+
) as dag:
30+
31+
create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
32+
add_keys_to_bucket = upload_keys()
33+
delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
34+
create_bucket >> add_keys_to_bucket >> delete_bucket

airflow/dags/s3_file_sensor.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,17 @@
33

44
from airflow.decorators import task
55
from airflow.models.dag import DAG
6-
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7-
from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
8-
9-
# By default, it will use 'aws_default' connection. You can create it here by running `make minio_credentials`
10-
# If you want to change it, use a variable and pass it as `aws_conn_id` to all AWS operators.
11-
AWS_CONN_ID = 'aws_default'
6+
from airflow.models.variable import Variable
7+
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor, S3KeySizeSensor
128

139
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'patatas')
1410

15-
@task(task_id="s3_bucket_dag_add_keys_to_bucket")
16-
def upload_keys():
17-
s3_hook = S3Hook()
18-
for i in range(0, 3):
19-
s3_hook.load_string(string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME)
11+
@task(task_id="do_something")
12+
def do_something():
13+
print("Something!")
2014

2115
with DAG(
22-
dag_id='s3_bucket_dag',
16+
dag_id='s3_file_sensor',
2317
schedule_interval=None,
2418
start_date=datetime(2021, 1, 1),
2519
catchup=False,
@@ -28,7 +22,6 @@ def upload_keys():
2822
tags=['upv'],
2923
) as dag:
3024

31-
create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
32-
add_keys_to_bucket = upload_keys()
33-
delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
34-
create_bucket >> add_keys_to_bucket >> delete_bucket
25+
op = S3KeySensor(task_id="s3_key_sensor", bucket_key="s3://gasolina/some_file.json", bucket_name=None, dag=dag)
26+
end_task = do_something()
27+
op >> end_task

0 commit comments

Comments
 (0)