-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path1.5_user_processing.py
More file actions
96 lines (82 loc) · 3.86 KB
/
1.5_user_processing.py
File metadata and controls
96 lines (82 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from airflow.models import DAG
from datetime import datetime
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
import json
from pandas import json_normalize
default_args={
"owner":"Tinmar"
}
def _process_user(ti):
user = ti.xcom_pull(task_ids='extract_data')
user = user['results'][0]
processed_user = json_normalize(
{
'firstname':user['name']['first'],
'lastname':user['name']['last'],
'country':user['location']['country'],
'username':user['login']['username'],
'password':user['login']['password'],
'email':user['email']
}
)
processed_user.to_csv('/c/Users/tinma/OneDrive/Escritorio/processed_user.csv', index=None, header=False)
with DAG(
'1.5_user_processing',
start_date=datetime(2023,2,16),
schedule_interval='@daily',
catchup=False,
default_args=default_args,
tags=['Curso 3', 'Introduction_to_Apache_Airflow']
) as dag:
start = DummyOperator(task_id='start')
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres',
sql="""
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL
);
"""
)
insert_data = PostgresOperator(
task_id='insert_data',
postgres_conn_id='postgres',
sql= """
INSERT INTO users VALUES ('Tinmar', 'Andrade','México','tinmar','GuruSat.3','tinmar96@gmail.com')
"""
)
select_data = PostgresOperator(
task_id='select_data',
postgres_conn_id='postgres',
sql="""
SELECT * FROM users
"""
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extract_data = SimpleHttpOperator(
task_id='extract_data',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
process_user = PythonOperator(
task_id='process_user',
python_callable=_process_user
)
end = DummyOperator(task_id='end')
start >> create_table >> insert_data >> select_data >> is_api_available >> extract_data >> process_user >> end