Skip to content

Commit 88dacfe

Browse files
committed
support metadata management and optmize implementation
1 parent 32d9332 commit 88dacfe

7 files changed

Lines changed: 1200 additions & 341 deletions

File tree

python/tests/test_tsfile_dataset.py

Lines changed: 282 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
# under the License.
1717
#
1818

19+
import numpy as np
1920
import pandas as pd
21+
import pytest
2022

2123
from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema, TsFileTableWriter
2224
from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
25+
from tsfile.dataset.formatting import format_timestamp
26+
from tsfile.dataset.reader import TsFileSeriesReader
2327

2428

2529
def _write_weather_file(path, start):
@@ -43,12 +47,128 @@ def _write_weather_file(path, start):
4347
writer.write_dataframe(df)
4448

4549

50+
def _write_numeric_and_text_file(path):
51+
schema = TableSchema(
52+
"weather",
53+
[
54+
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
55+
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
56+
ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
57+
],
58+
)
59+
df = pd.DataFrame(
60+
{
61+
"time": [0, 1, 2],
62+
"device": ["device_a", "device_a", "device_a"],
63+
"temperature": [20.0, np.nan, 23.5],
64+
"status": ["ok", "warn", "ok"],
65+
}
66+
)
67+
with TsFileTableWriter(str(path), schema) as writer:
68+
writer.write_dataframe(df)
69+
70+
71+
def _write_partial_numeric_rows_file(path):
72+
schema = TableSchema(
73+
"weather",
74+
[
75+
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
76+
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
77+
ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
78+
],
79+
)
80+
df = pd.DataFrame(
81+
{
82+
"time": [0, 1],
83+
"device": ["device_a", "device_a"],
84+
"temperature": [np.nan, 21.0],
85+
"humidity": [50.0, 51.0],
86+
}
87+
)
88+
with TsFileTableWriter(str(path), schema) as writer:
89+
writer.write_dataframe(df)
90+
91+
92+
def _write_weather_with_extra_field_file(path, start):
93+
schema = TableSchema(
94+
"weather",
95+
[
96+
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
97+
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
98+
ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
99+
ColumnSchema("pressure", TSDataType.DOUBLE, ColumnCategory.FIELD),
100+
],
101+
)
102+
df = pd.DataFrame(
103+
{
104+
"time": [start, start + 1],
105+
"device": ["device_a", "device_a"],
106+
"temperature": [20.0, 21.0],
107+
"humidity": [50.0, 51.0],
108+
"pressure": [1000.0, 1001.0],
109+
}
110+
)
111+
with TsFileTableWriter(str(path), schema) as writer:
112+
writer.write_dataframe(df)
113+
114+
115+
def _write_multi_tag_file(path):
116+
schema = TableSchema(
117+
"weather",
118+
[
119+
ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
120+
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
121+
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
122+
ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
123+
ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
124+
],
125+
)
126+
df = pd.DataFrame(
127+
{
128+
"time": [0, 1, 0, 1],
129+
"city": ["beijing", "beijing", "shanghai", "shanghai"],
130+
"device": ["device_a", "device_a", "device_b", "device_b"],
131+
"temperature": [20.0, 21.0, 24.0, 25.0],
132+
"humidity": [50.0, 51.0, 60.0, 61.0],
133+
"status": ["ok", "ok", "warn", "warn"],
134+
}
135+
)
136+
with TsFileTableWriter(str(path), schema) as writer:
137+
writer.write_dataframe(df)
138+
139+
140+
def _write_special_tag_file(path):
141+
schema = TableSchema(
142+
"weather",
143+
[
144+
ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
145+
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
146+
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
147+
],
148+
)
149+
df = pd.DataFrame(
150+
{
151+
"time": [0, 1],
152+
"city": ["bei.jing", "bei.jing"],
153+
"device": [r"dev\1", r"dev\1"],
154+
"temperature": [20.0, 21.0],
155+
}
156+
)
157+
with TsFileTableWriter(str(path), schema) as writer:
158+
writer.write_dataframe(df)
159+
160+
46161
def test_dataset_top_level_imports():
47162
assert TsFileDataFrame.__module__ == "tsfile.dataset.dataframe"
48163
assert Timeseries.__module__ == "tsfile.dataset.timeseries"
49164
assert AlignedTimeseries.__module__ == "tsfile.dataset.timeseries"
50165

