一行代码将 Pandas read_sql 加速 10 倍


ConnectorX 旨在通过为开发人员提供高效、轻量级和易于使用的工具来加快从数据库加载数据的过程。在本文中,我们将通过回答以下问题为您简要介绍 ConnectorX:
什么是 ConnectorX?
如何使用 ConnectorX?
为什么 ConnectorX 是更好的选择?
接下来会发生什么?
什么是 ConnectorX?
ConnectorX 是一个开源库,可加速将数据从数据库加载到 pandas.DataFrame 等数据结构,以便进一步处理和分析。

ConnectorX 高级工作流程示例
ConnectorX 由两个主要概念组成:Source (例如 PostgreSQL )和 Destination (例如 pandas.DataFrame )。ConnectorX 会将用户给出的 SQL 查询转发给 Source ,然后高效地将查询结果从 Source 传输到 Destination 。上图显示了当前支持的源和目标的示例。
为了提供高效的性能和强大的功能,ConnectorX 中繁重的工作负载是用 Rust 编写的。它还具有与 Python 中的其他数据科学库轻松互操作的简单 API 的 Python 绑定。
接下来,让我们看看如何通过一行代码以不同的方式使用 ConnectorX。
如何使用连接器 X?
第一步:安装 ConnectorX:
pip install connectorx
第 2 步:使用 read_sql 从数据库加载数据。源是使用连接字符串定义的,目标是默认的 pandas.DataFrame 并且可以通过设置_return_type_[1]来改变:
import connectorx as cx# source: PostgreSQL, destination: pandas.DataFramedf = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch", "SELECT * FROM lineitem")
查询结果将在 pandas.DataFrame 中返回,如下所示:
为了进一步加快速度,ConnectorX 支持通过对查询进行分区并并行执行分区查询来提高 CPU 和带宽资源的利用率。用户可以指定分区列或手动定义分区查询:
# 指定分区列和分区数df = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch", "SELECT * FROM lineitem", partition_on="l_orderkey", partition_num=4)# 手动partition the query# 下面的分区与上面的分区等效df = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch", [    "SELECT * FROM lineitem WHERE l_orderkey > 0 AND l_orderkey <= 15000000 ",    "SELECT * FROM lineitem WHERE l_orderkey > 15000000 AND l_orderkey <= 30000000",    "SELECT * FROM lineitem WHERE l_orderkey > 30000000 AND l_orderkey <= 45000000",    "SELECT * FROM lineitem WHERE l_orderkey > 45000000 AND l_orderkey <= 60000000"])
ConnectorX 将并行运行所有分区查询,并将所有查询结果连接到一个 pandas.DataFrame 中。至于分区,目前,ConnectorX 支持对 SPJA 查询的整数列进行自动分区。下面是一个更复杂的查询示例:
query = f”””SELECT l_orderkey,SUM(l_extendedprice * ( 1 — l_discount )) AS 收入,o_orderdate,o_shippriorityFROM customer,orders,lineitemWHERE c_mktsegment = 'BUILDING'AND c_custkey = o_custkeyAND l_orderkey = o_orderkeyAND o_orderdate < DATE ' 1995–03–15'AND l_shipdate > DATE '1995–03–15'GROUP BY l_orderkey,o_orderdate,o_shippriority“””df = read_sql(“postgresql://postgres:postgres@localhost:5432/tpch”, query, partition_on=”l_orderkey”, partition_num=4)
请查看我们的 Github 存储库[2]以获取更多用法和示例!
为什么 ConnectorX 是更好的选择?
其他常用的 Python 库如 Pandas 已经提供了类似的 read_sql 函数,那么为什么要使用 ConnectorX?为了回答这个问题,让我们看一个简单的基准测试,我们将 ConnectorX 与其他三个现有的解决方案(Pandas[3]、Modin[4] 和 Dask[5])进行了比较,这些解决方案也能够将数据从数据库加载到数据帧。
我们使用 TPC-H 中的 LINEITEM 表并将比例因子设置为 10。该表包含 60M 记录 x 16 列,如果转储到 CSV 文件中,大小为 8 GB,在 PostgreSQL 中为 11 GB。我们测量每个解决方案在不同的并行度(1 到 10 个核心)和带宽条件下从数据库加载整个表并写入 pandas.DataFrame 所需的时间。我们还测量了此过程中每种方法的内存使用峰值。我们在此处展示的测试结果是在 AWS r5.4xlarge 实例上进行的,我们在该实例上运行 ConnectorX 并从运行在 AWS RDS 上的 db.m6g.4xlarge 实例上的 PostgreSQL 加载数据。(有关其他数据库的更多基准测试结果,请在此处查看) 结果表明,ConnectorX 是所有场景中速度最快且内存效率最高的解决方案!
更快的数据加载
使用 4 个 CPU 核对不同方法的速度测试结果如下图所示。我们可以看到,ConnectorX 是所有解决方案中最快的,与 PostgreSQL 上的 Modin、Pandas 和 Dask 相比,数据加载速度分别提高了 3 倍、10 倍和 20 倍 !

read_sql 的时间比较(4 核)
我们还将用于 read_sql 的核心(分区)数量从 1 变为 10,并在不同的网络条件和机器下对其进行了测试。(由于内存不足 (OOM), Dask 无法完成使用 1 个核心。Pandas 不支持并行性,因此我们只使用 1 个核心。)我们发现 ConnectorX 在所有设置中始终优于其他方法。
2. 更小的内存占用
接下来,我们来看看每个方法的内存使用情况。这里我们绘制了 4 核设置的比较结果。我们可以看到,ConnectorX 使用的内存比其他方法少 3 倍。 在不同的并行度下,结果保持不变。

read_sql 的内存对比(4 核)
ConnectorX 是如何做到这一点的?
三个主要原因使 ConnectorX 能够实现这一性能:
用本地语言编写: 与其他库不同,ConnectorX 是用 Rust 编写的,这避免了在 Python 中实现数据密集型应用程序的额外成本。
只复制一次: 现有解决方案在从数据库下载数据时或多或少会进行多次数据复制,但 ConnectorX 的实现遵循“零复制”原则。即使在并行性下,我们也设法将数据直接从源复制到目标一次。
CPU 缓存高效: 我们应用了多项优化来使 ConnectorX CPU 缓存友好。除了“零拷贝”实现,ConnectorX 中的数据处理以流方式进行,以减少缓存未命中。另一个例子是,当我们在 Python 中构造字符串时,我们将一批字符串写入一个预先分配的缓冲区,而不是为每个字符串分配单独的位置。
接下来会发生什么?
到目前为止,ConnectorX 支持广泛使用的 源 ,包括 PostgreSQL、MySQL、SQLite 以及采用相同 wire protocol 的其他数据库(例如通过 PostgreSQL 的 Redshift、通过 MySQL 的 Clickhouse)。我们现在正致力于添加更多流行的数据库和数据仓库[6]。我们还计划在未来支持以不同文件格式(例如 CSV、JSON、Parquet)从数据存储(如 Amazon S3 )传输数据。[7]
至于 Destinations ,ConnectorX 支持 Python 和 Rust 中所有流行的数据帧,包括 Pandas[8]、Apache Arrow[9]、Modin[10]、Dask[11] 和 Polars[12]。可在此处[13]找到已完成和正在进行的来源和目的地的完整列表。如果您有其他建议[14],请告诉我们!
除了支持更多来源和目的地之外,该团队还致力于开发新功能,以帮助最大限度地减少数据加载的时间成本,例如维护客户端数据缓存[15]。我们还对优化查询分区[16]感兴趣,例如支持使用从数据库收集的元数据进行自动分区。您可以在 Github[17] 上关注我们,了解我们最新的实施状态和我们的下一个计划!
参考资料
[1]
return_type: https://github.com/sfu-db/connector-x#parameters[2]
Github 存储库: https://github.com/sfu-db/connector-x#detailed-usage-and-examples[3]
Pandas: https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html[4]
Modin: https://modin.readthedocs.io/en/latest/supported_apis/io_supported.html[5]
Dask: https://docs.dask.org/en/latest/dataframe-sql.html#loading-from-sql-with-read-sql-table[6]
流行的数据库和数据仓库: https://github.com/sfu-db/connector-x/discussions/61[7]
Amazon S3 )传输数据。: https://aws.amazon.com/s3/[8]
Pandas: https://pandas.pydata.org/[9]
Apache Arrow: https://arrow.apache.org/[10]
Modin: https://modin.readthedocs.io/en/latest/#[11]
Dask: https://dask.org/[12]
Polars: https://github.com/pola-rs/polars[13]
此处: https://github.com/sfu-db/connector-x#supported-sources--destinations[14]
如果您有其他建议: https://github.com/sfu-db/connector-x/discussions[15]
客户端数据缓存: https://github.com/sfu-db/connector-x/discussions/64[16]
优化查询分区: https://github.com/sfu-db/connector-x/discussions/65[17]
Github: https://github.com/sfu-db/connector-x
原文: 
https://towardsdatascience.com/connectorx-the-fastest-way-to-load-data-from-databases-a65d4d4062d5
到顶部