Skip to content

Support TsfileDataFrame#765

Open
ycycse wants to merge 5 commits intoapache:developfrom
ycycse:tsdf
Open

Support TsfileDataFrame#765
ycycse wants to merge 5 commits intoapache:developfrom
ycycse:tsdf

Conversation

@ycycse
Copy link
Copy Markdown
Member

@ycycse ycycse commented Apr 2, 2026

This PR introduce TsFileDataFrame, which can read multi tsfile for model training usage. Just like use dataframe.

https://apache-iotdb-project.feishu.cn/docx/SenJdxlbuoUS5Uxmq7jcOUzdnob?from=from_copylink

Build Wheel

Prerequisite: build the C++ library first, since the Python package depends on the shared library from cpp/target/build.

From the repo root:

mvn -P with-cpp clean install -DskipTests

cd python
python setup.py build_ext --inplace
python setup.py bdist_wheel

pip install dist/*.whl

Validation

  Basic dataset regression tests:

  pytest -q python/tests/test_tsfile_dataset.py

Basic Usage

Open one TsFile

  from tsfile import TsFileDataFrame

  tsdf = TsFileDataFrame("/path/to/file.tsfile")
  print(tsdf)

Open a shard directory

TsFileDataFrame will recursively collect all .tsfile files under the directory.

  from tsfile import TsFileDataFrame

  tsdf = TsFileDataFrame("/path/to/dataset_dir")
  print(len(tsdf))
  print(tsdf.list_timeseries()[:5])

Filter by metadata

  filter_df = tsdf[tsdf['filed'] == 'weather']
  print(filter_df)

Access one logical series

By index:

  series = tsdf[0]
  print(series)
  print(series.name)
  print(series.stats)
  print(series[0])
  print(series[:10])

By logical series path:

  series = tsdf["weather.device_a.temperature"]
  print(series.timestamps)

Build subset views

  subset = tsdf[:10]
  print(subset)

  subset2 = tsdf[[0, 3, 5]]
  print(subset2.list_timeseries())

Time-aligned multi-series query

  aligned = tsdf.loc[0:1000, [0, 1]]
  print(aligned)
  print(aligned.timestamps)
  print(aligned.values)
  print(aligned.series_names)

Or use logical series names:

  aligned = tsdf.loc[
      0:1000,
      ["weather.device_a.temperature", "weather.device_a.humidity"],
  ]

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new Python-side “dataframe-like” API for reading TsFile table-model data across multiple TsFile shards, enabling series discovery, per-series access, and time-aligned multi-series queries for model training workflows.

Changes:

  • Added TsFileSeriesReader to discover series (table + tags + field) and read data via Arrow batch queries.
  • Added TsFileDataFrame (plus Timeseries / AlignedTimeseries) to unify multiple TsFiles, merge overlapping series, and support .loc aligned queries.
  • Exported the new classes from python/tsfile/__init__.py.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
python/tsfile/tsfile_series_reader.py New Arrow-based series reader with metadata discovery, timestamp caching, and range/time reads.
python/tsfile/tsfile_dataframe.py New multi-file dataframe-like abstraction with series selection, merged metadata, and .loc alignment.
python/tsfile/init.py Re-export TsFileDataFrame, Timeseries, and AlignedTimeseries from the package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _read_arrow, values are always converted via to_numpy().astype(np.float64). This will fail for FIELD columns that are not numeric (e.g., STRING/TEXT/BLOB/DATE) and can also fail when the Arrow array contains nulls (writing code allows NaN values, which become nulls on read). Consider filtering field_columns up-front to numeric TSDataType only, and/or using Arrow casting + null-to-NaN handling before converting to NumPy (and keeping non-numeric columns out of this reader).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has since been refactored into python/tsfile/dataset/reader.py. The dataset reader now only includes numeric FIELD columns during discovery, and non-numeric fields are excluded from the dataset path. The current read path also keeps an explicit numeric-compatibility check when converting Arrow arrays.

Comment on lines +280 to +285
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()

info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series_range() indexes timestamps[start] and timestamps[end - 1] without validating start/end (including start == end, negative indices, or end > length). This currently raises confusing IndexErrors and makes empty slices impossible. Add explicit range validation and return an empty result for start >= end to match the docstring’s [start, end) semantics.

Suggested change
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
if start < 0 or end < 0:
raise ValueError("start and end indices must be non-negative")
# Handle cached series data first, using [start, end) semantics
if series_path in self._series_data_cache:
data = self._series_data_cache[series_path]
length = len(data)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []
return data[start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
length = len(timestamps)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-tag grouping path builds tag_tuples in Python and then, for each unique tuple, constructs a full boolean mask with a Python loop ([t == ut for t in tag_tuples]). This is O(n * unique_tags) and will become a bottleneck for large tables. Consider using a vectorized approach (e.g., structured NumPy array + np.unique(..., return_inverse=True) or pandas/groupby) to compute groups and indices in (near) linear time.

Suggested change
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
# Multiple tag columns: use structured NumPy array for grouping
n = len(all_times)
# Build a structured array with one field per tag column
dtype = [(tc, all_tags[tc].dtype) for tc in tag_columns]
structured_tags = np.empty(n, dtype=dtype)
for tc in tag_columns:
structured_tags[tc] = all_tags[tc]
# Find unique tag combinations and an inverse index for grouping
unique_vals, inverse = np.unique(
structured_tags, return_inverse=True
)
# Group rows by unique tag combination using the inverse index
for group_id, ut in enumerate(unique_vals):
mask = inverse == group_id
tag_values = tuple(ut[tc] for tc in tag_columns)
self._register_tag_group(
table_name,
tag_columns,
tag_values,
field_columns,
all_times[mask],

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/reader.py. The pure-Python tuple/mask grouping path was replaced with a vectorized NumPy grouping implementation based on structured arrays and np.unique(..., return_inverse=True).

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

raise ValueError(f"Series not found: {series_path}")
if series_path not in self._series_data_cache:
data = self.read_series(series_path)
self._series_data_cache[series_path] = np.array(data, dtype=np.float32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series returns float64 values (via _read_arrow which casts to np.float64), but cache_series_data stores them as float32. When read_series is later called on a cached series, it returns float32 values silently, which can lose precision.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The old cached-series path was removed during the refactor, so this precision inconsistency no longer exists.

Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Comment on lines +316 to +317
if idx < 0 or idx >= len(self._df._series_list):
raise IndexError(f"Series index {idx} out of range")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative indices are not normalized (no idx = length + idx like other __getitem__ methods in this file). A user passing df.loc[:, [-1]] would get an IndexError instead of the last series.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/dataframe.py. Negative indices in .loc series selection are normalized now.

Comment on lines +567 to +568
merged = np.unique(np.concatenate(all_ts))
merged.sort()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.unique already returns a sorted array. The .sort() call is redundant.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +108 to +111
if max_rows is None or n_rows <= max_rows:
show_rows = list(range(n_rows))
else:
show_rows = list(range(max_rows))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When max_rows is exceeded, only the first max_rows rows are shown. In contrast, TsFileDataFrame._format_table shows head + "..." + tail. This inconsistency may confuse users expecting similar behavior from both display methods.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. The aligned display now uses the same head + ... + tail truncation style.

Comment on lines +837 to +846
def close(self):
"""Close all underlying readers.

No-op for subset views (they don't own readers).
"""
if self._is_view:
return
for reader in self._readers.values():
reader.close()
self._readers.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After close(), _readers is cleared but _series_map still holds references to closed readers. Any subsequent data access (e.g., tsdf[0][0]) will attempt to read from a closed reader, producing an unclear error.

Recommendation: Either invalidate _series_map too, or set a _closed flag and check it in data-access paths.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. we have a _closed flag now.

Comment on lines +73 to +90
def add_table(
self,
table_name: str,
tag_columns: Iterable[str],
tag_types: Iterable[TSDataType],
field_columns: Iterable[str],
) -> int:
table_id = len(self.table_entries)
self.table_entries.append(
TableEntry(
table_name=table_name,
tag_columns=tuple(tag_columns),
tag_types=tuple(tag_types),
field_columns=tuple(field_columns),
)
)
self.table_id_by_name[table_name] = table_id
return table_id
Copy link
Copy Markdown
Contributor

@jt2594838 jt2594838 Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the caller responsible for checking the existence of the table?

Notice that the same table may have different columns in different TsFiles, which should be merged.

Copy link
Copy Markdown
Member Author

@ycycse ycycse Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I missed that. In current impentation, same table across TsFiles are assumed to have the same schema which need some time to support field-union merge. Since we have validation to reject the situaion, I think we can note it down and follow up it with a separate PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have no further questions then.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants