Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add add_columns job #3180

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions python/python/lance/_datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,28 @@ def is_datagen_supported():


def rand_batches(
schema: pa.Schema, *, num_batches: int = None, batch_size_bytes: int = None
schema: pa.Schema,
*,
num_batches: int = None,
batch_size_bytes: int = None,
batch_size_rows: int = None,
):
if not datagen.is_datagen_supported():
raise NotImplementedError(
"This version of lance was not built with the datagen feature"
)
return datagen.rand_batches(schema, num_batches, batch_size_bytes)
return datagen.rand_batches(schema, num_batches, batch_size_bytes, batch_size_rows)


def rand_reader(
schema: pa.Schema,
*,
num_batches: int = None,
batch_size_bytes: int = None,
batch_size_rows: int = None,
):
if not datagen.is_datagen_supported():
raise NotImplementedError(
"This version of lance was not built with the datagen feature"
)
return datagen.rand_reader(schema, num_batches, batch_size_bytes, batch_size_rows)
197 changes: 197 additions & 0 deletions python/python/lance/evolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from typing import TYPE_CHECKING, Any, Dict, List

import pyarrow as pa

from . import LanceDataset, LanceOperation, write_dataset
from .fragment import write_fragments
from .lance.align import (
AlignFragmentsPlan,
)
from .types import _coerce_reader

if TYPE_CHECKING:
from .types import ReaderLike


class DataShard:
"""
A shard of new data to be added to a dataset.

This is returned by the ``add_data`` method of an ``AddColumnsJob``. This should
be collected and passed to the ``finish_adding_data`` method of the job.

This should be treated as an opaque pickleable object.
"""

def __init__(self, fragments):
self._fragments = fragments


class AddColumnsJob:
"""
A job to add new columns to a dataset.

This job can be used to distribute the work of adding new columns to a dataset
across multiple workers. The job is created by calling the ``create`` method.
This job operates in two phases.

In the first phase the new data is calculated. The new data is written to a
temporary dataset. Once all the new data is calculated the job enters the second
phase where the new data is aligned with the target dataset. The alignment phase
reads from the temporary dataset and rewrites the data in fragments aligned with
the target dataset. Both phases can be distributed across multiple workers.

The job itself should be pickled and sent to any number of workers. The workers
can then call the ``add_data`` method to create a data shard containing some new
values for the new columns. These data shards should be collected and passed to
some finalization step which will call the ``finish_adding_data`` method to create
an ``AlignColumnsPlan`` which can be used to commit the new data to the dataset.
Details on the alignment phase can be found in the documentation for
``AlignColumnsPlan``.

It is not required that a value be given for every row in the target dataset. If a
value is not given for a row then the value for the new columns will be set to null.
New columns created using the ``AddColumnsJob`` will be appended to the end of the
schema of the target dataset and will always be nullable columns. If desired, then
``LanceDataset.alter_columns`` can be used to change the nullability of the new
columns after they are added.

Modifications to he target dataset can be tolerated while the new data is being.
Any new rows added after the job has been distributed to workers will have null for
the new columns. New values calculated for rows that have since been deleted will
be ignored. Note: if a row is used to calculate new values, and then that row is
updated before the new data is committed, then the new values (based on the old row)
will still be inserted.
"""

def __init__(
self,
target: LanceDataset,
source: LanceDataset,
join_key: str,
):
self.target = target
self.source = source
self.join_key = join_key

@staticmethod
def create(
target,
source_uri: str,
new_schema: pa.Schema,
join_key: str,
*,
overwrite_tmp: bool = False,
) -> "AddColumnsJob":
"""
Creates a new AddColumnsJob instance to add new columns to ``target``.

Parameters
----------

target : LanceDataset
The dataset to which the new columns will be added.

source_uri: str
The URI of the temporary dataset to which the new data will be written.

There must not be a dataset at this URI unless ``overwrite_tmp`` is set
to True in which case the existing dataset will be overwritten.

The temporary dataset must be in a location that is accessible to all
workers that will be adding data to the job.

new_schema : pa.Schema
The schema of the new columns to be added. This schema must contain
the join key column.

join_key : str
The column used to join the new data with the existing data. There are
special values that can be used here that will affect the behavior of the
job.

If the join_key value is ``_dataset_offset`` then the new data should have
a ``_dataset_offset`` column which indicates the offset of the data in the
target dataset.

If the join key value is ``_rowid`` then the new data should have a
``_rowid`` column which indicates the row id of the data in the target
dataset.

If the join key value is anything else then the both the new data and the
target dataset must have a column with the same name. A join will be
performed on this column.
"""
if new_schema.field(0).name != join_key:
if any(f.name == join_key for f in new_schema):
raise ValueError(
f"If the join_key ({join_key}) is in the new_schema it "
"must be the first column"
)
if join_key == "_rowid" or join_key == "_dataset_offset":
new_schema = pa.schema(
[pa.field(join_key, pa.uint64())] + list(iter(new_schema))
)
else:
# We can't infer type so we require it to be in the schema
raise ValueError(
f"join_key ({join_key}) must be included in new_schema "
"when not using _rowid or _dataset_offset"
)
source = write_dataset(
[],
source_uri,
schema=new_schema,
mode="create",
data_storage_version=target.data_storage_version,
)
return AddColumnsJob(target, source, join_key)

