Skip to content

Commit bd63348

Browse files
committed
Rework logic introduced in f733c56 so generated column filtering happens at the table cache level instead of per-row operation.
1 parent f733c56 commit bd63348

7 files changed

Lines changed: 131 additions & 152 deletions

File tree

dml_events.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e
174174

175175
query := "INSERT IGNORE INTO " +
176176
QuotedTableNameFromString(schemaName, tableName) +
177-
" (" + strings.Join(quotedColumnNamesForInsert(e.table), ",") + ")" +
178-
" VALUES (" + buildStringListForInsertValues(e.table, e.newValues) + ")"
177+
" (" + strings.Join(quotedColumnNames(e.table), ",") + ")" +
178+
" VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")"
179179

180180
return query, nil
181181
}
@@ -278,13 +278,30 @@ func (e *BinlogDeleteEvent) PaginationKey() (string, error) {
278278
return paginationKeyFromEventData(e.table, e.oldValues)
279279
}
280280

281+
func filterGeneratedColumnsOnRowsEvent(ts *TableSchema, rev *replication.RowsEvent) {
282+
for _, idx := range ts.GeneratedColumnIndecesOnRawTable {
283+
for _, row := range rev.Rows {
284+
j := 0
285+
for i := range row {
286+
if i == idx {
287+
continue
288+
}
289+
row[j] = row[i]
290+
j++
291+
}
292+
row = row[:j]
293+
}
294+
}
295+
}
296+
281297
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) {
282298
rowsEvent := ev.Event.(*replication.RowsEvent)
299+
filterGeneratedColumnsOnRowsEvent(table, rowsEvent)
283300

284301
for _, row := range rowsEvent.Rows {
285302
if len(row) != len(table.Columns) {
286303
return nil, fmt.Errorf(
287-
"table %s.%s has %d columns but event has %d columns instead",
304+
"table %s.%s has %d updatable columns but event has %d columns instead",
288305
table.Schema,
289306
table.Name,
290307
len(table.Columns),
@@ -323,14 +340,10 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
323340
}
324341
}
325342

326-
func quotedColumnNamesForInsert(table *TableSchema) []string {
327-
cols := []string{}
328-
329-
for _, c := range table.Columns {
330-
if c.IsVirtual {
331-
continue
332-
}
333-
cols = append(cols, QuoteField(c.Name))
343+
func quotedColumnNames(table *TableSchema) []string {
344+
cols := make([]string, len(table.Columns))
345+
for i, column := range table.Columns {
346+
cols[i] = QuoteField(column.Name)
334347
}
335348

336349
return cols
@@ -351,18 +364,15 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData
351364
return nil
352365
}
353366

354-
func buildStringListForInsertValues(table *TableSchema, values []interface{}) string {
367+
func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string {
355368
var buffer []byte
356369

357370
for i, value := range values {
358-
if table.Columns[i].IsVirtual {
359-
continue
360-
}
361-
362-
if len(buffer) != 0 {
371+
if i > 0 {
363372
buffer = append(buffer, ',')
364373
}
365-
buffer = appendEscapedValue(buffer, value, table.Columns[i])
374+
375+
buffer = appendEscapedValue(buffer, value, columns[i])
366376
}
367377

368378
return string(buffer)

row_batch.go

Lines changed: 6 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -64,77 +64,23 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface
6464
return "", nil, err
6565
}
6666

67-
vcm := e.virtualColumnsMap()
68-
valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)"
67+
valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
6968
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr
7069

7170
query := "INSERT IGNORE INTO " +
7271
QuotedTableNameFromString(schemaName, tableName) +
73-
" (" + e.quotedFields(vcm) + ") VALUES " + valuesStr
72+
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
7473

75-
return query, e.flattenRowData(vcm), nil
74+
return query, e.flattenRowData(), nil
7675
}
7776

78-
// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated).
79-
func (e *RowBatch) virtualColumnsMap() map[int]bool {
80-
res := map[int]bool{}
81-
82-
for i, name := range e.columns {
83-
isVirtual := false
84-
for _, c := range e.table.Columns {
85-
if name == c.Name && c.IsVirtual {
86-
isVirtual = true
87-
break
88-
}
89-
}
90-
91-
res[i] = isVirtual
92-
}
93-
94-
return res
95-
}
96-
97-
// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch.
98-
func (e *RowBatch) activeColumnCount(vcm map[int]bool) int {
99-
if vcm == nil {
100-
return len(e.columns)
101-
}
102-
103-
count := 0
104-
for _, isVirtual := range vcm {
105-
if !isVirtual {
106-
count++
107-
}
108-
}
109-
return count
110-
}
111-
112-
// quotedFields returns a string with comma-separated quoted field names for INSERTs.
113-
func (e *RowBatch) quotedFields(vcm map[int]bool) string {
114-
cols := []string{}
115-
for i, name := range e.columns {
116-
if vcm != nil && vcm[i] {
117-
continue
118-
}
119-
cols = append(cols, name)
120-
}
121-
122-
return strings.Join(QuoteFields(cols), ",")
123-
}
124-
125-
// flattenRowData flattens RowData values into a single array for INSERTs.
126-
func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} {
127-
rowSize := e.activeColumnCount(vcm)
77+
func (e *RowBatch) flattenRowData() []interface{} {
78+
rowSize := len(e.values[0])
12879
flattened := make([]interface{}, rowSize*len(e.values))
12980

13081
for rowIdx, row := range e.values {
131-
i := 0
13282
for colIdx, col := range row {
133-
if vcm != nil && vcm[colIdx] {
134-
continue
135-
}
136-
flattened[rowIdx*rowSize+i] = col
137-
i++
83+
flattened[rowIdx*rowSize+colIdx] = col
13884
}
13985
}
14086

table_schema_cache.go

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,21 @@ type TableSchema struct {
4242
ForcedIndexForVerification string // Forced index name
4343
PaginationKeyColumn *schema.TableColumn
4444
PaginationKeyIndex int
45+
GeneratedColumnIndecesOnRawTable []int
4546

4647
rowMd5Query string
4748
}
4849

50+
// isColumnGenerated evaluates wheter a go_myslq.schema.TableColumn is generated or not.
51+
func isColumnGenerated(tc *schema.TableColumn) bool {
52+
return tc.IsVirtual || tc.IsStored
53+
}
54+
55+
// isColumnIndexGenerated evaluetes whether a TableSchema column is generated, by index.
56+
func (t *TableSchema) IsColumnIndexGenerated(idx int) bool {
57+
return isColumnGenerated(&t.Columns[idx])
58+
}
59+
4960
// This query returns the MD5 hash for a row on this table. This query is valid
5061
// for both the source and the target shard.
5162
//
@@ -90,11 +101,12 @@ func (t *TableSchema) RowMd5Query() string {
90101
}
91102

92103
columns := make([]schema.TableColumn, 0, len(t.Columns))
93-
for _, column := range t.Columns {
104+
for i, column := range t.Columns {
94105
_, isCompressed := t.CompressedColumnsForVerification[column.Name]
95106
_, isIgnored := t.IgnoredColumnsForVerification[column.Name]
107+
isGenerated := t.IsColumnIndexGenerated(i)
96108

97-
if isCompressed || isIgnored || column.IsVirtual {
109+
if isCompressed || isIgnored || isGenerated {
98110
continue
99111
}
100112

@@ -151,6 +163,53 @@ func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry)
151163
return tablesWithData, emptyTables, nil
152164
}
153165

166+
// removeInvisibleIndeces removes all invisible idx references from a go_mysql.schema.Table.
167+
func removeInvisibleIndeces(ts *schema.Table) {
168+
j := 0
169+
for i, index := range ts.Indexes {
170+
if !index.Visible {
171+
continue
172+
}
173+
ts.Indexes[j] = ts.Indexes[i]
174+
j++
175+
}
176+
ts.Indexes = ts.Indexes[:j]
177+
}
178+
179+
// removeGeneratedColumns removes all generated (virtual, stored) columns from a go_mysql.schema.Table.
180+
// Returns a slice removed column indeces.
181+
func removeGeneratedColumns(ts *schema.Table) []int {
182+
removedIdxs := []int{}
183+
184+
newColumns := make([]schema.TableColumn, 0, len(ts.Columns))
185+
newPKColumns := make([]int, 0, len(ts.Columns))
186+
newUnsignedColumns := make([]int, 0, len(ts.Columns))
187+
188+
j := 0
189+
for i, col := range ts.Columns {
190+
if isColumnGenerated(&col) {
191+
removedIdxs = append(removedIdxs, i)
192+
continue
193+
}
194+
195+
// regenerate PK and unsigned column indeces
196+
newColumns = append(newColumns, col)
197+
if ts.IsPrimaryKey(i) {
198+
newPKColumns = append(newPKColumns, j)
199+
}
200+
if col.IsUnsigned {
201+
newUnsignedColumns = append(newUnsignedColumns, j)
202+
}
203+
j++
204+
}
205+
206+
ts.Columns = newColumns
207+
ts.PKColumns = newPKColumns
208+
ts.UnsignedColumns = newUnsignedColumns
209+
210+
return removedIdxs
211+
}
212+
154213
func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error) {
155214
logger := logrus.WithField("tag", "table_schema_cache")
156215

@@ -189,20 +248,16 @@ func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig Col
189248
return tableSchemaCache, err
190249
}
191250

192-
// Filter out invisible indexes
193-
visibleIndexes := make([]*schema.Index, 0, len(tableSchema.Indexes))
194-
for _, index := range tableSchema.Indexes {
195-
if index.Visible {
196-
visibleIndexes = append(visibleIndexes, index)
197-
}
198-
}
199-
tableSchema.Indexes = visibleIndexes
251+
// filter out unwanted indeces and columns
252+
removeInvisibleIndeces(tableSchema)
253+
generatedColumnIdxs := removeGeneratedColumns(tableSchema)
200254

201255
tableSchemas = append(tableSchemas, &TableSchema{
202256
Table: tableSchema,
203257
CompressedColumnsForVerification: columnCompressionConfig.CompressedColumnsFor(dbname, table),
204258
IgnoredColumnsForVerification: columnIgnoreConfig.IgnoredColumnsFor(dbname, table),
205259
ForcedIndexForVerification: forceIndexConfig.IndexFor(dbname, table),
260+
GeneratedColumnIndecesOnRawTable: generatedColumnIdxs,
206261
})
207262
}
208263

@@ -448,7 +503,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
448503
if err != nil {
449504
break
450505
}
451-
506+
452507
var binValue []byte
453508
switch v := val.(type) {
454509
case []byte:
@@ -458,7 +513,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
458513
default:
459514
err = fmt.Errorf("expected binary/string for max key, got %T", val)
460515
}
461-
516+
462517
if err == nil {
463518
result = NewBinaryKeyWithColumn(primaryKeyColumn.Name, binValue)
464519
}

test/go/dml_events_test.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -83,36 +83,6 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
8383
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3)
8484
}
8585

86-
func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() {
87-
rowsEvent := &replication.RowsEvent{
88-
Table: this.tableMapEvent,
89-
Rows: [][]interface{}{
90-
{1000, []byte("val1"), true},
91-
{1001, []byte("val2"), false},
92-
{1002, "{\"val\": 42.0}", false},
93-
},
94-
}
95-
96-
// column 'col1' (#0) is generated so we should not insert into it.
97-
this.targetTable.Columns[0].IsVirtual = true
98-
99-
dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent)
100-
this.Require().Nil(err)
101-
this.Require().Equal(3, len(dmlEvents))
102-
103-
q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
104-
this.Require().Nil(err)
105-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1)
106-
107-
q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
108-
this.Require().Nil(err)
109-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2)
110-
111-
q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
112-
this.Require().Nil(err)
113-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3)
114-
}
115-
11686
func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() {
11787
rowsEvent := &replication.RowsEvent{
11888
Table: this.tableMapEvent,

test/go/row_batch_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,32 +73,6 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() {
7373
this.Require().Equal(expected, v1)
7474
}
7575

76-
func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() {
77-
vals := []ghostferry.RowData{
78-
ghostferry.RowData{1000, []byte("val1"), true},
79-
ghostferry.RowData{1001, []byte("val2"), true},
80-
ghostferry.RowData{1002, []byte("val3"), true},
81-
}
82-
83-
// column 'col2' (#1) is generated so we should not insert into it.
84-
this.targetTable.Columns[1].IsVirtual = true
85-
86-
batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0)
87-
this.Require().Equal(vals, batch.Values())
88-
89-
q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name)
90-
this.Require().Nil(err)
91-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1)
92-
93-
expected := []interface{}{
94-
1000, true,
95-
1001, true,
96-
1002, true,
97-
}
98-
99-
this.Require().Equal(expected, v1)
100-
}
101-
10276
func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() {
10377
vals := []ghostferry.RowData{
10478
ghostferry.RowData{1000, []byte("val0"), true},

0 commit comments

Comments
 (0)