跳到内容

示例

这里有一些示例可以尝试。有关更全面的使用示例,请参阅 examples/ 目录。

基本读写

import pandas as pd
import ray
from lance_ray import read_lance, write_lance

# Initialize Ray
ray.init()

# Create sample data
sample_data = {
    "user_id": range(100),
    "name": [f"User_{i}" for i in range(100)],
    "age": [20 + (i % 50) for i in range(100)],
    "score": [50.0 + (i % 100) * 0.5 for i in range(100)],
}
df = pd.DataFrame(sample_data)

# Create Ray dataset
ds = ray.data.from_pandas(df)

# Write to Lance format
write_lance(ds, "sample_dataset.lance")

# Read Lance dataset back
ds = read_lance("sample_dataset.lance")

# Perform distributed operations
filtered_ds = ds.filter(lambda row: row["age"] > 30)
print(f"Filtered count: {filtered_ds.count()}")

# Read with column selection and filtering
ds_filtered = read_lance(
    "sample_dataset.lance",
    columns=["user_id", "name", "score"],
    filter="score > 75.0"
)
print(f"Schema: {ds_filtered.schema()}")

数据演进

# Add columns using metadata catalog
from lance_ray import add_columns
import pyarrow as pa

def add_computed_column(batch: pa.RecordBatch) -> pa.RecordBatch:
    df = batch.to_pandas()
    df['computed'] = df['value'] * 2 + df['id']
    return pa.RecordBatch.from_pandas(df[["computed"]])

add_columns(
    uri="sample_dataset.lance",
    transform=add_computed_column,
    concurrency=4
)

使用命名空间

对于具有元数据目录的企业环境,您可以使用 Lance 命名空间集成。

import ray
import lance_namespace as ln
from lance_ray import read_lance, write_lance

# Initialize Ray
ray.init()

# Connect to a metadata catalog (directory-based example)
namespace = ln.connect("dir", {"root": "/path/to/tables"})

# Create a Ray dataset
data = ray.data.range(1000).map(lambda row: {"id": row["id"], "value": row["id"] * 2})

# Write to Lance format using metadata catalog
write_lance(data, namespace=namespace, table_id=["my_table"])

# Read Lance dataset back using metadata catalog
ray_dataset = read_lance(namespace=namespace, table_id=["my_table"])

# Perform distributed operations
result = ray_dataset.filter(lambda row: row["value"] > 100).count()
print(f"Filtered count: {result}")

软件包依赖项默认包含目录和 REST 命名空间实现。要使用其他实现,请安装特定的额外依赖项。例如,要与 AWS Glue 目录一起使用:

pip install lance-namespace[glue]

然后您可以这样做:

import ray
import lance_namespace as ln
from lance_ray import read_lance, write_lance

# Initialize Ray
ray.init()

# Connect to AWS Glue catalog 
# using the default account and region in the current AWS environment
namespace = ln.connect("glue", {})

# Create a Ray dataset
data = ray.data.range(1000).map(lambda row: {"id": row["id"], "value": row["id"] * 2})

# Write to Lance format using metadata catalog
write_lance(
    data, 
    uri="s3://my-bucket/my-table", 
    namespace=namespace, 
    table_id=["default", "my_table"]
)

# Read Lance dataset back using metadata catalog
ray_dataset = read_lance(namespace=namespace, table_id=["default", "my_table"])

# Perform distributed operations
result = ray_dataset.filter(lambda row: row["value"] > 100).count()
print(f"Filtered count: {result}")