1.1.1

Google Cloud Platform

  • cloud computing services

    image-20250109002231923

Data pipeline

image-20250108220251131

image-20250108220311426

One Host computer (Windows/ Mac os/ Linux) can have multiple containers. Here is one example container.

image-20250108221228005

Also we could have Postgres on either Computer or Docker. Database will not connected on both environment.

Once we finish a docker container, we can run this on any Google cloud(Kubernetes) environment.

1.2..1

Why should we care about Docker?

  • Reproducibility - 可重复性 * To use in different environments
  • Local experiments
  • Integration test(CI/CD) - 集成测试
  • Running pipeline on the could(AWS Batch, Kubernetes jobs)
  • Spark
  • Serverless(AWS Lambda, Google functions)

Docker

  1. Open Docker windows Dssktop

  2. run docker run hello-world in git bash. It will go to docker hub to find an image called hello-world, download this image and run this image image-20250109004459727

  3. Then we will input docker run -it ubuntu bash.

    run: we want to run this image

    -it: we want to do this in interactive mode, i means interactive, t means terminal

    this means we want to type something and the docker will react that

    1. here may got an error message:  image-20250109011507379

      To solve it, have to follow the steps:*

      1. Open git bash, in ~ path, locate ~/.bashrc file. Cant use ls -a ~ to see whether this folder have .bashrc file image-20250109011936754

      2. If not have .bashrc file, run vim ~/.bashrc in this path. Then will create an empty file.

        image-20250109012121440

      3. Once we get this file it was empty, we have press key i to move into edit mode. Then type in alias docker='winpty docker' After that press esc exit edit mode. type :wq exit and save the file.

      4. run source ~/.bashrc and reopen git bash. DONE!

        image-20250109012607339 image-20250109012531694

  4. ISOLATED It means even I use rm -rf / --no-preserve-root to remove everything image-20250109013239891 It will reversed next time I reopen this docker container

image-20250109013339891

1.2.2

Docker for python

Run python

docker run -it python:3.9

image-20250109013542871

Download library

  • ctrl + d leave python

  • run docker run -it --entrypoint-bash python:3.9

  • run pip install pandas

    image-20250109014145737

    Things is once we leave this shell and use docker run -it --entrypoint-bash python:3.9 back in again, we still can not use pandas module. The reason is same as we use rm -rf /.

Dockerfile

docker build -t

docker build means builds and image from dockerfile

. means we want docker to build an image this directory in the current directory.

It will look for the docker file and execute this docker file and we will create am image with test name

image-20250109015809237

Create a new file called pipeline.py

image-20250109020156514

It actually works!

now debug pipeline.py. Rebuild Dockerfile.

image-20250109020647005

image-20250109020638144

If we give more arguments to docker, those paramaters will be pass.image-20250109020807957

Postgres command line for Docker

在 git bash 中执行代码,将其中的 h:\data-engineering-zoomcamp\01-docker-terraform\2_docker_sql\ny_taxi_postgres_data 替换为自己的存储路径。

image-20250110124344024

bash
1
2
3
4
5
6
7
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v "h:\data-engineering-zoomcamp\01-docker-terraform\2_docker_sql\ny_taxi_postgres_data:/var/lib/postgresql/data" \
-p 5432:5432 \
postgres:13
  • 给环境加双引号

  • windows 系统注意反斜线方向

  • 空文件夹的问题也有遇到,但尝试评论中方案都未能解决,最后重装了 docker, 莫名其妙的好了

    image-20250110124325747

环境变量配置

  • -e POSTGRES_USER="root":设置 PostgreSQL 容器内的默认数据库用户为 root
  • -e POSTGRES_PASSWORD="root":设置 root 用户的密码为 root
  • -e POSTGRES_DB="ny_taxi":设置容器内创建的默认数据库名称为 ny_taxi

数据挂载

-v "h:\data-engineering-zoomcamp\01-docker-terraform\2_docker_sql\ny_taxi_postgres_data:/var/lib/postgresql/data":将主机(host)上的路径 h:\data-engineering-zoomcamp\01-docker-terraform\2_docker_sql\ny_taxi_postgres_data 挂载到容器内的 /var/lib/postgresql/data 目录。使 PostgreSQL 容器的数据储存在主机中,防删。

端口映射

  • -p 5432:5432:将容器内的 PostgreSQL 默认端口(5432)映射到主机的 5432 端口。这样就能从主机访问容器中的 PostgreSQL 数据库。

容器镜像

  • postgres:13:指定使用 postgres 镜像的版本 13 来启动容器。

pgcli

