Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add initial scanner statistics #3075

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.2
rev: v0.4.1
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
54 changes: 54 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Iterator,
Expand Down Expand Up @@ -50,6 +51,7 @@
CleanupStats,
Compaction,
CompactionMetrics,
LanceScanStats,
LanceSchema,
_Dataset,
_MergeInsertBuilder,
Expand Down Expand Up @@ -322,6 +324,7 @@ def scanner(
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None,
) -> LanceScanner:
"""Return a Scanner that can support various pushdowns.

Expand Down Expand Up @@ -414,6 +417,17 @@ def scanner(
fast_search: bool, default False
If True, then the search will only be performed on the indexed data, which
yields faster search time.
stats_handler: 'full', 'brief', or Callable, default None
If None then stats will not be collected.

If 'full' then detailed stats will be written to the log file (with DEBUG
level). This includes the scan plan.

If 'brief' then brief stats (overall timing) will be written to the log
file.

If a callable, then the callable will be provided an instance of
LanceScanStats when the scan is complete.

Notes
-----
Expand Down Expand Up @@ -463,6 +477,7 @@ def setopt(opt, val):
setopt(builder.use_stats, use_stats)
setopt(builder.use_scalar_index, use_scalar_index)
setopt(builder.fast_search, fast_search)
setopt(builder.stats_handler, stats_handler)

# columns=None has a special meaning. we can't treat it as "user didn't specify"
if self._default_scan_options is None:
Expand Down Expand Up @@ -543,6 +558,7 @@ def to_table(
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None,
) -> pa.Table:
"""Read the data into memory as a :py:class:`pyarrow.Table`

