Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ jobs:
env:
CI: "true"
MYSQL_VERSION: ${{ matrix.mysql }}
# Emit full goroutine stacks (all goroutines, not just the crashing one)
# when the test binary panics or times out. This is the primary
# diagnostic for stuck / deadlocked tests.
GOTRACEBACK: all

steps:
- uses: actions/checkout@v6.0.2
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ test-go:
curl -sL $(GOTESTSUM_URL) | tar -xz -C ./bin gotestsum; \
fi

ulimit -n 1024 && ./bin/gotestsum --format short-verbose ./test/go ./copydb/test ./sharding/test -count 1 -p 1 -failfast
go test -v
ulimit -n 1024 && ./bin/gotestsum --format short-verbose ./test/go ./copydb/test ./sharding/test -count 1 -p 1 -failfast -timeout 5m
go test -v -timeout 5m

test-ruby:
bundle install
Expand Down
95 changes: 94 additions & 1 deletion slog_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ghostferry

import (
"context"
"fmt"
"log/slog"
"reflect"
)

// loggerSlogHandler implements slog.Handler on top of a ghostferry Logger.
Expand Down Expand Up @@ -99,7 +101,98 @@ func applySlogAttrToLogger(l Logger, a slog.Attr, prefix string) Logger {
if a.Key == "" {
return l
}
return l.WithField(joinSlogPrefix(prefix, a.Key), a.Value.Any())
return l.WithField(joinSlogPrefix(prefix, a.Key), safeFieldValue(a.Value.Any()))
}

// safeFieldValue returns a value safe to hand to a structured-logging backend.
//
// Some third-party libraries log values that cannot be JSON-encoded. For
// example, go-mysql's BinlogSyncer logs its entire config via
// slog.Any("config", cfg), and that config contains func fields
// (Option func(*client.Conn) error, Dialer). logrus's JSON formatter calls
// json.Marshal on each field and fails on such values, printing
// "Failed to obtain reader, failed to marshal fields to JSON, json:
// unsupported type: func(*client.Conn) error" to stderr for every binlog
// syncer created — which floods the test logs.
//
// To stay backend-agnostic, any value that contains an unmarshalable kind
// (func, chan, or unsafe.Pointer) is rendered to a string via fmt instead of
// being passed through as a live Go value. Ordinary values are returned
// unchanged so normal structured fields are unaffected.
func safeFieldValue(v any) any {
if v == nil {
return nil
}
if containsUnmarshalableKind(reflect.ValueOf(v), 0) {
return fmt.Sprintf("%+v", v)
}
return v
}

// containsUnmarshalableKind reports whether v contains a func, chan, or
// unsafe.Pointer anywhere in its (possibly nested) structure. It guards
// against unbounded recursion with a depth limit and treats anything beyond it
// as unmarshalable so the value is stringified rather than risk a marshal
// failure.
func containsUnmarshalableKind(v reflect.Value, depth int) bool {
if !v.IsValid() {
return false
}
if depth > 8 {
return true
}

switch v.Kind() {
case reflect.Func, reflect.Chan, reflect.UnsafePointer:
return true
case reflect.Ptr, reflect.Interface:
if v.IsNil() {
return false
}
return containsUnmarshalableKind(v.Elem(), depth+1)
case reflect.Struct:
for i := 0; i < v.NumField(); i++ {
if containsUnmarshalableKind(v.Field(i), depth+1) {
return true
}
}
return false
case reflect.Slice, reflect.Array:
elem := v.Type().Elem()
if isUnmarshalableType(elem) {
return true
}
for i := 0; i < v.Len(); i++ {
if containsUnmarshalableKind(v.Index(i), depth+1) {
return true
}
}
return false
case reflect.Map:
if isUnmarshalableType(v.Type().Elem()) || isUnmarshalableType(v.Type().Key()) {
return true
}
for _, k := range v.MapKeys() {
if containsUnmarshalableKind(v.MapIndex(k), depth+1) {
return true
}
}
return false
default:
return false
}
}

// isUnmarshalableType reports whether a type is (or trivially contains) a kind
// that cannot be JSON-encoded. Used to short-circuit empty containers whose
// element type alone makes them unsafe.
func isUnmarshalableType(t reflect.Type) bool {
switch t.Kind() {
case reflect.Func, reflect.Chan, reflect.UnsafePointer:
return true
default:
return false
}
}

