From 32ce86575646b7520bcd4228ce442a19cf211d90 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 31 Oct 2024 13:27:03 -0700 Subject: [PATCH 1/6] Add initial scan statistics --- python/python/lance/dataset.py | 54 +++++++ python/python/lance/lance/__init__.pyi | 9 ++ python/python/tests/test_dataset.py | 25 +++ python/src/dataset.rs | 38 ++++- python/src/lib.rs | 2 + python/src/scanner.rs | 125 +++++++++++++++ rust/lance/src/dataset/scanner.rs | 75 +++++++-- rust/lance/src/dataset/scanner/stats.rs | 202 ++++++++++++++++++++++++ rust/lance/src/dataset/take.rs | 9 +- 9 files changed, 524 insertions(+), 15 deletions(-) create mode 100644 rust/lance/src/dataset/scanner/stats.rs diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 01358195af..70341ecf57 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -18,6 +18,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Dict, Iterable, Iterator, @@ -50,6 +51,7 @@ CleanupStats, Compaction, CompactionMetrics, + LanceScanStats, LanceSchema, _Dataset, _MergeInsertBuilder, @@ -322,6 +324,7 @@ def scanner( io_buffer_size: Optional[int] = None, late_materialization: Optional[bool | List[str]] = None, use_scalar_index: Optional[bool] = None, + stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None, ) -> LanceScanner: """Return a Scanner that can support various pushdowns. @@ -414,6 +417,17 @@ def scanner( fast_search: bool, default False If True, then the search will only be performed on the indexed data, which yields faster search time. + stats_handler: 'full', 'brief', or Callable, default None + If None then stats will not be collected. + + If 'full' then detailed stats will be written to the log file (with DEBUG + level). This includes the scan plan. + + If 'brief' then brief stats (overall timing) will be written to the log + file. + + If a callable, then the callable will be provided an instance of + LanceScanStats when the scan is complete. Notes ----- @@ -463,6 +477,7 @@ def setopt(opt, val): setopt(builder.use_stats, use_stats) setopt(builder.use_scalar_index, use_scalar_index) setopt(builder.fast_search, fast_search) + setopt(builder.stats_handler, stats_handler) # columns=None has a special meaning. we can't treat it as "user didn't specify" if self._default_scan_options is None: @@ -543,6 +558,7 @@ def to_table( io_buffer_size: Optional[int] = None, late_materialization: Optional[bool | List[str]] = None, use_scalar_index: Optional[bool] = None, + stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None, ) -> pa.Table: """Read the data into memory as a :py:class:`pyarrow.Table` @@ -612,6 +628,17 @@ def to_table( currently only supports a single column in the columns list. - query: str The query string to search for. + stats_handler: 'full', 'brief', or Callable, default None + If None then stats will not be collected. + + If 'full' then detailed stats will be written to the log file (with DEBUG + level). This includes the scan plan. + + If 'brief' then brief stats (overall timing) will be written to the log + file. + + If a callable, then the callable will be provided an instance of + LanceScanStats when the scan is complete. Notes ----- @@ -639,6 +666,7 @@ def to_table( use_stats=use_stats, fast_search=fast_search, full_text_query=full_text_query, + stats_handler=stats_handler, ).to_table() @property @@ -723,6 +751,7 @@ def to_batches( io_buffer_size: Optional[int] = None, late_materialization: Optional[bool | List[str]] = None, use_scalar_index: Optional[bool] = None, + stats_handler: Optional[Union[Callable[[LanceScanStats], None], str]] = None, **kwargs, ) -> Iterator[pa.RecordBatch]: """Read the dataset as materialized record batches. @@ -754,6 +783,7 @@ def to_batches( with_row_address=with_row_address, use_stats=use_stats, full_text_query=full_text_query, + stats_handler=stats_handler, ).to_batches() def sample( @@ -2960,6 +2990,7 @@ def __init__(self, ds: LanceDataset): self._fast_search = False self._full_text_query = None self._use_scalar_index = None + self._stats_handler = None def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder: for key, value in default_opts.items(): @@ -3251,6 +3282,28 @@ def full_text_search( self._full_text_query = {"query": query, "columns": columns} return self + def stats_handler( + self, handler: Union[Callable[[LanceScanStats], None], str] + ) -> ScannerBuilder: + """ + Sets the handler for scan statistics + + Scan statistics are gathered while the scan is run and, once the scan is + complete, these statistics are given to the handler. + + If 'full' then full stats (including the plan) are collected and logged. + + If 'brief' then only global scan timers are collected and logged. + + If the handler is a callable then the stats will be provided to the callable. + + Note: the format of the log message provided by 'full' and 'brief' is NOT + stable and subject to change. Prefer using a callable instead of parsing the + log messages. + """ + self._stats_handler = handler + return self + def to_scanner(self) -> LanceScanner: scanner = self.ds._ds.scanner( self._columns, @@ -3274,6 +3327,7 @@ def to_scanner(self) -> LanceScanner: self._full_text_query, self._late_materialization, self._use_scalar_index, + self._stats_handler, ) return LanceScanner(scanner, self.ds) diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index b894dec238..50d530326c 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -452,3 +452,12 @@ def bfloat16_array(values: List[str | None]) -> BFloat16Array: ... __version__: str language_model_home: Callable[[], str] + +class LanceScanStats: + start: int + end: int + wall_clock_duration: float + wall_clock_throughput: float + output_rows: int + estimated_output_bytes: int + plan: Optional[str] diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 73fae33863..b3378a7356 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2234,6 +2234,31 @@ def test_dataset_restore(tmp_path: Path): assert dataset.count_rows() == 100 +def test_scan_stats(tmp_path: Path): + data = pa.table({"a": range(100)}) + dataset = lance.write_dataset(data, tmp_path) + + dataset.to_table(stats_handler="full") + dataset.to_table(stats_handler="brief") + + now = datetime.now() + unix_now = time.mktime(now.timetuple()) + + def assert_stats(stats: lance.lance.LanceScanStats): + # Start/end should be within 5 minutes of now + assert stats.start / 1000 < unix_now + 300 + assert stats.start / 1000 > unix_now - 300 + assert stats.end / 1000 < unix_now + 300 + assert stats.end / 1000 > unix_now - 300 + assert stats.wall_clock_duration > 0 + assert stats.wall_clock_throughput > 0 + assert stats.output_rows == 100 + assert stats.estimated_output_bytes == 800 + assert stats.plan is not None + + dataset.to_table(stats_handler=assert_stats) + + def test_mixed_mode_overwrite(tmp_path: Path): data = pa.table({"a": range(100)}) dataset = lance.write_dataset(data, tmp_path, data_storage_version="legacy") diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 80475bfd82..8e93895179 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -31,6 +31,7 @@ use arrow_array::Array; use futures::{StreamExt, TryFutureExt}; use lance::dataset::builder::DatasetBuilder; use lance::dataset::refs::{Ref, TagContents}; +use lance::dataset::scanner::stats::ScanStatisticsHandler; use lance::dataset::scanner::MaterializationStyle; use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt}; use lance::dataset::{ @@ -67,7 +68,7 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use pyo3::exceptions::{PyStopIteration, PyTypeError}; use pyo3::prelude::*; -use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString}; +use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString, PyTuple}; use pyo3::{ exceptions::{PyIOError, PyKeyError, PyValueError}, pyclass, @@ -78,7 +79,8 @@ use snafu::{location, Location}; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; -use crate::fragment::FileFragment; +use crate::fragment::{FileFragment, FragmentMetadata}; +use crate::scanner::LanceScanStats; use crate::schema::LanceSchema; use crate::session::Session; use crate::utils::PyLance; @@ -509,6 +511,7 @@ impl Dataset { full_text_query: Option<&Bound<'_, PyDict>>, late_materialization: Option, use_scalar_index: Option, + stats_handler: Option>, ) -> PyResult { let mut scanner: LanceScanner = self_.ds.scan(); match (columns, columns_with_transform) { @@ -589,6 +592,37 @@ impl Dataset { scanner.fragment_readahead(fragment_readahead); } + if let Some(stats_handler) = stats_handler { + if stats_handler.downcast::().is_ok() { + let stats_handler = stats_handler.extract::()?; + scanner.stats_handler( + stats_handler + .parse::() + .infer_error()?, + ); + } else if stats_handler.is_callable() { + let stats_handler = stats_handler.unbind(); + let stats_handler = ScanStatisticsHandler::Custom(Arc::new(move |stats| { + let wrapped_stats = LanceScanStats::new(stats); + let stats_handler = stats_handler.clone(); + Python::with_gil(move |py| { + let args = PyTuple::new(py, vec![wrapped_stats.into_py(py)]); + stats_handler.call1(py, args) + }) + .map_err(|err| lance_core::Error::Wrapped { + error: err.into(), + location: location!(), + })?; + Ok(()) + })); + scanner.stats_handler(stats_handler); + } else { + return Err(PyValueError::new_err( + "stats_handler must be a string or a callable", + )); + } + } + scanner.scan_in_order(scan_in_order.unwrap_or(true)); if with_row_id.unwrap_or(false) { diff --git a/python/src/lib.rs b/python/src/lib.rs index 02d5c0e4a3..c3e7540f1b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -44,6 +44,7 @@ use futures::StreamExt; use lance_index::DatasetIndexExt; use pyo3::exceptions::{PyIOError, PyValueError}; use pyo3::prelude::*; +use scanner::LanceScanStats; use session::Session; #[macro_use] @@ -122,6 +123,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/scanner.rs b/python/src/scanner.rs index d32c02ac98..1f500bda3d 100644 --- a/python/src/scanner.rs +++ b/python/src/scanner.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use arrow::pyarrow::*; use arrow_array::RecordBatchReader; +use lance::dataset::scanner::stats::ScannerStats; +use lance::dataset::scanner::stats::ThroughputUnit; use pyo3::prelude::*; use pyo3::pyclass; @@ -28,6 +30,129 @@ use pyo3::exceptions::PyValueError; use crate::reader::LanceReader; use crate::RT; +#[pyclass] +pub struct LanceScanStats { + inner: ScannerStats, +} + +impl LanceScanStats { + pub fn new(inner: ScannerStats) -> Self { + Self { inner } + } +} + +#[pymethods] +impl LanceScanStats { + /// The start of the scan + /// + /// This is when the stream is constructed, not when it is first consumed. + /// + /// Returned as milliseconds since the UNIX epoch + #[getter] + fn start(&self) -> PyResult { + Ok(self + .inner + .start + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap()) + } + + /// The end of the scan + /// + /// This is when the last batch is provided to the consumer which may be + /// well after the I/O has finished (if there is a slow consumer or expensive + /// decode). + /// + /// Returned as milliseconds since the UNIX epoch + #[getter] + fn end(&self) -> PyResult { + Ok(self + .inner + .end + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap()) + } + + /// The wall clock duration of the scan + /// + /// NOTE: This is not the time that the scanner was actually doing work, and not the amount + /// of time spent in I/O but simply the time from when the scanner was created to when the + /// last batch was provided to the consumer. + /// + /// As an example, if a consumer is slow to consume the data (e.g. they are writing the data + /// back out to disk or doing expensive processing) then this will be much larger than the + /// actual time to read the data. + /// + /// Returned as floating point seconds + #[getter] + fn wall_clock_duration(&self) -> PyResult { + Ok(self.inner.wall_clock_duration.as_secs_f64()) + } + + /// This is an estimate of the "wall clock throughput" in GiB/s + /// + /// Note: this is based both on :ref:`wall_clock_duration` (see note on that method) and + /// :ref:`estimated_output_bytes` (see note on that field). + /// + /// It is not safe, for example, to assume that this is the rate at which data was pulled down + /// from storage. + /// + /// Returned as floating point GiB/s + #[getter] + fn wall_clock_throughput(&self) -> PyResult { + Ok(self.inner.wall_clock_throughput().gigabytes_per_second()) + } + + /// The number of rows output by the scanner + #[getter] + fn output_rows(&self) -> PyResult { + Ok(self.inner.output_rows) + } + + /// The estimated size of the output in bytes + /// + /// "Estimated" is used here because there may be some instances where multiple + /// batches will share the same underlying buffer (e.g. a dictionary) and so the + /// actual data size may be less than the reported size. + /// + /// Also, this is very different than "input bytes" which may be much smaller since + /// the input may be compressed or encoded. + /// + /// This will always be greater than or equal to the actual size. + #[getter] + fn estimated_output_bytes(&self) -> PyResult { + Ok(self.inner.estimated_output_bytes) + } + + /// The plan that was used to generate the scan + /// + /// There are some instances where we generate a scan without a plan and some handlers + /// do not need the plan and so we may not gather it. In these cases this will be None. + #[getter] + fn plan(&self) -> PyResult> { + Ok(self.inner.plan.clone()) + } + + fn __repr__(&self) -> String { + format!( + "LanceScanStats(start={}, end={}, wall_clock_duration={}, wall_clock_throughput={}, output_rows={}, estimated_output_bytes={}, plan={:?})", + self.start().unwrap(), + self.end().unwrap(), + self.wall_clock_duration().unwrap(), + self.wall_clock_throughput().unwrap(), + self.output_rows().unwrap(), + self.estimated_output_bytes().unwrap(), + self.plan().unwrap() + ) + } +} + /// This will be wrapped by a python class to provide /// additional functionality #[pyclass(name = "_Scanner", module = "_lib")] diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 265c9a2220..76cffdbc10 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -55,6 +55,7 @@ use lance_io::stream::RecordBatchStream; use lance_linalg::distance::MetricType; use lance_table::format::{Fragment, Index}; use roaring::RoaringBitmap; +use stats::{ScanStatisticsHandler, ScannerStatsCollector, DEFAULT_STATS_HANDLER}; use tracing::{info_span, instrument, Span}; use super::Dataset; @@ -75,6 +76,8 @@ use snafu::{location, Location}; #[cfg(feature = "substrait")] use lance_datafusion::substrait::parse_substrait; +pub mod stats; + pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192; // For backwards compatibility / historical reasons we re-calculate the default batch size // on each call @@ -327,6 +330,9 @@ pub struct Scanner { /// This is essentially a weak consistency search. Users can run index or optimize index /// to make the index catch up with the latest data. fast_search: bool, + + /// How should scan statistics be handled + statistics_handler: ScanStatisticsHandler, } fn escape_column_name(name: &str) -> String { @@ -365,6 +371,7 @@ impl Scanner { fragments: None, fast_search: false, use_scalar_index: true, + statistics_handler: DEFAULT_STATS_HANDLER.clone(), } } @@ -609,6 +616,17 @@ impl Scanner { self } + /// Set how to handle scan statistics. + /// + /// By default, statistics are not reported. + /// + /// Scan statistics are collected during the scan and will be provided at the end. This + /// can be useful for debugging and profiling scan performance. + pub fn stats_handler(&mut self, stats_handler: ScanStatisticsHandler) -> &mut Self { + self.statistics_handler = stats_handler; + self + } + /// Set limit and offset. /// /// If offset is set, the first offset rows will be skipped. If limit is set, @@ -1004,10 +1022,21 @@ impl Scanner { async move { let plan = self.create_plan().await?; - Ok(DatasetRecordBatchStream::new(execute_plan( - plan, - LanceExecutionOptions::default(), - )?)) + let stats_handler = self.statistics_handler.clone(); + let plan_str = if matches!( + stats_handler, + ScanStatisticsHandler::DoNotReport | ScanStatisticsHandler::LogBrief + ) { + None + } else { + Some(Self::plan_to_string(plan.as_ref(), /*verbose=*/ true)) + }; + + Ok(DatasetRecordBatchStream::new( + execute_plan(plan, LanceExecutionOptions::default())?, + Some(stats_handler), + plan_str, + )) } .boxed() } @@ -2286,12 +2315,15 @@ impl Scanner { )) } + fn plan_to_string(plan: &dyn ExecutionPlan, verbose: bool) -> String { + let display = DisplayableExecutionPlan::new(plan); + format!("{}", display.indent(verbose)) + } + #[instrument(level = "info", skip(self))] pub async fn explain_plan(&self, verbose: bool) -> Result { let plan = self.create_plan().await?; - let display = DisplayableExecutionPlan::new(plan.as_ref()); - - Ok(format!("{}", display.indent(verbose))) + Ok(Self::plan_to_string(plan.as_ref(), verbose)) } } @@ -2303,12 +2335,23 @@ pub struct DatasetRecordBatchStream { #[pin] exec_node: SendableRecordBatchStream, span: Span, + stats_collector: Option, } impl DatasetRecordBatchStream { - pub fn new(exec_node: SendableRecordBatchStream) -> Self { + pub fn new( + exec_node: SendableRecordBatchStream, + stats_handler: Option, + plan: Option, + ) -> Self { + let stats_handler = stats_handler.unwrap_or_else(|| DEFAULT_STATS_HANDLER.clone()); + let stats_collector = ScannerStatsCollector::new(stats_handler, plan); let span = info_span!("DatasetRecordBatchStream"); - Self { exec_node, span } + Self { + exec_node, + span, + stats_collector: Some(stats_collector), + } } } @@ -2326,6 +2369,20 @@ impl Stream for DatasetRecordBatchStream { let _guard = this.span.enter(); match this.exec_node.poll_next_unpin(cx) { Poll::Ready(result) => { + match &result { + Some(Ok(batch)) => { + if let Some(stats_collector) = this.stats_collector.as_mut() { + stats_collector.observe_batch(batch); + } + } + Some(Err(_)) | None => { + if let Some(stats_collector) = this.stats_collector.take() { + if let Err(err) = stats_collector.finish() { + return Poll::Ready(Some(Err(err))); + } + } + } + } Poll::Ready(result.map(|r| r.map_err(|e| Error::io(e.to_string(), location!())))) } Poll::Pending => Poll::Pending, diff --git a/rust/lance/src/dataset/scanner/stats.rs b/rust/lance/src/dataset/scanner/stats.rs new file mode 100644 index 0000000000..6958cf6508 --- /dev/null +++ b/rust/lance/src/dataset/scanner/stats.rs @@ -0,0 +1,202 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use arrow_array::RecordBatch; +use lance_core::{Error, Result}; +use snafu::{location, Location}; + +pub struct ScannerStats { + /// The start of the scan + /// + /// This is when the stream is constructed, not when it is first consumed. + pub start: SystemTime, + /// The end of the scan + /// + /// This is when the last batch is provided to the consumer which may be + /// well after the I/O has finished (if there is a slow consumer or expensive + /// decode). + pub end: SystemTime, + /// The wall clock duration of the scan + /// + /// NOTE: This is not the time that the scanner was actually doing work, and not the amount + /// of time spent in I/O but simply the time from when the scanner was created to when the + /// last batch was provided to the consumer. + /// + /// As an example, if a consumer is slow to consume the data (e.g. they are writing the data + /// back out to disk or doing expensive processing) then this will be much larger than the + /// actual time to read the data. + pub wall_clock_duration: Duration, + /// The number of rows output by the scanner + pub output_rows: u64, + /// The estimated size of the output in bytes + /// + /// "Estimated" is used here because there may be some instances where multiple + /// batches will share the same underlying buffer (e.g. a dictionary) and so the + /// actual data size may be less than the reported size. + /// + /// Also, this is very different than "input bytes" which may be much smaller since + /// the input may be compressed or encoded. + /// + /// This will always be greater than or equal to the actual size. + pub estimated_output_bytes: u64, + /// The plan that was used to generate the scan + /// + /// There are some instances where we generate a scan without a plan and some handlers + /// do not need the plan and so we may not gather it. In these cases this will be None. + pub plan: Option, +} + +impl ScannerStats { + /// This is an estimate of the "wall clock throughput" in GiB/s + /// + /// Note: this is based both on [`Self::wall_clock_duration`] (see note on that method) and + /// [`Self::estimated_output_bytes`] (see note on that field). + /// + /// It is not safe, for example, to assume that this is the rate at which data was pulled down + /// from storage. + pub fn wall_clock_throughput(&self) -> impl ThroughputUnit { + let duration_secs = self.wall_clock_duration.as_secs_f64(); + if duration_secs == 0.0 { + return 0.0; + } + self.estimated_output_bytes as f64 / duration_secs + } +} + +pub trait ThroughputUnit { + fn gigabytes_per_second(&self) -> f64; +} + +/// Here we assume that the throughput is in B/s +impl ThroughputUnit for f64 { + fn gigabytes_per_second(&self) -> f64 { + self / (1024.0 * 1024.0 * 1024.0) + } +} + +pub(super) struct ScannerStatsCollector { + start: Instant, + start_time: SystemTime, + output_rows: u64, + estimated_output_bytes: u64, + plan: Option, + handler: ScanStatisticsHandler, +} + +impl ScannerStatsCollector { + pub fn new(handler: ScanStatisticsHandler, plan: Option) -> Self { + let start = Instant::now(); + let start_time = SystemTime::now(); + Self { + start, + start_time, + output_rows: 0, + estimated_output_bytes: 0, + plan, + handler, + } + } + + pub fn observe_batch(&mut self, batch: &RecordBatch) { + self.output_rows += batch.num_rows() as u64; + self.estimated_output_bytes += batch + .columns() + .iter() + .map(|c| c.get_buffer_memory_size() as u64) + .sum::(); + } + + pub fn finish(self) -> Result<()> { + let end = Instant::now(); + let end_time = SystemTime::now(); + let stats = ScannerStats { + start: self.start_time, + end: end_time, + wall_clock_duration: (end - self.start), + output_rows: self.output_rows, + estimated_output_bytes: self.estimated_output_bytes, + plan: self.plan, + }; + match self.handler { + ScanStatisticsHandler::DoNotReport => Ok(()), + ScanStatisticsHandler::LogBrief => { + log::debug!( + "Scan wall time {}s ({} GiB/s), output {} rows, estimated output size {} bytes", + stats.wall_clock_throughput().gigabytes_per_second(), + stats.wall_clock_duration.as_secs_f64(), + stats.output_rows, + stats.estimated_output_bytes + ); + Ok(()) + } + ScanStatisticsHandler::LogFull => { + log::debug!( + "Scan wall time {}s ({} GiB/s), output {} rows, estimated output size {} bytes, plan: {}", + stats.wall_clock_duration.as_secs_f64(), + stats.wall_clock_throughput().gigabytes_per_second(), + stats.output_rows, + stats.estimated_output_bytes, + stats.plan.as_deref().unwrap_or("N/A") + ); + Ok(()) + } + ScanStatisticsHandler::Custom(handler) => handler(stats), + } + } +} + +/// Describes how statistics should be handled +#[derive(Clone)] +pub enum ScanStatisticsHandler { + /// Do not report (and possibly even do not gather) any statistics + DoNotReport, + /// Log the scan statistics at the end of the scan + LogBrief, + /// Log the scan statistics (and the scan plan) at the end of the scan + LogFull, + /// Call a custom function with the statistics at the end of the scan + Custom(Arc Result<()> + Send + Sync>), +} + +impl std::fmt::Debug for ScanStatisticsHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::DoNotReport => write!(f, "DoNotReport"), + Self::LogBrief => write!(f, "LogBrief"), + Self::LogFull => write!(f, "LogFull"), + Self::Custom(_) => write!(f, "Custom"), + } + } +} + +impl FromStr for ScanStatisticsHandler { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "brief" => Ok(Self::LogBrief), + "full" => Ok(Self::LogFull), + _ => Err(Error::InvalidInput { + source: format!("invalid value for ScanStatisticsHandler: {}", s).into(), + location: location!(), + }), + } + } +} + +lazy_static::lazy_static! { + pub(crate) static ref DEFAULT_STATS_HANDLER: ScanStatisticsHandler = match std::env::var("LANCE_SCAN_STATISTICS") { + Ok(val) => match val.as_str() { + "brief" => ScanStatisticsHandler::LogBrief, + "full" => ScanStatisticsHandler::LogFull, + _ => ScanStatisticsHandler::DoNotReport, + }, + Err(_) => ScanStatisticsHandler::DoNotReport, + }; +} diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index cf89fa905f..b8a21ab368 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -421,10 +421,11 @@ pub fn take_scan( }) .buffered(batch_readahead); - DatasetRecordBatchStream::new(Box::pin(RecordBatchStreamAdapter::new( - arrow_schema, - batch_stream, - ))) + DatasetRecordBatchStream::new( + Box::pin(RecordBatchStreamAdapter::new(arrow_schema, batch_stream)), + /*stats_handler=*/ None, // TODO: might want to allow custom handlers + /*plan=*/ None, + ) } struct RowAddressStats { From bf95b5b170e918c1987e93c99b456191a34db522 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 Jan 2025 06:23:50 -0800 Subject: [PATCH 2/6] ruff format --- python/python/lance/fragment.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 495e6552d1..cd4b25af94 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -692,7 +692,8 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_move_stable_row_ids: bool = False, - ) -> Transaction: ... + ) -> Transaction: + ... @overload def write_fragments( @@ -710,7 +711,8 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_move_stable_row_ids: bool = False, - ) -> List[FragmentMetadata]: ... + ) -> List[FragmentMetadata]: + ... def write_fragments( From fa1d09026d1f67491eab50f88a04cc9967558fde Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 Jan 2025 06:27:10 -0800 Subject: [PATCH 3/6] Address clippy warnings --- python/src/dataset.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8e93895179..0e6ff26787 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -79,7 +79,7 @@ use snafu::{location, Location}; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; -use crate::fragment::{FileFragment, FragmentMetadata}; +use crate::fragment::FileFragment; use crate::scanner::LanceScanStats; use crate::schema::LanceSchema; use crate::session::Session; @@ -487,7 +487,7 @@ impl Dataset { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None))] + #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, stats_handler=None))] fn scanner( self_: PyRef<'_, Self>, columns: Option>, @@ -606,7 +606,7 @@ impl Dataset { let wrapped_stats = LanceScanStats::new(stats); let stats_handler = stats_handler.clone(); Python::with_gil(move |py| { - let args = PyTuple::new(py, vec![wrapped_stats.into_py(py)]); + let args = PyTuple::new_bound(py, vec![wrapped_stats.into_py(py)]); stats_handler.call1(py, args) }) .map_err(|err| lance_core::Error::Wrapped { From 308df934bae0caa0c56a3acd4f42e4e8efcd4441 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 Jan 2025 06:33:58 -0800 Subject: [PATCH 4/6] Upgraded ruff in precommit to match whats in the workflow --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a49e64a867..e31ba3b6d6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.2.2 + rev: v0.4.1 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] From 986b8c6b4559bfeae945f6f382d6a09c2ab28154 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 Jan 2025 08:11:31 -0800 Subject: [PATCH 5/6] Ruff format --- python/python/lance/fragment.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index cd4b25af94..495e6552d1 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -692,8 +692,7 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_move_stable_row_ids: bool = False, - ) -> Transaction: - ... + ) -> Transaction: ... @overload def write_fragments( @@ -711,8 +710,7 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_move_stable_row_ids: bool = False, - ) -> List[FragmentMetadata]: - ... + ) -> List[FragmentMetadata]: ... def write_fragments( From fbc682bde912f499a3be36c1ec80b01325694014 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 Jan 2025 10:33:43 -0800 Subject: [PATCH 6/6] Avoid clone of stats_handler in a place where we don't have GIL --- python/src/dataset.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 0e6ff26787..a0f5465e4b 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -604,8 +604,7 @@ impl Dataset { let stats_handler = stats_handler.unbind(); let stats_handler = ScanStatisticsHandler::Custom(Arc::new(move |stats| { let wrapped_stats = LanceScanStats::new(stats); - let stats_handler = stats_handler.clone(); - Python::with_gil(move |py| { + Python::with_gil(|py| { let args = PyTuple::new_bound(py, vec![wrapped_stats.into_py(py)]); stats_handler.call1(py, args) })