Skip to content

Commit bced86b

Browse files
committed
fix typo and address close problem
1 parent d79bf32 commit bced86b

File tree

4 files changed

+57
-1
lines changed

4 files changed

+57
-1
lines changed

python/tests/test_tsfile_dataset.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,34 @@ def test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
264264
TsFileDataFrame([str(path1), str(path2)], show_progress=False)
265265

266266

267+
def test_dataset_rejects_data_access_after_close(tmp_path):
268+
path = tmp_path / "weather.tsfile"
269+
_write_weather_file(path, 0)
270+
271+
tsdf = TsFileDataFrame(str(path), show_progress=False)
272+
series = tsdf[0]
273+
tsdf.close()
274+
275+
with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
276+
_ = tsdf[0]
277+
278+
with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
279+
_ = series[0]
280+
281+
282+
def test_subset_close_warns_and_does_not_close_root(tmp_path):
283+
path = tmp_path / "weather.tsfile"
284+
_write_weather_file(path, 0)
285+
286+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
287+
subset = tsdf[:1]
288+
with pytest.warns(RuntimeWarning, match="no-op"):
289+
subset.close()
290+
291+
series = tsdf[0]
292+
assert series[0] == 20.0
293+
294+
267295
def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path):
268296
path1 = tmp_path / "part1.tsfile"
269297
path2 = tmp_path / "part2.tsfile"

python/tsfile/dataset/dataframe.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import os
2424
import sys
2525
from typing import Dict, List, Set, Tuple, Union
26+
import warnings
2627

2728
import numpy as np
2829

@@ -271,6 +272,7 @@ def _parse_key(self, key):
271272

272273
def _query_aligned(self, start_time: int, end_time: int, series_refs: List[SeriesRefKey], series_names: List[str]):
273274
"""Batch aligned reads by reader/device, then merge per-series fragments."""
275+
self._df._assert_open()
274276
groups = defaultdict(list)
275277
for col_idx, series_ref in enumerate(series_refs):
276278
device_idx, field_idx = series_ref
@@ -320,6 +322,7 @@ def __init__(self, paths: Union[str, List[str]], show_progress: bool = True):
320322
self._cache = _DerivedCache()
321323
self._is_view = False
322324
self._root = None
325+
self._closed = False
323326
self._load_metadata()
324327

325328
@classmethod
@@ -341,8 +344,16 @@ def _from_subset(cls, parent: "TsFileDataFrame", series_refs: List[SeriesRefKey]
341344
series_ref_set=set(series_refs),
342345
)
343346
obj._cache = _DerivedCache(devices=parent._cache.devices, field_stats=parent._cache.field_stats)
347+
obj._closed = False
344348
return obj
345349

350+
def _owner(self) -> "TsFileDataFrame":
351+
return self._root if self._is_view else self
352+
353+
def _assert_open(self):
354+
if self._owner()._closed:
355+
raise RuntimeError("Current TsFileDataFrame is closed.")
356+
346357
def _load_metadata(self):
347358
"""Build the logical cross-file index and the derived per-series caches."""
348359
from .reader import TsFileSeriesReader
@@ -474,11 +485,13 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]:
474485
return [name for name in names if name.startswith(prefix) or name == path_prefix]
475486

476487
def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries:
488+
self._assert_open()
477489
series_name = self._build_series_name(series_ref)
478490
return Timeseries(
479491
series_name,
480492
self._index.series_ref_map[series_ref],
481493
self._cache.field_stats[series_ref],
494+
self._assert_open,
482495
lambda: _merge_field_timestamps(series_name, self._index.series_ref_map[series_ref]),
483496
)
484497

@@ -595,10 +608,18 @@ def show(self, max_rows: int = 20):
595608

596609
def close(self):
597610
if self._is_view:
611+
warnings.warn(
612+
"close() on a subset TsFileDataFrame is a no-op; only the root dataframe owns the readers.",
613+
RuntimeWarning,
614+
stacklevel=2,
615+
)
616+
return
617+
if self._closed:
598618
return
599619
for reader in self._readers.values():
600620
reader.close()
601621
self._readers.clear()
622+
self._closed = True
602623

603624
def __del__(self):
604625
try:

python/tsfile/dataset/metadata.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ def get_field_index(self, field_name: str) -> int:
5151

5252
@dataclass(slots=True)
5353
class DeviceEntry:
54-
"""One logical device identified by table name + ordered tag values."""
54+
"""One logical device identified by table_id + ordered tag values.
55+
56+
The table_id refers to MetadataCatalog.table_entries[table_id].
57+
"""
5558

5659
table_id: int
5760
tag_values: Tuple[Any, ...]

python/tsfile/dataset/timeseries.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,13 @@ def __init__(
6868
name: str,
6969
series_refs: list,
7070
stats: dict,
71+
ensure_open: Callable[[], None],
7172
load_timestamps: Callable[[], np.ndarray],
7273
):
7374
self._name = name
7475
self._series_refs = series_refs
7576
self._stats = dict(stats)
77+
self._ensure_open = ensure_open
7678
self._load_timestamps = load_timestamps
7779
self._timestamps = None
7880

@@ -82,6 +84,7 @@ def name(self) -> str:
8284

8385
@property
8486
def timestamps(self) -> np.ndarray:
87+
self._ensure_open()
8588
if self._timestamps is None:
8689
self._timestamps = self._load_timestamps()
8790
return self._timestamps
@@ -128,6 +131,7 @@ def __getitem__(self, key):
128131
raise TypeError(f"Unsupported key type: {type(key)}")
129132

130133
def _query_time_range(self, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]:
134+
self._ensure_open()
131135
time_parts = []
132136
value_parts = []
133137
for reader, device_id, field_idx in self._series_refs:

0 commit comments

Comments
 (0)