// joinSlogPrefix concatenates prefix and key with a dot, eliding the dot when
Expand Down
45 changes: 45 additions & 0 deletions test/go/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,51 @@ func (s *LoggerTestSuite) TestNewSlogLoggerOutputIsValidJSON() {
}
}

// configWithFunc mimics the shape of go-mysql's BinlogSyncerConfig, which is
// logged via slog.Any("config", cfg) and contains func fields that cannot be
// JSON-encoded. See safeFieldValue in slog_handler.go.
type configWithFunc struct {
ServerID uint32
Host string
Option func(conn *struct{}) error
}

// TestNewSlogLoggerHandlesUnmarshalableFields is a regression test for the
// "Failed to obtain reader, failed to marshal fields to JSON, json:
// unsupported type: func(...)" noise that flooded the logs whenever go-mysql's
// BinlogSyncer logged its config (which contains func fields) through our slog
// bridge with the logrus JSON formatter.
func (s *LoggerTestSuite) TestNewSlogLoggerHandlesUnmarshalableFields() {
for _, b := range backends {
s.Run(string(b), func() {
var buf bytes.Buffer
useBackend(b, &buf)
ghostferry.SetLogJSONFormatter()

cfg := configWithFunc{
ServerID: 42,
Host: "localhost",
Option: func(_ *struct{}) error { return nil },
}

sl := ghostferry.NewSlogLogger(ghostferry.LogWithField("tag", "slog_unmarshalable"))
sl.Info("create BinlogSyncer", slog.Any("config", cfg))

out := buf.String()
s.Require().NotContains(out, "failed to marshal fields to JSON", "log line: %s", out)
s.Require().NotContains(out, "unsupported type", "log line: %s", out)

var parsed map[string]any
err := json.Unmarshal(buf.Bytes(), &parsed)
s.Require().NoError(err, "output should be valid JSON: %s", out)
s.Require().Equal("create BinlogSyncer", parsed["msg"])
// The config is stringified, so it should be present as a string
// field and include the non-func data.
s.Require().Contains(fmt.Sprintf("%v", parsed["config"]), "localhost")
})
}
}

func TestLoggerTestSuite(t *testing.T) {
suite.Run(t, new(LoggerTestSuite))
}
73 changes: 73 additions & 0 deletions test/helpers/blocking_gate_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
require "thread"

module BlockingGateHelper
# BlockingGate is a bounded synchronization primitive for integration tests
# that need to block Ghostferry's progress at a certain point (by blocking
# inside a status handler) until some other event releases it.
#
# Previously tests used a bare `sleep` with no argument inside a status
# handler to block the DataIterator/BinlogStreamer indefinitely, relying on
# a later status handler (e.g. AFTER_BINLOG_APPLY) to TERM the process and
# unblock everything. When the expected releasing event did not arrive (a
# legitimate ordering race), the handler slept forever. The Go side then hit
# its 30s HTTP client timeout and panicked, but the Ruby WEBrick handler
# thread stayed asleep, which blocked server shutdown and hung the whole
# suite until the CI job-level timeout.
#
# BlockingGate replaces the bare `sleep` with a wait that:
# - returns immediately once #release is called, and
# - raises a clear error after `timeout` seconds instead of blocking
# forever, so a broken ordering assumption fails fast with a useful
# message rather than wedging CI.
class BlockingGate
Error = Class.new(StandardError)
TimeoutError = Class.new(Error)

def initialize(label:, timeout: 20)
@label = label
@timeout = timeout
@mutex = Mutex.new
@cond = ConditionVariable.new
@released = false
end

# Block the calling thread until #release is called or `timeout` seconds
# elapse. Raises BlockingGate::TimeoutError on timeout.
def wait
deadline = monotonic_now + @timeout
@mutex.synchronize do
until @released
remaining = deadline - monotonic_now
if remaining <= 0
raise TimeoutError, "BlockingGate(#{@label}) timed out after #{@timeout}s waiting to be released"
end
@cond.wait(@mutex, remaining)
end
end
end

# Unblock any thread currently in #wait, and make future #wait calls
# return immediately. Idempotent.
def release
@mutex.synchronize do
@released = true
@cond.broadcast
end
end

def released?
@mutex.synchronize { @released }
end

private

def monotonic_now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end

# Convenience factory so tests can write `gate = blocking_gate(...)`.
def blocking_gate(label:, timeout: 20)
BlockingGate.new(label: label, timeout: timeout)
end
end
Loading
Loading