caio.co/de/go-tdigest

Fix forever-incrementing count which caused inaccurate results after Compress(); eliminate memory allocation when calling findNearestCentroids(); make tests deterministic by removing t.Parallel() (see comments); add more tests

Id
011e706ed7c61fb18b5bffd923b6b030025f97b2
Author
Ian Wilkes
Commit time
2016-04-11T16:39:40-07:00

Modified summary.go

@@ -158,7 +158,7

func (s summary) sumUntilMean(mean float64) uint32 {
var cumSum uint32
- for i := 0; i < len(s.keys); i++ {
+ for i := range s.keys {
if s.keys[i] < mean {
cumSum += s.counts[i]
} else {

Modified tdigest.go

@@ -99,35 +99,41
return fmt.Errorf("Illegal datapoint <value: %.4f, count: %d>", value, count)
}

- t.count += count
-
if t.summary.Len() == 0 {
t.summary.Add(value, count)
+ t.count = count
return nil
}

- candidates := t.findNearestCentroids(value)
-
+ // Avoid allocation for our slice by using a local array here.
+ ar := [2]centroid{}
+ candidates := ar[:]
+ candidates[0], candidates[1] = t.findNearestCentroids(value)
+ if !candidates[1].isValid() {
+ candidates = candidates[:1]
+ }
for len(candidates) > 0 && count > 0 {
- j := rand.Intn(len(candidates))
+ j := 0
+ if len(candidates) > 1 {
+ j = rand.Intn(len(candidates))
+ }
chosen := candidates[j]

- quantile := t.computeCentroidQuantile(chosen)
+ quantile := t.computeCentroidQuantile(&chosen)

if float64(chosen.count+count) > t.threshold(quantile) {
candidates = append(candidates[:j], candidates[j+1:]...)
continue
}

- deltaW := math.Min(t.threshold(quantile)-float64(chosen.count), float64(count))
- t.summary.updateAt(chosen.index, value, uint32(deltaW))
- count -= uint32(deltaW)
-
- candidates = append(candidates[:j], candidates[j+1:]...)
+ t.summary.updateAt(chosen.index, value, uint32(count))
+ t.count += count
+ count = 0
}

if count > 0 {
t.summary.Add(value, count)
+ t.count += count
}

if float64(t.summary.Len()) > 20*t.compression {
@@ -213,7 +219,7
return (float64(c.count)/2.0 + float64(cumSum)) / float64(t.count)
}

-func (t *TDigest) findNearestCentroids(mean float64) []*centroid {
+func (t *TDigest) findNearestCentroids(mean float64) (centroid, centroid) {
ceil, floor := t.summary.ceilingAndFloorItems(mean)

if !ceil.isValid() && !floor.isValid() {
@@ -221,18 +227,18
}

if !ceil.isValid() {
- return []*centroid{&floor}
+ return floor, invalidCentroid
}

if !floor.isValid() {
- return []*centroid{&ceil}
+ return ceil, invalidCentroid
}

if math.Abs(floor.mean-mean) < math.Abs(ceil.mean-mean) {
- return []*centroid{&floor}
+ return floor, invalidCentroid
} else if math.Abs(floor.mean-mean) == math.Abs(ceil.mean-mean) && floor.mean != ceil.mean {
- return []*centroid{&floor, &ceil}
+ return floor, ceil
} else {
- return []*centroid{&ceil}
+ return ceil, invalidCentroid
}
}

Modified tdigest_test.go

@@ -7,9 +7,13
"testing"
)

-func TestTInternals(t *testing.T) {
- t.Parallel()
+// Test of tdigest internals and accuracy. Note no t.Parallel():
+// during tests the default random seed is consistent, but varying
+// concurrency scheduling mixes up the random values used in each test.
+// Since there's a random number call inside tdigest this breaks repeatability
+// for all tests. So, no test concurrency here.

+func TestTInternals(t *testing.T) {
tdigest := New(100)

if !math.IsNaN(tdigest.Quantile(0.1)) {
@@ -72,10 +76,6
}

func TestUniformDistribution(t *testing.T) {
- t.Parallel()
-
- rand.Seed(0xDEADBEEF)
-
tdigest := New(100)

for i := 0; i < 10000; i++ {
@@ -91,18 +91,138
assertDifferenceSmallerThan(tdigest, 0.999, 0.001, t)
}

-func TestSequentialInsertion(t *testing.T) {
- t.Parallel()
- tdigest := New(10)
+// Asserts quantile p is no greater than absolute m off from "true"
+// fractional quantile for supplied data. So m must be scaled
+// appropriately for source data range.
+func assertDifferenceFromQuantile(data []float64, tdigest *TDigest, p float64, m float64, t *testing.T) {
+ q := quantile(p, data)
+ tp := tdigest.Quantile(p)

- // FIXME Timeout after X seconds of something?
- for i := 0; i < 10000; i++ {
- tdigest.Add(float64(i), 1)
+ if math.Abs(tp-q) >= m {
+ t.Fatalf("T-Digest.Quantile(%.4f) = %.4f vs actual %.4f. Diff (%.4f) >= %.4f", p, tp, q, math.Abs(tp-q), m)
}
}

+func TestSequentialInsertion(t *testing.T) {
+ tdigest := New(10)
+
+ data := make([]float64, 10000)
+ for i := 0; i < len(data); i++ {
+ data[i] = float64(i)
+ }
+
+ for i := 0; i < len(data); i++ {
+ tdigest.Add(data[i], 1)
+
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.001, 1.0+0.001*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.01, 1.0+0.005*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.05, 1.0+0.01*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.25, 1.0+0.03*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.5, 1.0+0.03*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.75, 1.0+0.03*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.95, 1.0+0.01*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.99, 1.0+0.005*float64(i), t)
+ assertDifferenceFromQuantile(data[:i+1], tdigest, 0.999, 1.0+0.001*float64(i), t)
+ }
+}
+
+func TestNonUniformDistribution(t *testing.T) {
+ tdigest := New(10)
+
+ // Not quite a uniform distribution, but close.
+ data := make([]float64, 1000)
+ for i := 0; i < 500; i++ {
+ data[i] = 700.0 + rand.Float64()*100.0
+ }
+ for i := 500; i < 750; i++ {
+ data[i] = 100.0 + rand.Float64()*100.0
+ }
+ for i := 750; i < 1000; i++ {
+ data[i] = 600.0 + rand.Float64()*10.0
+ }
+
+ for i := 0; i < len(data); i++ {
+ tdigest.Add(data[i], 1)
+ }
+
+ max := float64(len(data))
+ sort.Float64s(data)
+ assertDifferenceFromQuantile(data, tdigest, 0.001, 1.0+0.001*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.01, 1.0+0.005*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.05, 1.0+0.01*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.25, 1.0+0.01*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.5, 1.0+0.05*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.75, 1.0+0.01*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.95, 1.0+0.01*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.99, 1.0+0.005*max, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.999, 1.0+0.001*max, t)
+}
+
+func TestNonSequentialInsertion(t *testing.T) {
+ tdigest := New(10)
+
+ // Not quite a uniform distribution, but close.
+ data := make([]float64, 1000)
+ for i := 0; i < len(data); i++ {
+ tmp := (i * 1627) % len(data)
+ data[i] = float64(tmp)
+ }
+
+ sorted := make([]float64, 0, len(data))
+
+ for i := 0; i < len(data); i++ {
+ tdigest.Add(data[i], 1)
+ sorted = append(sorted, data[i])
+
+ // Estimated quantiles are all over the place for low counts, which is
+ // OK given that something like P99 is not very meaningful when there are
+ // 25 samples. To account for this, increase the error tolerance for
+ // smaller counts.
+ if i == 0 {
+ continue
+ }
+
+ max := float64(len(data))
+ fac := 1.0 + max/float64(i)
+
+ sort.Float64s(sorted)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.001, fac+0.001*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.01, fac+0.005*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.05, fac+0.01*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.25, fac+0.01*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.5, fac+0.02*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.75, fac+0.01*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.95, fac+0.01*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.99, fac+0.005*max, t)
+ assertDifferenceFromQuantile(sorted, tdigest, 0.999, fac+0.001*max, t)
+ }
+}
+
+func TestWeights(t *testing.T) {
+ tdigest := New(10)
+
+ // Create data slice with repeats matching weights we gave to tdigest
+ data := []float64{}
+ for i := 0; i < 100; i++ {
+ tdigest.Add(float64(i), uint32(i))
+
+ for j := 0; j < i; j++ {
+ data = append(data, float64(i))
+ }
+ }
+
+ assertDifferenceFromQuantile(data, tdigest, 0.001, 1.0+0.001*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.01, 1.0+0.005*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.05, 1.0+0.01*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.25, 1.0+0.01*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.5, 1.0+0.02*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.75, 1.0+0.01*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.95, 1.0+0.01*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.99, 1.0+0.005*100.0, t)
+ assertDifferenceFromQuantile(data, tdigest, 0.999, 1.0+0.001*100.0, t)
+}
+
func TestIntegers(t *testing.T) {
- t.Parallel()
tdigest := New(100)

tdigest.Add(1, 1)
@@ -144,13 +264,10
}

index := q * (float64(len(data)) - 1)
-
return data[int(index)+1]*(index-float64(int(index))) + data[int(index)]*(float64(int(index)+1)-index)
}

func TestMerge(t *testing.T) {
- t.Parallel()
-
if testing.Short() {
t.Skipf("Skipping merge test. Short flag is on")
}