示例¶
这里有一些示例可以尝试。有关更全面的使用示例,请参阅 examples/
目录。
基本读写¶
import pandas as pd
import ray
from lance_ray import read_lance, write_lance
# Initialize Ray
ray.init()
# Create sample data
sample_data = {
"user_id": range(100),
"name": [f"User_{i}" for i in range(100)],
"age": [20 + (i % 50) for i in range(100)],
"score": [50.0 + (i % 100) * 0.5 for i in range(100)],
}
df = pd.DataFrame(sample_data)
# Create Ray dataset
ds = ray.data.from_pandas(df)
# Write to Lance format
write_lance(ds, "sample_dataset.lance")
# Read Lance dataset back
ds = read_lance("sample_dataset.lance")
# Perform distributed operations
filtered_ds = ds.filter(lambda row: row["age"] > 30)
print(f"Filtered count: {filtered_ds.count()}")
# Read with column selection and filtering
ds_filtered = read_lance(
"sample_dataset.lance",
columns=["user_id", "name", "score"],
filter="score > 75.0"
)
print(f"Schema: {ds_filtered.schema()}")
数据演进¶
# Add columns using metadata catalog
from lance_ray import add_columns
import pyarrow as pa
def add_computed_column(batch: pa.RecordBatch) -> pa.RecordBatch:
df = batch.to_pandas()
df['computed'] = df['value'] * 2 + df['id']
return pa.RecordBatch.from_pandas(df[["computed"]])
add_columns(
uri="sample_dataset.lance",
transform=add_computed_column,
concurrency=4
)
使用命名空间¶
对于具有元数据目录的企业环境,您可以使用 Lance 命名空间集成。
import ray
import lance_namespace as ln
from lance_ray import read_lance, write_lance
# Initialize Ray
ray.init()
# Connect to a metadata catalog (directory-based example)
namespace = ln.connect("dir", {"root": "/path/to/tables"})
# Create a Ray dataset
data = ray.data.range(1000).map(lambda row: {"id": row["id"], "value": row["id"] * 2})
# Write to Lance format using metadata catalog
write_lance(data, namespace=namespace, table_id=["my_table"])
# Read Lance dataset back using metadata catalog
ray_dataset = read_lance(namespace=namespace, table_id=["my_table"])
# Perform distributed operations
result = ray_dataset.filter(lambda row: row["value"] > 100).count()
print(f"Filtered count: {result}")
软件包依赖项默认包含目录和 REST 命名空间实现。要使用其他实现,请安装特定的额外依赖项。例如,要与 AWS Glue 目录一起使用:
然后您可以这样做:
import ray
import lance_namespace as ln
from lance_ray import read_lance, write_lance
# Initialize Ray
ray.init()
# Connect to AWS Glue catalog
# using the default account and region in the current AWS environment
namespace = ln.connect("glue", {})
# Create a Ray dataset
data = ray.data.range(1000).map(lambda row: {"id": row["id"], "value": row["id"] * 2})
# Write to Lance format using metadata catalog
write_lance(
data,
uri="s3://my-bucket/my-table",
namespace=namespace,
table_id=["default", "my_table"]
)
# Read Lance dataset back using metadata catalog
ray_dataset = read_lance(namespace=namespace, table_id=["default", "my_table"])
# Perform distributed operations
result = ray_dataset.filter(lambda row: row["value"] > 100).count()
print(f"Filtered count: {result}")