Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions sdks/go/pkg/beam/transforms/stats/quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package stats
import (
"bytes"
"container/heap"
"context"
"encoding/gob"
"encoding/json"
"hash/crc32"
Expand Down Expand Up @@ -48,8 +47,15 @@ func init() {

register.Function1x2(fixedKey)
register.Function2x1(makeWeightedElement)
register.Combiner3[*compactors, weightedElement, *compactors](&approximateQuantilesInputFn{})
register.Combiner3[*compactors, *compactors, *compactors](&approximateQuantilesMergeOnlyFn{})
register.Combiner3[*compactors, *compactors, []beam.T](&approximateQuantilesOutputFn{})
register.DoFn1x2[beam.T, int, beam.T](&shardElementsFn{})
register.Function0x1[error](setupResultShim)
}

func setupResultShim() error { return nil }

// Opts contains settings used to configure how approximate quantiles are computed.
type Opts struct {
// Controls the memory used and approximation error (difference between the quantile returned and the true quantile.)
Expand Down Expand Up @@ -493,7 +499,7 @@ func (f *approximateQuantilesOutputFn) AddInput(compactors *compactors, element
return compactors
}

func (f *approximateQuantilesOutputFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
func (f *approximateQuantilesOutputFn) MergeAccumulators(a, b *compactors) *compactors {
a.merge(b, f.State.less)
return a
}
Expand Down Expand Up @@ -559,7 +565,7 @@ func toWeightedSlice(compactor compactor, less reflectx.Func2x1, weight int) []w
}
return weightedElements
}
func (f *approximateQuantilesOutputFn) ExtractOutput(ctx context.Context, compactors *compactors) []beam.T {
func (f *approximateQuantilesOutputFn) ExtractOutput(compactors *compactors) []beam.T {
sorted := toWeightedSlice(compactors.Compactors[0], f.State.less, 1)
for level, compactor := range compactors.Compactors[1:] {
sorted = mergeSortedWeighted(sorted, toWeightedSlice(compactor, f.State.less, 1<<uint(level)), func(a, b any) bool {
Expand Down Expand Up @@ -605,12 +611,12 @@ func (f *approximateQuantilesInputFn) AddInput(compactors *compactors, element w
return compactors
}

func (f *approximateQuantilesInputFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
func (f *approximateQuantilesInputFn) MergeAccumulators(a, b *compactors) *compactors {
a.merge(b, f.State.less)
return a
}

func (f *approximateQuantilesInputFn) ExtractOutput(ctx context.Context, compactors *compactors) *compactors {
func (f *approximateQuantilesInputFn) ExtractOutput(compactors *compactors) *compactors {
for i := range compactors.Compactors {
// Sort the compactors here so when we're merging them for the final output, they're already sorted and we can merge elements in order.
compactors.Compactors[i].sort(f.State.less)
Expand All @@ -634,12 +640,12 @@ func (f *approximateQuantilesMergeOnlyFn) AddInput(compactors *compactors, eleme
return compactors
}

func (f *approximateQuantilesMergeOnlyFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
func (f *approximateQuantilesMergeOnlyFn) MergeAccumulators(a, b *compactors) *compactors {
a.merge(b, f.State.less)
return a
}

func (f *approximateQuantilesMergeOnlyFn) ExtractOutput(ctx context.Context, compactors *compactors) *compactors {
func (f *approximateQuantilesMergeOnlyFn) ExtractOutput(compactors *compactors) *compactors {
for i := range compactors.Compactors {
// Sort the compactors here so when we're merging them for the final output, they're already sorted and we can merge elements in order.
compactors.Compactors[i].sort(f.State.less)
Expand Down
Loading