51166

167+
def test_format_timestamp_preserves_millisecond_precision():
168+
assert "." not in format_timestamp(1000)
169+
assert format_timestamp(1).endswith(".001")
170+
171+
52172
def test_dataset_basic_access_patterns(tmp_path, capsys):
53173
path1 = tmp_path / "part1.tsfile"
54174
path2 = tmp_path / "part2.tsfile"
@@ -64,6 +184,7 @@ def test_dataset_basic_access_patterns(tmp_path, capsys):
64184
assert len(first) == 6
65185
assert first[0] == 20.0
66186
assert first[-1] == 23.0
187+
assert "Timeseries(" in repr(first)
67188

68189
by_name = tsdf[first.name]
69190
assert isinstance(by_name, Timeseries)
@@ -81,9 +202,168 @@ def test_dataset_basic_access_patterns(tmp_path, capsys):
81202
assert isinstance(aligned, AlignedTimeseries)
82203
assert aligned.shape == (6, 2)
83204

84-
metadata = tsdf.metadata()
85-
assert list(metadata["field"]) == ["temperature", "humidity"]
205+
aligned_negative = tsdf.loc[0:5, [-1]]
206+
assert isinstance(aligned_negative, AlignedTimeseries)
207+
assert aligned_negative.shape == (6, 1)
208+
209+
assert list(tsdf["field"]) == ["temperature", "humidity"]
86210

