DuckDB 加速数据分析效率

性能对比:DuckDB 与传统方法
在本节中,我将向你展示如何从头开始编写一个数据管道。这个过程会相对简单:从磁盘读取数据,进行聚合操作,然后写入结果。我的目标是展示从 pandas 切换到 DuckDB 能在性能上带来多大提升。不仅如此,使用 DuckDB 编写的代码也会更加简洁。
这并不是对 ETL/ELT 流程的全面介绍,只是一个高层次的概述。
数据管道目标
我即将展示的数据管道能够实现以下目标:
- 提取:从磁盘读取多个 Parquet 文件(大约包含 1.2 亿行数据)。
- 转换:计算每月的汇总统计数据,例如出租车公司的收入、收入利润率(乘车费用与乘客支付金额的差值)以及每次乘车的平均收入。你还会看到我在本文前面讨论过的一些更常见的统计数据。
- 加载:将每月的统计数据以 CSV 文件的形式本地保存。
运行代码后,两种管道实现方式得到的聚合结果应该完全相同(不考虑一些舍入差异):
DuckDB 和 Pandas 的管道结果
首先,让我们来看一下使用 pandas 实现的数据管道。
代码:Python 和 pandas
无论我怎么尝试,都无法一次性处理所有 6 个 Parquet 文件。几秒钟内就会出现如下系统警告信息:
系统内存错误
看起来 36GB 的内存不足以一次性处理 1.2 亿行数据。结果,Python 脚本被终止:
由于内存不足,Python 脚本被终止
为了解决这个问题,我不得不按顺序处理 Parquet 文件。以下是我用于数据管道的代码:
import os
import pandas as pd
def calculate_monthly_stats_per_file(file_path: str) -> pd.DataFrame:
# 读取单个 Parquet 文件
df = pd.read_parquet(file_path)
# 提取乘车年份和月份
df["ride_year"] = df["pickup_datetime"].dt.year
df["ride_month"] = df["pickup_datetime"].dt.month
# 移除不在指定时间段内的数据点
df = df[(df["ride_year"] == 2024) & (df["ride_month"] >= 1) & (df["ride_month"] <= 6)]
# 合并乘车年份和月份
df["ride_period"] = df["ride_year"].astype(str) + "-" + df["ride_month"].astype(str)
# 计算总乘车费用
df["total_ride_cost"] = (
df["base_passenger_fare"] + df["tolls"] + df["bcf"] +
df["sales_tax"] + df["congestion_surcharge"] + df["airport_fee"] + df["tips"]
)
# 聚合操作
summary = df.groupby("ride_period").agg(
num_rides=("pickup_datetime", "count"),
ride_time_in_days=("trip_time", lambda x: round(x.sum() / 86400, 2)),
total_miles=("trip_miles", "sum"),
total_ride_cost=("total_ride_cost", "sum"),
total_rider_pay=("driver_pay", "sum")
).reset_index()
# 额外的属性
summary["total_miles_in_mil"] = summary["total_miles"] / 1000000
summary["company_revenue"] = round(summary["total_ride_cost"] - summary["total_rider_pay"], 2)
summary["company_margin"] = round((1 - (summary["total_rider_pay"] / summary["total_ride_cost"])) * 100, 2).astype(
str) + "%"
summary["avg_company_revenue_per_ride"] = round(summary["company_revenue"] / summary["num_rides"], 2)
# 删除不再需要的列
summary.drop(["total_miles"], axis=1, inplace=True)
return summary
def calculate_monthly_stats(file_dir: str) -> pd.DataFrame:
# 从多个 Parquet 文件读取数据
files = [os.path.join(file_dir, f) for f in os.listdir(file_dir) if f.endswith(".parquet")]
df = pd.DataFrame()
for file in files:
print(file)
file_stats = calculate_monthly_stats_per_file(file_path=file)
# 检查 df 是否为空
if df.empty:
df = file_stats
else:
# 按行连接
df = pd.concat([df, file_stats], axis=0)
# 对数据集进行排序
df = df.sort_values(by="ride_period")
# 更改列的顺序
cols = ["ride_period", "num_rides", "ride_time_in_days", "total_miles_in_mil", "total_ride_cost",
"total_rider_pay", "company_revenue", "company_margin", "avg_company_revenue_per_ride"]
return df[cols]
if __name__ == "__main__":
data_dir = "nyc-taxi-data"
output_dir = "pipeline_results"
output_file_name = "results_pandas.csv"
# 运行数据管道
monthly_stats = calculate_monthly_stats(file_dir=data_dir)
# 保存为 CSV 文件
monthly_stats.to_csv(f"{output_dir}/{output_file_name}", index=False)
希望使用 DuckDB 实现的数据管道能够顺利完成,不会出现任何内存问题。
代码:DuckDB
你对 DuckDB 代码的大部分内容应该已经很熟悉了。唯一的新内容是最外层的 SELECT
语句,因为它计算了一些额外的统计数据。其他部分我都保持不变:
import duckdb
import pandas as pd
def calculate_monthly_stats(file_dir: str) -> pd.DataFrame:
query = f"""
SELECT
ride_period,
num_rides,
ride_time_in_days,
total_miles / 1000000 AS total_miles_in_mil,
total_ride_cost,
total_rider_pay,
ROUND(total_ride_cost - total_rider_pay, 2) AS company_revenue,
ROUND((1 - total_rider_pay / total_ride_cost) * 100, 2) || '%' AS company_margin,
ROUND((total_ride_cost - total_rider_pay) / num_rides, 2) AS avg_company_revenue_per_ride
FROM (
SELECT
ride_year || '-' || ride_month AS ride_period,
COUNT(*) AS num_rides,
ROUND(SUM(trip_time) / 86400, 2) AS ride_time_in_days,
ROUND(SUM(trip_miles), 2) AS total_miles,
ROUND(SUM(base_passenger_fare + tolls + bcf + sales_tax + congestion_surcharge + airport_fee + tips), 2) AS total_ride_cost,
ROUND(SUM(driver_pay), 2) AS total_rider_pay
FROM (
SELECT
DATE_PART('year', pickup_datetime) AS ride_year,
DATE_PART('month', pickup_datetime) AS ride_month,
trip_time,
trip_miles,
base_passenger_fare,
tolls,
bcf,
sales_tax,
congestion_surcharge,
airport_fee,
tips,
driver_pay
FROM PARQUET_SCAN("{file_dir}/*.parquet")
WHERE
ride_year = 2024
AND ride_month >= 1
AND ride_month <= 6
)
GROUP BY ride_period
ORDER BY ride_period
)
"""
conn = duckdb.connect()
df = conn.sql(query).df()
conn.close()
return df
if __name__ == "__main__":
data_dir = "nyc-taxi-data"
output_dir = "pipeline_results"
output_file_name = "results_duckdb.csv"
# 运行数据管道
monthly_stats = calculate_monthly_stats(file_dir=data_dir)
# 保存为 CSV 文件
monthly_stats.to_csv(f"{output_dir}/{output_file_name}", index=False)
接下来,我将向你展示运行时间的差异。
性能对比结果
在分别运行两个管道 5 次并取平均值后,得到了如下运行时间:
DuckDB 与 pandas 的运行时间对比图
平均而言,在加载和处理大约 1.2 亿行(约 3GB)、分布在 6 个 Parquet 文件中的数据时,pandas 的速度比 DuckDB 慢 24 倍。
这种对比并不完全公平,因为我无法使用 pandas 一次性处理所有 Parquet 文件。不过,这些结果仍然具有参考价值,因为在日常工作中你也可能会遇到同样的困难。
如果一个库无法满足你的需求,那就试试另一个。大多数情况下,能够最快完成任务的库很可能就是 DuckDB。