跳到内容

读写数据

写入 Lance 数据集

如果您熟悉 Apache PyArrow,您会发现创建 Lance 数据集非常简单。首先使用 lance.write_dataset 函数写入 pyarrow.Table

import lance
import pyarrow as pa

table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
                              {"name": "Bob", "age": 30}])
ds = lance.write_dataset(table, "./alice_and_bob.lance")

如果数据集太大而无法完全加载到内存中,您可以使用 lance.write_dataset 流式传输数据,它也支持 pyarrow.RecordBatchIterator。在这种情况下,您需要提供数据集的 pyarrow.Schema

def producer() -> Iterator[pa.RecordBatch]:
    """An iterator of RecordBatches."""
    yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}])
    yield pa.RecordBatch.from_pylist([{"name": "Bob", "age": 30}])

schema = pa.schema([
    ("name", pa.string()),
    ("age", pa.int32()),
])

ds = lance.write_dataset(producer(),
                         "./alice_and_bob.lance",
                         schema=schema, mode="overwrite")
print(ds.count_rows())  # Output: 2

lance.write_dataset 支持写入 pyarrow.Tablepandas.DataFramepyarrow.dataset.DatasetIterator[pyarrow.RecordBatch]

添加行

要将数据插入到数据集中,您可以使用 LanceDataset.insert 或使用 mode=appendlance.write_dataset

import lance
import pyarrow as pa

table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
                              {"name": "Bob", "age": 30}])
ds = lance.write_dataset(table, "./insert_example.lance")

new_table = pa.Table.from_pylist([{"name": "Carla", "age": 37}])
ds.insert(new_table)
print(ds.to_table().to_pandas())
#     name  age
# 0  Alice   20
# 1    Bob   30
# 2  Carla   37

new_table2 = pa.Table.from_pylist([{"name": "David", "age": 42}])
ds = lance.write_dataset(new_table2, ds, mode="append")
print(ds.to_table().to_pandas())
#     name  age
# 0  Alice   20
# 1    Bob   30
# 2  Carla   37
# 3  David   42

删除行

Lance 支持使用 SQL 筛选器从数据集中删除行,如筛选器下推中所述。例如,要从上述数据集中删除 Bob 的行,可以使用

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.delete("name = 'Bob'")
dataset2 = lance.dataset("./alice_and_bob.lance")
print(dataset2.to_table().to_pandas())
#     name  age
# 0  Alice   20

注意

Lance 格式是不可变的。每个写入操作都会创建数据集的新版本,因此用户必须重新打开数据集才能看到更改。同样,行是通过在单独的删除索引中标记为已删除来删除的,而不是重写文件。这种方法更快,并且避免了使任何引用文件的索引失效,确保后续查询不会返回已删除的行。

更新行

Lance 支持使用 lance.LanceDataset.update 方法根据 SQL 表达式更新行。例如,如果我们在数据集中注意到 Bob 的名字有时被写成 Blob,我们可以通过以下方式修复:

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"name": "'Bob'"}, where="name = 'Blob'")

更新值是 SQL 表达式,这就是为什么 'Bob' 用单引号括起来的原因。这意味着我们可以根据需要使用引用现有列的复杂表达式。例如,如果两年过去了,我们希望在同一个示例中更新 Alice 和 Bob 的年龄,我们可以这样写:

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"age": "age + 2"})

如果您尝试用新值更新一组单独的行,那么使用下面描述的合并插入操作通常更有效。

import lance

# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
                                  {"name": "Bob", "age": 20}])

# This works, but is inefficient, see below for a better approach
dataset = lance.dataset("./alice_and_bob.lance")
for idx in range(new_table.num_rows):
  name = new_table[0][idx].as_py()
  new_age = new_table[1][idx].as_py()
  dataset.update({"age": new_age}, where=f"name='{name}'")

合并插入

Lance 支持合并插入操作。这可以用于批量添加新数据,同时(可能)与现有数据匹配。此操作可用于多种不同的用例。

批量更新

lance.LanceDataset.update 方法对于基于筛选器更新行非常有用。但是,如果我们想用新行替换现有行,那么 lance.LanceDataset.merge_insert 操作会更有效。

