DuckDB 加速数据分析效率

DuckDB 加速数据分析效率

性能对比:DuckDB 与传统方法

在本节中,我将向你展示如何从头开始编写一个数据管道。这个过程会相对简单:从磁盘读取数据,进行聚合操作,然后写入结果。我的目标是展示从 pandas 切换到 DuckDB 能在性能上带来多大提升。不仅如此,使用 DuckDB 编写的代码也会更加简洁。

这并不是对 ETL/ELT 流程的全面介绍,只是一个高层次的概述。

数据管道目标

我即将展示的数据管道能够实现以下目标:

  • 提取:从磁盘读取多个 Parquet 文件(大约包含 1.2 亿行数据)。
  • 转换:计算每月的汇总统计数据,例如出租车公司的收入、收入利润率(乘车费用与乘客支付金额的差值)以及每次乘车的平均收入。你还会看到我在本文前面讨论过的一些更常见的统计数据。
  • 加载:将每月的统计数据以 CSV 文件的形式本地保存。

运行代码后,两种管道实现方式得到的聚合结果应该完全相同(不考虑一些舍入差异):

DuckDB 和 Pandas 的管道结果

首先,让我们来看一下使用 pandas 实现的数据管道。

代码:Python 和 pandas

无论我怎么尝试,都无法一次性处理所有 6 个 Parquet 文件。几秒钟内就会出现如下系统警告信息:

系统内存错误

看起来 36GB 的内存不足以一次性处理 1.2 亿行数据。结果,Python 脚本被终止:

由于内存不足,Python 脚本被终止

为了解决这个问题,我不得不按顺序处理 Parquet 文件。以下是我用于数据管道的代码:

import os
import pandas as pd


def calculate_monthly_stats_per_file(file_path: str) -> pd.DataFrame:
    # 读取单个 Parquet 文件
    df = pd.read_parquet(file_path)
    # 提取乘车年份和月份
    df["ride_year"] = df["pickup_datetime"].dt.year
    df["ride_month"] = df["pickup_datetime"].dt.month
    # 移除不在指定时间段内的数据点
    df = df[(df["ride_year"] == 2024) & (df["ride_month"] >= 1) & (df["ride_month"] <= 6)]
    # 合并乘车年份和月份
    df["ride_period"] = df["ride_year"].astype(str) + "-" + df["ride_month"].astype(str)
    # 计算总乘车费用
    df["total_ride_cost"] = (
            df["base_passenger_fare"] + df["tolls"] + df["bcf"] +
            df["sales_tax"] + df["congestion_surcharge"] + df["airport_fee"] + df["tips"]
    )
    # 聚合操作
    summary = df.groupby("ride_period").agg(
        num_rides=("pickup_datetime", "count"),
        ride_time_in_days=("trip_time", lambda x: round(x.sum() / 86400, 2)),
        total_miles=("trip_miles", "sum"),
        total_ride_cost=("total_ride_cost", "sum"),
        total_rider_pay=("driver_pay", "sum")
    ).reset_index()
    # 额外的属性
    summary["total_miles_in_mil"] = summary["total_miles"] / 1000000
    summary["company_revenue"] = round(summary["total_ride_cost"] - summary["total_rider_pay"], 2)
    summary["company_margin"] = round((1 - (summary["total_rider_pay"] / summary["total_ride_cost"])) * 100, 2).astype(
        str) + "%"
    summary["avg_company_revenue_per_ride"] = round(summary["company_revenue"] / summary["num_rides"], 2)
    # 删除不再需要的列
    summary.drop(["total_miles"], axis=1, inplace=True)
    return summary


def calculate_monthly_stats(file_dir: str) -> pd.DataFrame:
    # 从多个 Parquet 文件读取数据
    files = [os.path.join(file_dir, f) for f in os.listdir(file_dir) if f.endswith(".parquet")]

    df = pd.DataFrame()
    for file in files:
        print(file)
        file_stats = calculate_monthly_stats_per_file(file_path=file)
        # 检查 df 是否为空
        if df.empty:
            df = file_stats
        else:
            # 按行连接
            df = pd.concat([df, file_stats], axis=0)
    # 对数据集进行排序
    df = df.sort_values(by="ride_period")
    # 更改列的顺序
    cols = ["ride_period", "num_rides", "ride_time_in_days", "total_miles_in_mil", "total_ride_cost",
            "total_rider_pay", "company_revenue", "company_margin", "avg_company_revenue_per_ride"]
    return df[cols]


