-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path9.2_postgres_dag.py
More file actions
69 lines (53 loc) · 2.14 KB
/
9.2_postgres_dag.py
File metadata and controls
69 lines (53 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
### Uso del PostgresOperator
Este operador se utiliza para interactuar con una base de datos
postgresql mediante lenguaje SQL...
Además de lo visto en la sección 1 de este DAG, ahora vamos a
implementar parámetros dinámicos dentro de este Operador de la
siguiente manera
"""
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
dag_owner = 'Tinmar'
default_args = {'owner': dag_owner,
'depends_on_past': False,
'retries': 0
}
def _print_sql(ti):
sql = ti.xcom_pull(task_ids='postgres_insert', key='return_value')
print(sql)
with DAG(
dag_id='9.2_postgres_dag',
default_args=default_args,
description='',
start_date=datetime(2023,2,23),
schedule_interval=None,
catchup=False,
tags=['Curso4', 'The Operators Guide']
)as dag:
dag.doc_md = __doc__
start = DummyOperator(task_id='start')
postgres_create = PostgresOperator(
task_id='postgres_create',
sql="sql/create.sql"
)
postgres_insert = PostgresOperator(
task_id='postgres_insert',
sql=["sql/insert_2.sql", "SELECT * FROM my_table"],
params={
"filename":"python.py"
}
)
# postgres_select = PostgresOperator(
# task_id='postgres_select',
# sql="SELECT * FROM my_table"
# )
print_sql = PythonOperator(
task_id='print_sql',
python_callable=_print_sql
)
end = DummyOperator(task_id='end')
start >> postgres_create >> postgres_insert >> print_sql >> end