Skip to content

Commit d13734a

Browse files
authored
Merge pull request #23 from godatadriven/kg/update_ch12
Switch chapters 12 and 14 around
2 parents 2710fe5 + dffc9df commit d13734a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+262
-258
lines changed

chapter12/.env

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
AIRFLOW_UID=1000
2+
AIRFLOW_GID=0
File renamed without changes.

chapter12/README.md

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,79 @@
1-
# Chapter 12
1+
# Chapter 14
22

3-
Code accompanying Chapter 12 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).
3+
Code accompanying Chapter 14 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).
44

5-
## Contents
5+
Much of this use case was based on the ideas in https://toddwschneider.com/posts/taxi-vs-citi-bike-nyc, where
6+
the fastest method of transportation (Citi Bikes or Yellow Taxis) between two NYC neighborhoods at given times
7+
and days is determines. We will productionize this use case into an Airflow workflow. Two (mocked) real-time
8+
services are created for this purpose:
69

7-
This folder contains DAGs from Chapter 12. Topics covered are monitoring, logging, scaling horizontal, etc. An
8-
accompanying Docker Compose setup was built for demonstration purposes. This includes:
10+
1. A service providing Citi Bike rides
11+
1. A service providing NYC Yellow Taxi rides
912

10-
- Airflow (webserver, scheduler, and Celery workers)
11-
- PostgreSQL database for Airflow metastore
12-
- Redis for Celery queue
13-
- Flower, a Celery monitoring tool
14-
- Prometheus, for scraping and storing metrics
15-
- Grafana, for visualizing metrics
16-
- And a Redis & StatsD exporter to expose metrics
13+
An Airflow DAG will periodically scrape these services, wrangle the data, and send the results into a Postgres
14+
database serving a tiny website, displaying which transportation method is fastest between two NYC
15+
neighborhoods.
1716

18-
Given the number of services, this can become a bit resource-heavy on your machine.
17+
The real data is only available in batches of months/years. Therefore we provide two APIs to mimic "live"
18+
systems. The system can be brought online with the `docker-compose.yml` provided with this repository:
1919

20-
Unfortunately, not everything can be scripted/pre-initialized, especially in Grafana. Therefore, you must add
21-
Prometheus as a datasource and create a dashboard yourself.
20+
```bash
21+
docker-compose up -d
22+
```
2223

23-
## Usage
24+
If all processing goes well, the final result is visible on http://localhost:8083.
2425

25-
To get started with the code examples, start Airflow with Docker Compose with the following command:
26+
Ports on which services are available:
2627

27-
```bash
28-
docker-compose up -d
28+
- http://localhost:5432: Airflow Postgres DB (`airflow`/`airflow`)
29+
- http://localhost:5433: NYC Taxi Postgres DB (`taxi`/`ridetlc`)
30+
- http://localhost:5434: Citi Bike Postgres DB (`citi`/`cycling`)
31+
- http://localhost:5435: NYC Transportation results Postgres DB (`nyc`/`tr4N5p0RT4TI0N`)
32+
- http://localhost:8080: Airflow webserver (`airflow`/`airflow`)
33+
- http://localhost:8081: NYC Taxi static file server
34+
- http://localhost:8082: Citi Bike API (`citibike`/`cycling`)
35+
- http://localhost:8083: NYC Transportation API
36+
- http://localhost:9000: MinIO (`AKIAIOSFODNN7EXAMPLE`/`wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`)
37+
38+
## More information
39+
40+
### Taxi dataset
41+
42+
The taxi dataset refers to the "NYC Taxi and Limousine Commission (TLC) Trip Record Data", available here:
43+
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.
44+
45+
A snippet of the data:
46+
47+
```csv
48+
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
49+
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
50+
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
51+
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,
52+
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,0,0.3,7.55,
53+
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
54+
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,5.76,0.3,13.31,
55+
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
56+
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.30,1,N,163,229,1,6.5,0.5,0.5,1.25,0,0.3,9.05,
57+
1,2019-01-01 00:32:01,2019-01-01 00:45:39,1,3.70,1,N,229,7,1,13.5,0.5,0.5,3.7,0,0.3,18.5,
2958
```
3059

31-
The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
32-
Airflow webserver at http://localhost:8080.
60+
Data dictionary: https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
3361

34-
To stop running the examples, run the following command:
62+
### Citi Bike dataset
3563

36-
```bash
37-
docker-compose down -v
64+
https://www.citibikenyc.com/system-data
65+
66+
A snippet of the data:
67+
68+
```csv
69+
"tripduration","starttime","stoptime","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender"
70+
201,"2019-01-01 03:09:09.7110","2019-01-01 03:12:30.8790",3183,"Exchange Place",40.7162469,-74.0334588,3214,"Essex Light Rail",40.7127742,-74.0364857,29612,"Subscriber",1993,1
71+
505,"2019-01-01 05:18:00.1060","2019-01-01 05:26:25.9050",3183,"Exchange Place",40.7162469,-74.0334588,3638,"Washington St",40.7242941,-74.0354826,29213,"Subscriber",1972,2
72+
756,"2019-01-01 10:36:33.3400","2019-01-01 10:49:10.2600",3183,"Exchange Place",40.7162469,-74.0334588,3192,"Liberty Light Rail",40.7112423,-74.0557013,26164,"Subscriber",1985,1
73+
1575,"2019-01-01 12:43:38.6430","2019-01-01 13:09:54.5280",3183,"Exchange Place",40.7162469,-74.0334588,3638,"Washington St",40.7242941,-74.0354826,29672,"Customer",1969,0
74+
1566,"2019-01-01 12:43:39.6010","2019-01-01 13:09:46.5100",3183,"Exchange Place",40.7162469,-74.0334588,3638,"Washington St",40.7242941,-74.0354826,29522,"Customer",1969,0
75+
737,"2019-01-01 12:56:53.2040","2019-01-01 13:09:11.0400",3183,"Exchange Place",40.7162469,-74.0334588,3205,"JC Medical Center",40.71653978099194,-74.0496379137039,29447,"Subscriber",1993,1
76+
917,"2019-01-01 13:03:44.7760","2019-01-01 13:19:02.7690",3183,"Exchange Place",40.7162469,-74.0334588,3277,"Communipaw & Berry Lane",40.71435836870427,-74.06661093235016,29299,"Subscriber",1986,1
77+
3248,"2019-01-01 13:12:03.1280","2019-01-01 14:06:12.0400",3183,"Exchange Place",40.7162469,-74.0334588,3196,"Riverview Park",40.7443187,-74.0439909,29495,"Subscriber",1992,1
78+
3168,"2019-01-01 13:13:12.0450","2019-01-01 14:06:00.4110",3183,"Exchange Place",40.7162469,-74.0334588,3196,"Riverview Park",40.7443187,-74.0439909,26312,"Customer",1969,0
3879
```
File renamed without changes.