在 Powershell 中执行

pgcli -h localhost -p 5432 -u root -d ny_taxi

image-20250110124238334

使用 \dt 查询目前有的表

image-20250110124452186

Running Jupyter Notebook

如何打开

  1. 使用 git bash,输入 jupyter notebook 然后会通过默认浏览器打开(蛮方便诶)

    image-20250110124922140

    1. 通过 anaconda 打开

image-20250110124947569

使用 jupyter notebook

image-20250110125241831

Taxi data

下载数据

wget

  1. 首先我下载了 wget.exe, 地址: https://eternallybored.org/misc/wget/

  2. wget.exe 置于 C:\Program Files\Git\mingw64\bin\ 目录下. 这里的下载目录根据自己安装 Git 时选择的目录来选择

下载数据

wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz

image-20250110135024819

解压

gzip -d yellow_tripdata_2021-01.csv.gz

image-20250110135153849

查看数据

节选前 100 条数据集查看

head -n 100 yellow_tripdata_2021-01.csv > yellow_head.csv

image-20250110135733848

计算有多少行 wc -l yellow_tripdata_2021-01.csv

image-20250110141518670

Reading csv with pandas

pd.read()

导入并查看前 100 行,导入使用 df = pd.read('yellow_tripdata_2021-01.csv') 要确保读取的 csv 和.ipynb 文件在一个路径下,否则要用绝对路径调用 image-20250110142113388

pd.io.sql.get_schema(df, name,con))

描述表格在 SQL 中的格式, 我们可以看到 tpep_pickup_datetime 在表中的额格式是 timestamp,所以用到了下面的 function 来转换 pandas 识别的格式 ![image-20250110143006298](E:\new_hexo\blog\source_posts\DE Zoomcamp w1.assets\image-20250110143006298.png) image-20250110144300725 **pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine)**:

  • df:。df 表示希望在数据库中创建的表的数据。
  • name='yellow_taxi_data':为生成的 SQL 表指定表名为 yellow_taxi_data
  • con=engine:数据库连接对象(engine),表明与 PostgreSQL 数据库的连接。SQLAlchemy engine 被用来执行与数据库的交互。

**get_schema**:

  • 根据传入的 DataFrame 的列名、数据类型等信息生成一个对应的 SQL CREATE TABLE 语句。 它不会执行 SQL,只是生成语句 ,可以将其输出并 手动执行或保存 到文件中。

添加了 con = engine 参数

pd.to_datetime()

使用 pd.to_datetime(df.tpep_pick_datetime) 来将格式转换为 datetime64 image-20250110142557817

修改数据格式:

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime) df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)image-20250110143201840

create_engine()

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

create_engine 会创建一个 SQLAlchemy Engine 对象来表示与数据库的连接。通过这个 engine,可以执行 SQL 查询、读取数据、写入数据等操作

postgresql://

这是数据库的连接协议,告诉 SQLAlchemy 使用 PostgreSQL 数据库

root:root

这是数据库的用户名和密码,格式为 username:password。之前创建的用户名和密码都是 root

localhost

localhost 表示数据库在本地运行, 也就是 pgcli 执行 pgcli -h localhost -p 5432 -u root -d ny_taxi 时候使用的 -h 参数

5432

PostgreSQL 数据库的端口号

ny_taxi

要连接的数据库的名称

pd.read_csv(…, iterator = True, chunksize = 100000)

介绍下 df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

**iterator=True**:

  • 告诉 pandas 返回一个迭代器,按块读取文件而不是一次性加载整个 CSV 文件

**chunksize=100000**:

  • 这个参数指定每次读取的行数。这里是每次读取 100,000 行数据。逐块读取 CSV 文件,直到文件读取完毕

next()

df = next(df_iter)

