跳到内容

使用 Lance 写入和读取数据集

在此示例中,我们将一个简单的 Lance 数据集写入磁盘。然后我们将读取它并打印出一些基本属性,例如数据集的模式和每个记录批次的尺寸。该示例仅使用一个记录批次,但它也应该适用于更大的数据集(多个记录批次)。

写入原始数据集

// Writes sample dataset to the given path
async fn write_dataset(data_path: &str) {
    // Define new schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("key", DataType::UInt32, false),
        Field::new("value", DataType::UInt32, false),
    ]));

    // Create new record batches
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6])),
            Arc::new(UInt32Array::from(vec![6, 7, 8, 9, 10, 11])),
        ],
    )
    .unwrap();

    let batches = RecordBatchIterator::new([Ok(batch)], schema.clone());

    // Define write parameters (e.g. overwrite dataset)
    let write_params = WriteParams {
        mode: WriteMode::Overwrite,
        ..Default::default()
    };

    Dataset::write(batches, data_path, Some(write_params))
        .await
        .unwrap();
} // End write dataset

首先我们为数据集定义一个模式,并从该模式创建一个记录批次。接下来我们遍历记录批次(在此示例中只有一个),并将它们写入磁盘。我们还定义了写入参数(设置为覆盖),然后将数据集写入磁盘。

读取 Lance 数据集

现在我们已经将数据集写入新目录,我们可以将其读回并打印出一些基本属性。

// Reads dataset from the given path and prints batch size, schema for all record batches. Also extracts and prints a slice from the first batch
async fn read_dataset(data_path: &str) {
    let dataset = Dataset::open(data_path).await.unwrap();
    let scanner = dataset.scan();

    let mut batch_stream = scanner.try_into_stream().await.unwrap().map(|b| b.unwrap());

    while let Some(batch) = batch_stream.next().await {
        println!("Batch size: {}, {}", batch.num_rows(), batch.num_columns()); // print size of batch
        println!("Schema: {:?}", batch.schema()); // print schema of recordbatch

        println!("Batch: {:?}", batch); // print the entire recordbatch (schema and data)
    }
} // End read dataset

首先我们打开数据集,并创建一个扫描器对象。我们用它来创建一个 batch_stream,它将允许我们访问数据集中的每个记录批次。然后我们遍历记录批次,并打印出每个批次的尺寸和模式。

完整示例

use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::{RecordBatch, RecordBatchIterator};
use futures::StreamExt;
use lance::dataset::{WriteMode, WriteParams};
use lance::Dataset;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data_path: &str = "./temp_data.lance";

    write_dataset(data_path).await;
    read_dataset(data_path).await;
}