import lance

dataset = lance.dataset("./alice_and_bob.lance")
print(dataset.to_table().to_pandas())
#     name  age
# 0  Alice   20
# 1    Bob   30

# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 2},
                                  {"name": "Bob", "age": 3}])
# This will use `name` as the key for matching rows.  Merge insert
# uses a JOIN internally and so you typically want this column to
# be a unique key or id of some kind.
rst = dataset.merge_insert("name") \
       .when_matched_update_all() \
       .execute(new_table)
print(dataset.to_table().to_pandas())
#     name  age
# 0  Alice    2
# 1    Bob    3

请注意,与更新操作类似,修改的行将被删除并重新插入到表中,将其位置更改到末尾。此外,这些行的相对顺序可能会发生变化,因为我们在内部使用哈希连接操作。

如果不存在则插入

有时我们只想在以前从未插入过数据的情况下插入数据。例如,当我们有一批数据但我们不知道之前添加了哪些行并且不想创建重复行时,就会发生这种情况。我们可以使用合并插入操作来实现此目的:

# Bob is already in the table, but Carla is new
new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},
                                  {"name": "Carla", "age": 37}])

dataset = lance.dataset("./alice_and_bob.lance")

# This will insert Carla but leave Bob unchanged
_ = dataset.merge_insert("name") \
       .when_not_matched_insert_all() \
       .execute(new_table)
# Verify that Carla was added but Bob remains unchanged
print(dataset.to_table().to_pandas())
#     name  age
# 0  Alice   20
# 1    Bob   30
# 2  Carla   37

更新或插入 (Upsert)

有时我们希望结合上述两种行为。如果行已经存在,我们希望更新它。如果行不存在,我们希望添加它。此操作有时称为“upsert”。我们也可以使用合并插入操作来完成此操作:

import lance
import pyarrow as pa

# Change Carla's age and insert David
new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},
                                  {"name": "David", "age": 42}])

dataset = lance.dataset("./alice_and_bob.lance")

# This will update Carla and insert David
_ = dataset.merge_insert("name") \
       .when_matched_update_all() \
       .when_not_matched_insert_all() \
       .execute(new_table)
# Verify the results
print(dataset.to_table().to_pandas())
#     name  age
# 0  Alice   20
# 1    Bob   30
# 2  Carla   27
# 3  David   42

替换部分数据

一种不太常见但仍然有用的行为是替换现有行的某个区域(由筛选器定义)并使用新数据。这类似于在单个事务中执行删除和插入。例如:

import lance
import pyarrow as pa

new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},
                                  {"name": "Francene", "age": 44}])

dataset = lance.dataset("./alice_and_bob.lance")
print(dataset.to_table().to_pandas())
#       name  age
# 0    Alice   20
# 1      Bob   30
# 2  Charlie   45
# 3    Donna   50

# This will remove anyone above 40 and insert our new data
_ = dataset.merge_insert("name") \
       .when_not_matched_insert_all() \
       .when_not_matched_by_source_delete("age >= 40") \
       .execute(new_table)
# Verify the results - people over 40 replaced with new data
print(dataset.to_table().to_pandas())
#        name  age
# 0     Alice   20
# 1       Bob   30
# 2     Edgar   46
# 3  Francene   44

读取 Lance 数据集

要打开 Lance 数据集,请使用 lance.dataset 函数

import lance
ds = lance.dataset("s3://bucket/path/imagenet.lance")
# Or local path
ds = lance.dataset("./imagenet.lance")

注意

Lance 目前支持本地文件系统、AWS s3 和 Google Cloud Storage(gs) 作为存储后端。更多信息请阅读对象存储配置

读取 Lance 数据集最直接的方法是使用 lance.LanceDataset.to_table 方法将整个数据集加载到内存中。

table = ds.to_table()

由于 Lance 是一种高性能的列式格式,因此它可以通过利用列(投影)下推和筛选器(谓词)下推来实现高效读取数据集子集的功能。

table = ds.to_table(
    columns=["image", "label"],
    filter="label = 2 AND text IS NOT NULL",
    limit=1000,
    offset=3000)

