跳到内容

使用 Lance 创建 LLM 训练的文本数据集

Lance 可用于创建和缓存用于大型语言模型预训练/微调的文本(或代码)数据集。当需要在部分数据上训练模型或分块处理数据而无需一次性将所有数据下载到磁盘时,就会出现这种需求。当您只想使用 TB 或 PB 级别数据集的子集时,这会成为一个相当大的问题。

在此示例中,我们将通过分批下载文本数据集、对其进行分词并将其保存为 Lance 数据集来绕过此问题。您可以根据需要执行此操作,平均内存消耗约为 3-4 GB!

在此示例中,我们使用 wikitext 数据集,该数据集是维基百科上经过验证的优质文章和特色文章集中的超过 1 亿个标记的集合。

准备和预处理原始数据集

我们首先定义数据集和分词器

import lance
import pyarrow as pa

from datasets import load_dataset
from transformers import AutoTokenizer
from tqdm.auto import tqdm  # optional for progress tracking

tokenizer = AutoTokenizer.from_pretrained('gpt2')

dataset = load_dataset('wikitext', 'wikitext-103-raw-v1', streaming=True)['train']
dataset = dataset.shuffle(seed=1337)

load_dataset 中的 streaming 参数特别重要,因为如果您在不将其设置为 True 的情况下运行它,数据集库将首先下载整个数据集,即使您只想使用它的一个子集。将 streaming 设置为 True 后,样本将在需要时下载。

现在我们将定义一个函数来帮助我们逐个对样本进行分词。

def tokenize(sample, field='text'):
    return tokenizer(sample[field])['input_ids']

此函数将从 Hugging Face 数据集中接收一个样本,并对 field 列中的值进行分词。这是您要分词的主要文本。

创建 Lance 数据集

现在我们已经设置好原始数据集和预处理代码,接下来定义主函数,该函数接收数据集、样本数和字段,并返回一个 pyarrow 批次,该批次稍后将被写入 Lance 数据集。

def process_samples(dataset, num_samples=100_000, field='text'):
    current_sample = 0
    for sample in tqdm(dataset, total=num_samples):
        # If we have added all 5M samples, stop
        if current_sample == num_samples:
            break
        if not sample[field]:
            continue
        # Tokenize the current sample
        tokenized_sample = tokenize(sample, field)
        # Increment the counter
        current_sample += 1
        # Yield a PyArrow RecordBatch
        yield pa.RecordBatch.from_arrays(
            [tokenized_sample], 
            names=["input_ids"]
        )

此函数将遍历 Hugging Face 数据集,一次一个样本,对样本进行分词,并生成一个包含所有标记的 pyarrow RecordBatch。我们将一直这样做,直到达到 num_samples 数量的样本或数据集结束(以先发生者为准)。

请注意,我们所说的“样本”是指原始数据集中的一个示例(行)。一个示例具体意味着什么将取决于数据集本身,因为它可以是一行文本或整个文本文件。在此示例中,其长度介于一行和一段文本之间。

我们还需要定义一个模式来告诉 Lance 我们期望在表格中使用什么类型的数据。由于我们的数据集仅包含长整型标记,因此 int64 是合适的数据类型。

schema = pa.schema([
    pa.field("input_ids", pa.int64())
])

最后,我们需要定义一个 reader,它将从我们的 process_samples 函数中读取记录批次流,该函数生成由单个分词样本组成的记录批次。

reader = pa.RecordBatchReader.from_batches(
    schema, 
    process_samples(dataset, num_samples=500_000, field='text') # For 500K samples
)

最后,我们使用 lance.write_dataset,它会将数据集写入磁盘。

# Write the dataset to disk
lance.write_dataset(
    reader, 
    "wikitext_500K.lance",
    schema
)

如果您想在将标记保存到磁盘之前对其进行一些其他预处理(例如掩码等),您可以将其添加到 process_samples 函数中。

就是这样!您的数据集已分词并保存到磁盘!