Apache DataFusion 集成¶
Lance 数据集可以使用 Apache Datafusion 进行查询,这是一个用 Rust 编写的可扩展查询引擎,使用 Apache Arrow 作为其内存格式。这意味着您可以编写复杂的 SQL 查询来分析 Lance 中的数据。
该集成允许用户将列选择和基本过滤器下推到 Lance,从而减少执行查询时扫描的数据量。此外,该集成还支持从 Lance 数据集流式传输数据,这允许用户执行大于内存的聚合。
Rust¶
Lance 包含一个 DataFusion 表提供程序 `lance::datafusion::LanceTableProvider`。用户可以将 Lance 数据集注册为 DataFusion 中的表并使用它运行 SQL
简单 SQL¶
use datafusion::prelude::SessionContext;
use lance::datafusion::LanceTableProvider;
let ctx = SessionContext::new();
ctx.register_table("dataset",
Arc::new(LanceTableProvider::new(
Arc::new(dataset.clone()),
/* with_row_id */ false,
/* with_row_addr */ false,
)))?;
let df = ctx.sql("SELECT * FROM dataset LIMIT 10").await?;
let result = df.collect().await?;
连接 2 个表¶
use datafusion::prelude::SessionContext;
use lance::datafusion::LanceTableProvider;
let ctx = SessionContext::new();
ctx.register_table("orders",
Arc::new(LanceTableProvider::new(
Arc::new(orders_dataset.clone()),
/* with_row_id */ false,
/* with_row_addr */ false,
)))?;
ctx.register_table("customers",
Arc::new(LanceTableProvider::new(
Arc::new(customers_dataset.clone()),
/* with_row_id */ false,
/* with_row_addr */ false,
)))?;
let df = ctx.sql("
SELECT o.order_id, o.amount, c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
LIMIT 10
").await?;
let result = df.collect().await?;
注册 UDF¶
Lance 提供了一些内置 UDF,用户可以手动注册并在查询中使用。以下示例演示如何注册和使用 `contains_tokens`。
use datafusion::prelude::SessionContext;
use lance::datafusion::LanceTableProvider;
use lance_datafusion::udf::register_functions;
let ctx = SessionContext::new();
// Register built-in UDFs
register_functions(&ctx);
ctx.register_table("dataset",
Arc::new(LanceTableProvider::new(
Arc::new(dataset.clone()),
/* with_row_id */ false,
/* with_row_addr */ false,
)))?;
let df = ctx.sql("SELECT * FROM dataset WHERE contains_tokens(text, 'cat')").await?;
let result = df.collect().await?;
JSON 函数¶
Lance 通过一组内置 UDF 提供全面的 JSON 支持,这些 UDF 在您使用 `register_functions()` 时会自动注册。这些函数使您能够高效地查询和过滤 JSON 数据。
有关 JSON 函数的完整指南,包括: - `json_extract` - 使用 JSONPath 提取值 - `json_get`、`json_get_string`、`json_get_int`、`json_get_float`、`json_get_bool` - 类型安全的值提取 - `json_exists` - 检查路径是否存在 - `json_array_contains`、`json_array_length` - 数组操作
有关详细文档和示例,请参阅 JSON 支持指南。
示例:在 SQL 中查询 JSON
// After registering functions as shown above
let df = ctx.sql("
SELECT * FROM dataset
WHERE json_get_string(metadata, 'category') = 'electronics'
AND json_array_contains(metadata, '$.tags', 'featured')
").await?;
Python¶
在 Python 中,此集成通过 Datafusion FFI 完成。`FFILanceTableProvider` FFI 表提供程序包含在 `pylance` 中。例如,如果我想查询 `my_lance_dataset`
from datafusion import SessionContext # pip install datafusion
from lance import FFILanceTableProvider
ctx = SessionContext()
table1 = FFILanceTableProvider(
my_lance_dataset, with_row_id=True, with_row_addr=True
)
ctx.register_table_provider("table1", table1)
ctx.table("table1")
ctx.sql("SELECT * FROM table1 LIMIT 10")