Lance 了解读取像 image 这样的重列的成本。因此,它采用优化的查询计划来高效执行操作。

迭代读取

如果数据集太大而无法放入内存,您可以使用 lance.LanceDataset.to_batches 方法分批读取它

for batch in ds.to_batches(columns=["image"], filter="label = 10"):
    # do something with batch
    compute_on_batch(batch)

不出所料,lance.LanceDataset.to_batches 接受与 lance.LanceDataset.to_table 函数相同的参数。

筛选器下推

Lance 采用标准 SQL 表达式作为数据集过滤的谓词。通过将 SQL 谓词直接下推到存储系统,扫描期间的总 I/O 负载显著降低。

目前,Lance 支持不断增长的表达式列表。

  • >, >=, <, <=, =
  • ANDORNOT
  • IS NULLIS NOT NULL
  • IS TRUEIS NOT TRUEIS FALSEIS NOT FALSE
  • IN
  • LIKENOT LIKE
  • regexp_match(column, pattern)
  • CAST

例如,以下筛选器字符串是可接受的

((label IN [10, 20]) AND (note['email'] IS NOT NULL))
    OR NOT note['created']

可以使用下标访问嵌套字段。结构体字段可以使用字段名下标,而列表字段可以使用索引下标。

如果您的列名包含特殊字符或是一个 SQL 关键字,您可以使用反引号 (`) 进行转义。对于嵌套字段,路径的每个段都必须用反引号括起来。

`CUBE` = 10 AND `column name with space` IS NOT NULL
  AND `nested with space`.`inner with space` < 2

警告

不支持包含句点 (.) 的字段名。

日期、时间戳和十进制的文字可以通过在类型名称后写入字符串值来编写。例如:

date_col = date '2021-01-01'
and timestamp_col = timestamp '2021-01-01 00:00:00'
and decimal_col = decimal(8,3) '1.000'

对于时间戳列,精度可以在类型参数中指定为数字。默认值为微秒精度 (6)。

SQL 时间单位
timestamp(0)
timestamp(3) 毫秒
timestamp(6) 微秒
timestamp(9) 纳秒

Lance 在内部以 Arrow 格式存储数据。从 SQL 类型到 Arrow 的映射是

SQL 类型 Arrow 类型
boolean Boolean
tinyint / tinyint unsigned Int8 / UInt8
smallint / smallint unsigned Int16 / UInt16
intinteger / int unsignedinteger unsigned Int32 / UInt32
bigint / bigint unsigned Int64 / UInt64
float Float32
double Float64
decimal(precision, scale) Decimal128
date Date32
timestamp Timestamp (1)
string Utf8
binary Binary

(1) 请参阅上表中的精度映射。

随机读取

Lance 作为列式格式的一个显著特点是它允许您快速读取随机样本。

# Access the 2nd, 101th and 501th rows
data = ds.take([1, 100, 500], columns=["image", "label"])

快速随机访问单个行的能力在促进各种工作流中起着关键作用,例如机器学习训练中的随机采样和洗牌。此外,它还使用户能够构建辅助索引,从而实现查询的快速执行以提高性能。

表维护

随着时间的推移,某些操作会导致 Lance 数据集布局不佳。例如,许多小的追加操作将导致大量的小片段。或者删除许多行将导致查询变慢,因为需要过滤掉已删除的行。

为了解决这个问题,Lance 提供了优化数据集布局的方法。

压缩数据文件

数据文件可以重写,从而减少文件数量。当将 target_rows_per_fragment 传递给 lance.dataset.DatasetOptimizer.compact_files 时,Lance 将跳过任何已达到该行计数的片段,并重写其他片段。片段将根据其片段 ID 合并,因此将保留数据的固有顺序。

注意

压缩会创建表的新版本。它不会删除旧版本的表及其引用的文件。

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)

在压缩期间,Lance 还可以删除已删除的行。重写的片段将不会有删除文件。这可以提高扫描性能,因为在扫描期间不需要跳过软删除的行。

当文件被重写时,原始行地址将失效。这意味着如果受影响的文件以前是任何 ANN 索引的一部分,它们将不再是。因此,建议在重新构建索引之前重写文件。