feat: add custom Lance metrics to trace read-path scan performance#460
feat: add custom Lance metrics to trace read-path scan performance#460summaryzb wants to merge 1 commit intolance-format:mainfrom
Conversation
Change-Id: I63dd17d7e8469c27a73251d7eca3ac373d279d7f
|
Some review notes on the metrics approach — splitting into perf concerns, naming/correctness fixes, and gaps in coverage. Performance concernsThe overall approach is sound — thread-confined long counters, no locks, and
One unrelated correctness win bundled in worth noting: Naming / correctness fixes
Gaps in coverageThe current set tells you "how long did Lance take" but not "where did the time go" or "did pushdown work." Proposed final metric set: Counts (
Times (
Drop: If The two I'd treat as blocking are |
|
This is awesome! Appreciate the effort here, it will be super useful. I left a comment ^^^ kind of just dumping my claude session during review (sorry for the AI comment). My takeaways: |
Summary
Adds custom metrics to the Lance Spark read path using Spark's DataSource V2
CustomMetricAPI, enabling per-task timing and counter instrumentation that surfaces on the Spark UI Scan node. Six metrics are tracked: fragments scanned, batches read, dataset open time, scanner creation time, batch load time, and a derived total scan time.Motivation
Implement #459
before this pr
after this pr
Approach
The implementation uses Spark's
CustomMetric/CustomTaskMetricAPI, which is the standard DataSource V2 mechanism for surfacing connector-specific metrics in the Spark UI.Metric definitions (
LanceCustomMetrics): SixCustomSumMetricinner classes define the metrics. Each has a public no-arg constructor (required by Spark's reflection-based instantiation). A staticallMetrics()method returns all definitions forLanceScan.supportedCustomMetrics(). TheCustomSumMetricbase class handles aggregation across tasks automatically.Executor-side tracking (
LanceReadMetricsTracker): A thread-confined accumulator that lives inside eachPartitionReader. It collects per-phase nanosecond timings and counters via simpleadd*()methods. ThecurrentMetricsValues()method returns a snapshot array ofCustomTaskMetricinstances -- Spark calls this after eachnext()invocation. The derivedscanTimeNsmetric is computed asdatasetOpenTimeNs + scannerCreateTimeNs + batchLoadTimeNs.Instrumentation points: Timing is captured at three boundaries in the scan lifecycle:
LanceFragmentScanner.create()wrapsDataset.open()andfragment.newScan()withSystem.nanoTime()measurements, storing the durations as instance fields.LanceFragmentColumnarBatchScanner.loadNextBatch()measures eachArrowReader.loadNextBatch()call.LanceColumnarPartitionReaderreads these timings from the scanner and feeds them into itsLanceReadMetricsTracker.The same pattern is applied to
LanceCountStarPartitionReader(pushed-downCOUNT(*)) andLanceRowPartitionReader(delegates to the columnar reader). All three reader types overridecurrentMetricsValues()to report metrics to Spark.Test Coverage
allMetrics()returns exactly 6 metrics.LanceCustomMetrics.CustomSumMetric.aggregateTaskMetrics()correctly sums values (including empty array).add*()calls accumulate correctly; derivedscanTimeNsequals sum of three sub-timings.addFragmentsScanned()supports multi-fragment increments.SELECT x, yquery produces non-zero values for all six metrics, andscanTimeNs == datasetOpenTimeNs + scannerCreateTimeNs + batchLoadTimeNsafter Spark aggregation.LanceCountStarPartitionReaderpath also produces non-zero values for all metrics.