Skip to content

Commit 75e0664

Browse files
authored
Perf: optimize Tablet write with columnar string storage and lazy DeviceID construction (~10x throughput) (#748)
1 parent f621373 commit 75e0664

File tree

6 files changed

+529
-65
lines changed

6 files changed

+529
-65
lines changed

cpp/src/common/tablet.cc

Lines changed: 142 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,15 @@ int Tablet::init() {
9898
case BLOB:
9999
case TEXT:
100100
case STRING: {
101-
value_matrix_[c].string_data =
102-
static_cast<common::String*>(common::mem_alloc(
103-
sizeof(String) * max_row_num_, common::MOD_TABLET));
104-
if (value_matrix_[c].string_data == nullptr) return E_OOM;
101+
auto* sc = static_cast<StringColumn*>(common::mem_alloc(
102+
sizeof(StringColumn), common::MOD_TABLET));
103+
if (sc == nullptr) return E_OOM;
104+
new (sc) StringColumn();
105+
// 8 bytes/row is a conservative initial estimate for short
106+
// string columns (e.g. device IDs, tags). The buffer grows
107+
// automatically on demand via mem_realloc.
108+
sc->init(max_row_num_, max_row_num_ * 8);
109+
value_matrix_[c].string_col = sc;
105110
break;
106111
}
107112
default:
@@ -150,7 +155,8 @@ void Tablet::destroy() {
150155
case BLOB:
151156
case TEXT:
152157
case STRING:
153-
common::mem_free(value_matrix_[c].string_data);
158+
value_matrix_[c].string_col->destroy();
159+
common::mem_free(value_matrix_[c].string_col);
154160
break;
155161
default:
156162
break;
@@ -240,17 +246,51 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data,
240246
return E_TYPE_NOT_SUPPORTED;
241247
}
242248

249+
std::memcpy(dst, data, count * elem_size);
243250
if (bitmap == nullptr) {
244-
// All valid: bulk copy + mark all as non-null
245-
std::memcpy(dst, data, count * elem_size);
246251
bitmaps_[schema_index].clear_all();
247252
} else {
248-
// Bulk copy all data (null positions will have garbage but won't be
249-
// read).
250-
std::memcpy(dst, data, count * elem_size);
253+
char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
254+
uint32_t bm_bytes = (count + 7) / 8;
255+
std::memcpy(tsfile_bm, bitmap, bm_bytes);
256+
}
257+
cur_row_size_ = std::max(count, cur_row_size_);
258+
return E_OK;
259+
}
260+
261+
int Tablet::set_column_string_values(uint32_t schema_index,
262+
const int32_t* offsets, const char* data,
263+
const uint8_t* bitmap, uint32_t count) {
264+
if (err_code_ != E_OK) {
265+
return err_code_;
266+
}
267+
if (UNLIKELY(schema_index >= schema_vec_->size())) {
268+
return E_OUT_OF_RANGE;
269+
}
270+
if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
271+
return E_OUT_OF_RANGE;
272+
}
273+
274+
StringColumn* sc = value_matrix_[schema_index].string_col;
275+
if (sc == nullptr) {
276+
return E_INVALID_ARG;
277+
}
278+
279+
uint32_t total_bytes = static_cast<uint32_t>(offsets[count]);
280+
if (total_bytes > sc->buf_capacity) {
281+
sc->buf_capacity = total_bytes;
282+
sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity);
283+
}
251284

252-
// bitmap uses TsFile convention (1=null, 0=valid), same as
253-
// internal BitMap, so copy directly.
285+
if (total_bytes > 0) {
286+
std::memcpy(sc->buffer, data, total_bytes);
287+
}
288+
std::memcpy(sc->offsets, offsets, (count + 1) * sizeof(int32_t));
289+
sc->buf_used = total_bytes;
290+
291+
if (bitmap == nullptr) {
292+
bitmaps_[schema_index].clear_all();
293+
} else {
254294
char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
255295
uint32_t bm_bytes = (count + 7) / 8;
256296
std::memcpy(tsfile_bm, bitmap, bm_bytes);
@@ -292,9 +332,10 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
292332
double* double_values = column_values.double_data;
293333
return &double_values[row_index];
294334
}
335+
case TEXT:
336+
case BLOB:
295337
case STRING: {
296-
auto string_values = column_values.string_data;
297-
return &string_values[row_index];
338+
return &column_values.string_col->get_string_view(row_index);
298339
}
299340
default:
300341
return nullptr;
@@ -304,8 +345,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
304345
template <>
305346
void Tablet::process_val(uint32_t row_index, uint32_t schema_index,
306347
common::String str) {
307-
value_matrix_[schema_index].string_data[row_index].dup_from(str,
308-
page_arena_);
348+
value_matrix_[schema_index].string_col->append(row_index, str.buf_,
349+
str.len_);
309350
bitmaps_[schema_index].clear(row_index); /* mark as non-null */
310351
}
311352