@staticmethod
def _disallow_arg(args: Dict[Any, Any], arg: str):
if arg in args:
raise TypeError(f"{arg} cannot be used in add_data")

def add_data(self, data: "ReaderLike", **kwargs) -> DataShard:
"""
Adds new data. The new data must contain the join key column.

Returns a DataShard. This can be pickled and should be collected and
passed to the ``finish_adding_data`` method.
"""
# This must always be append
self._disallow_arg(kwargs, "mode")
# This is inherited from the base dataset
self._disallow_arg(kwargs, "data_storage_version")
# Soon to be deprecated
self._disallow_arg(kwargs, "use_legacy_format")

reader = _coerce_reader(data)
first_col = reader.schema.field(0)
if first_col.name != self.join_key:
raise ValueError(
"First column in the new data must be "
f"the join key column {self.join_key}"
)

frags = write_fragments(data, self.source)
return DataShard(frags)

def finish_adding_data(
self, shards: List[DataShard]
) -> (AlignFragmentsPlan, LanceDataset):
all_frags = []
for shard in shards:
all_frags.extend(shard._fragments)

op = LanceOperation.Append(all_frags)
# TODO: Ideally we should fail here if finish_adding_data is called twice
# but we don't have a good way to do that yet (two appends will be allowed)
source = LanceDataset.commit(self.source, op, read_version=1)
return (
AlignFragmentsPlan.create(source._ds, self.target._ds, self.join_key),
source,
)
3 changes: 3 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def infer_tfrecord_schema(
) -> pa.Schema: ...
def read_tfrecord(uri: str, schema: pa.Schema) -> pa.RecordBatchReader: ...

class _Dataset:
pass

class CleanupStats:
bytes_removed: int
old_versions: int
Expand Down
26 changes: 26 additions & 0 deletions python/python/lance/lance/align/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import List

from ... import _Dataset

class AlignFragmentsPlan:
tasks: List["AlignFragmentsTask"]
@staticmethod
def create(
source: _Dataset, target: _Dataset, join_key: str
) -> AlignFragmentsPlan: ...
def commit(
self,
results: List[AlignFragmentsTaskResult],
source: _Dataset,
target: _Dataset,
) -> None: ...
def __repr__(self) -> str: ...

class AlignFragmentsTask:
def execute(
self, source: _Dataset, target: _Dataset
) -> AlignFragmentsTaskResult: ...
def __repr__(self) -> str: ...

class AlignFragmentsTaskResult:
def __repr__(self) -> str: ...
67 changes: 67 additions & 0 deletions python/python/tests/test_schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-FileCopyrightText: Copyright The Lance Authors

import os
import pickle
import random
import uuid
from pathlib import Path

Expand All @@ -12,6 +14,7 @@
import pyarrow.compute as pc
import pytest
from lance import LanceDataset
from lance.evolution import AddColumnsJob
from lance.file import LanceFileReader, LanceFileWriter


Expand Down Expand Up @@ -512,3 +515,67 @@ def some_udf(batch):

with pytest.raises(ValueError, match="A checkpoint file cannot be used"):
frag.merge_columns(some_udf, columns=["a"])


def test_add_columns_task_dataset_offset(tmp_path: Path):
# Create our initial dataset
tab = pa.table({"a": range(1000)})
dataset = lance.write_dataset(tab, tmp_path / "primary", max_rows_per_file=100)

# Create a job to add a new column named "b"
job = AddColumnsJob.create(
dataset,
tmp_path / "tmp",
new_schema=pa.schema([pa.field("b", pa.int64())]),
join_key="_dataset_offset",
)

# We divide the input into 20 tasks and only run 14 of them (in random order)
task_ids = list(range(20))
random.shuffle(task_ids)
task_ids = task_ids[:14]

# Note: we only populate data for 700 rows so 300 rows will have null values
# Each task is responsible for adding 50 rows
data_shards = []
for task_id in task_ids:
offset = task_id * 50
old_data = dataset.to_table(offset=offset, limit=50)
col_b = pc.add(old_data["a"], 1)
offsets = pa.array([offset + i for i in range(50)], pa.uint64())
new_data = pa.table({"_dataset_offset": offsets, "b": col_b})

# Simulate pickling and unpickling the job
job = pickle.loads(pickle.dumps(job))
shard = job.add_data(new_data)
shard = pickle.loads(pickle.dumps(shard))
data_shards.append(shard)

# Prepare for alignment
(alignment_plan, source) = job.finish_adding_data(data_shards)
target = dataset

# Run the alignment tasks in random order
tasks = list(alignment_plan.tasks)
random.shuffle(tasks)

aligned_frags = [
pickle.loads(pickle.dumps(task)).execute(source._ds, target._ds)
for task in tasks
]

# Commit the aligned fragments
alignment_plan.commit(aligned_frags, source._ds, target._ds)

# Check the final dataset
data = lance.dataset(tmp_path / "primary").to_table()

a_vals = data.column("a").to_pylist()
b_vals = data.column("b").to_pylist()

null_count = 0
for a_val, b_val in zip(a_vals, b_vals):
if b_val is None:
null_count += 1
else:
assert b_val == a_val + 1
Loading
Loading