``next() 是 Python 的内置函数,用于从迭代器中获取下一个元素。每次调用时,它会返回.csv文件中的下一个数据块.df = next(df_iter)会将每次从迭代器取出的 100000 行数据赋值给df, 每次调用都会返回下一个数据块直到.csv`读取完毕

df.head() 查看前 5 行数据,默认是 5,可以自行设置

df.to_sql()

df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

这里的作用就是添加表头

**df.head(n=0)**:

  • df.head() 返回 DataFrame df 的前 0 行数据. 如果想要不同数量的行,可以通过 df.head(n) 来指定,例如 df.head(10)

**.to_sql()**:

  • to_sql() 将 DataFrame 的数据保存到 SQL 数据库中。它会根据 DataFrame 的结构生成相应的 INSERT 语句,将数据插入到数据库中

  • name='yellow_taxi_data':指定要将数据保存到数据库中的表名,这里是 yellow_taxi_data

  • con=engine:指定数据库连接对象,这里使用的是 SQLAlchemy 创建的 engine。它表示与 PostgreSQL 数据库的连接

  • if_exists='replace':这个参数指定当表已经存在时的处理方式

    'replace' 会删除现有的表并重新创建一个新的表

    'append' 追加数据而不是替换表

    'fail' 不做任何操作使用

    image-20250110153156902

    image-20250110153119798

%time

%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

查看将 yellow_taxi_data 添加到 engine 的时间,这里的 yellow_taxi_data 是我们之前切割过得 100000

image-20250110153247129

遇到的问题

No module named ‘psycopg2’

image-20250110143345096

解决方法:pip install psycopg2 image-20250110143526159

StopIteration:

只是说明迭代器 df = next(df_iter) 已经没有数据块可以继续读取了, 并非问题

image-20250110153900462

1.2.3

创建网络将两个 container 放在一个网络下

docker network create pg-network

image-20250112111933713

将 postgres: 13 这个 container 放入网络

image-20250111235244485

将 dpage/pgadmin4 这个 container 放入网络

image-20250111235227245

pgadmin 使用 query

image-20250111235329737

1.2.4

.ipynb 文件转换成 .py 文件

jupyter nbconvert --to=script upload-data.ipynb

  • **jupyter nbcconvert **

    Jupyter 的工具,用于将 notebook 转换为其他格式

  • --to=script

    指定转换目标为 .py, 这会将 Notebook 中的代码单元格提取并保存为 Python 脚本,同时将 Markdown 单元格和注释保留为脚本中的注释

  • upload-data.ipynb
    指定要转换的 Jupyter Notebook 文件名,这里是 upload-data.ipynb

使用 parser.add_argument()

image-20250112122852378

图中这些参数, 在后续的使用中都是要在执行这个 .py 的时候输入的参数

先设置临时环境变量,这是我们下载代码的 url

URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"

这里执行这个 .py 的指令是:

1
2
3
4
5
6
7
8
python ingest_data.py \
--user=root \
--password=root \
--host=localhost \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}

执行效果:

image-20250112123226841

这里有个坑,就是视频中用的是下载完直接 .csv 格式,而目前我们有的是 .gz 格式,所以在代码中加了

1
2
3
4
if url.endswith('.csv.gz'):
csv_name = 'output.csv.gz'
else:
csv_name = 'output.csv'

这样如果传入的 urlhttps://example.com/data.csv.gz,那么 csv_name 会被设置为 output.csv.gz。如果传入的 urlhttps://example.com/data.csv,那么 csv_name 会被设置为 output.csv

后续 pandas 会根据文件扩展名来判断如何读取文件

解释下 Dockerfile

1
2
3
4
5
6
7
8
9
FROM python:3.9.1

RUN apt-get install wget
RUN pip install pandas sqlalchemy psycopg2

WORKDIR /app
COPY ingest_data.py ingest_data.py

ENTRYPOINT [ "python", "ingest_data.py" ]

RUN pip install pandas sqlalchemy psycopg2

基于我们在 python 文件中需要使用 sqlalchemypsycopg2 这两个库, 所以需要在这里下载 dependency

构建 Dockerfile 并执行

在 Dockerfile 所在的目录下执行

docker build -t taxi_ingest:v001 .

image-20250113223148218

构建完成后执行

1
2
3
4
5
6
7
8
9
10
11
12

docker run -it \
--network=pg-network \
taxi_ingest:v001 \
--user=root \
--password=root \
--host=pg-database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}

image-20250113231929742

此处有坑不要根据视频来填写 --host=localhost \ 会产生报错 psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "localhost" (127.0.0.1) and accepting TCP/IP connections on port 5432? image-20250113231439723

HTTP server + ipconfig

git bash 中输入 python -m httpm.server

image-20250113233803162

image-20250113233848603

使用 ipconfig 查询地址

image-20250113234327973

之后可以将本地的这个 .csvURL 更新为环境变量中的 URL 值, 即

URL="http://192.168.64.1:8000/yellow_tripdata_2021-01.csv.gz"

使用之前同样的指令,这样下载数据会变快

1.2.5

Docker Compose yaml file

目的 :无需执行之前使用的多个命令行,例如创建网络,添加网络 etc.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
services:
pgdatabase: # 需要的服务
image: postgres:13 # 对于pgdatabase我们需要的image
environment: # 设置的环境变量
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes: # 这里是路径,./ny_taxi_postgres_data 是本机路径,/var/lib/postgresql/dat 是容器里的路径 rw 是读写
- "./ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
ports: # host和映射到终端的port
- "5432:5432"
pgadmin: # 需要的服务
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"

之后执行 docker-compose up

image-20250114144546427

退出的时候 ctrl + C

image-20250114145035735

输入 docker-compose up -d 启动

image-20250114145134211

关闭使用 docker-compose down

image-20250114145301591

1.2.6

下载 zone 数据

!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

由于 .csv 的地址发生改变,所以需要用这个链接进行下载

SQL 查询

查看全部数据

1
2
3
4
select
*
from
zones;

查看 100 条数据

1
2
3
4
5
select
*
from
yellow_taxi_trips
limit 100;

用实际名称代替区域标号

使用 where 中的关键词进行关联

1
2
3
4
5
6
7
8
9
10
select 
*
from
yellow_taxi_trips t,
zones zpu,
zones zdo
where
t."PULocationID" = zpu."LocationID" and
t."DOLocationID" = zdo."LocationID"
limit 100;

还可以用

1
2
3
4
5
6
7
8
select 
*
from
yellow_taxi_trips t join zones zpu
on t."PULocationID" = zpu."LocationID"
join zones zdo
on t."DOLocationID" = zdo."LocationID"
limit 100;

只查看部分列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
select
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
concat(zpu."Borough", ' / ', zpu."Zone") as "pickup_loc",
concat(zdo."Borough", ' / ', zdo."Zone") as "dropoff_loc"
from
yellow_taxi_trips t,
zones zpu,
zones zdo
where
t."PULocationID" = zpu."LocationID" and
t."DOLocationID" = zdo."LocationID"
limit 100;

1
2
3
4
5
6
7
8
9
10
11
12
select 
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
concat(zpu."Borough", ' / ', zpu."Zone") as "pickup_loc",
concat(zdo."Borough", ' / ', zdo."Zone") as "dropoff_loc"
from
yellow_taxi_trips t join zones zpu
on t."PULocationID" = zpu."LocationID"
join zones zdo
on t."DOLocationID" = zdo."LocationID"
limit 100;

检查是否有 NULL 值

1
2
3
4
5
6
7
8
9
10
select
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
"PULocationID",
"DOLocationID"
from
yellow_taxi_trips t
where
"PULocationID" is NULL

检查 yellow_taxi_trips 表中是否有 DOLocationID 不在 zones 表中

1
2
3
4
5
6
7
8
9
10
11
select
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
"PULocationID",
"DOLocationID"
from
yellow_taxi_trips t
where
"DOLocationID" not in (select "LocationID" from zones)
limit 100;

删除某个数据

1
delete from zones where "LocationID" = 142;

LEFT JOIN

这里由于之前删除了 LocationID=142 的数据,所以我想让 ID=142 的数据显示 unknown

1
2
3
4
5
6
7
8
9
10
11
12
select 
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
concat(zpu."Borough", ' / ', zpu."Zone") as "pickup_loc",
concat(zdo."Borough", ' / ', zdo."Zone") as "dropoff_loc"
from
yellow_taxi_trips t left join zones zpu
on t."PULocationID" = zpu."LocationID"
join zones zdo
on t."DOLocationID" = zdo."LocationID"
limit 100;

image-20250114185430672

  • 同理,RIGHT JOIN 就是 zones 表中有记录,但是 yellow_taxi_trips 这个表里没有记录,那么我们可以让这个记录显示 unknown
  • OUTER JOIN 像是 LEFT JOINRIGHT JOIN 的组合,左边没有或者右边没有记录都会显示 Unknown

GROUP BY

进行计数,这里是按照日期计数

1
2
3
4
5
6
7
select
CAST(tpep_dropoff_datetime as DATE) as "day",
count(1)
from
yellow_taxi_trips t
group by
CAST(tpep_dropoff_datetime as DATE);

ORDER BY

1
2
3
4
5
6
7
8
9
10
select
CAST(tpep_dropoff_datetime as DATE) as "day",
count(1),
max(total_amount),
max(passenger_count)
from
yellow_taxi_trips t
group by
CAST(tpep_dropoff_datetime as DATE)
order by "day" asc;

DESC from highest to lowest 

ASC from lowest to highest

1
2
3
4
5
6
7
8
9
10
11
12
13
select
CAST(tpep_dropoff_datetime as DATE) as "day",
"DOLocationID",
count(1) as "count",
max(total_amount),
max(passenger_count)
from
yellow_taxi_trips t
group by
1, 2
order by
"day" asc,
"DOLocationID" asc;