diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 0934981f9..d9e19fb6b 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -1118,8 +1118,13 @@ int duplicate_ideviceid_to_device_fields(storage::IDeviceID* id, for (int i = 0; i < n; i++) { const std::string* ps = (static_cast(i) < segs.size()) ? segs[i] : nullptr; - const char* lit = (ps != nullptr) ? ps->c_str() : "null"; - seg_arr[i] = strdup(lit); + // A null tag segment is exposed as a NULL pointer so callers can + // distinguish a missing/null tag from the literal string "null". + if (ps == nullptr) { + seg_arr[i] = nullptr; + continue; + } + seg_arr[i] = strdup(ps->c_str()); if (seg_arr[i] == nullptr) { for (int j = 0; j < i; j++) { free(seg_arr[j]); @@ -1627,6 +1632,12 @@ TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, case TAG_FILTER_NOT_REGEXP: filter = builder.not_reg_exp(column_name, value); break; + case TAG_FILTER_IS_NULL: + filter = builder.is_null(column_name); + break; + case TAG_FILTER_IS_NOT_NULL: + filter = builder.is_not_null(column_name); + break; default: *err_code = common::E_INVALID_ARG; return nullptr; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index ae3e28eed..4471da89e 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -875,6 +875,8 @@ typedef enum { TAG_FILTER_GTEQ = 5, TAG_FILTER_REGEXP = 6, TAG_FILTER_NOT_REGEXP = 7, + TAG_FILTER_IS_NULL = 8, + TAG_FILTER_IS_NOT_NULL = 9, } TagFilterOp; /** @@ -884,7 +886,8 @@ typedef enum { * index). * @param table_name [in] Table name whose schema defines the TAG columns. * @param column_name [in] Name of the TAG column to filter on. - * @param value [in] Comparison value (string). + * @param value [in] Comparison value (string). Ignored for + * TAG_FILTER_IS_NULL / TAG_FILTER_IS_NOT_NULL (may be NULL). * @param op [in] Comparison operator (TagFilterOp). * @param err_code [out] Error code. E_OK(0) on success. * @return TagFilterHandle on success; NULL on failure. diff --git a/cpp/src/reader/filter/tag_filter.cc b/cpp/src/reader/filter/tag_filter.cc index f92c9ef86..03b8ae785 100644 --- a/cpp/src/reader/filter/tag_filter.cc +++ b/cpp/src/reader/filter/tag_filter.cc @@ -44,7 +44,8 @@ TagEq::TagEq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagEq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] == value_; } @@ -53,7 +54,8 @@ TagNeq::TagNeq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagNeq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] != value_; } @@ -62,7 +64,8 @@ TagLt::TagLt(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagLt::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] < value_; } @@ -71,7 +74,8 @@ TagLteq::TagLteq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagLteq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] <= value_; } @@ -80,7 +84,8 @@ TagGt::TagGt(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagGt::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] > value_; } @@ -89,7 +94,8 @@ TagGteq::TagGteq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagGteq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] >= value_; } @@ -105,7 +111,9 @@ TagRegExp::TagRegExp(int col_idx, std::string tag_value) } bool TagRegExp::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size() || !is_valid_pattern_) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr || + !is_valid_pattern_) + return false; try { return std::regex_search(*segments[col_idx_], pattern_); } catch (const std::regex_error&) { @@ -125,7 +133,9 @@ TagNotRegExp::TagNotRegExp(int col_idx, std::string tag_value) } bool TagNotRegExp::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size() || !is_valid_pattern_) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr || + !is_valid_pattern_) + return false; try { return !std::regex_search(*segments[col_idx_], pattern_); } catch (const std::regex_error&) { @@ -133,6 +143,22 @@ bool TagNotRegExp::satisfyRow(std::vector segments) const { } } +// TagIsNull implementation +TagIsNull::TagIsNull(int col_idx) : TagFilter(col_idx, "") {} + +bool TagIsNull::satisfyRow(std::vector segments) const { + // A tag is null when its segment is an explicit null pointer or when the + // device id omits the (trailing) segment entirely. + return col_idx_ >= segments.size() || segments[col_idx_] == nullptr; +} + +// TagIsNotNull implementation +TagIsNotNull::TagIsNotNull(int col_idx) : TagFilter(col_idx, "") {} + +bool TagIsNotNull::satisfyRow(std::vector segments) const { + return col_idx_ < segments.size() && segments[col_idx_] != nullptr; +} + // TagBetween implementation TagBetween::TagBetween(int col_idx, std::string lower_value, std::string upper_value) @@ -141,7 +167,8 @@ TagBetween::TagBetween(int col_idx, std::string lower_value, } bool TagBetween::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; const std::string& segment_value = *segments[col_idx_]; return segment_value >= value_ && segment_value <= value2_; } @@ -154,7 +181,8 @@ TagNotBetween::TagNotBetween(int col_idx, std::string lower_value, } bool TagNotBetween::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; const std::string& segment_value = *segments[col_idx_]; return segment_value < value_ || segment_value > value2_; } @@ -200,64 +228,76 @@ TagFilterBuilder::TagFilterBuilder(TableSchema* schema) Filter* TagFilterBuilder::eq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagEq(idx, value); } Filter* TagFilterBuilder::neq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNeq(idx, value); } Filter* TagFilterBuilder::lt(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagLt(idx, value); } Filter* TagFilterBuilder::lteq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagLteq(idx, value); } Filter* TagFilterBuilder::gt(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagGt(idx, value); } Filter* TagFilterBuilder::gteq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagGteq(idx, value); } Filter* TagFilterBuilder::reg_exp(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagRegExp(idx, value); } Filter* TagFilterBuilder::not_reg_exp(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNotRegExp(idx, value); } +Filter* TagFilterBuilder::is_null(const std::string& columnName) { + auto idx = get_tag_column_index(columnName); + if (idx < 0) return nullptr; + return new TagIsNull(idx); +} + +Filter* TagFilterBuilder::is_not_null(const std::string& columnName) { + auto idx = get_tag_column_index(columnName); + if (idx < 0) return nullptr; + return new TagIsNotNull(idx); +} + Filter* TagFilterBuilder::between_and(const std::string& columnName, const std::string& lower, const std::string& upper) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagBetween(idx, lower, upper); } @@ -265,7 +305,7 @@ Filter* TagFilterBuilder::between_and(const std::string& columnName, Filter* TagFilterBuilder::not_between_and(const std::string& columnName, const std::string& lower, const std::string& upper) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNotBetween(idx, lower, upper); } @@ -284,7 +324,7 @@ Filter* TagFilterBuilder::not_filter(Filter* filter) { return new TagNot(dynamic_cast(filter)); } -int TagFilterBuilder::get_id_column_index(const std::string& columnName) { +int TagFilterBuilder::get_tag_column_index(const std::string& columnName) { int idColumnOrder = table_schema_->find_id_column_order(columnName); if (idColumnOrder == -1) { return -1; diff --git a/cpp/src/reader/filter/tag_filter.h b/cpp/src/reader/filter/tag_filter.h index b858be8c9..50c750d44 100644 --- a/cpp/src/reader/filter/tag_filter.h +++ b/cpp/src/reader/filter/tag_filter.h @@ -106,6 +106,21 @@ class TagNotRegExp : public TagFilter { bool satisfyRow(std::vector segments) const override; }; +// IS NULL: tag column has no value for this device. An absent trailing +// segment (col_idx_ beyond the device's segment count) is also treated as null. +class TagIsNull : public TagFilter { + public: + explicit TagIsNull(int col_idx); + bool satisfyRow(std::vector segments) const override; +}; + +// IS NOT NULL: tag column has a concrete value for this device. +class TagIsNotNull : public TagFilter { + public: + explicit TagIsNotNull(int col_idx); + bool satisfyRow(std::vector segments) const override; +}; + // Range query [value_, value2_] class TagBetween : public TagFilter { public: @@ -171,6 +186,8 @@ class TagFilterBuilder { Filter* reg_exp(const std::string& columnName, const std::string& value); Filter* not_reg_exp(const std::string& columnName, const std::string& value); + Filter* is_null(const std::string& columnName); + Filter* is_not_null(const std::string& columnName); Filter* between_and(const std::string& columnName, const std::string& lower, const std::string& upper); Filter* not_between_and(const std::string& columnName, @@ -182,7 +199,7 @@ class TagFilterBuilder { static Filter* not_filter(Filter* filter); private: - int get_id_column_index(const std::string& columnName); + int get_tag_column_index(const std::string& columnName); }; } // namespace storage diff --git a/cpp/test/reader/filter/tag_filter_test.cc b/cpp/test/reader/filter/tag_filter_test.cc index 0274d2424..02ce64b85 100644 --- a/cpp/test/reader/filter/tag_filter_test.cc +++ b/cpp/test/reader/filter/tag_filter_test.cc @@ -414,4 +414,87 @@ TEST_F(TagFilterTest, TagRegExpEdgeCases) { delete invalid_filter; delete empty_filter; +} + +// A null tag segment must not crash comparison filters; a null value never +// satisfies a concrete-value predicate (SQL UNKNOWN -> not matched). +TEST_F(TagFilterTest, ComparisonFiltersTreatNullSegmentAsNoMatch) { + // "name" (col 1) is an explicit null pointer. + std::vector segments = {nullptr, nullptr, + new std::string("25"), + new std::string("engineering")}; + + auto eq = builder_->eq("name", "john"); + auto neq = builder_->neq("name", "john"); + auto lt = builder_->lt("name", "z"); + auto gt = builder_->gt("name", "a"); + auto between = builder_->between_and("name", "a", "z"); + auto reg = builder_->reg_exp("name", ".*"); + + EXPECT_FALSE(eq->satisfyRow(0, segments)); + EXPECT_FALSE(neq->satisfyRow(0, segments)); + EXPECT_FALSE(lt->satisfyRow(0, segments)); + EXPECT_FALSE(gt->satisfyRow(0, segments)); + EXPECT_FALSE(between->satisfyRow(0, segments)); + EXPECT_FALSE(reg->satisfyRow(0, segments)); + + delete eq; + delete neq; + delete lt; + delete gt; + delete between; + delete reg; + delete segments[2]; + delete segments[3]; +} + +// IS NULL filter +TEST_F(TagFilterTest, TagIsNullFilter) { + auto filter = builder_->is_null("name"); + ASSERT_NE(filter, nullptr); + + // "name" is an explicit null pointer. + std::vector null_seg = {nullptr, nullptr, + new std::string("25"), + new std::string("engineering")}; + EXPECT_TRUE(filter->satisfyRow(0, null_seg)); + delete null_seg[2]; + delete null_seg[3]; + + // "name" has a concrete value. + auto present = createSegments("john", "25", "engineering"); + EXPECT_FALSE(filter->satisfyRow(0, present)); + cleanupSegments(present); + + // A trailing tag column omitted from the device id (segment count too + // small) is also treated as null. + auto trailing = builder_->is_null("score"); // col_idx 5 + std::vector short_seg = {nullptr, new std::string("john")}; + EXPECT_TRUE(trailing->satisfyRow(0, short_seg)); + delete short_seg[1]; + + delete filter; + delete trailing; +} + +// IS NOT NULL filter +TEST_F(TagFilterTest, TagIsNotNullFilter) { + auto filter = builder_->is_not_null("name"); + ASSERT_NE(filter, nullptr); + + auto present = createSegments("john", "25", "engineering"); + EXPECT_TRUE(filter->satisfyRow(0, present)); + cleanupSegments(present); + + std::vector null_seg = {nullptr, nullptr}; + EXPECT_FALSE(filter->satisfyRow(0, null_seg)); + + // An omitted trailing tag is null, so IS NOT NULL is false. + auto trailing = builder_->is_not_null("score"); + std::vector short_seg = {nullptr, new std::string("john")}; + EXPECT_FALSE(trailing->satisfyRow(0, short_seg)); + delete short_seg[1]; + + delete filter; + delete trailing; } \ No newline at end of file diff --git a/python/tests/test_reader_metadata.py b/python/tests/test_reader_metadata.py index e4e7d0f24..fd00d7f74 100644 --- a/python/tests/test_reader_metadata.py +++ b/python/tests/test_reader_metadata.py @@ -63,7 +63,7 @@ def test_get_all_devices_segments(): assert d0.table_name == "root.sg" assert d0.segments == ("root.sg", "py_details") - grp = reader.get_timeseries_metadata(None)[device] + grp = reader.get_timeseries_metadata(None)[d0.segments] assert grp.table_name == "root.sg" assert grp.segments == ("root.sg", "py_details") assert len(grp.timeseries) == 1 @@ -103,8 +103,8 @@ def test_get_all_devices_and_timeseries_metadata_statistic(): assert devices[0].path == device meta_all = reader.get_timeseries_metadata(None) - assert list(meta_all.keys()) == [device] - grp = meta_all[device] + assert list(meta_all.keys()) == [devices[0].segments] + grp = meta_all[devices[0].segments] assert grp.table_name == "root.sg" assert grp.segments == ("root.sg", "py_meta") series = grp.timeseries @@ -127,11 +127,11 @@ def test_get_all_devices_and_timeseries_metadata_statistic(): assert reader.get_timeseries_metadata([]) == {} sub = reader.get_timeseries_metadata([DeviceID(device, None, ())]) - assert device in sub - assert len(sub[device].timeseries) == 1 + assert devices[0].segments in sub + assert len(sub[devices[0].segments].timeseries) == 1 sub_str = reader.get_timeseries_metadata([device]) - assert device in sub_str + assert devices[0].segments in sub_str finally: reader.close() try: @@ -163,7 +163,7 @@ def test_get_timeseries_metadata_boolean_statistic(): reader = TsFileReader(path) try: meta_all = reader.get_timeseries_metadata(None) - st = meta_all[device].timeseries[0].statistic + st = meta_all[("root.sg", "py_bool")].timeseries[0].statistic assert isinstance(st, BoolTimeseriesStatistic) assert st.has_statistic assert st.sum == pytest.approx(2.0) @@ -200,7 +200,7 @@ def test_get_timeseries_metadata_string_statistic(): reader = TsFileReader(path) try: meta_all = reader.get_timeseries_metadata(None) - m = meta_all[device].timeseries[0] + m = meta_all[("root.sg", "py_str")].timeseries[0] assert m.measurement_name == "m_str" assert m.data_type == TSDataType.STRING st = m.statistic diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index f79a6d466..d95d247c1 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -28,7 +28,7 @@ TableSchema, TsFileTableWriter, ) -from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame +from tsfile import AlignedTimeseries, SeriesPath, Timeseries, TsFileDataFrame from tsfile.dataset.formatting import format_timestamp from tsfile.dataset.metadata import ( MetadataCatalog, @@ -290,6 +290,190 @@ def test_dataset_loc_aligns_timestamp_union_and_preserves_requested_order(tmp_pa assert aligned.values[2, 1] == 30.0 +def test_dataset_reads_nullable_tag_devices_in_isolation(tmp_path): + path = tmp_path / "nullable_tags.tsfile" + schema = TableSchema( + "sensors", + [ + ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + # Non-trailing null: region IS NULL, device='alpha'. + null_region = pd.DataFrame( + { + "time": [0, 1, 2], + "region": [None, None, None], + "device": ["alpha", "alpha", "alpha"], + "temperature": [10.0, 11.0, 12.0], + } + ) + # Trailing null: region='north', device IS NULL. Shares the region prefix + # with the fully specified device below to exercise device isolation. + null_device = pd.DataFrame( + { + "time": [0, 1, 2], + "region": ["north", "north", "north"], + "device": [None, None, None], + "temperature": [20.0, 21.0, 22.0], + } + ) + full = pd.DataFrame( + { + "time": [0, 1, 2], + "region": ["north", "north", "north"], + "device": ["beta", "beta", "beta"], + "temperature": [30.0, 31.0, 32.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(null_region) + writer.write_dataframe(null_device) + writer.write_dataframe(full) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + series = tsdf.list_timeseries() + # Null tags keep their position via the \N marker; trailing nulls drop. + assert set(series) == { + "sensors.\\N.alpha.temperature", + "sensors.north.temperature", + "sensors.north.beta.temperature", + } + # list_timeseries returns SeriesPath objects carrying structured tags. + by_tags = {sp.tags: sp for sp in series} + assert (None, "alpha") in by_tags + assert ("north",) in by_tags + assert ("north", "beta") in by_tags + + ordered = sorted(series) + aligned = tsdf.loc[:, ordered] + by_name = dict(zip(aligned.series_names, aligned.values.T)) + + # Non-trailing null device reads its own data (previously crashed / NaN). + np.testing.assert_array_equal( + by_name["sensors.\\N.alpha.temperature"], np.array([10.0, 11.0, 12.0]) + ) + # Trailing-null device must NOT merge with the fully specified north.beta. + np.testing.assert_array_equal( + by_name["sensors.north.temperature"], np.array([20.0, 21.0, 22.0]) + ) + np.testing.assert_array_equal( + by_name["sensors.north.beta.temperature"], np.array([30.0, 31.0, 32.0]) + ) + + +def test_series_path_object_roundtrip_and_escaping(): + from tsfile.dataset.metadata import split_logical_series_path + + sp = SeriesPath("tbl", ("a.b", None, "x"), "f") + assert isinstance(sp, str) + assert sp.table == "tbl" + assert sp.tags == ("a.b", None, "x") + assert sp.field == "f" + # A dot in a value is escaped; a null tag uses the collision-proof \N marker. + assert str(sp) == "tbl.a\\.b.\\N.x.f" + # Splitting round-trips: the escaped dot stays in the value, \N decodes to None. + assert split_logical_series_path(str(sp)) == ["tbl", "a.b", None, "x", "f"] + # Trailing nulls are dropped (mirroring device-id normalization). + assert SeriesPath("tbl", ("a", None), "f").tags == ("a",) + assert str(SeriesPath("tbl", ("a", None), "f")) == "tbl.a.f" + + +def test_series_path_construction_forms_are_equivalent(): + explicit = SeriesPath("tbl", (None, "sensorA"), "temperature") + flat = SeriesPath(["tbl", None, "sensorA", "temperature"]) # [table, *tags, field] + from_string = SeriesPath("tbl.\\N.sensorA.temperature") + + for sp in (explicit, flat, from_string): + assert sp == "tbl.\\N.sensorA.temperature" + assert sp.table == "tbl" + assert sp.tags == (None, "sensorA") + assert sp.field == "temperature" + + # A no-tag table is just [table, field]. + assert SeriesPath(["tbl", "f"]).tags == () + with pytest.raises(ValueError): + SeriesPath(["tbl"]) + + +def test_split_logical_series_path_null_marker_only_whole_component(): + from tsfile.dataset.metadata import split_logical_series_path + + # \N is a null tag only as a complete component. + assert split_logical_series_path("a.\\N.b.f") == ["a", None, "b", "f"] + assert split_logical_series_path("a.\\N.\\N.f") == ["a", None, None, "f"] + # A real value "\N" (doubled backslash) stays a string, never null. + assert split_logical_series_path("a.\\\\N.b.f") == ["a", "\\N", "b", "f"] + + # \N mixed with other characters is invalid input and fails fast, instead of + # being silently parsed as a null tag (which could resolve the wrong device). + for bad in ( + "tbl.a\\N.b.f", # characters before the marker + "tbl.\\Nfoo.x.f", # characters after the marker + "a.\\N\\N.f", # two markers in one component + "a.\\N\\.b.f", # an escape after the marker + ): + with pytest.raises(ValueError, match="Invalid series path"): + split_logical_series_path(bad) + + +def test_dataset_null_tag_positions_and_string_null_are_distinct(tmp_path): + path = tmp_path / "null_positions.tsfile" + schema = TableSchema( + "a", + [ + ColumnSchema("t1", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("t2", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("t3", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + rows = { + (None, "b", "c"): 10.0, # null at position 1 + ("b", None, "c"): 20.0, # null at position 2 (distinct from the above) + ("null", "b", "c"): 30.0, # the literal string "null", not a real null + } + with TsFileTableWriter(str(path), schema) as writer: + for tags, base in rows.items(): + writer.write_dataframe( + pd.DataFrame( + { + "time": [0, 1], + "t1": [tags[0], tags[0]], + "t2": [tags[1], tags[1]], + "t3": [tags[2], tags[2]], + "v": [base, base + 1], + } + ) + ) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + series = tsdf.list_timeseries() + # Nothing collapses: three physically distinct devices stay distinct. + assert len(series) == 3 + by_tags = {sp.tags: sp for sp in series} + assert (None, "b", "c") in by_tags # null position 1 + assert ("b", None, "c") in by_tags # null position 2 + assert ("null", "b", "c") in by_tags # the string "null" + + # Each device reads its own data via SeriesPath and via the \N string form. + for tags, base in rows.items(): + sp = by_tags[tags] + np.testing.assert_array_equal( + tsdf.loc[:, sp].values.ravel(), np.array([base, base + 1]) + ) + np.testing.assert_array_equal( + tsdf.loc[:, str(sp)].values.ravel(), np.array([base, base + 1]) + ) + + # A hand-built SeriesPath resolves to the same null-tag device. + hand = SeriesPath("a", (None, "b", "c"), "v") + np.testing.assert_array_equal( + tsdf.loc[:, hand].values.ravel(), np.array([10.0, 11.0]) + ) + + def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 0) @@ -814,8 +998,43 @@ def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): } series_path = build_series_path(catalog, device_id, 0) - assert series_path == "weather.device_a.temperature" + # The leading null tag is preserved at its position via the \N marker. + assert series_path == "weather.\\N.device_a.temperature" + assert series_path.tags == (None, "device_a") assert resolve_series_path(catalog, series_path) == (table_id, device_id, 0) + # The plain string form (with \N) round-trips to the same device. + assert resolve_series_path(catalog, str(series_path)) == (table_id, device_id, 0) + + +def test_resolve_series_path_rejects_wrong_tag_count(): + catalog = MetadataCatalog() + table_id = catalog.add_table( + "weather", + ("city", "device"), + (TSDataType.STRING, TSDataType.STRING), + ("temperature",), + ) + device_id = catalog.add_device(table_id, ("beijing", "d1"), 0, 1) + catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 1, + "min_time": 0, + "max_time": 0, + "timeline_length": 1, + "timeline_min_time": 0, + "timeline_max_time": 0, + } + + assert resolve_series_path(catalog, "weather.beijing.d1.temperature") == ( + table_id, + device_id, + 0, + ) + # An extra tag must NOT be silently truncated into a match. + with pytest.raises(ValueError, match="Series not found"): + resolve_series_path(catalog, "weather.beijing.d1.extra.temperature") + # Too few tags has no matching device either. + with pytest.raises(ValueError, match="Series not found"): + resolve_series_path(catalog, "weather.beijing.temperature") def test_reader_metadata_tag_values_trim_trailing_none(): @@ -826,24 +1045,60 @@ class _Group: assert TsFileSeriesReader._metadata_tag_values(_Group(), 1) == ("device_a",) -def test_exact_tag_filter_rejects_none_tag_values(): - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - _build_exact_tag_filter({"device": None}) - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - _build_exact_tag_filter({"city": "beijing", "device": None}) +def test_exact_tag_filter_uses_is_null_for_none_tag_values(): + from tsfile.tag_filter import AndTagFilter, ComparisonTagFilter + + only_null = _build_exact_tag_filter({"device": None}) + assert isinstance(only_null, ComparisonTagFilter) + assert only_null.op == ComparisonTagFilter.IS_NULL + assert only_null.column_name == "device" + mixed = _build_exact_tag_filter({"city": "beijing", "device": None}) + assert isinstance(mixed, AndTagFilter) + assert isinstance(mixed.left, ComparisonTagFilter) + assert mixed.left.op == ComparisonTagFilter.EQ + assert mixed.left.value == "beijing" + assert isinstance(mixed.right, ComparisonTagFilter) + assert mixed.right.op == ComparisonTagFilter.IS_NULL + assert mixed.right.column_name == "device" + + +def _tag_filter_has_is_null(tag_filter) -> bool: + from tsfile.tag_filter import ComparisonTagFilter + + if isinstance(tag_filter, ComparisonTagFilter): + return tag_filter.op == ComparisonTagFilter.IS_NULL + for attr in ("left", "right", "filter"): + child = getattr(tag_filter, attr, None) + if child is not None and _tag_filter_has_is_null(child): + return True + return False + + +def test_reader_exact_match_with_none_tag_values_issues_is_null_query(): + captured = {} + + class _EmptyResultSet: + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + def read_arrow_batch(self): + return None + + def next(self): + return False -def test_reader_exact_match_with_none_tag_values_fails_fast(): class _FakeNativeReader: def query_table(self, *args, **kwargs): - raise AssertionError( - "query should not be issued when None-tag exact matching is unsupported" - ) + captured["table"] = kwargs.get("tag_filter") + return _EmptyResultSet() def query_table_by_row(self, *args, **kwargs): - raise AssertionError( - "row query should not be issued when None-tag exact matching is unsupported" - ) + captured["row"] = kwargs.get("tag_filter") + return _EmptyResultSet() reader = object.__new__(TsFileSeriesReader) reader._reader = _FakeNativeReader() @@ -864,10 +1119,13 @@ def query_table_by_row(self, *args, **kwargs): "timeline_max_time": 1, } - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - reader.read_series_by_ref(device_id, 0, 0, 1) - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - reader.read_series_by_row(device_id, 0, 0, 2) + # Both read paths now issue a native query that encodes the null tag as + # IS NULL instead of failing fast. + reader.read_series_by_ref(device_id, 0, 0, 1) + reader.read_series_by_row(device_id, 0, 0, 2) + + assert _tag_filter_has_is_null(captured["table"]) + assert _tag_filter_has_is_null(captured["row"]) def test_dataframe_resolves_named_sparse_tag_series_path(): @@ -882,17 +1140,15 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): device_key = ("weather", (None, "device_a")) tsdf._index.device_order = [device_key] tsdf._index.device_index_by_key = {device_key: 0} - tsdf._index.tables_with_sparse_tag_values = {"weather"} - tsdf._index.sparse_device_indices_by_compressed_path = { - ("weather", ("device_a",)): [0] - } tsdf._index.device_refs = [[]] tsdf._index.series_refs_ordered = [(0, 0)] tsdf._index.series_ref_set = {(0, 0)} tsdf._index.series_ref_map = {(0, 0): []} - assert tsdf.list_timeseries() == ["weather.device_a.temperature"] - assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0) + assert tsdf.list_timeseries() == ["weather.\\N.device_a.temperature"] + # Resolvable by the \N string form and by the returned SeriesPath itself. + assert tsdf._resolve_series_name("weather.\\N.device_a.temperature") == (0, 0) + assert tsdf._resolve_series_name(tsdf.list_timeseries()[0]) == (0, 0) def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): @@ -912,17 +1168,17 @@ def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): ("weather", (None, "device_a")): 0, ("weather", ("beijing", "device_b")): 1, } - tsdf._index.tables_with_sparse_tag_values = {"weather"} - tsdf._index.sparse_device_indices_by_compressed_path = { - ("weather", ("device_a",)): [0], - ("weather", ("beijing", "device_b")): [1], - } tsdf._index.device_refs = [[], []] tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] tsdf._index.series_ref_set = {(0, 0), (1, 0)} tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} - assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] + # Prefix matching is position-aware: "weather.\N" selects the null-city + # device, "weather.beijing" selects the fully specified one. + assert tsdf.list_timeseries("weather.\\N") == ["weather.\\N.device_a.temperature"] + assert tsdf.list_timeseries("weather.beijing") == [ + "weather.beijing.device_b.temperature" + ] def test_dataframe_list_timeseries_prefix_can_skip_full_name_build( @@ -943,7 +1199,7 @@ def fail_build_series_name(_series_ref): assert tsdf.list_timeseries("pvf") == [] -def test_series_path_resolution_reports_ambiguous_sparse_path(): +def test_series_path_resolution_distinguishes_null_position(): catalog = MetadataCatalog() table_id = catalog.add_table( "weather", @@ -951,8 +1207,8 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): (TSDataType.STRING, TSDataType.STRING), ("temperature",), ) - first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) - second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) + first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) # device IS NULL + second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) # city IS NULL for device_id in (first_id, second_id): catalog.series_stats_by_ref[(device_id, 0)] = { "length": 1, @@ -963,10 +1219,17 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): "timeline_max_time": 0, } - assert build_series_path(catalog, first_id, 0) == "weather.beijing.temperature" - assert build_series_path(catalog, second_id, 0) == "weather.beijing.temperature" - with pytest.raises(ValueError, match="Ambiguous series path"): - resolve_series_path(catalog, "weather.beijing.temperature") + # Null position is preserved, so these two devices get distinct paths + # (previously both compressed to "weather.beijing.temperature" -> ambiguous). + first_path = build_series_path(catalog, first_id, 0) + second_path = build_series_path(catalog, second_id, 0) + assert first_path == "weather.beijing.temperature" + assert second_path == "weather.\\N.beijing.temperature" + assert first_path != second_path + + # Each resolves unambiguously back to its own device. + assert resolve_series_path(catalog, first_path) == (table_id, first_id, 0) + assert resolve_series_path(catalog, second_path) == (table_id, second_id, 0) def test_reader_show_progress_reports_start_immediately(tmp_path, capsys): diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index ac8b6b853..a1c37fce1 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -81,6 +81,8 @@ def _preload_dll(path): tag_gteq, tag_regexp, tag_not_regexp, + tag_is_null, + tag_is_not_null, tag_between, tag_not_between, ) @@ -88,4 +90,4 @@ def _preload_dll(path): from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config from .tsfile_table_writer import TsFileTableWriter from .utils import to_dataframe, dataframe_to_tsfile -from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries +from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries, SeriesPath diff --git a/python/tsfile/dataset/__init__.py b/python/tsfile/dataset/__init__.py index 4072bd4c1..15c20d540 100644 --- a/python/tsfile/dataset/__init__.py +++ b/python/tsfile/dataset/__init__.py @@ -19,6 +19,7 @@ """Dataset-style TsFile accessors.""" from .dataframe import TsFileDataFrame +from .metadata import SeriesPath from .timeseries import AlignedTimeseries, Timeseries -__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries"] +__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries", "SeriesPath"] diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 40149102a..77c278289 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -30,7 +30,9 @@ from .formatting import format_dataframe_table from .metadata import ( + SeriesPath, TableEntry, + _normalize_tag_values, build_logical_series_components, build_logical_series_path, split_logical_series_path, @@ -62,15 +64,10 @@ class _LogicalIndex: # Stable logical device order, each item is (table_name, tag_values). device_order: List[DeviceKey] = field(default_factory=list) - # Map one logical device key to its dataframe-local device index. + # Map one logical device key to its dataframe-local device index. The key's + # tag tuple keeps interior nulls (None) and drops trailing ones, so every + # device -- including null-tagged ones -- resolves by a single direct lookup. device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) - # Tables that need sparse compressed-path lookup because some devices - # contain non-trailing missing tag values. - tables_with_sparse_tag_values: Set[str] = field(default_factory=set) - # Map one compressed tree-style device path to sparse logical devices only. - sparse_device_indices_by_compressed_path: Dict[ - Tuple[str, Tuple[str, ...]], List[int] - ] = field(default_factory=dict) # For each logical device, keep the contributing reader-local device refs. device_refs: List[List[DeviceRef]] = field(default_factory=list) @@ -168,20 +165,6 @@ def _register_reader( index.device_index_by_key[device_key] = device_idx index.device_order.append(device_key) index.device_refs.append([]) - if any(value is None for value in device_entry.tag_values): - index.tables_with_sparse_tag_values.add(table_entry.table_name) - compressed_components = tuple( - build_logical_series_components( - table_entry.table_name, - device_entry.tag_values, - "", - table_entry.tag_columns, - )[1:-1] - ) - compressed_key = (table_entry.table_name, compressed_components) - index.sparse_device_indices_by_compressed_path.setdefault( - compressed_key, [] - ).append(device_idx) index.device_refs[device_idx].append((reader, device_id)) for field_idx in range(len(table_entry.field_columns)): @@ -707,7 +690,7 @@ def _get_series_components( device_key = self._index.device_order[device_idx] return device_key, self._index.table_entries[device_key[0]], field_idx - def _build_series_name(self, series_ref: SeriesRefKey) -> str: + def _build_series_name(self, series_ref: SeriesRefKey) -> SeriesPath: device_key, table_entry, field_idx = self._get_series_components(series_ref) table_name, tag_values = device_key field_name = table_entry.field_columns[field_idx] @@ -715,52 +698,39 @@ def _build_series_name(self, series_ref: SeriesRefKey) -> str: table_name, tag_values, field_name, table_entry.tag_columns ) - def _resolve_series_name(self, series_name: str) -> SeriesRefKey: - try: - parts = split_logical_series_path(series_name) - except ValueError as exc: - raise KeyError(_series_lookup_hint(series_name)) from exc - if len(parts) < 2: - raise KeyError(_series_lookup_hint(series_name)) + def _resolve_series_name(self, series_name) -> SeriesRefKey: + """Resolve a ``SeriesPath`` or path string (``\\N`` = null tag) to a ref. + + Every device has a unique position-preserving key, so this is a single + direct lookup -- no sparse/compressed fallback and no ambiguity. + """ + if isinstance(series_name, SeriesPath): + table_name, tag_parts, field_name = ( + series_name.table, + list(series_name.tags), + series_name.field, + ) + else: + try: + parts = split_logical_series_path(series_name) + except ValueError as exc: + raise KeyError(_series_lookup_hint(series_name)) from exc + if len(parts) < 2: + raise KeyError(_series_lookup_hint(series_name)) + table_name, field_name, tag_parts = parts[0], parts[-1], parts[1:-1] - table_name = parts[0] if table_name not in self._index.table_entries: raise KeyError(_series_lookup_hint(series_name)) - table_entry = self._index.table_entries[table_name] - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise KeyError(_series_lookup_hint(series_name)) from exc - tag_parts = parts[1:-1] - direct_device_idx = self._index.device_index_by_key.get( - (table_name, tuple(tag_parts)) - ) - - if table_name not in self._index.tables_with_sparse_tag_values: - if direct_device_idx is None: - raise KeyError(_series_lookup_hint(series_name)) - device_idx = direct_device_idx - else: - compressed_key = (table_name, tuple(tag_parts)) - sparse_device_indices = ( - self._index.sparse_device_indices_by_compressed_path.get( - compressed_key, [] - ) - ) - candidate_indices = [] - if direct_device_idx is not None: - candidate_indices.append(direct_device_idx) - for device_idx in sparse_device_indices: - if device_idx not in candidate_indices: - candidate_indices.append(device_idx) - if not candidate_indices: - raise KeyError(_series_lookup_hint(series_name)) - if len(candidate_indices) > 1: - raise KeyError(f"Ambiguous series path: '{series_name}'.") - device_idx = candidate_indices[0] + device_key = (table_name, _normalize_tag_values(tag_parts)) + device_idx = self._index.device_index_by_key.get(device_key) + if device_idx is None: + raise KeyError(_series_lookup_hint(series_name)) series_ref = (device_idx, field_idx) if series_ref not in self._index.series_ref_set: @@ -784,7 +754,7 @@ def _build_series_info(self, series_ref: SeriesRefKey) -> dict: def __len__(self) -> int: return len(self._index.series_refs_ordered) - def list_timeseries(self, path_prefix: str = "") -> List[str]: + def list_timeseries(self, path_prefix: str = "") -> List[SeriesPath]: if not path_prefix: return [ self._build_series_name(series_ref) diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 125bb00c2..c6d7e5887 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -73,10 +73,6 @@ class MetadataCatalog: device_entries: List[DeviceEntry] = field(default_factory=list) table_id_by_name: Dict[str, int] = field(default_factory=dict) device_id_by_key: Dict[Tuple[int, tuple], int] = field(default_factory=dict) - tables_with_sparse_tag_values: set = field(default_factory=set) - sparse_device_ids_by_compressed_path: Dict[ - Tuple[int, Tuple[str, ...]], List[int] - ] = field(default_factory=dict) series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = field( default_factory=dict ) @@ -122,15 +118,6 @@ def add_device( ) ) self.device_id_by_key[key] = device_id - if _has_sparse_tag_holes(normalized_tag_values): - self.tables_with_sparse_tag_values.add(table_id) - compressed_key = ( - table_id, - _compressed_tag_path_components(normalized_tag_values), - ) - self.sparse_device_ids_by_compressed_path.setdefault( - compressed_key, [] - ).append(device_id) return device_id @property @@ -141,6 +128,75 @@ def series_count(self) -> int: ) +# Path marker for a null tag value: a single backslash followed by N. A real +# tag value can never produce this because escaping always doubles a backslash +# (and never escapes "N"), so \N unambiguously distinguishes a null tag from the +# literal string "null". +_NULL_MARKER = "N" +_NULL_TOKEN = _PATH_ESCAPE + _NULL_MARKER + + +class SeriesPath(str): + """Logical identifier of one time series: table + ordered tag values + field. + + ``SeriesPath`` subclasses ``str``; its string value is the escaped path form + (with ``\\N`` marking a null tag), so it can be used anywhere a path string + is accepted. It additionally exposes the structured ``table`` / ``tags`` / + ``field`` components, where a ``None`` entry in ``tags`` means the tag is + null -- unambiguously distinct from the literal string value ``"null"``. + + Trailing null tags are dropped (mirroring the device-id normalization), so + ``tags`` keeps every interior null but not absent trailing ones. + + Construct it from explicit parts or from a single flat component sequence + (the same shape :func:`split_logical_series_path` returns):: + + SeriesPath("table", (None, "sensorA"), "temperature") + SeriesPath(["table", None, "sensorA", "temperature"]) # [table, *tags, field] + SeriesPath("table.\\N.sensorA.temperature") # a path string + """ + + __slots__ = ("_table", "_tags", "_field") + + def __new__(cls, *args: Any) -> "SeriesPath": + if len(args) == 3: + table, tags, field = args + elif len(args) == 1: + components = args[0] + # A bare string is a path; otherwise it is a [table, *tags, field] + # sequence (None entries are null tags). + if isinstance(components, str): + components = split_logical_series_path(components) + components = list(components) + if len(components) < 2: + raise ValueError( + f"SeriesPath needs at least [table, field]; got {components!r}" + ) + table, tags, field = components[0], components[1:-1], components[-1] + else: + raise TypeError( + "SeriesPath(table, tags, field) or " "SeriesPath([table, *tags, field])" + ) + normalized = _normalize_tag_values(tags) + obj = str.__new__(cls, _join_series_path(table, normalized, field)) + obj._table = table + obj._tags = normalized + obj._field = field + return obj + + @property + def table(self) -> str: + return self._table + + @property + def tags(self) -> Tuple[Any, ...]: + return self._tags + + @property + def field(self) -> str: + return self._field + + def _escape_path_component(value: Any) -> str: return ( str(value) @@ -149,6 +205,11 @@ def _escape_path_component(value: Any) -> str: ) +def _render_path_component(value: Any) -> str: + """Render one tag component: ``None`` -> the null marker, else escaped value.""" + return _NULL_TOKEN if value is None else _escape_path_component(value) + + def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: values = list(tag_values) while values and values[-1] is None: @@ -156,52 +217,66 @@ def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: return tuple(values) -def _compressed_tag_path_components(tag_values: Iterable[Any]) -> Tuple[str, ...]: - return tuple(str(value) for value in tag_values if value is not None) - - -def _has_sparse_tag_holes(tag_values: Iterable[Any]) -> bool: - return any(value is None for value in tag_values) +def split_logical_series_path(series_path: str) -> List[Any]: + """Split a path into components, decoding escapes in a single pass. - -def split_logical_series_path(series_path: str) -> List[str]: - parts = [] - current = [] + ``\\.`` -> ``.``, ``\\\\`` -> ``\\``, and a component that is *exactly* the null + marker ``\\N`` -> ``None``. The marker is only valid as a whole component: a + real value's backslash is always doubled, so a lone ``\\N`` never occurs in a + real value. ``\\N`` combined with any other characters (e.g. ``a\\N`` or + ``\\Nfoo``) is therefore invalid input and raises, rather than being silently + parsed as a null tag (which could otherwise resolve the wrong device). + """ + parts: List[Any] = [] + current: List[str] = [] + is_null = False escaping = False for char in series_path: if escaping: - current.append(char) + if char == _NULL_MARKER: # \N is a null tag only as a whole component + if is_null or current: + raise ValueError(f"Invalid series path: {series_path}") + is_null = True + elif is_null: # nothing may follow the null marker in a component + raise ValueError(f"Invalid series path: {series_path}") + else: # \\ -> \, \. -> ., any other \x -> x + current.append(char) escaping = False - continue - if char == _PATH_ESCAPE: + elif char == _PATH_ESCAPE: escaping = True - continue - if char == _PATH_SEPARATOR: - parts.append("".join(current)) + elif char == _PATH_SEPARATOR: + parts.append(None if is_null else "".join(current)) current = [] - continue - current.append(char) + is_null = False + elif is_null: # nothing may follow the null marker in a component + raise ValueError(f"Invalid series path: {series_path}") + else: + current.append(char) if escaping: raise ValueError(f"Invalid series path: {series_path}") - parts.append("".join(current)) + parts.append(None if is_null else "".join(current)) return parts +def _join_series_path( + table_name: str, tag_values: Iterable[Any], field_name: str +) -> str: + parts = [_escape_path_component(table_name)] + parts.extend(_render_path_component(value) for value in tag_values) + parts.append(_escape_path_component(field_name)) + return _PATH_SEPARATOR.join(parts) + + def build_logical_series_path( table_name: str, tag_values: Iterable[Any], field_name: str, tag_columns: Iterable[str] = (), -) -> str: - components = build_logical_series_components( - table_name, tag_values, field_name, tag_columns - ) - return _PATH_SEPARATOR.join( - _escape_path_component(component) for component in components - ) +) -> SeriesPath: + return SeriesPath(table_name, tag_values, field_name) def build_logical_series_components( @@ -209,9 +284,16 @@ def build_logical_series_components( tag_values: Iterable[Any], field_name: str, _tag_columns: Iterable[str] = (), -) -> List[str]: - components = [table_name, *_compressed_tag_path_components(tag_values), field_name] - return [str(component) for component in components] +) -> List[Any]: + """Position-preserving components for prefix matching; ``None`` marks a null tag.""" + return [ + str(table_name), + *( + None if value is None else str(value) + for value in _normalize_tag_values(tag_values) + ), + str(field_name), + ] def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx: int) -> str: @@ -242,60 +324,60 @@ def iter_series_paths(catalog: MetadataCatalog) -> Iterator[str]: def resolve_series_path( - catalog: MetadataCatalog, series_path: str + catalog: MetadataCatalog, series_path: Any ) -> Tuple[int, int, int]: - """Resolve an external path to ``(table_id, device_id, field_idx)``.""" - parts = split_logical_series_path(series_path) - if len(parts) < 2: - raise ValueError(f"Invalid series path: {series_path}") + """Resolve a path (``str`` with ``\\N``, or ``SeriesPath``) to refs. + + Returns ``(table_id, device_id, field_idx)``. Every device maps to a unique + position-preserving path, so resolution is a single direct lookup. + """ + if isinstance(series_path, SeriesPath): + table_name, tag_parts, field_name = ( + series_path.table, + list(series_path.tags), + series_path.field, + ) + coerce = False + else: + parts = split_logical_series_path(series_path) + if len(parts) < 2: + raise ValueError(f"Invalid series path: {series_path}") + table_name, field_name, tag_parts = parts[0], parts[-1], parts[1:-1] + coerce = True - table_name = parts[0] if table_name not in catalog.table_id_by_name: raise ValueError(f"Series not found: {series_path}") - table_id = catalog.table_id_by_name[table_name] table_entry = catalog.table_entries[table_id] - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise ValueError(f"Series not found: {series_path}") from exc - tag_parts = parts[1:-1] - direct_device_id = None - direct_tag_values = _normalize_tag_values( - _coerce_path_component(raw_value, tag_type) - for raw_value, tag_type in zip(tag_parts, table_entry.tag_types) - ) - direct_key = (table_id, direct_tag_values) - if direct_key in catalog.device_id_by_key: - direct_device_id = catalog.device_id_by_key[direct_key] - - if table_id not in catalog.tables_with_sparse_tag_values: - if direct_device_id is None: - raise ValueError(f"Series not found: {series_path}") - return table_id, direct_device_id, field_idx - - compressed_key = (table_id, tuple(tag_parts)) - sparse_device_ids = catalog.sparse_device_ids_by_compressed_path.get( - compressed_key, [] - ) - candidate_ids = [] - seen_ids = set() - if direct_device_id is not None: - candidate_ids.append(direct_device_id) - seen_ids.add(direct_device_id) - for device_id in sparse_device_ids: - if device_id in seen_ids: - continue - candidate_ids.append(device_id) - seen_ids.add(device_id) - if not candidate_ids: - raise ValueError(f"Series not found: {series_path}") - if len(candidate_ids) > 1: - raise ValueError(f"Ambiguous series path: {series_path}") + if coerce: + # Coerce each part by its column's type. Parts beyond the declared tag + # columns are kept as-is (rather than truncated) so an over-specified + # path fails the lookup instead of silently matching a shorter device. + tag_types = table_entry.tag_types + tag_values = _normalize_tag_values( + ( + None + if raw_value is None + else ( + _coerce_path_component(raw_value, tag_types[idx]) + if idx < len(tag_types) + else raw_value + ) + ) + for idx, raw_value in enumerate(tag_parts) + ) + else: + tag_values = _normalize_tag_values(tag_parts) - return table_id, candidate_ids[0], field_idx + device_id = catalog.device_id_by_key.get((table_id, tag_values)) + if device_id is None: + raise ValueError(f"Series not found: {series_path}") + return table_id, device_id, field_idx def _coerce_path_component(value: str, data_type: TSDataType) -> Any: diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 4899b2bf9..831926324 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -25,7 +25,7 @@ import numpy as np from ..constants import ColumnCategory, TSDataType -from ..tag_filter import tag_eq +from ..tag_filter import tag_eq, tag_is_null from ..tsfile_reader import TsFileReaderPy from .metadata import ( MetadataCatalog, @@ -48,23 +48,36 @@ def _to_python_scalar(value): return value.item() if hasattr(value, "item") else value -def _ensure_supported_exact_tag_values(tag_values: Dict[str, object]) -> None: - if any(tag_value is None for tag_value in tag_values.values()): - raise NotImplementedError( - "Exact tag matching with None tag values is not supported yet. " - "Native tag filter support for IS NULL / IS NOT NULL is required." - ) - - def _build_exact_tag_filter(tag_values: Dict[str, object]): - _ensure_supported_exact_tag_values(tag_values) + """Build a conjunctive filter that isolates exactly one device. + + A ``None`` tag value matches the device's null/missing tag via IS NULL so + that devices sharing the same non-null tags (for example a trailing-null + device versus a fully specified one) are not conflated. + """ tag_filter = None for tag_column, tag_value in tag_values.items(): - expr = tag_eq(tag_column, str(tag_value)) + if tag_value is None: + expr = tag_is_null(tag_column) + else: + expr = tag_eq(tag_column, str(tag_value)) tag_filter = expr if tag_filter is None else tag_filter & expr return tag_filter +def _device_exact_tag_values(table_entry, device_entry) -> Dict[str, object]: + """Map every declared tag column to this device's value (None when null/missing). + + ``device_entry.tag_values`` drops trailing null tags, so columns beyond its + length are treated as null rather than omitted from the exact-match filter. + """ + device_tag_values = device_entry.tag_values + return { + column: device_tag_values[idx] if idx < len(device_tag_values) else None + for idx, column in enumerate(table_entry.tag_columns) + } + + class TsFileSeriesReader: """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch reads.""" @@ -362,7 +375,7 @@ def read_series_by_row( table_entry, device_entry, field_name = self._resolve_series_ref( device_id, field_idx ) - tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) + tag_values = _device_exact_tag_values(table_entry, device_entry) tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None # Some native row-query paths stop at an internal block boundary even @@ -416,7 +429,7 @@ def read_device_fields_by_time_range( table_entry.table_name, requested_field_columns, table_entry.tag_columns, - dict(zip(table_entry.tag_columns, device_entry.tag_values)), + _device_exact_tag_values(table_entry, device_entry), start_time, end_time, ) diff --git a/python/tsfile/tag_filter.py b/python/tsfile/tag_filter.py index a40c0c47c..ac6e46c5c 100644 --- a/python/tsfile/tag_filter.py +++ b/python/tsfile/tag_filter.py @@ -42,10 +42,17 @@ class ComparisonTagFilter(TagFilter): GTEQ = 5 REGEXP = 6 NOT_REGEXP = 7 + IS_NULL = 8 + IS_NOT_NULL = 9 + + # Operators that take no comparison value. + _NO_VALUE_OPS = (IS_NULL, IS_NOT_NULL) def __init__(self, column_name: str, value: str, op: int): self.column_name = column_name - self.value = value + # IS NULL / IS NOT NULL carry no value; the native layer ignores it but + # still expects a (possibly empty) string. + self.value = "" if value is None else value self.op = op def __repr__(self): @@ -58,7 +65,11 @@ def __repr__(self): 5: ">=", 6: "=~", 7: "!~", + 8: "IS NULL", + 9: "IS NOT NULL", } + if self.op in self._NO_VALUE_OPS: + return f"TagFilter({self.column_name} {op_names.get(self.op, '?')})" return ( f"TagFilter({self.column_name} {op_names.get(self.op, '?')} {self.value!r})" ) @@ -151,6 +162,16 @@ def tag_not_regexp(column_name: str, pattern: str) -> TagFilter: return ComparisonTagFilter(column_name, pattern, ComparisonTagFilter.NOT_REGEXP) +def tag_is_null(column_name: str) -> TagFilter: + """Create a tag IS NULL filter: the device has no value for this tag column.""" + return ComparisonTagFilter(column_name, "", ComparisonTagFilter.IS_NULL) + + +def tag_is_not_null(column_name: str) -> TagFilter: + """Create a tag IS NOT NULL filter: the device has a value for this tag column.""" + return ComparisonTagFilter(column_name, "", ComparisonTagFilter.IS_NOT_NULL) + + def tag_between(column_name: str, lower: str, upper: str) -> TagFilter: """Create a tag BETWEEN filter: lower <= column <= upper.""" return BetweenTagFilter(column_name, lower, upper, is_not=False) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 0fa570df2..4e90dd483 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -324,6 +324,8 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": TAG_FILTER_GTEQ = 5, TAG_FILTER_REGEXP = 6, TAG_FILTER_NOT_REGEXP = 7, + TAG_FILTER_IS_NULL = 8, + TAG_FILTER_IS_NOT_NULL = 9, TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, const char* table_name, diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 1aea243ed..038336a42 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -1056,18 +1056,11 @@ cdef tuple c_device_segments_to_tuple(char** segs, uint32_t n): cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap): cdef dict out = {} cdef uint32_t di, ti - cdef char* p cdef char* tnp - cdef object key cdef object table_py cdef tuple segs_py cdef list series for di in range(mmap.device_count): - p = mmap.entries[di].device.path - if p == NULL: - key = None - else: - key = p.decode('utf-8') tnp = mmap.entries[di].device.table_name if tnp == NULL: table_py = None @@ -1081,7 +1074,10 @@ cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap series.append( timeseries_metadata_c_to_py( &mmap.entries[di].timeseries[ti])) - out[key] = DeviceTimeseriesMetadataGroupPy( + # Key by the full segments tuple, not the device path string: the path + # renders a null tag as "null", so keying by it would collide a real + # null tag with the literal string "null" and silently drop one device. + out[segs_py] = DeviceTimeseriesMetadataGroupPy( table_py, segs_py, series) return out diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 9193e2c61..341a7493d 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -497,11 +497,15 @@ cdef class TsFileReaderPy: def get_timeseries_metadata( self, device_ids: Optional[List] = None - ) -> Dict[str, DeviceTimeseriesMetadataGroup]: + ) -> Dict[tuple, DeviceTimeseriesMetadataGroup]: """ - Return map device path -> :class:`tsfile.schema.DeviceTimeseriesMetadataGroup` + Return map device-segments-tuple -> :class:`tsfile.schema.DeviceTimeseriesMetadataGroup` (table name, segments, and list of :class:`tsfile.schema.TimeseriesMetadata`). + The key is the device's full segment tuple (a null tag is ``None``), not + the dotted path string, so a real null tag and the literal string + ``"null"`` map to distinct entries instead of colliding. + ``device_ids is None``: all devices. ``device_ids == []``: empty map. Non-empty list restricts to those devices (only existing devices appear). """