Skip to content

Commit f8d6242

Browse files
committed
Rework logic introduced in f733c56 so generated column filtering happens
at the table cache level instead of per-row operation.
1 parent f733c56 commit f8d6242

8 files changed

Lines changed: 131 additions & 184 deletions

File tree

dml_events.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ type RowData []interface{}
4242
// https://github.com/Shopify/ghostferry/issues/165.
4343
//
4444
// In summary:
45-
// - This code receives values from both go-sql-driver/mysql and
46-
// go-mysql-org/go-mysql.
47-
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
48-
// slice for unsigned integer.
49-
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
50-
// unsigned integer.
51-
// - We currently make this function deal with both cases. In the future we can
52-
// investigate alternative solutions.
45+
// - This code receives values from both go-sql-driver/mysql and
46+
// go-mysql-org/go-mysql.
47+
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
48+
// slice for unsigned integer.
49+
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
50+
// unsigned integer.
51+
// - We currently make this function deal with both cases. In the future we can
52+
// investigate alternative solutions.
5353
func (r RowData) GetUint64(colIdx int) (uint64, error) {
5454
u64, ok := Uint64Value(r[colIdx])
5555
if ok {
@@ -174,8 +174,8 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e
174174

175175
query := "INSERT IGNORE INTO " +
176176
QuotedTableNameFromString(schemaName, tableName) +
177-
" (" + strings.Join(quotedColumnNamesForInsert(e.table), ",") + ")" +
178-
" VALUES (" + buildStringListForInsertValues(e.table, e.newValues) + ")"
177+
" (" + strings.Join(quotedColumnNames(e.table), ",") + ")" +
178+
" VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")"
179179

180180
return query, nil
181181
}
@@ -323,14 +323,10 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
323323
}
324324
}
325325