if __name__ == "__main__":
    data_dir = "nyc-taxi-data"
    output_dir = "pipeline_results"
    output_file_name = "results_pandas.csv"
    # 运行数据管道
    monthly_stats = calculate_monthly_stats(file_dir=data_dir)
    # 保存为 CSV 文件
    monthly_stats.to_csv(f"{output_dir}/{output_file_name}", index=False)

希望使用 DuckDB 实现的数据管道能够顺利完成,不会出现任何内存问题。

代码:DuckDB

你对 DuckDB 代码的大部分内容应该已经很熟悉了。唯一的新内容是最外层的 SELECT 语句,因为它计算了一些额外的统计数据。其他部分我都保持不变:

import duckdb
import pandas as pd


def calculate_monthly_stats(file_dir: str) -> pd.DataFrame:
    query = f"""
        SELECT
            ride_period,
            num_rides,
            ride_time_in_days,
            total_miles / 1000000 AS total_miles_in_mil,
            total_ride_cost,
            total_rider_pay,
            ROUND(total_ride_cost - total_rider_pay, 2) AS company_revenue,
            ROUND((1 - total_rider_pay / total_ride_cost) * 100, 2) || '%' AS company_margin,
            ROUND((total_ride_cost - total_rider_pay) / num_rides, 2) AS avg_company_revenue_per_ride
        FROM (
            SELECT
                ride_year || '-' || ride_month AS ride_period,
                COUNT(*) AS num_rides,
                ROUND(SUM(trip_time) / 86400, 2) AS ride_time_in_days,
                ROUND(SUM(trip_miles), 2) AS total_miles,
                ROUND(SUM(base_passenger_fare + tolls + bcf + sales_tax + congestion_surcharge + airport_fee + tips), 2) AS total_ride_cost,
                ROUND(SUM(driver_pay), 2) AS total_rider_pay
            FROM (
                SELECT
                    DATE_PART('year', pickup_datetime) AS ride_year,
                    DATE_PART('month', pickup_datetime) AS ride_month,
                    trip_time,
                    trip_miles,
                    base_passenger_fare,
                    tolls,
                    bcf,
                    sales_tax,
                    congestion_surcharge,
                    airport_fee,
                    tips,
                    driver_pay
                FROM PARQUET_SCAN("{file_dir}/*.parquet")
                WHERE 
                    ride_year = 2024
                AND ride_month >= 1 
                AND ride_month <= 6
            )
            GROUP BY ride_period
            ORDER BY ride_period
        )
    """
    conn = duckdb.connect()
    df = conn.sql(query).df()
    conn.close()
    return df


if __name__ == "__main__":
    data_dir = "nyc-taxi-data"
    output_dir = "pipeline_results"
    output_file_name = "results_duckdb.csv"
    # 运行数据管道
    monthly_stats = calculate_monthly_stats(file_dir=data_dir)
    # 保存为 CSV 文件
    monthly_stats.to_csv(f"{output_dir}/{output_file_name}", index=False)

接下来,我将向你展示运行时间的差异。

性能对比结果

在分别运行两个管道 5 次并取平均值后,得到了如下运行时间:

DuckDB 与 pandas 的运行时间对比图

平均而言,在加载和处理大约 1.2 亿行(约 3GB)、分布在 6 个 Parquet 文件中的数据时,pandas 的速度比 DuckDB 慢 24 倍。

这种对比并不完全公平,因为我无法使用 pandas 一次性处理所有 Parquet 文件。不过,这些结果仍然具有参考价值,因为在日常工作中你也可能会遇到同样的困难。

如果一个库无法满足你的需求,那就试试另一个。大多数情况下,能够最快完成任务的库很可能就是 DuckDB。

Publish on 2025-01-20,Update on 2025-02-10