跳到内容

DataFrame 创建表

使用 DataSource V2 API 从 DataFrames 创建 Lance 表。

基本的 DataFrame 创建

# Create DataFrame
data = [
(1, "Alice", "alice@example.com"),
(2, "Bob", "bob@example.com"),
(3, "Charlie", "charlie@example.com")
]
df = spark.createDataFrame(data, ["id", "name", "email"])

# Write as new table using catalog
df.writeTo("users").create()
import spark.implicits._

// Create DataFrame
val data = Seq(
    (1, "Alice", "alice@example.com"),
    (2, "Bob", "bob@example.com"),
    (3, "Charlie", "charlie@example.com")
)
val df = data.toDF("id", "name", "email")

// Write as new table using catalog
df.writeTo("users").create()
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

// Create DataFrame
List<Row> data = Arrays.asList(
    RowFactory.create(1L, "Alice", "alice@example.com"),
    RowFactory.create(2L, "Bob", "bob@example.com"),
    RowFactory.create(3L, "Charlie", "charlie@example.com")
);

StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.LongType, false, Metadata.empty()),
    new StructField("name", DataTypes.StringType, true, Metadata.empty()),
    new StructField("email", DataTypes.StringType, true, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

// Write as new table using catalog
df.writeTo("users").create();

创建带向量列的表

Lance 支持用于 AI 工作负载的向量(嵌入)列。这些列在内部存储为 Arrow FixedSizeList[n],其中 n 是向量维度。由于 Spark DataFrames 没有原生的固定大小数组类型,您需要向您的模式字段添加元数据,以指示 ArrayType(FloatType)ArrayType(DoubleType) 应该转换为 Arrow FixedSizeList。

元数据键 "arrow.fixed-size-list.size",其值如 128,告诉 Lance-Spark 连接器在写入操作期间将该数组列转换为 FixedSizeList[128]

支持的类型

  • 元素类型FloatType (float32),DoubleType (float64)
  • 数组要求:
  • 必须包含 containsNull = false
  • 列必须不可为空
  • 所有数组必须具有完全指定的维度

示例

from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, FloatType
from pyspark.sql.types import Metadata

# Create schema with vector column
vector_metadata = {"arrow.fixed-size-list.size": 128}
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("embeddings", ArrayType(FloatType(), False), False, vector_metadata)
])

# Create DataFrame with vector data
import numpy as np
data = [(i, np.random.rand(128).astype(np.float32).tolist()) for i in range(100)]
df = spark.createDataFrame(data, schema)

# Write to Lance format
df.writeTo("vectors_table").create()
import org.apache.spark.sql.types._

// Create metadata for vector column
val vectorMetadata = new MetadataBuilder()
  .putLong("arrow.fixed-size-list.size", 128)
  .build()

// Create schema with vector column
val schema = StructType(Array(
  StructField("id", IntegerType, false),
  StructField("embeddings", ArrayType(FloatType, false), false, vectorMetadata)
))

// Create DataFrame with vector data
import scala.util.Random
val data = (0 until 100).map { i =>
  (i, Array.fill(128)(Random.nextFloat()))
}
val df = spark.createDataFrame(data).toDF("id", "embeddings")

// Write to Lance format
df.writeTo("vectors_table").create()
import org.apache.spark.sql.types.*;

// Create metadata for vector column
Metadata vectorMetadata = new MetadataBuilder()
    .putLong("arrow.fixed-size-list.size", 128)
    .build();

// Create schema with vector column
StructType schema = new StructType(new StructField[] {
    DataTypes.createStructField("id", DataTypes.IntegerType, false),
    DataTypes.createStructField("embeddings", 
        DataTypes.createArrayType(DataTypes.FloatType, false),
        false, vectorMetadata)
});

// Create DataFrame with vector data
List<Row> rows = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
    float[] vector = new float[128];
    for (int j = 0; j < 128; j++) {
        vector[j] = random.nextFloat();
    }
    rows.add(RowFactory.create(i, vector));
}
Dataset<Row> df = spark.createDataFrame(rows, schema);

// Write to Lance format
df.writeTo("vectors_table").create();

创建多个向量列

您可以创建具有多个向量列的 DataFrames,每个列具有不同的维度

from pyspark.sql.types import DoubleType

# Create schema with multiple vector columns
text_metadata = {"arrow.fixed-size-list.size": 384}
image_metadata = {"arrow.fixed-size-list.size": 512}

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("text_embeddings", ArrayType(FloatType(), False), False, text_metadata),
    StructField("image_embeddings", ArrayType(DoubleType(), False), False, image_metadata)
])

# Create DataFrame with multiple vector columns
data = [
    (i, 
     np.random.rand(384).astype(np.float32).tolist(),
     np.random.rand(512).tolist())
    for i in range(100)
]
df = spark.createDataFrame(data, schema)

# Write to Lance format
df.writeTo("multi_vectors_table").create()

向量索引

创建带有向量列的表后,您可以使用 Lance Python API 创建向量索引以进行高效的相似性搜索

import lance
import numpy as np

# Open the dataset
ds = lance.dataset("/path/to/vectors_table.lance")

# Create a vector index on the embeddings column
ds.create_index(
    "embeddings",
    index_type="IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)

# Perform similarity search
query_vector = np.random.rand(128).astype(np.float32)
results = ds.to_table(
    nearest={"column": "embeddings", "q": query_vector, "k": 10}
).to_pandas()

注意:当将向量列读回 Spark DataFrames 时,它们会自动转换为常规的 ArrayType(FloatType)ArrayType(DoubleType),以与 Spark 操作兼容。