数据演进¶
Lance 支持传统模式演进:添加、删除和修改数据集中的列。大多数这些操作都可以在不重写数据文件的情况下执行,因此效率很高。此外,Lance 还支持数据演进,允许您在不重写数据文件的情况下回填现有行的新列数据,使其非常适合机器学习特征工程等用例。
通常,模式更改会与大多数其他并发写入操作冲突。例如,如果您在其他人向数据集追加数据时更改数据集的模式,那么您的模式更改或追加将失败,具体取决于操作的顺序。因此,建议在没有其他写入操作时执行模式更改。
添加新列¶
仅限 Schema¶
我们在生产中看到的一个常见用例是向数据集添加新列而不填充它。这对于以后运行大型分布式作业以延迟填充列很有用。为此,您可以使用 lance.LanceDataset.add_columns
方法添加 pyarrow.Field
或 pyarrow.Schema
中的列。
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "null_columns")
# With pyarrow Field
dataset.add_columns(pa.field("embedding", pa.list_(pa.float32(), 128)))
assert dataset.schema == pa.schema([
("id", pa.int64()),
("embedding", pa.list_(pa.float32(), 128)),
])
# With pyarrow Schema
dataset.add_columns(pa.schema([
("label", pa.string()),
("score", pa.float32()),
]))
assert dataset.schema == pa.schema([
("id", pa.int64()),
("embedding", pa.list_(pa.float32(), 128)),
("label", pa.string()),
("score", pa.float32()),
])
此操作非常快,因为它只更新数据集的元数据。
带数据回填¶
可以使用 lance.LanceDataset.add_columns
方法在单个操作中添加和填充新列。有两种方法可以指定如何填充新列:首先,为每个新列提供一个 SQL 表达式;其次,提供一个函数来生成新列数据。
SQL 表达式可以是独立表达式,也可以引用现有列。SQL 字面值可用于为所有现有行设置单个值。
table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.add_columns({
"hash": "sha256(name)",
"status": "'active'",
})
print(dataset.to_table().to_pandas())
# name hash status
# 0 Alice b';\xc5\x10b\x97<E\x8dZo-\x8dd\xa0#$cT\xad~\x0... active
# 1 Bob b'\xcd\x9f\xb1\xe1H\xcc\xd8D.Z\xa7I\x04\xccs\x... active
# 2 Carla b'\xad\x8d\x83\xff\xd8+Z\x8e\xd4)\xe8Y+\\\xb3\... active
您还可以提供一个 Python 函数来生成新的列数据。例如,这可以用于计算新的嵌入列。此函数应接受一个 PyArrow RecordBatch 并返回一个 PyArrow RecordBatch 或一个 Pandas DataFrame。该函数将为数据集中每个批次调用一次。
如果函数计算成本高昂且可能失败,建议在 UDF 中设置一个检查点文件。此检查点文件在每次调用后保存 UDF 的状态,因此如果 UDF 失败,可以从上一个检查点重新启动。请注意,此文件可能会变得非常大,因为它需要存储多达整个数据文件的未保存结果。
import lance
import pyarrow as pa
import numpy as np
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
@lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite")
def add_random_vector(batch):
embeddings = np.random.rand(batch.num_rows, 128).astype("float32")
return pd.DataFrame({"embedding": embeddings})
dataset.add_columns(add_random_vector)
使用合并¶
如果您已经预先计算了一个或多个新列,可以使用 lance.LanceDataset.merge
方法将它们添加到现有数据集中。这允许填充附加列而无需重写整个数据集。
要使用 merge
方法,请提供一个包含您要添加的列的新数据集,以及一个用于将新数据连接到现有数据集的列名。
例如,假设我们有一个嵌入和 ID 的数据集
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings", mode="overwrite")
现在,如果我们想添加一个我们生成的标签列,我们可以通过合并一个新表来做到这一点
new_data = pa.table({
"id": pa.array([1, 2, 3]),
"label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
print(dataset.to_table().to_pandas())
# id embedding label
# 0 1 [1, 2, 3] horse
# 1 2 [4, 5, 6] rabbit
# 2 3 [7, 8, 9] cat
删除列¶
最后,您可以使用 lance.LanceDataset.drop_columns
方法从数据集中删除列。这只是一个元数据操作,不会删除磁盘上的数据。这使得它非常快。
table = pa.table({"id": pa.array([1, 2, 3]),
"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names", mode="overwrite")
dataset.drop_columns(["name"])
print(dataset.schema)
# id: int64
要实际从磁盘中删除数据,必须重写文件以删除列,然后必须删除旧文件。这可以通过使用 lance.dataset.DatasetOptimizer.compact_files()
后跟 lance.LanceDataset.cleanup_old_versions()
来完成。
重命名列¶
可以使用 lance.LanceDataset.alter_columns
方法重命名列。
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
dataset.alter_columns({"path": "id", "name": "new_id"})
print(dataset.to_table().to_pandas())
# new_id
# 0 1
# 1 2
# 2 3
这也适用于嵌套列。要寻址嵌套列,请使用点(.
)分隔嵌套级别。例如
data = [
{"meta": {"id": 1, "name": "Alice"}},
{"meta": {"id": 2, "name": "Bob"}},
]
schema = pa.schema([
("meta", pa.struct([
("id", pa.int32()),
("name", pa.string()),
]))
])
dataset = lance.write_dataset(data, "nested_rename")
dataset.alter_columns({"path": "meta.id", "name": "new_id"})
print(dataset.to_table().to_pandas())
# meta
# 0 {'new_id': 1, 'name': 'Alice'}
# 1 {'new_id': 2, 'name': 'Bob'}
转换列数据类型¶
除了更改列名之外,您还可以使用 lance.LanceDataset.alter_columns
方法更改列的数据类型。这需要将该列重写到新的数据文件中,但不需要重写其他列。
注意
如果列有索引,则在更改列类型时会删除索引。
此方法可用于更改列的向量类型。例如,我们可以将 float32 嵌入列更改为 float16 列,以节省磁盘空间,但会降低精度
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.FixedShapeTensorArray.from_numpy_ndarray(
np.random.rand(3, 128).astype("float32"))
})
dataset = lance.write_dataset(table, "embeddings")
dataset.alter_columns({"path": "embedding",
"data_type": pa.list_(pa.float16(), 128)})
print(dataset.schema)
# id: int64
# embedding: fixed_size_list<item: halffloat>[128]
# child 0, item: halffloat