@@ -450,6 +491,91 @@ void Tablet::set_column_categories(
450491
}
451492
}
452493

494+
void Tablet::reset_string_columns() {
495+
size_t schema_count = schema_vec_->size();
496+
for (size_t c = 0; c < schema_count; c++) {
497+
const MeasurementSchema& schema = schema_vec_->at(c);
498+
if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
499+
schema.data_type_ == BLOB) {
500+
value_matrix_[c].string_col->reset();
501+
}
502+
}
503+
}
504+
505+
// Find all row indices where the device ID changes. A device ID is the
506+
// composite key formed by all id columns (e.g. region + sensor_id). Row i
507+
// is a boundary when at least one id column differs between row i-1 and row i.
508+
//
509+
// Example (2 id columns: region, sensor_id):
510+
// row 0: "A", "s1"
511+
// row 1: "A", "s2" <- boundary: sensor_id changed
512+
// row 2: "B", "s1" <- boundary: region changed
513+
// row 3: "B", "s1"
514+
// row 4: "B", "s2" <- boundary: sensor_id changed
515+
// result: [1, 2, 4]
516+
//
517+
// Boundaries are computed in one shot at flush time rather than maintained
518+
// incrementally during add_value / set_column_*. The total work is similar
519+
// either way, but batch computation here is far more CPU-friendly: the inner
520+
// loop is a tight memcmp scan over contiguous buffers with good cache
521+
// locality, and the CPU can pipeline comparisons without the branch overhead
522+
// and cache thrashing of per-row bookkeeping spread across the write path.
523+
std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
524+
const uint32_t row_count = get_cur_row_size();
525+
if (row_count <= 1) return {};
526+
527+
const uint32_t nwords = (row_count + 63) / 64;
528+
std::vector<uint64_t> boundary(nwords, 0);
529+
530+
uint32_t boundary_count = 0;
531+
const uint32_t max_boundaries = row_count - 1;
532+
for (auto it = id_column_indexes_.rbegin(); it != id_column_indexes_.rend();
533+
++it) {
534+
const StringColumn& sc = *value_matrix_[*it].string_col;
535+
const int32_t* off = sc.offsets;
536+
const char* buf = sc.buffer;
537+
for (uint32_t i = 1; i < row_count; i++) {
538+
if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
539+
int32_t len_a = off[i] - off[i - 1];
540+
int32_t len_b = off[i + 1] - off[i];
541+
if (len_a != len_b ||
542+
(len_a > 0 && memcmp(buf + off[i - 1], buf + off[i],
543+
static_cast<uint32_t>(len_a)) != 0)) {
544+
boundary[i >> 6] |= (1ULL << (i & 63));
545+
if (++boundary_count >= max_boundaries) break;
546+
}
547+
}
548+
if (boundary_count >= max_boundaries) break;
549+
}
550+
551+
// Sweep the bitmap word by word, extracting set bit positions in order.
552+
// Each word covers 64 consecutive rows: word w covers rows [w*64, w*64+63].
553+
//
554+
// For each word we use two standard bit tricks:
555+
// __builtin_ctzll(bits) — count trailing zeros = index of lowest set bit
556+
// bits &= bits - 1 — clear the lowest set bit
557+
//
558+
// Example: w=1, bits=0b...00010100 (bits 2 and 4 set)
559+
// iter 1: ctzll=2 → idx=1*64+2=66, bits becomes 0b...00010000
560+
// iter 2: ctzll=4 → idx=1*64+4=68, bits becomes 0b...00000000 → exit
561+
//
562+
// Guards: idx>0 because row 0 can never be a boundary (no predecessor);
563+
// idx<row_count trims padding bits in the last word when row_count%64 != 0.
564+
std::vector<uint32_t> result;
565+
for (uint32_t w = 0; w < nwords; w++) {
566+
uint64_t bits = boundary[w];
567+
while (bits) {
568+
uint32_t bit = __builtin_ctzll(bits);
569+
uint32_t idx = w * 64 + bit;
570+
if (idx > 0 && idx < row_count) {
571+
result.push_back(idx);
572+
}
573+
bits &= bits - 1;
574+
}
575+
}
576+
return result;
577+
}
578+
453579
std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
454580
std::vector<std::string*> id_array;
455581
id_array.push_back(new std::string(insert_target_name_));