87211
assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf)
88212
aligned.show(2)
89213
assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out
214+
215+
216+
def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
217+
path = tmp_path / "numeric_and_text.tsfile"
218+
_write_numeric_and_text_file(path)
219+
220+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
221+
assert tsdf.list_timeseries() == ["weather.device_a.temperature"]
222+
223+
series = tsdf[0]
224+
assert series.name == "weather.device_a.temperature"
225+
assert np.isnan(series[1])
226+
sliced = series[:3]
227+
assert sliced.shape == (3,)
228+
assert np.isnan(sliced[1])
229+
assert series[1:1].shape == (0,)
230+
231+
232+
def test_dataset_timeseries_supports_negative_step_slices(tmp_path):
233+
path = tmp_path / "weather.tsfile"
234+
_write_weather_file(path, 0)
235+
236+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
237+
series = tsdf[0]
238+
np.testing.assert_array_equal(series[::-1], np.array([23.0, 21.5, 20.0]))
239+
np.testing.assert_array_equal(series[::-2], np.array([23.0, 20.0]))
240+
241+
242+
def test_dataset_metadata_discovery_uses_all_numeric_fields(tmp_path):
243+
path = tmp_path / "partial_numeric_rows.tsfile"
244+
_write_partial_numeric_rows_file(path)
245+
246+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
247+
assert tsdf.list_timeseries() == [
248+
"weather.device_a.temperature",
249+
"weather.device_a.humidity",
250+
]
251+
252+
assert list(tsdf["count"]) == [2, 2]
253+
assert list(tsdf["start_time"]) == [0, 0]
254+
assert list(tsdf["end_time"]) == [1, 1]
255+
256+
257+
def test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
258+
path1 = tmp_path / "part1.tsfile"
259+
path2 = tmp_path / "part2.tsfile"
260+
_write_weather_file(path1, 0)
261+
_write_weather_file(path2, 2)
262+
263+
with pytest.raises(ValueError, match="Duplicate timestamp"):
264+
TsFileDataFrame([str(path1), str(path2)], show_progress=False)
265+
266+
267+
def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path):
268+
path1 = tmp_path / "part1.tsfile"
269+
path2 = tmp_path / "part2.tsfile"
270+
_write_weather_file(path1, 0)
271+
_write_weather_with_extra_field_file(path2, 2)
272+
273+
with pytest.raises(ValueError, match="Incompatible schema for table 'weather'"):
274+
TsFileDataFrame([str(path1), str(path2)], show_progress=False)
275+
276+
277+
def test_dataset_multi_tag_metadata_discovery(tmp_path):
278+
path = tmp_path / "multi_tag.tsfile"
279+
_write_multi_tag_file(path)
280+
281+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
282+
assert tsdf.list_timeseries() == [
283+
"weather.beijing.device_a.temperature",
284+
"weather.beijing.device_a.humidity",
285+
"weather.shanghai.device_b.temperature",
286+
"weather.shanghai.device_b.humidity",
287+
]
288+
289+
summary = pd.DataFrame(
290+
{
291+
"series_path": tsdf.list_timeseries(),
292+
"table": tsdf["table"],
293+
"city": tsdf["city"],
294+
"device": tsdf["device"],
295+
"field": tsdf["field"],
296+
"start_time": tsdf["start_time"],
297+
"end_time": tsdf["end_time"],
298+
"count": tsdf["count"],
299+
}
300+
).sort_values(["city", "device", "field"]).reset_index(drop=True)
301+
assert list(summary.columns) == [
302+
"series_path",
303+
"table",
304+
"city",
305+
"device",
306+
"field",
307+
"start_time",
308+
"end_time",
309+
"count",
310+
]
311+
assert list(summary["city"]) == ["beijing", "beijing", "shanghai", "shanghai"]
312+
assert list(summary["device"]) == ["device_a", "device_a", "device_b", "device_b"]
313+
assert list(summary["field"]) == ["humidity", "temperature", "humidity", "temperature"]
314+
assert list(summary["count"]) == [2, 2, 2, 2]
315+
316+
317+
def test_dataset_series_paths_escape_special_tag_values(tmp_path):
318+
path = tmp_path / "special_tag.tsfile"
319+
_write_special_tag_file(path)
320+
321+
with TsFileDataFrame(str(path), show_progress=False) as tsdf:
322+
expected_path = r"weather.bei\.jing.dev\\1.temperature"
323+
assert tsdf.list_timeseries() == [expected_path]
324+
325+
series = tsdf[expected_path]
326+
assert isinstance(series, Timeseries)
327+
assert series.name == expected_path
328+
assert list(tsdf["city"]) == ["bei.jing"]
329+
assert list(tsdf["device"]) == [r"dev\1"]
330+
331+
332+
def test_reader_series_paths_escape_special_tag_values(tmp_path):
333+
path = tmp_path / "special_tag.tsfile"
334+
_write_special_tag_file(path)
335+
336+
reader = TsFileSeriesReader(str(path), show_progress=False)
337+
try:
338+
expected_path = r"weather.bei\.jing.dev\\1.temperature"
339+
assert reader.series_paths == [expected_path]
340+
info = reader.get_series_info(expected_path)
341+
assert info["tag_values"] == {"city": "bei.jing", "device": r"dev\1"}
342+
finally:
343+
reader.close()
344+
345+
346+
def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
347+
path = tmp_path / "weather.tsfile"
348+
_write_weather_file(path, 100)
349+
350+
reader = TsFileSeriesReader(str(path), show_progress=False)
351+
try:
352+
assert reader.series_paths == [
353+
"weather.device_a.temperature",
354+
"weather.device_a.humidity",
355+
]
356+
assert len(reader.catalog.table_entries) == 1
357+
assert len(reader.catalog.device_entries) == 1
358+
assert reader.catalog.series_count == 2
359+
360+
by_path = reader.get_series_info("weather.device_a.temperature")
361+
by_ref = reader.get_series_info_by_ref(0, 0)
362+
assert by_ref == by_path
363+
assert by_ref["tag_values"] == {"device": "device_a"}
364+
365+
ts_by_path = reader.get_series_timestamps("weather.device_a.temperature")
366+
ts_by_device = reader.get_device_timestamps(0)
367+
np.testing.assert_array_equal(ts_by_path, ts_by_device)
368+
finally:
369+
reader.close()

0 commit comments

Comments
 (0)