diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 93ca4e3ee525..d8f7dd6026de 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -20,7 +20,6 @@ package stats import ( "bytes" "container/heap" - "context" "encoding/gob" "encoding/json" "hash/crc32" @@ -48,8 +47,15 @@ func init() { register.Function1x2(fixedKey) register.Function2x1(makeWeightedElement) + register.Combiner3[*compactors, weightedElement, *compactors](&approximateQuantilesInputFn{}) + register.Combiner3[*compactors, *compactors, *compactors](&approximateQuantilesMergeOnlyFn{}) + register.Combiner3[*compactors, *compactors, []beam.T](&approximateQuantilesOutputFn{}) + register.DoFn1x2[beam.T, int, beam.T](&shardElementsFn{}) + register.Function0x1[error](setupResultShim) } +func setupResultShim() error { return nil } + // Opts contains settings used to configure how approximate quantiles are computed. type Opts struct { // Controls the memory used and approximation error (difference between the quantile returned and the true quantile.) @@ -493,7 +499,7 @@ func (f *approximateQuantilesOutputFn) AddInput(compactors *compactors, element return compactors } -func (f *approximateQuantilesOutputFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors { +func (f *approximateQuantilesOutputFn) MergeAccumulators(a, b *compactors) *compactors { a.merge(b, f.State.less) return a } @@ -559,7 +565,7 @@ func toWeightedSlice(compactor compactor, less reflectx.Func2x1, weight int) []w } return weightedElements } -func (f *approximateQuantilesOutputFn) ExtractOutput(ctx context.Context, compactors *compactors) []beam.T { +func (f *approximateQuantilesOutputFn) ExtractOutput(compactors *compactors) []beam.T { sorted := toWeightedSlice(compactors.Compactors[0], f.State.less, 1) for level, compactor := range compactors.Compactors[1:] { sorted = mergeSortedWeighted(sorted, toWeightedSlice(compactor, f.State.less, 1<