chapter12/docker-compose.yml

Lines changed: 106 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@ version: '3.7'
22

33
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
44
x-environment: &airflow_environment
5-
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/1
6-
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres:5432/airflow
7-
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
5+
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False
6+
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
7+
- AIRFLOW__CORE__FERNET_KEY=hCRoPUYBO27QiEg1MRu5hSjLG7yNd8y8XKlm-8kRlkQ=
88
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
99
- AIRFLOW__CORE__LOAD_EXAMPLES=False
1010
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
11-
- AIRFLOW__METRICS__STATSD_HOST=statsd_exporter
12-
- AIRFLOW__METRICS__STATSD_ON=True
13-
- AIRFLOW__METRICS__STATSD_PORT=9125
11+
- AIRFLOW__CORE__STORE_DAG_CODE=True
12+
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
1413
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
14+
- AIRFLOW_CONN_S3=s3://@?host=http://minio:9000&aws_access_key_id=AKIAIOSFODNN7EXAMPLE&aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
15+
- AIRFLOW_CONN_CITIBIKE=http://citibike:cycling@citibike_api:5000
16+
- AIRFLOW_CONN_TAXI=http://taxi_fileserver
17+
- AIRFLOW_CONN_RESULT_DB=postgresql://nyc:tr4N5p0RT4TI0N@result_db:5432/nyctransportation
1518

1619
x-airflow-image: &airflow_image apache/airflow:2.0.0-python3.8
1720
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
@@ -26,99 +29,139 @@ services:
2629
ports:
2730
- "5432:5432"
2831

29-
init:
30-
image: *airflow_image
32+
initdb_adduser:
33+
build:
34+
context: ./services/airflow
35+
args:
36+
AIRFLOW_BASE_IMAGE: *airflow_image
3137
depends_on:
3238
- postgres
3339
environment: *airflow_environment
3440
entrypoint: /bin/bash
3541
command: -c 'airflow db init && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email [email protected]'
3642

3743
webserver:
38-
image: *airflow_image
44+
build:
45+
context: ./services/airflow
46+
args:
47+
AIRFLOW_BASE_IMAGE: *airflow_image
3948
restart: always
4049
depends_on:
4150
- postgres
42-
ports:
43-
- "8080:8080"
4451
volumes:
4552
- logs:/opt/airflow/logs
53+
ports:
54+
- "8080:8080"
4655
environment: *airflow_environment
4756
command: webserver
4857

4958
scheduler:
50-
image: *airflow_image
59+
build:
60+
context: ./services/airflow
61+
args:
62+
AIRFLOW_BASE_IMAGE: *airflow_image
5163
restart: always
5264
depends_on:
5365
- postgres
5466
volumes:
55-
- logs:/opt/airflow/logs
5667
- ./dags:/opt/airflow/dags
68+
- ./src:/opt/airflow/nyctransport/src
69+
- ./setup.py:/opt/airflow/nyctransport/setup.py
70+
- logs:/opt/airflow/logs
5771
environment: *airflow_environment
58-
command: scheduler
72+
entrypoint: ["/bin/sh"]
73+
command: ["-c", "pip install --user -e /opt/airflow/nyctransport && airflow scheduler"]
5974

