Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatic conflict resolution #3191

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 123 additions & 5 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,123 @@ message Transaction {
// The UUID that unique identifies a transaction.
string uuid = 2;

// Optional version tag.
string tag = 3;
reserved 3; // Deprecated "tag"

// The list of operations that make up this transaction.
//
// The user operations are logical groupings of actions. These actions can be
// applied in sequence to the manifest to produce the final manifest.
//
// By grouping actions together, we can provide a human-readable history of
// the table.
//
// For example, consider the SQL query:
//
// ```sql
// BEGIN TRANSACTION;
// CREATE TABLE t (a INT);
// INSERT INTO t VALUES (1);
// SET CONFIG ON TABLE t lance.compaction.max_rows_per_file=100000
// COMMIT;
// ```
//
// Would be represented as:
//
// ```yaml
// user_operations:
// - description: "CREATE TABLE t (a INT);"
// uuid: 123e4567-e89b-12d3-a456-426655440001
// read_version: 0
// actions:
// - type: replace_schema
// schema:
// - name: a
// type: leaf
// logical_type: int
// nullable: true
// - description: "INSERT INTO t VALUES (1);"
// uuid: 123e4567-e89b-12d3-a456-426655440002
// read_version: 0
// actions:
// - type: add_fragments
// fragments:
// - id: 0
// files:
// - path: data/123e4567-e89b-12d3-a456-426655440000.lance
// fields: [0]
// - description: "SET CONFIG ON TABLE t lance.compaction.max_rows_per_file=100000"
// uuid: 123e4567-e89b-12d3-a456-426655440003
// read_version: 0
// actions:
// - type: upsert_config
// upsert_values:
// - key: lance.compaction.max_rows_per_file
// value: 100000
// ```
message CompositeOperation {
repeated UserOperation user_operations = 1;
}

// A logical grouping of actions that correspond to a single user action.
message UserOperation {
uint64 read_version = 1;
string uuid = 2;
string description = 4;
repeated Action actions = 5;
}

// An action to apply to the manifest.
message Action {
message AddFragments {
repeated DataFragment fragments = 1;
}

message DeleteFragments {
repeated uint64 deleted_fragment_ids = 1;
}

message UpdateFragments {
repeated DataFragment updated_fragments = 1;
}

message ReplaceFragments {
repeated DataFragment fragments = 1;
}

message ReplaceSchema {
repeated lance.file.Field schema = 1;
map<string, string> schema_metadata = 2;
}

message UpsertConfig {
map<string, string> upsert_values = 1;
repeated string delete_keys = 2;
}

message AddIndices {
repeated IndexMetadata indices = 1;
}

message RemoveIndices {
repeated IndexMetadata indices = 1;
}

message ReserveFragments {
uint32 num_fragments = 1;
}

oneof action {
AddFragments add_fragments = 1;
DeleteFragments delete_fragments = 2;
UpdateFragments update_fragments = 3;
ReplaceFragments replace_fragments = 4;
ReplaceSchema replace_schema = 5;
UpsertConfig upsert_config = 6;
AddIndices add_indices = 8;
RemoveIndices remove_indices = 9;
ReserveFragments reserve_fragments = 10;
}
}

// Add new rows to the dataset.
message Append {
Expand All @@ -46,7 +161,7 @@ message Transaction {
repeated uint64 deleted_fragment_ids = 2;
// The predicate that was evaluated
//
// This may be used to determine whether the delete would have affected
// This may be used to determine whether the delete would have affected
// files written by a concurrent transaction.
string predicate = 3;
}
Expand Down Expand Up @@ -158,13 +273,14 @@ message Transaction {
// The new fragments where updated rows have been moved to.
repeated DataFragment new_fragments = 3;
}

// An operation that updates the table config.
message UpdateConfig {
map<string, string> upsert_values = 1;
repeated string delete_keys = 2;
}

// TODO: deprecate operation
// The operation of this transaction.
oneof operation {
Append append = 100;
Expand All @@ -178,11 +294,13 @@ message Transaction {
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
CompositeOperation composite = 111;
}

// An operation to apply to the blob dataset
oneof blob_operation {
Append blob_append = 200;
Overwrite blob_overwrite = 202;
}
CompositeOperation blob_composite = 203;
}
}
2 changes: 2 additions & 0 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
mod fragment;
mod index;
mod manifest;
mod transaction;
mod action;

Check failure on line 12 in rust/lance-table/src/format.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

file not found for module `action`

Check failure on line 12 in rust/lance-table/src/format.rs

View workflow job for this annotation

GitHub Actions / linux-arm

file not found for module `action`

pub use fragment::*;
pub use index::Index;
Expand Down
11 changes: 0 additions & 11 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,6 @@ impl Manifest {
self.timestamp_nanos = nanos;
}

/// Set the `config` from an iterator
pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
self.config.extend(upsert_values);
}

/// Delete `config` keys using a slice of keys
pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
self.config
.retain(|key, _| !delete_keys.contains(&key.as_str()));
}

/// Check the current fragment list and update the high water mark
pub fn update_max_fragment_id(&mut self) {
let max_fragment_id = self
Expand Down
Loading
Loading