跳到内容

分布式写入

警告

Lance 提供开箱即用的 RaySpark 集成。

本页面适用于希望以自定义方式执行分布式操作的用户,即在没有 Lance 集成的情况下使用 slurmKubernetes

概述

Lance 格式旨在支持跨多个分布式工作节点的并行写入。分布式写入操作可以通过两个阶段执行:

  1. 并行写入:在多个工作节点上并行生成新的 lance.LanceFragment
  2. 提交:收集所有 lance.FragmentMetadata,并通过单个 lance.LanceOperation 将它们提交到单个数据集中。

Distributed Append

写入新数据

使用 lance.fragment.write_fragments 写入或追加新数据非常简单。

import json
from lance.fragment import write_fragments

# Run on each worker
data_uri = "./dist_write"
schema = pa.schema([
    ("a", pa.int32()),
    ("b", pa.string()),
])

# Run on worker 1
data1 = {
    "a": [1, 2, 3],
    "b": ["x", "y", "z"],
}
fragments_1 = write_fragments(data1, data_uri, schema=schema)
print("Worker 1: ", fragments_1)

# Run on worker 2
data2 = {
    "a": [4, 5, 6],
    "b": ["u", "v", "w"],
}
fragments_2 = write_fragments(data2, data_uri, schema=schema)
print("Worker 2: ", fragments_2)

输出

Worker 1:  [FragmentMetadata(id=0, files=...)]
Worker 2:  [FragmentMetadata(id=0, files=...)]

现在,使用 lance.fragment.FragmentMetadata.to_json 序列化片段元数据,并在单个工作节点上收集所有序列化的元数据以执行最终的提交操作。

import json
from lance import FragmentMetadata, LanceOperation

# Serialize Fragments into JSON data
fragments_json1 = [json.dumps(fragment.to_json()) for fragment in fragments_1]
fragments_json2 = [json.dumps(fragment.to_json()) for fragment in fragments_2]

# On one worker, collect all fragments
all_fragments = [FragmentMetadata.from_json(f) for f in \
    fragments_json1 + fragments_json2]

# Commit the fragments into a single dataset
# Use LanceOperation.Overwrite to overwrite the dataset or create new dataset.
op = lance.LanceOperation.Overwrite(schema, all_fragments)
read_version = 0 # Because it is empty at the time.
lance.LanceDataset.commit(
    data_uri,
    op,
    read_version=read_version,
)

# We can read the dataset using the Lance API:
dataset = lance.dataset(data_uri)
assert len(dataset.get_fragments()) == 2
assert dataset.version == 1
print(dataset.to_table().to_pandas())

输出

     a  b
0  1  x
1  2  y
2  3  z
3  4  u
4  5  v
5  6  w

追加数据

追加额外数据遵循类似的过程。使用 lance.LanceOperation.Append 提交新片段,确保将 read_version 设置为当前数据集的版本。

import lance

ds = lance.dataset(data_uri)
read_version = ds.version # record the read version

op = lance.LanceOperation.Append(schema, all_fragments)
lance.LanceDataset.commit(
    data_uri,
    op,
    read_version=read_version,
)

添加新列

Lance 格式在添加列等操作方面表现出色。由于其二维布局(参见此博客文章),添加新列效率非常高,因为它避免了复制现有数据文件。相反,该过程只需创建新的数据文件,并通过仅元数据操作将其链接到现有数据集。

import lance
from pyarrow import RecordBatch
import pyarrow.compute as pc

dataset = lance.dataset("./add_columns_example")
assert len(dataset.get_fragments()) == 2
assert dataset.to_table().combine_chunks() == pa.Table.from_pydict({
    "name": ["alice", "bob", "charlie", "craig", "dave", "eve"],
    "age": [25, 33, 44, 55, 66, 77],
}, schema=schema)


def name_len(names: RecordBatch) -> RecordBatch:
    return RecordBatch.from_arrays(
        [pc.utf8_length(names["name"])],
        ["name_len"],
    )

# On Worker 1
frag1 = dataset.get_fragments()[0]
new_fragment1, new_schema = frag1.merge_columns(name_len, ["name"])

# On Worker 2
frag2 = dataset.get_fragments()[1]
new_fragment2, _ = frag2.merge_columns(name_len, ["name"])

# On Worker 3 - Commit
all_fragments = [new_fragment1, new_fragment2]
op = lance.LanceOperation.Merge(all_fragments, schema=new_schema)
lance.LanceDataset.commit(
    "./add_columns_example",
    op,
    read_version=dataset.version,
)

# Verify dataset
dataset = lance.dataset("./add_columns_example")
print(dataset.to_table().to_pandas())

输出

      name  age  name_len
0    alice   25         5
1      bob   33         3
2  charlie   44         7
3    craig   55         5
4     dave   66         4
5      eve   77         3