使用 Lance 写入和读取数据集¶
在此示例中,我们将一个简单的 Lance 数据集写入磁盘。然后我们将读取它并打印出一些基本属性,例如数据集的模式和每个记录批次的尺寸。该示例仅使用一个记录批次,但它也应该适用于更大的数据集(多个记录批次)。
写入原始数据集¶
// Writes sample dataset to the given path
async fn write_dataset(data_path: &str) {
// Define new schema
let schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::UInt32, false),
Field::new("value", DataType::UInt32, false),
]));
// Create new record batches
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6])),
Arc::new(UInt32Array::from(vec![6, 7, 8, 9, 10, 11])),
],
)
.unwrap();
let batches = RecordBatchIterator::new([Ok(batch)], schema.clone());
// Define write parameters (e.g. overwrite dataset)
let write_params = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
Dataset::write(batches, data_path, Some(write_params))
.await
.unwrap();
} // End write dataset
首先我们为数据集定义一个模式,并从该模式创建一个记录批次。接下来我们遍历记录批次(在此示例中只有一个),并将它们写入磁盘。我们还定义了写入参数(设置为覆盖),然后将数据集写入磁盘。
读取 Lance 数据集¶
现在我们已经将数据集写入新目录,我们可以将其读回并打印出一些基本属性。
// Reads dataset from the given path and prints batch size, schema for all record batches. Also extracts and prints a slice from the first batch
async fn read_dataset(data_path: &str) {
let dataset = Dataset::open(data_path).await.unwrap();
let scanner = dataset.scan();
let mut batch_stream = scanner.try_into_stream().await.unwrap().map(|b| b.unwrap());
while let Some(batch) = batch_stream.next().await {
println!("Batch size: {}, {}", batch.num_rows(), batch.num_columns()); // print size of batch
println!("Schema: {:?}", batch.schema()); // print schema of recordbatch
println!("Batch: {:?}", batch); // print the entire recordbatch (schema and data)
}
} // End read dataset
首先我们打开数据集,并创建一个扫描器对象。我们用它来创建一个 batch_stream
,它将允许我们访问数据集中的每个记录批次。然后我们遍历记录批次,并打印出每个批次的尺寸和模式。
完整示例¶
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::{RecordBatch, RecordBatchIterator};
use futures::StreamExt;
use lance::dataset::{WriteMode, WriteParams};
use lance::Dataset;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data_path: &str = "./temp_data.lance";
write_dataset(data_path).await;
read_dataset(data_path).await;
}