Expand Down Expand Up @@ -612,6 +628,17 @@ def to_table(
currently only supports a single column in the columns list.
- query: str
The query string to search for.
stats_handler: 'full', 'brief', or Callable, default None
If None then stats will not be collected.

If 'full' then detailed stats will be written to the log file (with DEBUG
level). This includes the scan plan.

If 'brief' then brief stats (overall timing) will be written to the log
file.

If a callable, then the callable will be provided an instance of
LanceScanStats when the scan is complete.

Notes
-----
Expand Down Expand Up @@ -639,6 +666,7 @@ def to_table(
use_stats=use_stats,
fast_search=fast_search,
full_text_query=full_text_query,
stats_handler=stats_handler,
).to_table()

@property
Expand Down Expand Up @@ -723,6 +751,7 @@ def to_batches(
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None,
**kwargs,
) -> Iterator[pa.RecordBatch]:
"""Read the dataset as materialized record batches.
Expand Down Expand Up @@ -754,6 +783,7 @@ def to_batches(
with_row_address=with_row_address,
use_stats=use_stats,
full_text_query=full_text_query,
stats_handler=stats_handler,
).to_batches()

def sample(
Expand Down Expand Up @@ -2960,6 +2990,7 @@ def __init__(self, ds: LanceDataset):
self._fast_search = False
self._full_text_query = None
self._use_scalar_index = None
self._stats_handler = None

def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder:
for key, value in default_opts.items():
Expand Down Expand Up @@ -3251,6 +3282,28 @@ def full_text_search(
self._full_text_query = {"query": query, "columns": columns}
return self

def stats_handler(
self, handler: Union[Callable[[LanceScanStats], None], str]
) -> ScannerBuilder:
"""
Sets the handler for scan statistics

Scan statistics are gathered while the scan is run and, once the scan is
complete, these statistics are given to the handler.

If 'full' then full stats (including the plan) are collected and logged.

If 'brief' then only global scan timers are collected and logged.

If the handler is a callable then the stats will be provided to the callable.

Note: the format of the log message provided by 'full' and 'brief' is NOT
stable and subject to change. Prefer using a callable instead of parsing the
log messages.
"""
self._stats_handler = handler
return self

def to_scanner(self) -> LanceScanner:
scanner = self.ds._ds.scanner(
self._columns,
Expand All @@ -3274,6 +3327,7 @@ def to_scanner(self) -> LanceScanner:
self._full_text_query,
self._late_materialization,
self._use_scalar_index,
self._stats_handler,
)
return LanceScanner(scanner, self.ds)

Expand Down
9 changes: 9 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,12 @@ def bfloat16_array(values: List[str | None]) -> BFloat16Array: ...

__version__: str
language_model_home: Callable[[], str]

class LanceScanStats:
start: int
end: int
wall_clock_duration: float
wall_clock_throughput: float
output_rows: int
estimated_output_bytes: int
plan: Optional[str]
25 changes: 25 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,31 @@ def test_dataset_restore(tmp_path: Path):
assert dataset.count_rows() == 100


def test_scan_stats(tmp_path: Path):
data = pa.table({"a": range(100)})
dataset = lance.write_dataset(data, tmp_path)

dataset.to_table(stats_handler="full")
dataset.to_table(stats_handler="brief")

now = datetime.now()
unix_now = time.mktime(now.timetuple())

def assert_stats(stats: lance.lance.LanceScanStats):
# Start/end should be within 5 minutes of now
assert stats.start / 1000 < unix_now + 300
assert stats.start / 1000 > unix_now - 300
assert stats.end / 1000 < unix_now + 300
assert stats.end / 1000 > unix_now - 300
assert stats.wall_clock_duration > 0
assert stats.wall_clock_throughput > 0
assert stats.output_rows == 100
assert stats.estimated_output_bytes == 800
assert stats.plan is not None

dataset.to_table(stats_handler=assert_stats)


def test_mixed_mode_overwrite(tmp_path: Path):
data = pa.table({"a": range(100)})
dataset = lance.write_dataset(data, tmp_path, data_storage_version="legacy")
Expand Down
37 changes: 35 additions & 2 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow_array::Array;
use futures::{StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::refs::{Ref, TagContents};
use lance::dataset::scanner::stats::ScanStatisticsHandler;
use lance::dataset::scanner::MaterializationStyle;
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
use lance::dataset::{
Expand Down Expand Up @@ -67,7 +68,7 @@ use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
use pyo3::exceptions::{PyStopIteration, PyTypeError};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString};
use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString, PyTuple};
use pyo3::{
exceptions::{PyIOError, PyKeyError, PyValueError},
pyclass,
Expand All @@ -79,6 +80,7 @@ use snafu::{location, Location};
use crate::error::PythonErrorExt;
use crate::file::object_store_from_uri_or_path;
use crate::fragment::FileFragment;
use crate::scanner::LanceScanStats;
use crate::schema::LanceSchema;
use crate::session::Session;
use crate::utils::PyLance;
Expand Down Expand Up @@ -485,7 +487,7 @@ impl Dataset {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None))]
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, stats_handler=None))]
fn scanner(
self_: PyRef<'_, Self>,
columns: Option<Vec<String>>,
Expand All @@ -509,6 +511,7 @@ impl Dataset {
full_text_query: Option<&Bound<'_, PyDict>>,
late_materialization: Option<PyObject>,
use_scalar_index: Option<bool>,
stats_handler: Option<Bound<PyAny>>,
) -> PyResult<Scanner> {
let mut scanner: LanceScanner = self_.ds.scan();
match (columns, columns_with_transform) {
Expand Down Expand Up @@ -589,6 +592,36 @@ impl Dataset {
scanner.fragment_readahead(fragment_readahead);
}

if let Some(stats_handler) = stats_handler {
if stats_handler.downcast::<PyString>().is_ok() {
let stats_handler = stats_handler.extract::<String>()?;
scanner.stats_handler(
stats_handler
.parse::<ScanStatisticsHandler>()
.infer_error()?,
);
} else if stats_handler.is_callable() {
let stats_handler = stats_handler.unbind();
let stats_handler = ScanStatisticsHandler::Custom(Arc::new(move |stats| {
let wrapped_stats = LanceScanStats::new(stats);
Python::with_gil(|py| {
let args = PyTuple::new_bound(py, vec![wrapped_stats.into_py(py)]);
stats_handler.call1(py, args)
})
.map_err(|err| lance_core::Error::Wrapped {
error: err.into(),
location: location!(),
})?;
Comment on lines +607 to +614
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might recommend using format_python_error() error here. The nice thing it does is puts the traceback in the error message, which helps Python users a lot in identifying the source of an error.

Ok(())
}));
scanner.stats_handler(stats_handler);
} else {
return Err(PyValueError::new_err(
"stats_handler must be a string or a callable",
));
}
}

scanner.scan_in_order(scan_in_order.unwrap_or(true));

if with_row_id.unwrap_or(false) {
Expand Down
2 changes: 2 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use futures::StreamExt;
use lance_index::DatasetIndexExt;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use scanner::LanceScanStats;
use session::Session;

#[macro_use]
Expand Down Expand Up @@ -122,6 +123,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceColumnMetadata>()?;
m.add_class::<LancePageMetadata>()?;
m.add_class::<LanceBufferDescriptor>()?;
m.add_class::<LanceScanStats>()?;
m.add_class::<BFloat16>()?;
m.add_class::<CleanupStats>()?;
m.add_class::<KMeans>()?;
Expand Down
Loading
Loading