cpp/src/common/tablet.h

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,79 @@ class TabletColIterator;
4646
* with their associated metadata such as column names and types.
4747
*/
4848
class Tablet {
49+
// Arrow-style string column: offsets + contiguous buffer.
50+
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
51+
struct StringColumn {
52+
int32_t* offsets; // length: max_rows + 1 (Arrow-compatible)
53+
char* buffer; // contiguous string data
54+
uint32_t buf_capacity; // allocated buffer size
55+
uint32_t buf_used; // bytes written so far
56+
57+
StringColumn()
58+
: offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {}
59+
60+
void init(uint32_t max_rows, uint32_t init_buf_capacity) {
61+
offsets = (int32_t*)common::mem_alloc(
62+
sizeof(int32_t) * (max_rows + 1), common::MOD_DEFAULT);
63+
offsets[0] = 0;
64+
buf_capacity = init_buf_capacity;
65+
buffer =
66+
(char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
67+
buf_used = 0;
68+
}
69+
70+
void destroy() {
71+
if (offsets) common::mem_free(offsets);
72+
offsets = nullptr;
73+
if (buffer) common::mem_free(buffer);
74+
buffer = nullptr;
75+
buf_capacity = buf_used = 0;
76+
}
77+
78+
void reset() {
79+
buf_used = 0;
80+
if (offsets) offsets[0] = 0;
81+
}
82+
83+
void append(uint32_t row, const char* data, uint32_t len) {
84+
// Grow buffer if needed
85+
if (buf_used + len > buf_capacity) {
86+
buf_capacity = buf_capacity * 2 + len;
87+
buffer = (char*)common::mem_realloc(buffer, buf_capacity);
88+
}
89+
memcpy(buffer + buf_used, data, len);
90+
offsets[row] = static_cast<int32_t>(buf_used);
91+
offsets[row + 1] = static_cast<int32_t>(buf_used + len);
92+
buf_used += len;
93+
}
94+
95+
const char* get_str(uint32_t row) const {
96+
return buffer + offsets[row];
97+
}
98+
uint32_t get_len(uint32_t row) const {
99+
return static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
100+
}
101+
// Return a String view for a given row. The returned reference is
102+
// valid until the next call to get_string_view on this column.
103+
common::String& get_string_view(uint32_t row) {
104+
view_cache_.buf_ = buffer + offsets[row];
105+
view_cache_.len_ =
106+
static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
107+
return view_cache_;
108+
}
109+
110+
private:
111+
common::String view_cache_;
112+
};
113+
49114
struct ValueMatrixEntry {
50115
union {
51116
int32_t* int32_data;
52117
int64_t* int64_data;
53118
float* float_data;
54119
double* double_data;
55120
bool* bool_data;
56-
common::String* string_data;
121+
StringColumn* string_col;
57122
};
58123
};
59124

@@ -220,6 +285,16 @@ class Tablet {
220285
void set_column_categories(
221286
const std::vector<common::ColumnCategory>& column_categories);
222287
std::shared_ptr<IDeviceID> get_device_id(int i) const;
288+
std::vector<uint32_t> find_all_device_boundaries() const;
289+
290+
// Bulk copy string column data (offsets + data buffer).
291+
// offsets has count+1 entries and must start from 0 (offsets[0] == 0).
292+
// bitmap follows TsFile convention (bit=1 means null, nullptr means all
293+
// valid). Callers using Arrow convention (bit=1 means valid) must invert
294+
// before calling.
295+
int set_column_string_values(uint32_t schema_index, const int32_t* offsets,
296+
const char* data, const uint8_t* bitmap,
297+
uint32_t count);
223298
/**
224299
* @brief Template function to add a value of type T to the specified row
225300
* and column by name.
@@ -253,6 +328,8 @@ class Tablet {
253328
schema_map_ = schema_map;
254329
}
255330

331+
void reset_string_columns();
332+
256333
friend class TabletColIterator;
257334
friend class TsFileWriter;
258335
friend struct MeasurementNamesFromTablet;
@@ -265,7 +342,6 @@ class Tablet {
265342
private:
266343
template <typename T>
267344
void process_val(uint32_t row_index, uint32_t schema_index, T val);
268-
common::PageArena page_arena_{common::MOD_TABLET};
269345
uint32_t max_row_num_;
270346
uint32_t cur_row_size_;
271347
std::string insert_target_name_;

0 commit comments

Comments
 (0)