DuckDB访问数据湖


DuckDB 不仅可以方便访问 Pandas DataFrame, CSV、Parquet、PyArrow,甚至可以方便地访问数据湖。
安装需要的包
pip install duckdb deltalake
DeltaTable 表示特定版本的增量表的状态。这包括哪些文件是当前表的一部分、表的结构以及其他元数据,例如创建时间。
加载本地文件系统的数据湖(当前版本)
path = "/tmp/iris_delta"from deltalake import DeltaTabledt = DeltaTable(path)
本次使用的测试数据由 PySpark +Delta Lake 生成, 更多相关信息可以访问 Delta Lake 快速入门-PySpark 版
使用 DuckDB 访问
import duckdbcon=duckdb.connect()def dsql(sql):    return con.execute(sql).df()ds = dt.to_pyarrow_dataset()sql = "select * from ds limit 3"dsql(sql)

当前版本只有 50 条信息
sql = "select count(*) from ds"dsql(sql)

时间旅行(Time Travel)
除了可以访问最新版本的数据,还可以通过提供要加载的版本号来加载相应的版本:
dt = DeltaTable(path, version=1)ds = dt.to_pyarrow_dataset()sql = "select count(*) from ds"dsql(sql)

也可以在加载表格后,通过使用版本号或日期时间字符串更改版本:
dt.load_version(1)#等价于👇的from datetime import datetimetimestamp =(datetime            .fromtimestamp(1650358332868 / 1e3)            .astimezone()            .isoformat())dt.load_with_datetime(timestamp)dt.files()

这里的时间戳可以通过下面的命令获得,
#版本历史信息dt.history()
返回
[{'timestamp': 1650358316226,  'operation': 'CREATE TABLE',  'operationParameters': {'isManaged': 'false',   'description': None,   'partitionBy': '[]',   'properties': '{}'},  'isolationLevel': 'Serializable',  'isBlindAppend': True,  'operationMetrics': {},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': '0b21cbea-5672-4367-84be-5d6c2fd07aef'}, {'timestamp': 1650358332868,  'operation': 'WRITE',  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},  'readVersion': 0,  'isolationLevel': 'Serializable',  'isBlindAppend': True,  'operationMetrics': {'numFiles': '1',   'numOutputRows': '150',   'numOutputBytes': '2792'},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': 'aea8a270-4f3d-4ad3-8a63-2d9aebedefa7'}, {'timestamp': 1650358460951,  'operation': 'CREATE OR REPLACE TABLE',  'operationParameters': {'isManaged': 'false',   'description': None,   'partitionBy': '["Species"]',   'properties': '{}'},  'readVersion': 1,  'isolationLevel': 'Serializable',  'isBlindAppend': False,  'operationMetrics': {},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': '679a731d-9456-4ed0-aa63-3ce8a4ece8bc'}, {'timestamp': 1650358516037,  'operation': 'WRITE',  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},  'readVersion': 2,  'isolationLevel': 'Serializable',  'isBlindAppend': True,  'operationMetrics': {'numFiles': '3',   'numOutputRows': '150',   'numOutputBytes': '5612'},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': '443c8613-6dd7-4442-9111-adfd8dca7a67'}, {'timestamp': 1650358641212,  'operation': 'DELETE',  'operationParameters': {'predicate': '[]'},  'readVersion': 3,  'isolationLevel': 'Serializable',  'isBlindAppend': False,  'operationMetrics': {'numRemovedFiles': '3',   'executionTimeMs': '194',   'scanTimeMs': '192',   'rewriteTimeMs': '0'},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': 'c6f4373f-3f7b-41ca-8fa1-11501dbd2715'}, {'timestamp': 1650358662597,  'operation': 'WRITE',  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},  'readVersion': 4,  'isolationLevel': 'Serializable',  'isBlindAppend': True,  'operationMetrics': {'numFiles': '3',   'numOutputRows': '50',   'numOutputBytes': '5031'},  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',  'txnId': 'cee79e6b-63bc-48a1-9818-ccb37e17d811'}]
多种文件系统支持
除了本地文件系统,可以通过 storage_options 来配置存储后端,如 AWS S3,
storage_options = {"AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID",     "AWS_SECRET_ACCESS_KEY":"AWS_SECRET_ACCESS_KEY"}dt = DeltaTable("../rust/tests/data/delta-0.2.0",  storage_options=storage_options)
或者,如果您有一个数据目录,您可以通过引用数据库和表名来加载它。目前仅支持 AWS Glue。
对于 AWS Glue 目录,使用 AWS 环境变量进行身份验证。
from deltalake import DeltaTablefrom deltalake import DataCatalogdatabase_name = "simple_database"table_name = "simple_table"data_catalog = DataCatalog.AWSdt = DeltaTable.from_data_catalog(data_catalog=data_catalog,    database_name=database_name,    table_name=table_name)dt.to_pyarrow_table().to_pydict(){'id': [5, 7, 9, 5, 6, 7, 8, 9]}
除了本地文件系统,还支持以下后端:
AWS S3,由前缀 检测 s3://。可以使用与 CLI 相同的方式使用环境变量指定 AWS 凭证。
Azure Data Lake Storage Gen 2,由前缀 检测 adls2://。请注意,  必须按照说明设置 Azure 存储帐户[1]。
Google Cloud Storage,由前缀 检测 gs://。
更多的访问方式
除了使用 SQL 的方式访问,可以可以这样的方式
import duckdbfrom deltalake import DeltaTabledt = DeltaTable(path)ds = dt.to_pyarrow_dataset()ex_data = duckdb.arrow(ds)(ex_data .filter("Species = 'virginica' and Sepal_Length > 7") .project("Sepal_Length") .to_df())#返回 Sepal_Length0 7.21 7.12 7.2
从上面的代码可以看出,DuckDB 其实是借助于强大的 DeltaTable 和 PyArrow 来实现对数据湖的访问。
参考资料
[1]
设置 Azure 存储帐户: https://github.com/delta-io/delta-rs/blob/main/docs/ADLSGen2-HOWTO.md
到顶部