caio.co/de/go-tdigest

Get rid of the centroid abstraction

This was only being used to pack {float64,uint32}, all the
other functionality was skipped or became unused over
time for performance reasons. Away it goes.
Id
1f970eee21252fb9ac9d33e0f9e17afa88814329
Author
Caio
Commit time
2017-10-25T17:05:51+02:00

Modified serialization.go

@@ -35,9 +35,9
}

var x float64
- t.summary.Iterate(func(item centroid) bool {
- delta := item.mean - x
- x = item.mean
+ t.summary.ForEach(func(mean float64, count uint32) bool {
+ delta := mean - x
+ x = mean
err = binary.Write(buffer, endianess, float32(delta))

return err == nil
@@ -46,8 +46,8
return nil, err
}

- t.summary.Iterate(func(item centroid) bool {
- err = encodeUint(buffer, item.count)
+ t.summary.ForEach(func(mean float64, count uint32) bool {
+ err = encodeUint(buffer, count)
return err == nil
})
if err != nil {

Modified summary.go

@@ -6,18 +6,6
"sort"
)

-type centroid struct {
- mean float64
- count uint32
- index int
-}
-
-func (c centroid) isValid() bool {
- return !math.IsNaN(c.mean) && c.count > 0
-}
-
-var invalidCentroid = centroid{mean: math.NaN(), count: 0}
-
type summary struct {
means []float64
counts []uint32
@@ -96,23 +84,6
return s.counts[uncheckedIndex]
}

-func (s summary) Iterate(f func(c centroid) bool) {
- for i := 0; i < s.Len(); i++ {
- if !f(centroid{s.means[i], s.counts[i], i}) {
- break
- }
- }
-}
-
-func (s summary) Data() []centroid {
- data := make([]centroid, 0, s.Len())
- s.Iterate(func(c centroid) bool {
- data = append(data, c)
- return true
- })
- return data
-}
-
// return the index of the last item which the sum of counts
// of items before it is less than or equal to `sum`. -1 in
// case no centroid satisfies the requirement.
@@ -152,5 +123,20
for i := index - 1; i >= 0 && s.means[i] > s.means[i+1]; i-- {
s.means[i], s.means[i+1] = s.means[i+1], s.means[i]
s.counts[i], s.counts[i+1] = s.counts[i+1], s.counts[i]
+ }
+}
+
+func (s summary) ForEach(f func(float64, uint32) bool) {
+ for i := 0; i < len(s.means); i++ {
+ if !f(s.means[i], s.counts[i]) {
+ break
+ }
+ }
+}
+
+func (s summary) Clone() *summary {
+ return &summary{
+ means: append([]float64{}, s.means...),
+ counts: append([]uint32{}, s.counts...),
}
}

Modified summary_test.go

@@ -96,7 +96,7

}

-func TestIterate(t *testing.T) {
+func TestForEach(t *testing.T) {

s := newSummary(10)
for _, i := range []uint32{1, 2, 3, 4, 5, 6} {
@@ -104,23 +104,23
}

c := 0
- s.Iterate(func(i centroid) bool {
+ s.ForEach(func(mean float64, count uint32) bool {
c++
return false
})

if c != 1 {
- t.Errorf("Iterate must exit early if the closure returns false")
+ t.Errorf("ForEach must exit early if the closure returns false")
}

var tot uint32
- s.Iterate(func(i centroid) bool {
- tot += i.count
+ s.ForEach(func(mean float64, count uint32) bool {
+ tot += count
return true
})

if tot != 210 {
- t.Errorf("Iterate must walk through the whole data if it always returns true")
+ t.Errorf("ForEach must walk through the whole data if it always returns true")
}
}

Modified tdigest.go

@@ -195,7 +195,7
// Compression trades off accuracy for performance and happens
// automatically after a certain amount of distinct samples have been
// stored.
-func (t *TDigest) Compress() error {
+func (t *TDigest) Compress() (err error) {
if t.Len() <= 1 {
return nil
}
@@ -204,17 +204,13
t.summary = newSummary(uint(t.Len()))
t.count = 0

- nodes := oldTree.Data()
- shuffle(nodes, t.rng)
+ shuffle(oldTree.means, oldTree.counts, t.rng)
+ oldTree.ForEach(func(mean float64, count uint32) bool {
+ err = t.AddWeighted(mean, count)
+ return err == nil
+ })

- for _, item := range nodes {
- err := t.AddWeighted(item.mean, item.count)
- if err != nil {
- return err
- }
- }
-
- return nil
+ return err
}

// Merge joins a given digest into itself.
@@ -222,22 +218,20
// in separate threads and you want to compute quantiles over all the
// samples. This is particularly important on a scatter-gather/map-reduce
// scenario.
-func (t *TDigest) Merge(other *TDigest) error {
+func (t *TDigest) Merge(other *TDigest) (err error) {
if other.Len() == 0 {
return nil
}

- nodes := other.summary.Data()
- shuffle(nodes, t.rng)
+ // We must keep the other digest intact
+ data := other.summary.Clone()
+ shuffle(data.means, data.counts, t.rng)

- for _, item := range nodes {
- err := t.AddWeighted(item.mean, item.count)
- if err != nil {
- return err
- }
- }
-
- return nil
+ data.ForEach(func(mean float64, count uint32) bool {
+ err = t.AddWeighted(mean, count)
+ return err == nil
+ })
+ return err
}

// Len returns the number of centroids in the TDigest.
@@ -247,18 +241,13
// Iteration stops when the supplied function returns false, or when all
// centroids have been iterated.
func (t *TDigest) ForEachCentroid(f func(mean float64, count uint32) bool) {
- s := t.summary
- for i := 0; i < s.Len(); i++ {
- if !f(s.means[i], s.counts[i]) {
- break
- }
- }
+ t.summary.ForEach(f)
}

-func shuffle(data []centroid, rng TDigestRNG) {
- for i := len(data) - 1; i > 1; i-- {
+func shuffle(means []float64, counts []uint32, rng TDigestRNG) {
+ for i := len(means) - 1; i > 1; i-- {
j := rng.Intn(i + 1)
- data[i], data[j] = data[j], data[i]
+ means[i], means[j], counts[i], counts[j] = means[j], means[i], counts[j], counts[i]
}
}

Modified tdigest_test.go

@@ -246,8 +246,8
}

var tot uint32
- tdigest.summary.Iterate(func(item centroid) bool {
- tot += item.count
+ tdigest.ForEachCentroid(func(mean float64, count uint32) bool {
+ tot += count
return true
})