DataFrame 创建表¶
使用 DataSource V2 API 从 DataFrames 创建 Lance 表。
基本的 DataFrame 创建¶
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
// Create DataFrame
List<Row> data = Arrays.asList(
RowFactory.create(1L, "Alice", "alice@example.com"),
RowFactory.create(2L, "Bob", "bob@example.com"),
RowFactory.create(3L, "Charlie", "charlie@example.com")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("email", DataTypes.StringType, true, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// Write as new table using catalog
df.writeTo("users").create();
创建带向量列的表¶
Lance 支持用于 AI 工作负载的向量(嵌入)列。这些列在内部存储为 Arrow FixedSizeList[n]
,其中 n
是向量维度。由于 Spark DataFrames 没有原生的固定大小数组类型,您需要向您的模式字段添加元数据,以指示 ArrayType(FloatType)
或 ArrayType(DoubleType)
应该转换为 Arrow FixedSizeList。
元数据键 "arrow.fixed-size-list.size"
,其值如 128
,告诉 Lance-Spark 连接器在写入操作期间将该数组列转换为 FixedSizeList[128]
。
支持的类型¶
- 元素类型:
FloatType
(float32),DoubleType
(float64) - 数组要求:
- 必须包含
containsNull = false
- 列必须不可为空
- 所有数组必须具有完全指定的维度
示例¶
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, FloatType
from pyspark.sql.types import Metadata
# Create schema with vector column
vector_metadata = {"arrow.fixed-size-list.size": 128}
schema = StructType([
StructField("id", IntegerType(), False),
StructField("embeddings", ArrayType(FloatType(), False), False, vector_metadata)
])
# Create DataFrame with vector data
import numpy as np
data = [(i, np.random.rand(128).astype(np.float32).tolist()) for i in range(100)]
df = spark.createDataFrame(data, schema)
# Write to Lance format
df.writeTo("vectors_table").create()
import org.apache.spark.sql.types._
// Create metadata for vector column
val vectorMetadata = new MetadataBuilder()
.putLong("arrow.fixed-size-list.size", 128)
.build()
// Create schema with vector column
val schema = StructType(Array(
StructField("id", IntegerType, false),
StructField("embeddings", ArrayType(FloatType, false), false, vectorMetadata)
))
// Create DataFrame with vector data
import scala.util.Random
val data = (0 until 100).map { i =>
(i, Array.fill(128)(Random.nextFloat()))
}
val df = spark.createDataFrame(data).toDF("id", "embeddings")
// Write to Lance format
df.writeTo("vectors_table").create()
import org.apache.spark.sql.types.*;
// Create metadata for vector column
Metadata vectorMetadata = new MetadataBuilder()
.putLong("arrow.fixed-size-list.size", 128)
.build();
// Create schema with vector column
StructType schema = new StructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("embeddings",
DataTypes.createArrayType(DataTypes.FloatType, false),
false, vectorMetadata)
});
// Create DataFrame with vector data
List<Row> rows = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
float[] vector = new float[128];
for (int j = 0; j < 128; j++) {
vector[j] = random.nextFloat();
}
rows.add(RowFactory.create(i, vector));
}
Dataset<Row> df = spark.createDataFrame(rows, schema);
// Write to Lance format
df.writeTo("vectors_table").create();
创建多个向量列¶
您可以创建具有多个向量列的 DataFrames,每个列具有不同的维度
from pyspark.sql.types import DoubleType
# Create schema with multiple vector columns
text_metadata = {"arrow.fixed-size-list.size": 384}
image_metadata = {"arrow.fixed-size-list.size": 512}
schema = StructType([
StructField("id", IntegerType(), False),
StructField("text_embeddings", ArrayType(FloatType(), False), False, text_metadata),
StructField("image_embeddings", ArrayType(DoubleType(), False), False, image_metadata)
])
# Create DataFrame with multiple vector columns
data = [
(i,
np.random.rand(384).astype(np.float32).tolist(),
np.random.rand(512).tolist())
for i in range(100)
]
df = spark.createDataFrame(data, schema)
# Write to Lance format
df.writeTo("multi_vectors_table").create()
向量索引¶
创建带有向量列的表后,您可以使用 Lance Python API 创建向量索引以进行高效的相似性搜索
import lance
import numpy as np
# Open the dataset
ds = lance.dataset("/path/to/vectors_table.lance")
# Create a vector index on the embeddings column
ds.create_index(
"embeddings",
index_type="IVF_PQ",
num_partitions=256,
num_sub_vectors=16
)
# Perform similarity search
query_vector = np.random.rand(128).astype(np.float32)
results = ds.to_table(
nearest={"column": "embeddings", "q": query_vector, "k": 10}
).to_pandas()
注意:当将向量列读回 Spark DataFrames 时,它们会自动转换为常规的 ArrayType(FloatType)
或 ArrayType(DoubleType)
,以与 Spark 操作兼容。