326-
func quotedColumnNamesForInsert(table *TableSchema) []string {
327-
cols := []string{}
328-
329-
for _, c := range table.Columns {
330-
if c.IsVirtual {
331-
continue
332-
}
333-
cols = append(cols, QuoteField(c.Name))
326+
func quotedColumnNames(table *TableSchema) []string {
327+
cols := make([]string, len(table.Columns))
328+
for i, column := range table.Columns {
329+
cols[i] = QuoteField(column.Name)
334330
}
335331

336332
return cols
@@ -351,18 +347,15 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData
351347
return nil
352348
}
353349

354-
func buildStringListForInsertValues(table *TableSchema, values []interface{}) string {
350+
func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string {
355351
var buffer []byte
356352

357353
for i, value := range values {
358-
if table.Columns[i].IsVirtual {
359-
continue
360-
}
361-
362-
if len(buffer) != 0 {
354+
if i > 0 {
363355
buffer = append(buffer, ',')
364356
}
365-
buffer = appendEscapedValue(buffer, value, table.Columns[i])
357+
358+
buffer = appendEscapedValue(buffer, value, columns[i])
366359
}
367360

368361
return string(buffer)
@@ -511,10 +504,10 @@ func Int64Value(value interface{}) (int64, bool) {
511504
//
512505
// This is specifically mentioned in the the below link:
513506
//
514-
// When BINARY values are stored, they are right-padded with the pad value
515-
// to the specified length. The pad value is 0x00 (the zero byte). Values
516-
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
517-
// for retrievals.
507+
// When BINARY values are stored, they are right-padded with the pad value
508+
// to the specified length. The pad value is 0x00 (the zero byte). Values
509+
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
510+
// for retrievals.
518511
//
519512
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
520513
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {

row_batch.go

Lines changed: 6 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -64,77 +64,23 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface
6464
return "", nil, err
6565
}
6666

67-
vcm := e.virtualColumnsMap()
68-
valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)"
67+
valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
6968
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr
7069

7170
query := "INSERT IGNORE INTO " +
7271
QuotedTableNameFromString(schemaName, tableName) +
73-
" (" + e.quotedFields(vcm) + ") VALUES " + valuesStr
72+
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
7473

75-
return query, e.flattenRowData(vcm), nil
74+
return query, e.flattenRowData(), nil
7675
}
7776

78-
// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated).
79-
func (e *RowBatch) virtualColumnsMap() map[int]bool {
80-
res := map[int]bool{}
81-
82-
for i, name := range e.columns {
83-
isVirtual := false
84-
for _, c := range e.table.Columns {
85-
if name == c.Name && c.IsVirtual {
86-
isVirtual = true
87-
break
88-
}
89-
}
90-
91-
res[i] = isVirtual
92-
}
93-
94-
return res
95-
}
96-
97-
// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch.
98-
func (e *RowBatch) activeColumnCount(vcm map[int]bool) int {
99-
if vcm == nil {
100-
return len(e.columns)
101-
}
102-
103-
count := 0
104-
for _, isVirtual := range vcm {
105-
if !isVirtual {
106-
count++
107-
}
108-
}
109-
return count
110-
}
111-
112-
// quotedFields returns a string with comma-separated quoted field names for INSERTs.
113-
func (e *RowBatch) quotedFields(vcm map[int]bool) string {
114-
cols := []string{}
115-
for i, name := range e.columns {
116-
if vcm != nil && vcm[i] {
117-
continue
118-
}
119-
cols = append(cols, name)
120-
}
121-
122-
return strings.Join(QuoteFields(cols), ",")
123-
}
124-
125-
// flattenRowData flattens RowData values into a single array for INSERTs.
126-
func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} {
127-
rowSize := e.activeColumnCount(vcm)
77+
func (e *RowBatch) flattenRowData() []interface{} {
78+
rowSize := len(e.values[0])
12879
flattened := make([]interface{}, rowSize*len(e.values))
12980

13081
for rowIdx, row := range e.values {
131-
i := 0
13282
for colIdx, col := range row {
133-
if vcm != nil && vcm[colIdx] {
134-
continue
135-
}
136-
flattened[rowIdx*rowSize+i] = col
137-
i++
83+
flattened[rowIdx*rowSize+colIdx] = col
13884
}
13985
}
14086

table_schema_cache.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func (t *TableSchema) RowMd5Query() string {
9494
_, isCompressed := t.CompressedColumnsForVerification[column.Name]
9595
_, isIgnored := t.IgnoredColumnsForVerification[column.Name]
9696

97-
if isCompressed || isIgnored || column.IsVirtual {
97+
fmt.Printf(">>>>> col %q compressed:%v, ignored:%v, virtual:%v, stored:%v, generated:%v\n", column.Name, isCompressed, isIgnored, column.IsVirtual, column.IsStored, column.IsGenerated())
98+
if isCompressed || isIgnored || column.IsGenerated() {
9899
continue
99100
}
100101

@@ -189,14 +190,9 @@ func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig Col
189190
return tableSchemaCache, err
190191
}
191192

192-
// Filter out invisible indexes
193-
visibleIndexes := make([]*schema.Index, 0, len(tableSchema.Indexes))
194-
for _, index := range tableSchema.Indexes {
195-
if index.Visible {
196-
visibleIndexes = append(visibleIndexes, index)
197-
}
198-
}
199-
tableSchema.Indexes = visibleIndexes
193+
// filter out unwanted indeces and columns
194+
tableSchema.RemoveInvisibleIndeces()
195+
tableSchema.RemoveGeneratedColumns()
200196

201197
tableSchemas = append(tableSchemas, &TableSchema{
202198
Table: tableSchema,
@@ -448,7 +444,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
448444
if err != nil {
449445
break
450446
}
451-
447+
452448
var binValue []byte
453449
switch v := val.(type) {
454450
case []byte:
@@ -458,7 +454,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
458454
default:
459455
err = fmt.Errorf("expected binary/string for max key, got %T", val)
460456
}
461-
457+
462458
if err == nil {
463459
result = NewBinaryKeyWithColumn(primaryKeyColumn.Name, binValue)
464460
}

test/go/dml_events_test.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -83,36 +83,6 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
8383
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3)
8484
}
8585

86-
func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() {
87-
rowsEvent := &replication.RowsEvent{
88-
Table: this.tableMapEvent,
89-
Rows: [][]interface{}{
90-
{1000, []byte("val1"), true},
91-
{1001, []byte("val2"), false},
92-
{1002, "{\"val\": 42.0}", false},
93-
},
94-
}
95-
96-
// column 'col1' (#0) is generated so we should not insert into it.
97-
this.targetTable.Columns[0].IsVirtual = true
98-
99-
dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent)
100-
this.Require().Nil(err)
101-
this.Require().Equal(3, len(dmlEvents))
102-
103-
q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
104-
this.Require().Nil(err)
105-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1)
106-
107-
q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
108-
this.Require().Nil(err)
109-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2)
110-
111-
q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
112-
this.Require().Nil(err)
113-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3)
114-
}
115-
11686
func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() {
11787
rowsEvent := &replication.RowsEvent{
11888
Table: this.tableMapEvent,

test/go/row_batch_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,32 +73,6 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() {
7373
this.Require().Equal(expected, v1)
7474
}
7575

76-
func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() {
77-
vals := []ghostferry.RowData{
78-
ghostferry.RowData{1000, []byte("val1"), true},
79-
ghostferry.RowData{1001, []byte("val2"), true},
80-
ghostferry.RowData{1002, []byte("val3"), true},
81-
}
82-
83-
// column 'col2' (#1) is generated so we should not insert into it.
84-
this.targetTable.Columns[1].IsVirtual = true
85-
86-
batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0)
87-
this.Require().Equal(vals, batch.Values())
88-
89-
q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name)
90-
this.Require().Nil(err)
91-
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1)
92-
93-
expected := []interface{}{
94-
1000, true,
95-
1001, true,
96-
1002, true,
97-
}
98-
99-
this.Require().Equal(expected, v1)
100-
}
101-
10276
func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() {
10377
vals := []ghostferry.RowData{
10478
ghostferry.RowData{1000, []byte("val0"), true},

test/go/table_schema_cache_test.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (this *TableSchemaCacheTestSuite) TestFingerprintQuery() {
324324
this.Require().Equal("SELECT `id`,MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5,`data` FROM `s`.`t` WHERE `id` IN (?,?,?,?,?,?,?,?,?,?)", query)
325325
}
326326

327-
func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() {
327+
func (this *TableSchemaCacheTestSuite) TestTableRowMd7Query() {
328328
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil)
329329
this.Require().Nil(err)
330330

@@ -333,17 +333,32 @@ func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() {
333333
query := table.RowMd5Query()
334334
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query)
335335

336-
table = tables[0]
337-
table.Columns[0].IsVirtual = true
338-
query = table.RowMd5Query()
339-
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query)
340-
341336
table = tables[1]
342337
table.CompressedColumnsForVerification = map[string]string{"data": "SNAPPY"}
343338
query = table.RowMd5Query()
344339
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query)
345340
}
346341

342+
func (this *TableSchemaCacheTestSuite) TestTableRowMd7QueryWithVirtualField() {
343+
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil)
344+
this.Require().Nil(err)
345+
346+
tables := tableSchemaCache.AsSlice()
347+
table := tables[0]
348+
table.Columns[0].IsVirtual = true
349+
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query())
350+
}
351+
352+
func (this *TableSchemaCacheTestSuite) TestTableRowMd7QueryWithStoredField() {
353+
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil)
354+
this.Require().Nil(err)
355+
356+
tables := tableSchemaCache.AsSlice()
357+
table := tables[0]
358+
table.Columns[1].IsStored = true
359+
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query())
360+
}
361+
347362
func (this *TableSchemaCacheTestSuite) TestFingerprintQueryWithIgnoredColumns() {
348363
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil)
349364
this.Require().Nil(err)

test/helpers/db_helper.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_
125125

126126
connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}")
127127
connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))")
128+
connection.query(```
129+
CREATE TABLE IF NOT EXISTS #{dbtable} (
130+
id BIGINT(20) NOT NULL AUTO_INCREMENT,
131+
data TEXT,
132+
length BIGINT(20) AS (LENGTH(data)) VIRTUAL, /* generated columns should be ignored */
133+
summary VARCHAR(32) AS (MD5(data)) STORED,
134+
PRIMARY KEY(id)
135+
)
136+
```)
128137

129138
return if number_of_rows == 0
130139

0 commit comments

Comments
 (0)