跳到内容

Lance 表格式

数据集目录

一个 Lance 数据集以目录形式组织。

/path/to/dataset:
    data/*.lance  -- Data directory
    _versions/*.manifest -- Manifest file for each dataset version.
    _indices/{UUID-*}/index.idx -- Secondary index, each index per directory.
    _deletions/*.{arrow,bin} -- Deletion files, which contain IDs of rows
      that have been deleted.

一个 Manifest 文件包含描述数据集版本的元数据。

message Manifest {
  // All fields of the dataset, including the nested fields.
  repeated lance.file.Field fields = 1;

  // Fragments of the dataset.
  repeated DataFragment fragments = 2;

  // Snapshot version number.
  uint64 version = 3;

  // The file position of the version auxiliary data.
  //  * It is not inheritable between versions.
  //  * It is not loaded by default during query.
  uint64 version_aux_data = 4;

  // Schema metadata.
  map<string, bytes> metadata = 5;

  message WriterVersion {
    // The name of the library that created this file.
    string library = 1;
    // The version of the library that created this file. Because we cannot assume
    // that the library is semantically versioned, this is a string. However, if it
    // is semantically versioned, it should be a valid semver string without any 'v'
    // prefix. For example: `2.0.0`, `2.0.0-rc.1`.
    string version = 2;
  }

  // The version of the writer that created this file.
  //
  // This information may be used to detect whether the file may have known bugs
  // associated with that writer.
  WriterVersion writer_version = 13;

  // If present, the file position of the index metadata.
  optional uint64 index_section = 6;

  // Version creation Timestamp, UTC timezone
  google.protobuf.Timestamp timestamp = 7;

  // Optional version tag
  string tag = 8;

  // Feature flags for readers.
  //
  // A bitmap of flags that indicate which features are required to be able to
  // read the table. If a reader does not recognize a flag that is set, it
  // should not attempt to read the dataset.
  //
  // Known flags:
  // * 1: deletion files are present
  // * 2: row ids are stable and stored as part of the fragment metadata.
  // * 4: use v2 format (deprecated)
  // * 8: table config is present
  uint64 reader_feature_flags = 9;

  // Feature flags for writers.
  //
  // A bitmap of flags that indicate which features must be used when writing to the
  // dataset. If a writer does not recognize a flag that is set, it should not attempt to
  // write to the dataset.
  //
  // The flag identities are the same as for reader_feature_flags, but the values of
  // reader_feature_flags and writer_feature_flags are not required to be identical.
  uint64 writer_feature_flags = 10;

  // The highest fragment ID that has been used so far.
  //
  // This ID is not guaranteed to be present in the current version, but it may
  // have been used in previous versions.
  //
  // For a single fragment, will be zero. For no fragments, will be absent.
  optional uint32 max_fragment_id = 11;

  // Path to the transaction file, relative to `{root}/_transactions`. The file at that
  // location contains a wire-format serialized Transaction message representing the
  // transaction that created this version.
  //
  // This string field "transaction_file" may be empty if no transaction file was written.
  //
  // The path format is "{read_version}-{uuid}.txn" where {read_version} is the version of
  // the table the transaction read from (serialized to decimal with no padding digits),
  // and {uuid} is a hyphen-separated UUID.
  string transaction_file = 12;

  // The next unused row id. If zero, then the table does not have any rows.
  //
  // This is only used if the "stable_row_ids" feature flag is set.
  uint64 next_row_id = 14;

  message DataStorageFormat {
    // The format of the data files (e.g. "lance")
    string file_format = 1;
    // The max format version of the data files. The format of the version can vary by
    // file_format and is not required to follow semver.
    //
    // Every file in this version of the dataset has the same file_format version.
    string version = 2;
  }

  // The data storage format
  //
  // This specifies what format is used to store the data files.
  DataStorageFormat data_format = 15;

  // Table config.
  //
  // Keys with the prefix "lance." are reserved for the Lance library. Other
  // libraries may wish to similarly prefix their configuration keys
  // appropriately.
  map<string, string> config = 16;

  // The version of the blob dataset associated with this table.  Changes to
  // blob fields will modify the blob dataset and update this version in the parent
  // table.
  //
  // If this value is 0 then there are no blob fields.
  uint64 blob_dataset_version = 17;

  // The base paths of data files.
  //
  // This is used to determine the base path of a data file. In common cases data file paths are under current dataset base path.
  // But for shallow cloning, importing file and other multi-tier storage cases, the actual data files could be outside of the current dataset.
  // This field is used with the `base_id` in `lance.file.File` and `lance.file.DeletionFile`.
  //
  // For example, if we have a dataset with base path `s3://bucket/dataset`, we have a DataFile with base_id 0, we get the actual data file path by:
  // base_paths[id = 0] + /data/ + file.path
  // the key(a.k.a index) starts from 0, increased by 1 for each new base path.
  repeated BasePath base_paths = 18;

}

数据片段

DataFragment 代表数据集中的一块数据。它本身包含一个或多个 DataFile,其中每个 DataFile 可以包含该数据块中的多个列。它也可能包含一个 DeletionFile,这将在后面的章节中解释。

message DataFragment {
  // The ID of a DataFragment is unique within a dataset.
  uint64 id = 1;

  repeated DataFile files = 2;

  // File that indicates which rows, if any, should be considered deleted.
  DeletionFile deletion_file = 3;

  // TODO: What's the simplest way we can allow an inline tombstone bitmap?

  // A serialized RowIdSequence message (see rowids.proto).
  //
  // These are the row ids for the fragment, in order of the rows as they appear.
  // That is, if a fragment has 3 rows, and the row ids are [1, 42, 3], then the
  // first row is row 1, the second row is row 42, and the third row is row 3.
  oneof row_id_sequence {
    // If small (< 200KB), the row ids are stored inline.
    bytes inline_row_ids = 5;
    // Otherwise, stored as part of a file.
    ExternalFile external_row_ids = 6;
  } // row_id_sequence

  // Number of original rows in the fragment, this includes rows that are now marked with
  // deletion tombstones. To compute the current number of rows, subtract
  // `deletion_file.num_deleted_rows` from this value.
  uint64 physical_rows = 4;

}

数据片段的整体结构如下所示。一个或多个数据文件存储数据片段的列。通过添加新的数据文件,可以向数据片段中添加新列。删除文件(如果存在)存储已从数据片段中删除的行。

Fragment Structure

每一行都有一个唯一的 ID,它是一个由两个 u32 组成的 u64:数据片段 ID 和本地行 ID。本地行 ID 只是该行在数据文件中的索引。

数据集更新与数据演进

Lance 通过操作 Manifest 元数据支持快速数据集更新和 schema 演进。

追加操作通过向数据集中追加新的 Fragment 来完成。而添加列则通过向每个 Fragment 添加新列的 DataFile 来完成。最后,覆盖数据集可以通过重置 ManifestFragment 列表来完成。

Data Evolution

Schema 和字段

字段代表列的元数据。这包括名称、数据类型、ID、可空性和编码。

字段按深度优先顺序排列,可以是以下之一:

  1. 父级(结构体)
  2. 重复(列表/数组)
  3. 叶子(原始类型)

例如,schema

a: i32
b: struct {
    c: list<i32>
    d: i32
}

将表示为以下字段列表

名称 ID 类型 父 ID 逻辑类型
a 1 叶子 0 "int32"
b 2 父级 0 "struct"
b.c 3 重复 2 "list"
b.c 4 叶子 3 "int32"
b.d 5 叶子 2 "int32"

字段编码规范

列级编码配置通过 PyArrow 字段元数据指定

import pyarrow as pa

schema = pa.schema([
    pa.field(
        "compressible_strings",
        pa.string(),
        metadata={
            "lance-encoding:compression": "zstd",
            "lance-encoding:compression-level": "3",
            "lance-encoding:structural-encoding": "miniblock",
            "lance-encoding:packed": "true"
        }
    )
])
元数据键 类型 描述 示例值 示例用法 (Python)
lance-encoding:compression 压缩 指定压缩算法 zstd metadata={"lance-encoding:compression": "zstd"}
lance-encoding:compression-level 压缩 Zstd 压缩级别 (1-22) 3 metadata={"lance-encoding:compression-level": "3"}
lance-encoding:blob 存储 标记二进制数据(>4MB)进行分块存储 true/false metadata={"lance-encoding:blob": "true"}
lance-encoding:packed 优化 结构体内存布局优化 true/false metadata={"lance-encoding:packed": "true"}
lance-encoding:structural-encoding 嵌套数据 嵌套结构的编码策略 miniblock/fullzip metadata={"lance-encoding:structural-encoding": "miniblock"}

删除

通过在 _deletions 文件夹中与数据一起添加删除文件,可以将行标记为已删除。这些文件包含已从某个数据片段中删除的行的索引。对于给定版本的数据集,每个数据片段最多可以有一个删除文件。没有删除行的片段没有删除文件。

读取器在扫描或 ANN 搜索期间应过滤掉这些删除文件中包含的行 ID。

删除文件有两种类型

  1. Arrow 文件:存储带有平面索引向量的列
  2. Roaring 位图:将索引存储为压缩位图。

Roaring 位图用于较大的删除集,而 Arrow 文件用于较小的删除集。这是因为 Roaring 位图在小集合中效率较低。

删除文件的文件名结构如下:

_deletions/{fragment_id}-{read_version}-{random_id}.{arrow|bin}

其中 fragment_id 是文件对应的数据片段,read_version 是它创建时的数据集版本(通常比提交的版本小一),random_id 是一个随机的 i64,用于避免冲突。后缀由文件类型决定(Arrow 文件为 .arrow,Roaring 位图为 .bin)。

message DeletionFile {
  // Type of deletion file, which varies depending on what is the most efficient
  // way to store the deleted row offsets. If none, then will be unspecified. If there are
  // sparsely deleted rows, then ARROW_ARRAY is the most efficient. If there are
  // densely deleted rows, then BIT_MAP is the most efficient.
  enum DeletionFileType {
    // Deletion file is a single Int32Array of deleted row offsets. This is stored as
    // an Arrow IPC file with one batch and one column. Has a .arrow extension.
    ARROW_ARRAY = 0;
    // Deletion file is a Roaring Bitmap of deleted row offsets. Has a .bin extension.
    BITMAP = 1;
  }

  // Type of deletion file. If it is unspecified, then the remaining fields will be missing.
  DeletionFileType file_type = 1;
  // The version of the dataset this deletion file was built from.
  uint64 read_version = 2;
  // An opaque id used to differentiate this file from others written by concurrent
  // writers.
  uint64 id = 3;
  // The number of rows that are marked as deleted.
  uint64 num_deleted_rows = 4;
  // The base path index of the data file. Used when the file is imported or referred from another dataset.
  // Lance use it as key of the base_paths field in Manifest to determine the actual base path of the data file.
  optional uint32 base_id = 7;

}

删除可以通过重新写入删除行的文件来实现。然而,这会使行索引和 ANN 索引失效,重新计算这些索引的成本可能很高。

提交数据集

通过将新的 manifest 文件写入 _versions 目录来提交数据集的新版本。

为了防止并发写入者相互覆盖,提交过程必须对所有写入者都是原子且一致的。如果两个写入者尝试使用不同的机制提交,它们可能会相互覆盖对方的更改。对于任何原生支持原子 rename-if-not-exists 或 put-if-not-exists 的存储系统,应使用这些操作。这适用于本地文件系统和大多数云对象存储,包括 Amazon S3、Google Cloud Storage、Microsoft Azure Blob Storage。对于缺少此功能的系统,用户可以配置外部锁定机制。

Manifest 命名方案

Manifest 文件必须使用一致的命名方案。名称对应于版本。这样我们就可以打开正确版本的数据集,而无需读取所有 manifest。这也清楚地表明了下一个要写入的文件路径。

有两种命名方案可以使用

  1. V1:_versions/{version}.manifest。这是传统的命名方案。
  2. V2:_versions/{u64::MAX - version:020}.manifest。这是新的命名方案。版本用零填充(20 位),并从 u64::MAX 中减去。这使得版本可以按降序排序,从而可以通过一次列表调用在对象存储上找到最新的 manifest。

如果混合使用这两种命名方案,则会出错。

冲突解决

如果两个写入者同时尝试提交,一个会成功,另一个会失败。失败的写入者应尝试重试提交,但前提是其更改与成功写入者所做的更改兼容。

给定提交的更改记录为事务文件,位于数据集目录的 _transactions 前缀下。事务文件是序列化的 Transaction protobuf 消息。有关其定义,请参阅 transaction.proto 文件。

Conflict Resolution Flow

提交过程如下:

  1. 写入者完成所有数据文件的写入。
  2. 写入者在 _transactions 目录中创建一个事务文件。此文件描述了执行的操作,用于两个目的:(1) 检测冲突,以及 (2) 在重试期间重建 manifest。
  3. 查找自写入者开始写入以来所有新的提交。如果有,读取它们的事务文件并检查冲突。如果有任何冲突,中止提交。否则,继续。
  4. 构建一个 manifest 并尝试将其提交到下一个版本。如果提交因另一个写入者已提交而失败,则返回步骤 3。

在检查两个事务是否冲突时,应保守。如果事务文件缺失,则假定它冲突。如果事务文件有未知操作,则假定它冲突。

外部 Manifest 存储

如果后端对象存储不支持 *-if-not-exists 操作,可以使用外部 manifest 存储来允许并发写入者。外部 manifest 存储是一个支持 put-if-not-exists 操作的 KV 存储。外部 manifest 存储补充而不是替代对象存储中的 manifest。不了解外部 manifest 存储的读取器可以读取使用它的表,但它可能比表的真实最新版本落后一个版本。

External Store Commit

提交过程如下:

  1. PUT_OBJECT_STORE mydataset.lance/_versions/{version}.manifest-{uuid} 将新的 manifest 暂存到对象存储中,路径由新的 uuid 确定。
  2. PUT_EXTERNAL_STORE base_uri, version, mydataset.lance/_versions/{version}.manifest-{uuid} 将暂存的 manifest 的路径提交到外部存储。
  3. COPY_OBJECT_STORE mydataset.lance/_versions/{version}.manifest-{uuid} mydataset.lance/_versions/{version}.manifest 将暂存的 manifest 复制到最终路径。
  4. PUT_EXTERNAL_STORE base_uri, version, mydataset.lance/_versions/{version}.manifest 更新外部存储以指向最终 manifest。

请注意,提交在步骤 2 之后实际上已完成。如果写入者在步骤 2 之后失败,读取器将能够检测到外部存储和对象存储不同步,并尝试同步这两个存储。如果同步重试失败,读取器将拒绝加载。这是为了确保数据集始终可通过复制数据集目录而无需特殊工具即可移植。

External Store Reader

读取器加载过程如下:

  1. GET_EXTERNAL_STORE base_uri, version, path,如果 path 不以 UUID 结尾,则返回 path。
  2. COPY_OBJECT_STORE mydataset.lance/_versions/{version}.manifest-{uuid} mydataset.lance/_versions/{version}.manifest 重新尝试同步。
  3. PUT_EXTERNAL_STORE base_uri, version, mydataset.lance/_versions/{version}.manifest 更新外部存储以指向最终 manifest。
  4. RETURN mydataset.lance/_versions/{version}.manifest 始终返回最终路径,如果同步失败则返回错误。

功能:稳定行 ID

行 ID 功能为表中的每一行分配一个唯一的 u64 ID。此 ID 在行的整个生命周期内都是稳定的。为了加快访问速度,创建了一个辅助索引,将行 ID 映射到其在表中的位置。这些索引的相应部分存储在相应的数据片段的元数据中。

行 ID:分配给表中每一行的唯一自增 u64 ID。

行地址:行在表中的当前位置。这是一个 u64,可以看作是两个 u32 值的组合:数据片段 ID 和本地行偏移量。例如,如果行地址是 (42, 9),则该行位于第 42 个数据片段中,并且是该数据片段中的第 10 行。

行 ID 序列:数据片段中的行 ID 序列。

行 ID 索引:将行 ID 映射到行地址的辅助索引。此索引通过读取所有行 ID 序列来构建。

分配行 ID

行 ID 以单调递增的序列分配。下一个行 ID 存储在 manifest 中,字段名为 next_row_id。它从零开始。提交时,写入者使用该字段为新的数据片段分配行 ID。如果提交失败,写入者将重新读取新的 next_row_id,更新新的行 ID,然后再次尝试。这类似于 max_fragment_id 用于分配新的数据片段 ID 的方式。

当行 ID 更新时,通常会分配一个新的行 ID,而不是重用旧的。这是因为此功能没有机制来更新可能引用旧行 ID 值的辅助索引。通过删除旧的行 ID 并创建一个新的行 ID,辅助索引将避免引用过期数据。

行 ID 序列

数据片段的行 ID 值存储在 RowIdSequence protobuf 消息中。这在 protos/rowids.proto 文件中描述。行 ID 序列只是 u64 值数组,它们具有针对常见情况进行优化的表示形式,即它们已排序且可能连续。例如,一个新的数据片段将具有一个简单的范围行 ID 序列,因此它存储为 startend 值。

这些序列消息要么内联存储在数据片段元数据中,要么写入单独的文件并从数据片段元数据中引用。这种选择通常基于序列的大小。如果序列很小,则内联存储。如果序列很大,则写入单独的文件。通过将小序列内联,我们可以避免额外的 I/O 操作开销。

oneof row_id_sequence {
    // Inline sequence
    bytes inline_sequence = 1;
    // External file reference
    string external_file = 2;
} // row_id_sequence

行 ID 索引

为了确保通过行 ID 快速访问行,创建了一个辅助索引,将行 ID 映射到它们在表中的位置。此索引在加载表时构建,基于数据片段中的行 ID 序列。例如,如果数据片段 42 的行 ID 序列是 [0, 63, 10],则索引将有 0 -> (42, 0)63 -> (42, 1)10 -> (42, 2) 的条目。此索引的确切形式取决于实现,但应针对快速查找进行优化。

行 ID 掩码

由于索引文件是不可变的,它们可能包含对已删除或具有新值的行 ID 的引用。为了处理这种情况,为索引创建了一个掩码。

Index and Row ID marks

例如,考虑上图中所示的序列。它有一个包含两列 strvec 的数据集。一个字符串列和一个向量列。它们各自都有索引,字符串列有标量索引,向量列有向量索引。数据集中只有一个数据片段,行 ID 连续为 1 到 3。

当执行更新操作修改行 2 中的 vec 列时,会创建一个包含更新值的新数据片段。在原始数据片段中添加一个删除文件,标记第一个文件中的行 2 为已删除。在 str 索引中,数据片段位图会更新以反映行 ID 的新位置:{1, 2}。同时,vec 索引的数据片段位图不会更新,保持为 {1}。这是因为 vec 中的值已更新,因此索引中的数据不再反映表中的数据。