60-
# docker-compose -f docker-compose-celeryexecutor.yml up --scale worker=3 -d
61-
worker:
62-
image: *airflow_image
63-
restart: always
75+
taxi_fileserver:
76+
build:
77+
context: ./services/taxi_fileserver
78+
environment:
79+
- POSTGRES_HOST=taxi_db
80+
- POSTGRES_PORT=5433
81+
- POSTGRES_USERNAME=taxi
82+
- POSTGRES_PASSWORD=ridetlc
83+
- POSTGRES_DATABASE=tlctriprecords
84+
- DATA_YEAR=2019
6485
depends_on:
65-
- scheduler
66-
volumes:
67-
- logs:/opt/airflow/logs
68-
- ./dags:/opt/airflow/dags
69-
environment: *airflow_environment
70-
command: celery worker
86+
- taxi_db
87+
ports:
88+
- "8081:80"
7189

72-
flower:
73-
image: *airflow_image
74-
restart: always
90+
taxi_db:
91+
build:
92+
context: ./services/taxi_db
93+
ports:
94+
- "5433:5432"
95+
96+
citibike_api:
97+
build:
98+
context: ./services/citibike_api
99+
environment:
100+
- POSTGRES_HOST=citibike_db
101+
- POSTGRES_PORT=5434
102+
- POSTGRES_USERNAME=citi
103+
- POSTGRES_PASSWORD=cycling
104+
- POSTGRES_DATABASE=citibike
105+
- DATA_YEAR=2019
75106
depends_on:
76-
- worker
107+
- citibike_db
77108
ports:
78-
- "5555:5555"
79-
environment: *airflow_environment
80-
command: celery flower
109+
- "8082:5000"
81110

82-
statsd_exporter:
83-
image: prom/statsd-exporter:v0.18.0
84-
restart: always
85-
volumes:
86-
- ./files/statsd_mapping.yml:/tmp/statsd_mapping.yml
111+
citibike_db:
112+
build:
113+
context: ./services/citibike_db
87114
ports:
88-
- "9102:9102"
89-
- "9125:9125/udp"
90-
command: --statsd.mapping-config=/tmp/statsd_mapping.yml
115+
- "5434:5432"
91116

92-
prometheus:
93-
image: prom/prometheus:v2.22.0
94-
restart: always
117+
minio:
118+
image: minio/minio:RELEASE.2020-06-22T03-12-50Z
95119
volumes:
96-
- ./files/prometheus.yml:/etc/prometheus/prometheus.yml
120+
- s3:/data
97121
ports:
98-
- "9090:9090"
99-
command:
100-
- --web.enable-admin-api
101-
- --web.enable-lifecycle
102-
# Flags below are defaults, but must be added explicitly, otherwise would be overridden by flags above
103-
- --config.file=/etc/prometheus/prometheus.yml
104-
- --storage.tsdb.path=/prometheus
105-
- --web.console.libraries=/usr/share/prometheus/console_libraries
106-
- --web.console.templates=/usr/share/prometheus/consoles
122+
- "9000:9000"
123+
command: server /data
124+
environment:
125+
MINIO_ACCESS_KEY: AKIAIOSFODNN7EXAMPLE
126+
MINIO_SECRET_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
107127

108-
grafana:
109-
image: grafana/grafana:7.2.1
110-
restart: always
111-
ports:
112-
- "3000:3000"
128+
minio_init:
129+
image: minio/mc:RELEASE.2020-06-20T00-18-43Z
130+
depends_on:
131+
- minio
132+
entrypoint: >
133+
/bin/sh -c "
134+
while ! nc -z minio 9000; do echo 'Waiting 1 sec for MinIO to be healthy...' && sleep 1; done;
135+
echo 'MinIO is available.';
136+
while ! /usr/bin/mc config host add minio http://minio:9000 AKIAIOSFODNN7EXAMPLE wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY; do echo 'MinIO not up and running yet...' && sleep 1; done;
137+
echo 'Added mc host config.';
138+
/usr/bin/mc mb minio/datalake;
139+
exit 0;
140+
"
113141
114-
redis:
115-
image: redis:5-alpine
142+
result_db:
143+
image: postgres:13-alpine
144+
environment:
145+
- POSTGRES_USER=nyc
146+
- POSTGRES_PASSWORD=tr4N5p0RT4TI0N
147+
- POSTGRES_DB=nyctransportation
148+
volumes:
149+
- ./services/result_db/create_tables.sql:/docker-entrypoint-initdb.d/create_tables.sql
150+
ports:
151+
- "5435:5432"
116152

117-
redis_exporter:
118-
image: oliver006/redis_exporter:v1.5.2-alpine
153+
nyc_transportation_api:
154+
build:
155+
context: ./services/nyc_transportation_api
156+
environment:
157+
- POSTGRES_HOST=result_db
158+
- POSTGRES_PORT=5432
159+
- POSTGRES_USERNAME=nyc
160+
- POSTGRES_PASSWORD=tr4N5p0RT4TI0N
161+
- POSTGRES_DATABASE=nyctransportation
119162
ports:
120-
- "9121:9121"
121-
command: --redis.addr=redis://redis:6379
163+
- "8083:5000"
122164

123165
volumes:
124166
logs:
167+
s3:
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)