merge upstream
- Id
- 3af59afc9af5ba387c6a7231aa301fb53a491981
- Author
- Ian Wilkes
- Commit time
- 2019-10-29T15:02:17-07:00
Modified README.md
# T-Digest
-A map-reduce and parallel streaming friendly data-structure for accurate
+A fast map-reduce and parallel streaming friendly data-structure for accurate
quantile approximation.
-This package provides a very crude implementation of Ted Dunning's t-digest
-data structure in Go.
+This package provides an implementation of Ted Dunning's t-digest data
+structure in Go.
-[](https://travis-ci.org/caio/go-tdigest)
[](http://godoc.org/github.com/caio/go-tdigest)
-[](http://gocover.io/github.com/caio/go-tdigest)
[](https://goreportcard.com/report/github.com/caio/go-tdigest)
+
+## Project Status
+
+This project is actively maintained. We are happy to collaborate on features
+and issues if/when they arrive.
## Installation
+Our releases are tagged and signed following the [Semantic Versioning][semver]
+scheme. If you are using a dependency manager such as [dep][], the recommended
+way to is go about your business normally:
+
go get github.com/caio/go-tdigest
-## Usage
+Otherwise we recommend to use the following so that you don't risk breaking
+your build because of an API change:
- package main
+ go get gopkg.in/caio/go-tdigest.v2
- import (
- "fmt"
- "math/rand"
+[semver]: http://semver.org/
+[dep]: https://github.com/golang/dep
- "github.com/caio/go-tdigest"
- )
+## Example Usage
- func main() {
- var t = tdigest.New(100)
+```go
+package main
- for i := 0; i < 10000; i++ {
- t.Add(rand.Float64(), 1)
- }
+import (
+ "fmt"
+ "math/rand"
- fmt.Printf("p(.5) = %.6f\n", t.Quantile(0.5))
- }
+ "github.com/caio/go-tdigest"
+)
-## Disclaimer
+func main() {
+ // Analogue to tdigest.New(tdigest.Compression(100))
+ t, _ := tdigest.New()
-I've written this solely with the purpose of understanding how the
-data-structure works, it hasn't been throughly verified nor battle tested
-in a production environment.
+ for i := 0; i < 10000; i++ {
+ // Analogue to t.AddWeighted(rand.Float64(), 1)
+ t.Add(rand.Float64())
+ }
+
+ fmt.Printf("p(.5) = %.6f\n", t.Quantile(0.5))
+ fmt.Printf("CDF(Quantile(.5)) = %.6f\n", t.CDF(t.Quantile(0.5)))
+}
+```
+
+## Configuration
+
+You can configure your digest upon creation with options documented
+at [options.go](options.go). Example:
+
+```go
+// Construct a digest with compression=200 and its own
+// (thread-unsafe) RNG seeded with 0xCA10:
+digest, _ := tdigest.New(
+ tdigest.Compression(200),
+ tdigest.LocalRandomNumberGenerator(0xCA10),
+)
+```
+
+## Porting Existing Code to the v2 API
+
+It's very easy to migrate to the new API:
+
+- Replace `tdigest.New(100)` with `tdigest.New()`
+- Replace `tdigest.New(number)` with `tdigest.New(tdigest.Compression(number))`
+- Replace `Add(x,1)` with `Add(x)`
+- Replace `Add(x, weight)` with `AddWeighted(x, weight)`
+- Remove any use of `tdigest.Len()` (or [open an issue][issues])
+
+[issues]: https://github.com/caio/go-tdigest/issues/new
## References
-This is a very simple port of the [reference][1] implementation with some
-ideas borrowed from the [python version][2]. If you wanna get a quick grasp of
-how it works and why it's useful, [this video and companion article is pretty
-helpful][3].
+This is a port of the [reference][1] implementation with some ideas borrowed
+from the [python version][2]. If you wanna get a quick grasp of how it works
+and why it's useful, [this video and companion article is pretty helpful][3].
[1]: https://github.com/tdunning/t-digest
[2]: https://github.com/CamDavidsonPilon/tdigest
Modified serialization.go
}
var x float64
- t.summary.Iterate(func(item centroid) bool {
- delta := item.mean - x
- x = item.mean
+ t.summary.ForEach(func(mean float64, count uint64) bool {
+ delta := mean - x
+ x = mean
err = binary.Write(buffer, endianess, float32(delta))
return err == nil
return nil, err
}
- t.summary.Iterate(func(item centroid) bool {
- err = encodeUint(buffer, item.count)
+ t.summary.ForEach(func(mean float64, count uint64) bool {
+ err = encodeUint(buffer, count)
return err == nil
})
if err != nil {
// ToBytes serializes into the supplied slice, avoiding allocation if the slice
// is large enough. The result slice is returned.
func (t *TDigest) ToBytes(b []byte) []byte {
- requiredSize := 16 + (4 * len(t.summary.keys)) + (len(t.summary.counts) * binary.MaxVarintLen64)
+ requiredSize := 16 + (4 * len(t.summary.means)) + (len(t.summary.counts) * binary.MaxVarintLen64)
if cap(b) < requiredSize {
b = make([]byte, requiredSize)
// we'll return it with the actual encoded length.
b = b[:cap(b)]
- endianess.PutUint32(b[0:], uint32(smallEncoding))
- endianess.PutUint64(b[4:], math.Float64bits(t.compression))
- endianess.PutUint32(b[12:], uint32(t.summary.Len()))
+ endianess.PutUint32(b[0:4], uint32(smallEncoding))
+ endianess.PutUint64(b[4:12], math.Float64bits(t.compression))
+ endianess.PutUint32(b[12:16], uint32(t.summary.Len()))
var x float64
idx := 16
- for _, mean := range t.summary.keys {
+ for _, mean := range t.summary.means {
delta := mean - x
x = mean
endianess.PutUint32(b[idx:], math.Float32bits(float32(delta)))
// FromBytes reads a byte buffer with a serialized digest (from AsBytes)
// and deserializes it.
-func FromBytes(buf *bytes.Reader) (*TDigest, error) {
+//
+// This function creates a new tdigest instance with the provided options,
+// but ignores the compression setting since the correct value comes
+// from the buffer.
+func FromBytes(buf *bytes.Reader, options ...tdigestOption) (*TDigest, error) {
var encoding int32
err := binary.Read(buf, endianess, &encoding)
if err != nil {
return nil, fmt.Errorf("Unsupported encoding version: %d", encoding)
}
+ t, err := newWithoutSummary(options...)
+
+ if err != nil {
+ return nil, err
+ }
+
var compression float64
err = binary.Read(buf, endianess, &compression)
if err != nil {
return nil, err
}
- t := New(compression)
+ t.compression = compression
var numCentroids int32
err = binary.Read(buf, endianess, &numCentroids)
return nil, errors.New("bad number of centroids in serialization")
}
- means := make([]float64, numCentroids)
- var delta float32
+ t.summary = newSummary(int(numCentroids))
+ t.summary.means = t.summary.means[:numCentroids]
+ t.summary.counts = t.summary.counts[:numCentroids]
+
var x float64
for i := 0; i < int(numCentroids); i++ {
+ var delta float32
err = binary.Read(buf, endianess, &delta)
if err != nil {
return nil, err
}
x += float64(delta)
- means[i] = x
+ t.summary.means[i] = x
}
for i := 0; i < int(numCentroids); i++ {
- decUint, err := decodeUint(buf)
+ count, err := decodeUint(buf)
if err != nil {
return nil, err
}
-
- t.Add(means[i], decUint)
+ t.summary.counts[i] = count
+ t.count += count
}
return t, nil
}
-// FromBytes deserializes into the supplied TDigest struct, re-using and
-// overwriting any existing buffers.
+// FromBytes deserializes into the supplied TDigest struct, re-using
+// and overwriting any existing buffers.
+//
+// This method reinitializes the digest from the provided buffer
+// discarding any previously collected data. Notice that in case
+// of errors this may leave the digest in a unusable state.
func (t *TDigest) FromBytes(buf []byte) error {
if len(buf) < 16 {
return errors.New("buffer too small for deserialization")
}
- encoding := int32(endianess.Uint32(buf[0:]))
+ encoding := int32(endianess.Uint32(buf))
if encoding != smallEncoding {
return fmt.Errorf("unsupported encoding version: %d", encoding)
}
- compression := math.Float64frombits(endianess.Uint64(buf[4:]))
- numCentroids := int(endianess.Uint32(buf[12:]))
+ compression := math.Float64frombits(endianess.Uint64(buf[4:12]))
+ numCentroids := int(endianess.Uint32(buf[12:16]))
if numCentroids < 0 || numCentroids > 1<<22 {
return errors.New("bad number of centroids in serialization")
}
t.count = 0
t.compression = compression
- if t.summary == nil || cap(t.summary.keys) < numCentroids || cap(t.summary.counts) < numCentroids {
- t.summary = newSummary(uint(numCentroids))
+ if t.summary == nil ||
+ cap(t.summary.means) < numCentroids ||
+ cap(t.summary.counts) < numCentroids {
+ t.summary = newSummary(numCentroids)
}
- t.summary.keys = t.summary.keys[:numCentroids]
+ t.summary.means = t.summary.means[:numCentroids]
t.summary.counts = t.summary.counts[:numCentroids]
idx := 16
- var delta float32
var x float64
- for i := 0; i < int(numCentroids); i++ {
- delta = math.Float32frombits(endianess.Uint32(buf[idx:]))
+ for i := 0; i < numCentroids; i++ {
+ delta := math.Float32frombits(endianess.Uint32(buf[idx:]))
idx += 4
x += float64(delta)
- t.summary.keys[i] = x
+ t.summary.means[i] = x
}
- for i := 0; i < int(numCentroids); i++ {
+ for i := 0; i < numCentroids; i++ {
count, read := binary.Uvarint(buf[idx:])
if read < 1 {
return errors.New("error decoding varint, this TDigest is now invalid")
t.count += count
}
+ if idx != len(buf) {
+ return errors.New("buffer has unread data")
+ }
return nil
}
l := binary.PutUvarint(b[:], n)
- buf.Write(b[:l])
+ _, err := buf.Write(b[:l])
- return nil
+ return err
}
func decodeUint(buf *bytes.Reader) (uint64, error) {
Modified serialization_test.go
buf := new(bytes.Buffer)
for _, i := range testUints {
- encodeUint(buf, i)
+ err := encodeUint(buf, i)
+ if err != nil {
+ t.Error(err)
+ }
}
readBuf := bytes.NewReader(buf.Bytes())
for _, i := range testUints {
- j, _ := decodeUint(readBuf)
+ j, err := decodeUint(readBuf)
+ if err != nil {
+ t.Error(err)
+ }
if i != j {
t.Errorf("Basic encode/decode failed. Got %d, wanted %d", j, i)
}
func TestSerialization(t *testing.T) {
- // NOTE Using a high compression value and adding few items
- // so we don't end up compressing automatically
- t1 := New(100)
+ t1, _ := New()
for i := 0; i < 100; i++ {
- t1.Add(rand.Float64(), 1)
+ _ = t1.Add(rand.Float64())
}
serialized, _ := t1.AsBytes()
- t2, _ := FromBytes(bytes.NewReader(serialized))
-
- if t1.count != t2.count || t1.summary.Len() != t2.summary.Len() || t1.compression != t2.compression {
- t.Errorf("Deserialized to something different. t1=%v t2=%v serialized=%v", t1, t2, serialized)
+ t2, err := FromBytes(bytes.NewReader(serialized))
+ if err != nil {
+ t.Fatal(err)
}
+ assertSerialization(t, t1, t2)
+
+ err = t2.FromBytes(serialized)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assertSerialization(t, t1, t2)
var toBuf []byte
toBuf = t1.ToBytes(toBuf)
t.Errorf("ToBytes serialized to something else")
}
- t3 := &TDigest{}
- err := t3.FromBytes(serialized)
+ t3, _ := New()
+ err = t3.FromBytes(serialized)
if err != nil {
t.Error(err)
}
- if t1.count != t3.count || t1.summary.Len() != t3.summary.Len() || t1.compression != t3.compression {
- t.Errorf("Deserialized to something different. t1=%v t3=%v serialized=%v", t1, t3, serialized)
- }
- if !reflect.DeepEqual(t2, t3) {
- t.Errorf("FromBytes method deserialized to something different from FromBytes function")
- }
+
+ assertSerialization(t, t1, t3)
// Mess up t3's internal state, deserialize again.
t3.compression = 2
t3.count = 1000
- t3.summary.keys = append(t3.summary.keys, 2.0)
+ t3.summary.means = append(t3.summary.means, 2.0)
t3.summary.counts[0] = 0
err = t3.FromBytes(serialized)
if err != nil {
t.Error(err)
}
- if !reflect.DeepEqual(t2, t3) {
- t.Errorf("FromBytes method deserialized to something different from FromBytes function")
- }
+
+ assertSerialization(t, t1, t3)
wrong := serialized[:50]
err = t3.FromBytes(wrong)
}
}
+func assertSerialization(t *testing.T, t1, t2 *TDigest) {
+ if t1.Count() != t2.Count() ||
+ t1.summary.Len() != t2.summary.Len() ||
+ t1.compression != t2.compression {
+ t.Errorf("Deserialized to something different. t1=%v t2=%v", t1, t2)
+ }
+
+ b1, err := t1.AsBytes()
+ if err != nil {
+ t.Error(err)
+ }
+
+ b2, err := t2.AsBytes()
+ if err != nil {
+ t.Error(err)
+ }
+
+ if !bytes.Equal(b1, b2) {
+ t.Errorf("Deserialized to something different. b1=%q b2=%q", b1, b2)
+ }
+
+ // t2 is fully functional.
+
+ err = t2.Add(rand.Float64())
+ if err != nil {
+ t.Error(err)
+ }
+
+ err = t2.Compress()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFromBytesIgnoresCompression(t *testing.T) {
+ digest := uncheckedNew(Compression(42))
+
+ // Instructing FromBytes to use a compression different
+ // than the one in the payload should be ignored
+ payload, err := digest.AsBytes()
+
+ if err != nil {
+ t.Error(err)
+ }
+
+ other, err := FromBytes(bytes.NewReader(payload), Compression(100))
+
+ if err != nil {
+ t.Error(err)
+ }
+
+ if other.Compression() != 42 {
+ t.Errorf("Expected compression to be 42, got %f", other.Compression())
+ }
+}
+
func TestLargeSerializaton(t *testing.T) {
- t1 := New(10)
+ t1, err := New(Compression(10))
+ if err != nil {
+ t.Error(err)
+ }
for i := 0; i < 100000; i++ {
- t1.Add(rand.Float64(), 1)
+ t1.AddWeighted(rand.Float64(), 1000000000)
}
serialized, _ := t1.AsBytes()
t.Error(err)
}
- var t3 TDigest
+ t3, _ := New()
err = t3.FromBytes(serialized2)
if err != nil {
t.Error(err)
}
- if t1.count != t2.count || t1.summary.Len() != t2.summary.Len() || t1.compression != t2.compression {
- t.Errorf("Deserialized to something different. t1=%v t2=%v serialized=%v", t1, t2, serialized)
- }
- if t1.count != t3.count || t1.summary.Len() != t3.summary.Len() || t1.compression != t3.compression {
- t.Errorf("Deserialized to something different. t1=%v t3=%v serialized=%v", t1, t3, serialized)
- }
+
+ assertSerialization(t, t1, t2)
+ assertSerialization(t, t1, t3)
}
func TestJavaSmallBytesCompat(t *testing.T) {
t.Fatalf(err.Error())
}
- if tdigest.count != 100000 {
- t.Fatalf("Expected deserialized t-digest to have a count of 100_000. Got %d", tdigest.count)
+ if tdigest.Count() != 100000 {
+ t.Fatalf("Expected deserialized t-digest to have a count of 100_000. Got %d", tdigest.Count())
}
assertDifferenceSmallerThan(tdigest, 0.5, 0.02, t)
func BenchmarkAsBytes(b *testing.B) {
b.ReportAllocs()
- t1 := New(100)
+ t1, _ := New(Compression(100))
for i := 0; i < 100; i++ {
- t1.Add(rand.Float64(), 1)
+ t1.Add(rand.Float64())
}
b.ResetTimer()
func BenchmarkToBytes(b *testing.B) {
b.ReportAllocs()
- t1 := New(100)
+ t1, _ := New(Compression(100))
for i := 0; i < 100; i++ {
- t1.Add(rand.Float64(), 1)
+ t1.Add(rand.Float64())
}
b.ResetTimer()
func BenchmarkFromBytes(b *testing.B) {
b.ReportAllocs()
- t1 := New(100)
+ t1, _ := New(Compression(100))
for i := 0; i < 100; i++ {
- t1.Add(rand.Float64(), 1)
+ t1.Add(rand.Float64())
}
buf, _ := t1.AsBytes()
func BenchmarkFromBytesMethod(b *testing.B) {
b.ReportAllocs()
- t1 := New(100)
+ t1, _ := New(Compression(100))
for i := 0; i < 100; i++ {
- t1.Add(rand.Float64(), 1)
+ t1.Add(rand.Float64())
}
buf, _ := t1.AsBytes()
Modified summary.go
package tdigest
import (
"fmt"
"math"
- "math/rand"
"sort"
)
-type centroid struct {
- mean float64
- count uint64
- index int
-}
-
-func (c centroid) isValid() bool {
- return !math.IsNaN(c.mean) && c.count > 0
-}
-
-func (c *centroid) Update(x float64, weight uint64) {
- c.count += weight
- c.mean += float64(weight) * (x - c.mean) / float64(c.count)
-}
-
-var invalidCentroid = centroid{mean: math.NaN(), count: 0}
-
type summary struct {
- keys []float64
+ means []float64
counts []uint64
}
-func newSummary(initialCapacity uint) *summary {
- return &summary{
- keys: make([]float64, 0, initialCapacity),
+func newSummary(initialCapacity int) *summary {
+ s := &summary{
+ means: make([]float64, 0, initialCapacity),
counts: make([]uint64, 0, initialCapacity),
}
+ return s
}
-func (s summary) Len() int {
- return len(s.keys)
+func (s *summary) Len() int {
+ return len(s.means)
}
func (s *summary) Add(key float64, value uint64) error {
-
if math.IsNaN(key) {
return fmt.Errorf("Key must not be NaN")
}
-
if value == 0 {
return fmt.Errorf("Count must be >0")
}
- idx := s.FindIndex(key)
+ idx := s.findInsertionIndex(key)
- if s.meanAtIndexIs(idx, key) {
- s.updateAt(idx, key, value)
- return nil
- }
-
- s.keys = append(s.keys, math.NaN())
+ s.means = append(s.means, math.NaN())
s.counts = append(s.counts, 0)
- copy(s.keys[idx+1:], s.keys[idx:])
+ copy(s.means[idx+1:], s.means[idx:])
copy(s.counts[idx+1:], s.counts[idx:])
- s.keys[idx] = key
+ s.means[idx] = key
s.counts[idx] = value
return nil
}
-func (s summary) Find(x float64) centroid {
- idx := s.FindIndex(x)
-
- if idx < s.Len() && s.keys[idx] == x {
- return centroid{x, s.counts[idx], idx}
- }
-
- return invalidCentroid
-}
-
-func (s summary) FindIndex(x float64) int {
+// Always insert to the right
+func (s *summary) findInsertionIndex(x float64) int {
// Binary search is only worthwhile if we have a lot of keys.
- if len(s.keys) < 250 {
- for i, item := range s.keys {
- if item >= x {
+ if len(s.means) < 250 {
+ for i, mean := range s.means {
+ if mean > x {
return i
}
}
- return len(s.keys)
+ return len(s.means)
}
- return sort.SearchFloat64s(s.keys, x)
-}
-
-func (s summary) At(index int) centroid {
- if s.Len()-1 < index || index < 0 {
- return invalidCentroid
- }
-
- return centroid{s.keys[index], s.counts[index], index}
-}
-
-func (s summary) Iterate(f func(c centroid) bool) {
- for i := 0; i < s.Len(); i++ {
- if !f(centroid{s.keys[i], s.counts[i], i}) {
- break
- }
- }
-}
-
-func (s summary) Min() centroid {
- return s.At(0)
-}
-
-func (s summary) Max() centroid {
- return s.At(s.Len() - 1)
-}
-
-func (s summary) successorAndPredecessorItems(mean float64) (centroid, centroid) {
- idx := s.FindIndex(mean)
- return s.At(idx + 1), s.At(idx - 1)
-}
-
-func (s summary) ceilingAndFloorItems(mean float64) (centroid, centroid) {
- idx := s.FindIndex(mean)
-
- // Case 1: item is greater than all items in the summary
- if idx == s.Len() {
- return invalidCentroid, s.Max()
- }
-
- item := s.At(idx)
-
- // Case 2: item exists in the summary
- if item.isValid() && mean == item.mean {
- return item, item
- }
-
- // Case 3: item is smaller than all items in the summary
- if idx == 0 {
- return s.Min(), invalidCentroid
- }
-
- return item, s.At(idx - 1)
+ return sort.Search(len(s.means), func(i int) bool {
+ return s.means[i] > x
+ })
}
// This method is the hotspot when calling Add(), which in turn is called by
-// Compress() and Merge(). A simple loop unroll saves a surprising amount of
-// time.
-func (s summary) sumUntilIndex(idx int) uint64 {
- var cumSum uint64
- var i int
- for i = idx - 1; i >= 3; i -= 4 {
- cumSum += s.counts[i]
- cumSum += s.counts[i-1]
- cumSum += s.counts[i-2]
- cumSum += s.counts[i-3]
- }
- for ; i >= 0; i-- {
- cumSum += s.counts[i]
- }
-
- return cumSum
+// Compress() and Merge().
+func (s *summary) HeadSum(idx int) (sum float64) {
+ return float64(sumUntilIndex(s.counts, idx))
}
-func (s *summary) updateAt(index int, mean float64, count uint64) {
- c := centroid{s.keys[index], s.counts[index], index}
- c.Update(mean, count)
+func (s *summary) Floor(x float64) int {
+ return s.findIndex(x) - 1
+}
- oldMean := s.keys[index]
- s.keys[index] = c.mean
- s.counts[index] = c.count
-
- if c.mean > oldMean {
- s.adjustRight(index)
- } else if c.mean < oldMean {
- s.adjustLeft(index)
+func (s *summary) findIndex(x float64) int {
+ // Binary search is only worthwhile if we have a lot of keys.
+ if len(s.means) < 250 {
+ for i, mean := range s.means {
+ if mean >= x {
+ return i
+ }
+ }
+ return len(s.means)
}
+
+ return sort.Search(len(s.means), func(i int) bool {
+ return s.means[i] >= x
+ })
+}
+
+func (s *summary) Mean(uncheckedIndex int) float64 {
+ return s.means[uncheckedIndex]
+}
+
+func (s *summary) Count(uncheckedIndex int) uint64 {
+ return s.counts[uncheckedIndex]
+}
+
+// return the index of the last item which the sum of counts
+// of items before it is less than or equal to `sum`. -1 in
+// case no centroid satisfies the requirement.
+// Since it's cheap, this also returns the `HeadSum` until
+// the found index (i.e. cumSum = HeadSum(FloorSum(x)))
+func (s *summary) FloorSum(sum float64) (index int, cumSum float64) {
+ index = -1
+ for i, count := range s.counts {
+ if cumSum <= sum {
+ index = i
+ } else {
+ break
+ }
+ cumSum += float64(count)
+ }
+ if index != -1 {
+ cumSum -= float64(s.counts[index])
+ }
+ return index, cumSum
+}
+
+func (s *summary) setAt(index int, mean float64, count uint64) {
+ s.means[index] = mean
+ s.counts[index] = count
+ s.adjustRight(index)
+ s.adjustLeft(index)
}
func (s *summary) adjustRight(index int) {
- for i := index + 1; i < len(s.keys) && s.keys[i-1] > s.keys[i]; i++ {
- s.keys[i-1], s.keys[i] = s.keys[i], s.keys[i-1]
+ for i := index + 1; i < len(s.means) && s.means[i-1] > s.means[i]; i++ {
+ s.means[i-1], s.means[i] = s.means[i], s.means[i-1]
s.counts[i-1], s.counts[i] = s.counts[i], s.counts[i-1]
}
}
func (s *summary) adjustLeft(index int) {
- for i := index - 1; i >= 0 && s.keys[i] > s.keys[i+1]; i-- {
- s.keys[i], s.keys[i+1] = s.keys[i+1], s.keys[i]
+ for i := index - 1; i >= 0 && s.means[i] > s.means[i+1]; i-- {
+ s.means[i], s.means[i+1] = s.means[i+1], s.means[i]
s.counts[i], s.counts[i+1] = s.counts[i+1], s.counts[i]
}
}
-func (s summary) meanAtIndexIs(index int, mean float64) bool {
- return index < len(s.keys) && s.keys[index] == mean
+func (s *summary) ForEach(f func(float64, uint64) bool) {
+ for i, mean := range s.means {
+ if !f(mean, s.counts[i]) {
+ break
+ }
+ }
+}
+
+func (s *summary) Perm(rng RNG, f func(float64, uint64) bool) {
+ for _, i := range perm(rng, s.Len()) {
+ if !f(s.means[i], s.counts[i]) {
+ break
+ }
+ }
+}
+
+func (s *summary) Clone() *summary {
+ return &summary{
+ means: append([]float64{}, s.means...),
+ counts: append([]uint64{}, s.counts...),
+ }
}
// Randomly shuffles summary contents, so they can be added to another summary
// with being pathological. Renders summary invalid.
-func (s *summary) shuffle() {
- for i := len(s.keys) - 1; i > 1; i-- {
- s.Swap(i, rand.Intn(i+1))
+func (s *summary) shuffle(rng RNG) {
+ for i := len(s.means) - 1; i > 1; i-- {
+ s.Swap(i, rng.Intn(i+1))
}
-}
-
-// Re-sorts summary, repairing the damage done by shuffle().
-func (s *summary) unshuffle() {
- sort.Sort(s)
}
// for sort.Interface
func (s *summary) Swap(i, j int) {
- s.keys[i], s.keys[j] = s.keys[j], s.keys[i]
+ s.means[i], s.means[j] = s.means[j], s.means[i]
s.counts[i], s.counts[j] = s.counts[j], s.counts[i]
}
func (s *summary) Less(i, j int) bool {
- return s.keys[i] < s.keys[j]
+ return s.means[i] < s.means[j]
+}
+
+// A simple loop unroll saves a surprising amount of time.
+func sumUntilIndex(s []uint64, idx int) uint64 {
+ var cumSum uint64
+ var i int
+ for i = idx - 1; i >= 3; i -= 4 {
+ cumSum += uint64(s[i])
+ cumSum += uint64(s[i-1])
+ cumSum += uint64(s[i-2])
+ cumSum += uint64(s[i-3])
+ }
+ for ; i >= 0; i-- {
+ cumSum += uint64(s[i])
+ }
+ return cumSum
+}
+
+func perm(rng RNG, n int) []int {
+ m := make([]int, n)
+ for i := 1; i < n; i++ {
+ j := rng.Intn(i + 1)
+ m[i] = m[j]
+ m[j] = i
+ }
+ return m
}
Modified summary_test.go
func TestBasics(t *testing.T) {
s := newSummary(2)
- for _, n := range []float64{12, 13, 14, 15} {
- item := s.Find(n)
-
- if item.isValid() {
- t.Errorf("Found something for non existing key %.0f: %v", n, item)
- }
- }
-
err := s.Add(1, 1)
if err != nil {
}
func checkSorted(s *summary, t *testing.T) {
- if !sort.Float64sAreSorted(s.keys) {
- t.Fatalf("Keys are not sorted! %v", s.keys)
+ if !sort.Float64sAreSorted(s.means) {
+ t.Fatalf("Keys are not sorted! %v", s.means)
}
}
t.Errorf("Initial size should be zero regardless of capacity. Got %d", s.Len())
}
+ // construct a summary made of unique items only
for i := 0; i < maxDataSize; i++ {
k := rand.Float64()
v := rand.Uint64()
- err := s.Add(k, v)
-
- if err != nil {
- _, exists := testData[k]
- if !exists {
- t.Errorf("Failed to insert %.2f even though it doesn't exist yet", k)
- }
+ _, exists := testData[k]
+ if !exists {
+ _ = s.Add(k, v)
+ testData[k] = v
}
-
- testData[k] = v
}
checkSorted(s, t)
}
for k, v := range testData {
- c := s.Find(k)
- if !c.isValid() || c.count != v {
- t.Errorf("Find(%.0f) returned %d, expected %d", k, c.count, v)
+ i := s.findIndex(k)
+
+ if i == s.Len() {
+ t.Errorf("Couldn't find previously added key on summary")
+ continue
+ }
+
+ if s.means[i] != k || s.counts[i] != v {
+ t.Errorf("Wanted to find {%.4f,%d}, but found {%.4f,%d} instead", k, v, s.means[i], s.counts[i])
}
}
}
-func TestGetAt(t *testing.T) {
- data := make(map[int]uint64)
- const maxDataSize = 1000
+func TestSetAtNeverBreaksSorting(t *testing.T) {
+ s := newSummary(10)
- s := newSummary(maxDataSize)
-
- c := s.At(0)
-
- if c.isValid() {
- t.Errorf("At() on an empty structure should give invalid data. Got %v", c)
+ for _, i := range []float64{10, 10, 10, 10, 10} {
+ _ = s.Add(i, 1)
}
- for i := 0; i < maxDataSize; i++ {
- data[i] = rand.Uint64()
- s.Add(float64(i), data[i])
- }
+ s.setAt(0, 30, 1)
+ checkSorted(s, t)
- for i, v := range data {
- c := s.At(i)
- if !c.isValid() || c.count != v {
- t.Errorf("At(%d) = %d. Should've been %d", i, c.count, v)
- }
- }
+ s.setAt(s.Len()-1, 0, 1)
+ checkSorted(s, t)
- c = s.At(s.Len())
+ s.setAt(3, 10.1, 1)
+ checkSorted(s, t)
- if c.isValid() {
- t.Errorf("At() past the slice length should give invalid data")
- }
+ s.setAt(3, 9.9, 1)
+ checkSorted(s, t)
- c = s.At(-10)
-
- if c.isValid() {
- t.Errorf("At() with negative index should give invalid data")
- }
}
-func TestIterate(t *testing.T) {
+func TestForEach(t *testing.T) {
s := newSummary(10)
for _, i := range []uint64{1, 2, 3, 4, 5, 6} {
- s.Add(float64(i), i*10)
+ _ = s.Add(float64(i), i*10)
}
c := 0
- s.Iterate(func(i centroid) bool {
+ s.ForEach(func(mean float64, count uint64) bool {
c++
return false
})
if c != 1 {
- t.Errorf("Iterate must exit early if the closure returns false")
+ t.Errorf("ForEach must exit early if the closure returns false")
}
var tot uint64
- s.Iterate(func(i centroid) bool {
- tot += i.count
+ s.ForEach(func(mean float64, count uint64) bool {
+ tot += count
return true
})
if tot != 210 {
- t.Errorf("Iterate must walk through the whole data if it always returns true")
+ t.Errorf("ForEach must walk through the whole data if it always returns true")
}
}
-func TestCeilingAndFloor(t *testing.T) {
+func TestFloorSum(t *testing.T) {
s := newSummary(100)
-
- ceil, floor := s.ceilingAndFloorItems(1)
-
- if ceil.isValid() || floor.isValid() {
- t.Errorf("Empty centroids must return invalid ceiling and floor items")
+ var total uint64
+ for i := 0; i < 100; i++ {
+ count := uint64(rand.Intn(10)) + 1
+ _ = s.Add(rand.Float64(), count)
+ total += count
}
- s.Add(0.4, 1)
-
- ceil, floor = s.ceilingAndFloorItems(0.3)
-
- if floor.isValid() || ceil.mean != 0.4 {
- t.Errorf("Expected to find a ceil and NOT find a floor. ceil=%v, floor=%v", ceil, floor)
+ idx, _ := s.FloorSum(-1)
+ if idx != -1 {
+ t.Errorf("Expected no centroid to satisfy -1 but got index=%d", idx)
}
- ceil, floor = s.ceilingAndFloorItems(0.5)
+ for i := float64(0); i < float64(total)+10; i++ {
+ node, _ := s.FloorSum(i)
+ if s.HeadSum(node) > i {
+ t.Errorf("headSum(%d)=%.0f (>%.0f)", node, s.HeadSum(node), i)
+ }
+ if node+1 < s.Len() && s.HeadSum(node+1) <= i {
+ t.Errorf("headSum(%d)=%.0f (>%.0f)", node+1, s.HeadSum(node+1), i)
+ }
+ }
+}
- if ceil.isValid() || floor.mean != 0.4 {
- t.Errorf("Expected to find a floor and NOT find a ceiling. ceil=%v, floor=%v", ceil, floor)
+func TestFloor(t *testing.T) {
+ s := newSummary(200)
+ for i := float64(0); i < 101; i++ {
+ _ = s.Add(i/2.0, 1)
}
- s.Add(0.1, 2)
-
- ceil, floor = s.ceilingAndFloorItems(0.2)
-
- if ceil.mean != 0.4 || floor.mean != 0.1 {
- t.Errorf("Expected to find a ceiling and a floor. ceil=%v, floor=%v", ceil, floor)
+ if s.Floor(-30) != -1 {
+ t.Errorf("Shouldn't have found a floor index. Got %d", s.Floor(-30))
}
- s.Add(0.21, 3)
-
- ceil, floor = s.ceilingAndFloorItems(0.2)
-
- if ceil.mean != 0.21 || floor.mean != 0.1 {
- t.Errorf("Ceil should've shrunk. ceil=%v, floor=%v", ceil, floor)
- }
-
- s.Add(0.1999, 1)
-
- ceil, floor = s.ceilingAndFloorItems(0.2)
-
- if ceil.mean != 0.21 || floor.mean != 0.1999 {
- t.Errorf("Floor should've shrunk. ceil=%v, floor=%v", ceil, floor)
- }
-
- ceil, floor = s.ceilingAndFloorItems(10)
-
- if ceil.isValid() {
- t.Errorf("Expected an invalid ceil. Got %v", ceil)
- }
-
- ceil, floor = s.ceilingAndFloorItems(0.0001)
-
- if floor.isValid() {
- t.Errorf("Expected an invalid floor. Got %v", floor)
- }
-
- m := float64(0.42)
- s.Add(m, 1)
- ceil, floor = s.ceilingAndFloorItems(m)
-
- if ceil.mean != m || floor.mean != m {
- t.Errorf("ceiling and floor of an existing item should be the item itself")
+ for i := 0; i < s.Len(); i++ {
+ m := s.means[i]
+ f := s.means[s.Floor(m+0.1)]
+ if m != f {
+ t.Errorf("Erm, %.4f != %.4f", m, f)
+ }
}
}
keys := []float64{1, 2, 3, 4, 9, 5, 6, 7, 8}
counts := []uint64{1, 2, 3, 4, 9, 5, 6, 7, 8}
- s := summary{keys: keys, counts: counts}
+ s := summary{means: keys, counts: counts}
s.adjustRight(4)
- if !sort.Float64sAreSorted(s.keys) || s.counts[4] != 5 {
- t.Errorf("adjustRight should have fixed the keys/counts state. %v %v", s.keys, s.counts)
+ if !sort.Float64sAreSorted(s.means) || s.counts[4] != 5 {
+ t.Errorf("adjustRight should have fixed the keys/counts state. %v %v", s.means, s.counts)
}
keys = []float64{1, 2, 3, 4, 0, 5, 6, 7, 8}
counts = []uint64{1, 2, 3, 4, 0, 5, 6, 7, 8}
- s = summary{keys: keys, counts: counts}
+ s = summary{means: keys, counts: counts}
s.adjustLeft(4)
- if !sort.Float64sAreSorted(s.keys) || s.counts[4] != 4 {
- t.Errorf("adjustLeft should have fixed the keys/counts state. %v %v", s.keys, s.counts)
+ if !sort.Float64sAreSorted(s.means) || s.counts[4] != 4 {
+ t.Errorf("adjustLeft should have fixed the keys/counts state. %v %v", s.means, s.counts)
}
}
Modified tdigest.go
// Package tdigest provides a highly accurate mergeable data-structure
// for quantile estimation.
-package tdigest
-
-import (
- "fmt"
- "math"
- "math/rand"
-)
-
-// TDigest is a quantile approximation data structure.
+//
// Typical T-Digest use cases involve accumulating metrics on several
// distinct nodes of a cluster and then merging them together to get
// a system-wide quantile overview. Things such as: sensory data from
// IoT devices, quantiles over enormous document datasets (think
// ElasticSearch), performance metrics for distributed systems, etc.
+//
+// After you create (and configure, if desired) the digest:
+// digest, err := tdigest.New(tdigest.Compression(100))
+//
+// You can then use it for registering measurements:
+// digest.Add(number)
+//
+// Estimating quantiles:
+// digest.Quantile(0.99)
+//
+// And merging with another digest:
+// digest.Merge(otherDigest)
+package tdigest
+
+import (
+ "fmt"
+ "math"
+)
+
+// TDigest is a quantile approximation data structure.
type TDigest struct {
summary *summary
compression float64
count uint64
+ rng RNG
}
// New creates a new digest.
-// The compression parameter rules the threshold in which samples are
-// merged together - the more often distinct samples are merged the more
-// precision is lost. Compression should be tuned according to your data
-// distribution, but a value of 100 is often good enough. A higher
-// compression value means holding more centroids in memory (thus: better
-// precision), which means a bigger serialization payload and higher
-// memory footprint.
-// Compression must be a value greater of equal to 1, will panic
-// otherwise.
-func New(compression float64) *TDigest {
- if compression < 1 {
- panic("Compression must be >= 1.0")
+//
+// By default the digest is constructed with a configuration that
+// should be useful for most use-cases. It comes with compression
+// set to 100 and uses a local random number generator for
+// performance reasons.
+func New(options ...tdigestOption) (*TDigest, error) {
+ tdigest, err := newWithoutSummary(options...)
+
+ if err != nil {
+ return nil, err
}
- return &TDigest{
- compression: compression,
- summary: newSummary(estimateCapacity(compression)),
+
+ tdigest.summary = newSummary(estimateCapacity(tdigest.compression))
+ return tdigest, nil
+}
+
+// Creates a tdigest instance without allocating a summary.
+func newWithoutSummary(options ...tdigestOption) (*TDigest, error) {
+ tdigest := &TDigest{
+ compression: 100,
count: 0,
+ rng: newLocalRNG(1),
}
+
+ for _, option := range options {
+ err := option(tdigest)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return tdigest, nil
+}
+
+func _quantile(index float64, previousIndex float64, nextIndex float64, previousMean float64, nextMean float64) float64 {
+ delta := nextIndex - previousIndex
+ previousWeight := (nextIndex - index) / delta
+ nextWeight := (index - previousIndex) / delta
+ return previousMean*previousWeight + nextMean*nextWeight
+}
+
+// Compression returns the TDigest compression.
+func (t *TDigest) Compression() float64 {
+ return t.compression
}
// Quantile returns the desired percentile estimation.
+//
// Values of p must be between 0 and 1 (inclusive), will panic otherwise.
func (t *TDigest) Quantile(q float64) float64 {
if q < 0 || q > 1 {
if t.summary.Len() == 0 {
return math.NaN()
} else if t.summary.Len() == 1 {
- return t.summary.Min().mean
+ return t.summary.Mean(0)
}
- q *= float64(t.count)
- var total float64
- i := 0
+ index := q * float64(t.count-1)
+ previousMean := math.NaN()
+ previousIndex := float64(0)
+ next, total := t.summary.FloorSum(index)
- found := false
- var result float64
+ if next > 0 {
+ previousMean = t.summary.Mean(next - 1)
+ previousIndex = total - float64(t.summary.Count(next-1)+1)/2
+ }
- t.summary.Iterate(func(item centroid) bool {
- k := float64(item.count)
-
- if q < total+k {
- if i == 0 || i+1 == t.summary.Len() {
- result = item.mean
- found = true
- return false
+ for {
+ nextIndex := total + float64(t.summary.Count(next)-1)/2
+ if nextIndex >= index {
+ if math.IsNaN(previousMean) {
+ // the index is before the 1st centroid
+ if nextIndex == previousIndex {
+ return t.summary.Mean(next)
+ }
+ // assume linear growth
+ nextIndex2 := total + float64(t.summary.Count(next)) + float64(t.summary.Count(next+1)-1)/2
+ previousMean = (nextIndex2*t.summary.Mean(next) - nextIndex*t.summary.Mean(next+1)) / (nextIndex2 - nextIndex)
}
- succ, pred := t.summary.successorAndPredecessorItems(item.mean)
- delta := (succ.mean - pred.mean) / 2
- result = item.mean + ((q-total)/k-0.5)*delta
- found = true
- return false
+ // common case: two centroids found, the result in in between
+ return _quantile(index, previousIndex, nextIndex, previousMean, t.summary.Mean(next))
+ } else if next+1 == t.summary.Len() {
+ // the index is after the last centroid
+ nextIndex2 := float64(t.count - 1)
+ nextMean2 := (t.summary.Mean(next)*(nextIndex2-previousIndex) - previousMean*(nextIndex2-nextIndex)) / (nextIndex - previousIndex)
+ return _quantile(index, nextIndex, nextIndex2, t.summary.Mean(next), nextMean2)
}
-
- i++
- total += k
- return true
- })
-
- if found {
- return result
+ total += float64(t.summary.Count(next))
+ previousMean = t.summary.Mean(next)
+ previousIndex = nextIndex
+ next++
}
- return t.summary.Max().mean
+ // unreachable
}
-// Add registers a new sample in the digest.
+// boundedWeightedAverage computes the weighted average of two
+// centroids guaranteeing that the result will be between x1 and x2,
+// inclusive.
+//
+// Refer to https://github.com/caio/go-tdigest/pull/19 for more details
+func boundedWeightedAverage(x1 float64, w1 float64, x2 float64, w2 float64) float64 {
+ if x1 > x2 {
+ x1, x2, w1, w2 = x2, x1, w2, w1
+ }
+ result := (x1*w1 + x2*w2) / (w1 + w2)
+ return math.Max(x1, math.Min(result, x2))
+}
+
+// AddWeighted registers a new sample in the digest.
+//
// It's the main entry point for the digest and very likely the only
// method to be used for collecting samples. The count parameter is for
// when you are registering a sample that occurred multiple times - the
// most common value for this is 1.
-func (t *TDigest) Add(value float64, count uint64) error {
-
+//
+// This will emit an error if `value` is NaN of if `count` is zero.
+func (t *TDigest) AddWeighted(value float64, count uint64) (err error) {
if count == 0 {
return fmt.Errorf("Illegal datapoint <value: %.4f, count: %d>", value, count)
}
if t.summary.Len() == 0 {
- t.summary.Add(value, count)
- t.count = count
- return nil
+ err = t.summary.Add(value, count)
+ t.count = uint64(count)
+ return err
}
- // Avoid allocation for our slice by using a local array here.
- ar := [2]centroid{}
- candidates := ar[:]
- candidates[0], candidates[1] = t.findNearestCentroids(value)
- if !candidates[1].isValid() {
- candidates = candidates[:1]
+ begin := t.summary.Floor(value)
+ if begin == -1 {
+ begin = 0
}
- for len(candidates) > 0 && count > 0 {
- j := 0
- if len(candidates) > 1 {
- j = rand.Intn(len(candidates))
+
+ begin, end := t.findNeighbors(begin, value)
+
+ closest := t.chooseMergeCandidate(begin, end, value, count)
+
+ if closest == t.summary.Len() {
+ err = t.summary.Add(value, count)
+ if err != nil {
+ return err
}
- chosen := candidates[j]
-
- quantile := t.computeCentroidQuantile(&chosen)
-
- if float64(chosen.count+count) > t.threshold(quantile) {
- candidates = append(candidates[:j], candidates[j+1:]...)
- continue
- }
-
- t.summary.updateAt(chosen.index, value, uint64(count))
- t.count += count
- count = 0
+ } else {
+ c := float64(t.summary.Count(closest))
+ newMean := boundedWeightedAverage(t.summary.Mean(closest), c, value, float64(count))
+ t.summary.setAt(closest, newMean, uint64(c)+count)
}
-
- if count > 0 {
- t.summary.Add(value, count)
- t.count += count
- }
+ t.count += uint64(count)
if float64(t.summary.Len()) > 20*t.compression {
- t.Compress()
+ err = t.Compress()
}
- return nil
+ return err
+}
+
+// Count returns the total number of samples this digest represents
+//
+// The result represents how many times Add() was called on a digest
+// plus how many samples the digests it has been merged with had.
+// This is useful mainly for two scenarios:
+//
+// - Knowing if there is enough data so you can trust the quantiles
+//
+// - Knowing if you've registered too many samples already and
+// deciding what to do about it.
+//
+// For the second case one approach would be to create a side empty
+// digest and start registering samples on it as well as on the old
+// (big) one and then discard the bigger one after a certain criterion
+// is reached (say, minimum number of samples or a small relative
+// error between new and old digests).
+func (t TDigest) Count() uint64 {
+ return t.count
+}
+
+// Add is an alias for AddWeighted(x,1)
+// Read the documentation for AddWeighted for more details.
+func (t *TDigest) Add(value float64) error {
+ return t.AddWeighted(value, 1)
}
// Compress tries to reduce the number of individual centroids stored
// in the digest.
+//
// Compression trades off accuracy for performance and happens
// automatically after a certain amount of distinct samples have been
// stored.
-func (t *TDigest) Compress() {
+//
+// At any point in time you may call Compress on a digest, but you
+// may completely ignore this and it will compress itself automatically
+// after it grows too much. If you are minimizing network traffic
+// it might be a good idea to compress before serializing.
+func (t *TDigest) Compress() (err error) {
if t.summary.Len() <= 1 {
- return
+ return nil
}
oldTree := t.summary
- oldTree.shuffle()
t.summary = newSummary(estimateCapacity(t.compression))
t.count = 0
- for i := range oldTree.keys {
- t.Add(oldTree.keys[i], oldTree.counts[i])
- }
+ oldTree.shuffle(t.rng)
+ oldTree.ForEach(func(mean float64, count uint64) bool {
+ err = t.AddWeighted(mean, count)
+ return err == nil
+ })
+ return err
}
// Merge joins a given digest into itself.
+//
// Merging is useful when you have multiple TDigest instances running
// in separate threads and you want to compute quantiles over all the
// samples. This is particularly important on a scatter-gather/map-reduce
// scenario.
-func (t *TDigest) Merge(other *TDigest) {
- t.MergeDestructive(other)
-
- other.summary.unshuffle()
-}
-
-// As Merge, above, but leaves other in a scrambled state
-func (t *TDigest) MergeDestructive(other *TDigest) {
+func (t *TDigest) Merge(other *TDigest) (err error) {
if other.summary.Len() == 0 {
- return
+ return nil
}
- other.summary.shuffle()
+ other.summary.Perm(t.rng, func(mean float64, count uint64) bool {
+ err = t.AddWeighted(mean, count)
+ return err == nil
+ })
+ return err
+}
- for i := range other.summary.keys {
- t.Add(other.summary.keys[i], other.summary.counts[i])
+// MergeDestructive joins a given digest into itself rendering
+// the other digest invalid.
+//
+// This works as Merge above but its faster. Using this method
+// requires caution as it makes 'other' useless - you must make
+// sure you discard it without making further uses of it.
+func (t *TDigest) MergeDestructive(other *TDigest) (err error) {
+ if other.summary.Len() == 0 {
+ return nil
+ }
+
+ other.summary.shuffle(t.rng)
+ other.summary.ForEach(func(mean float64, count uint64) bool {
+ err = t.AddWeighted(mean, count)
+ return err == nil
+ })
+ return err
+}
+
+// CDF computes the fraction in which all samples are less than
+// or equal to the given value.
+func (t *TDigest) CDF(value float64) float64 {
+ if t.summary.Len() == 0 {
+ return math.NaN()
+ } else if t.summary.Len() == 1 {
+ if value < t.summary.Mean(0) {
+ return 0
+ }
+ return 1
+ }
+
+ // We have at least 2 centroids
+ left := (t.summary.Mean(1) - t.summary.Mean(0)) / 2
+ right := left
+ tot := 0.0
+
+ for i := 1; i < t.summary.Len()-1; i++ {
+ prevMean := t.summary.Mean(i - 1)
+ if value < prevMean+right {
+ v := (tot + float64(t.summary.Count(i-1))*interpolate(value, prevMean-left, prevMean+right)) / float64(t.Count())
+ if v > 0 {
+ return v
+ }
+ return 0
+ }
+
+ tot += float64(t.summary.Count(i - 1))
+ left = right
+ right = (t.summary.Mean(i+1) - t.summary.Mean(i)) / 2
+ }
+
+ // last centroid, the summary length is at least two
+ aIdx := t.summary.Len() - 2
+ aMean := t.summary.Mean(aIdx)
+ if value < aMean+right {
+ aCount := float64(t.summary.Count(aIdx))
+ return (tot + aCount*interpolate(value, aMean-left, aMean+right)) / float64(t.Count())
+ }
+ return 1
+}
+
+// Clone returns a deep copy of a TDigest.
+func (t *TDigest) Clone() *TDigest {
+ return &TDigest{
+ summary: t.summary.Clone(),
+ compression: t.compression,
+ count: t.count,
+ rng: t.rng,
}
}
-// Len returns the number of centroids in the TDigest.
-func (t *TDigest) Len() int { return t.summary.Len() }
+func interpolate(x, x0, x1 float64) float64 {
+ return (x - x0) / (x1 - x0)
+}
// ForEachCentroid calls the specified function for each centroid.
+//
// Iteration stops when the supplied function returns false, or when all
// centroids have been iterated.
func (t *TDigest) ForEachCentroid(f func(mean float64, count uint64) bool) {
- s := t.summary
- for i := 0; i < s.Len(); i++ {
- if !f(s.keys[i], s.counts[i]) {
+ t.summary.ForEach(f)
+}
+
+func (t TDigest) findNeighbors(start int, value float64) (int, int) {
+ minDistance := math.MaxFloat64
+ lastNeighbor := t.summary.Len()
+ for neighbor := start; neighbor < t.summary.Len(); neighbor++ {
+ z := math.Abs(t.summary.Mean(neighbor) - value)
+ if z < minDistance {
+ start = neighbor
+ minDistance = z
+ } else if z > minDistance {
+ lastNeighbor = neighbor
break
}
}
+ return start, lastNeighbor
}
-func estimateCapacity(compression float64) uint {
- return uint(compression) * 10
+func (t TDigest) chooseMergeCandidate(begin, end int, value float64, count uint64) int {
+ closest := t.summary.Len()
+ sum := t.summary.HeadSum(begin)
+ var n float32
+
+ for neighbor := begin; neighbor != end; neighbor++ {
+ c := float64(t.summary.Count(neighbor))
+ var q float64
+ if t.count == 1 {
+ q = 0.5
+ } else {
+ q = (sum + (c-1)/2) / float64(t.count-1)
+ }
+ k := 4 * float64(t.count) * q * (1 - q) / t.compression
+
+ if c+float64(count) <= k {
+ n++
+ if t.rng.Float32() < 1/n {
+ closest = neighbor
+ }
+ }
+ sum += c
+ }
+ return closest
}
-func (t *TDigest) threshold(q float64) float64 {
- return (4 * float64(t.count) * q * (1 - q)) / t.compression
+// TrimmedMean returns the mean of the distribution between the two
+// percentiles p1 and p2.
+//
+// Values of p1 and p2 must be beetween 0 and 1 (inclusive) and p1
+// must be less than p2. Will panic otherwise.
+func (t *TDigest) TrimmedMean(p1, p2 float64) float64 {
+ if p1 < 0 || p1 > 1 {
+ panic("p1 must be between 0 and 1 (inclusive)")
+ }
+ if p2 < 0 || p2 > 1 {
+ panic("p2 must be between 0 and 1 (inclusive)")
+ }
+ if p1 >= p2 {
+ panic("p1 must be lower than p2")
+ }
+
+ minCount := p1 * float64(t.count)
+ maxCount := p2 * float64(t.count)
+
+ var trimmedSum, trimmedCount, currCount float64
+ for i, mean := range t.summary.means {
+ count := float64(t.summary.counts[i])
+
+ nextCount := currCount + count
+ if nextCount <= minCount {
+ currCount = nextCount
+ continue
+ }
+
+ if currCount < minCount {
+ count = nextCount - minCount
+ }
+ if nextCount > maxCount {
+ count -= nextCount - maxCount
+ }
+
+ trimmedSum += count * mean
+ trimmedCount += count
+
+ if nextCount >= maxCount {
+ break
+ }
+ currCount = nextCount
+ }
+
+ if trimmedCount == 0 {
+ return 0
+ }
+ return trimmedSum / trimmedCount
}
-func (t *TDigest) computeCentroidQuantile(c *centroid) float64 {
- cumSum := t.summary.sumUntilIndex(c.index)
- return (float64(c.count)/2.0 + float64(cumSum)) / float64(t.count)
-}
-
-func (t *TDigest) findNearestCentroids(mean float64) (centroid, centroid) {
- ceil, floor := t.summary.ceilingAndFloorItems(mean)
-
- if !ceil.isValid() && !floor.isValid() {
- panic("findNearestCentroids called on an empty tree")
- }
-
- if !ceil.isValid() {
- return floor, invalidCentroid
- }
-
- if !floor.isValid() {
- return ceil, invalidCentroid
- }
-
- if math.Abs(floor.mean-mean) < math.Abs(ceil.mean-mean) {
- return floor, invalidCentroid
- } else if math.Abs(floor.mean-mean) == math.Abs(ceil.mean-mean) && floor.mean != ceil.mean {
- return floor, ceil
- } else {
- return ceil, invalidCentroid
- }
+func estimateCapacity(compression float64) int {
+ return int(compression) * 10
}
Modified tdigest_test.go
package tdigest
import (
+ "fmt"
"math"
"math/rand"
- "reflect"
"sort"
"testing"
+
+ "github.com/leesper/go_rng"
+ "gonum.org/v1/gonum/stat"
)
+
+func init() {
+ rand.Seed(0xDEADBEE)
+}
+
+func uncheckedNew(options ...tdigestOption) *TDigest {
+ t, _ := New(options...)
+ return t
+}
// Test of tdigest internals and accuracy. Note no t.Parallel():
// during tests the default random seed is consistent, but varying
// for all tests. So, no test concurrency here.
func TestTInternals(t *testing.T) {
- tdigest := New(100)
+ tdigest := uncheckedNew()
if !math.IsNaN(tdigest.Quantile(0.1)) {
t.Errorf("Quantile() on an empty digest should return NaN. Got: %.4f", tdigest.Quantile(0.1))
}
- tdigest.Add(0.4, 1)
+ if !math.IsNaN(tdigest.CDF(1)) {
+ t.Errorf("CDF() on an empty digest should return NaN. Got: %.4f", tdigest.CDF(1))
+ }
+
+ _ = tdigest.Add(0.4)
if tdigest.Quantile(0.1) != 0.4 {
t.Errorf("Quantile() on a single-sample digest should return the samples's mean. Got %.4f", tdigest.Quantile(0.1))
}
- tdigest.Add(0.5, 1)
+ if tdigest.CDF(0.3) != 0 {
+ t.Errorf("CDF(x) on digest with a single centroid should return 0 if x < mean")
+ }
+
+ if tdigest.CDF(0.5) != 1 {
+ t.Errorf("CDF(x) on digest with a single centroid should return 1 if x >= mean")
+ }
+
+ _ = tdigest.Add(0.5)
if tdigest.summary.Len() != 2 {
t.Errorf("Expected size 2, got %d", tdigest.summary.Len())
}
- if tdigest.summary.Min().mean != 0.4 {
- t.Errorf("Min() returned an unexpected centroid: %v", tdigest.summary.Min())
- }
-
- if tdigest.summary.Max().mean != 0.5 {
- t.Errorf("Min() returned an unexpected centroid: %v", tdigest.summary.Min())
- }
-
- tdigest.Add(0.4, 2)
- tdigest.Add(0.4, 3)
-
- if tdigest.summary.Len() != 2 {
- t.Errorf("Adding centroids of same mean shouldn't change size")
- }
-
- y := tdigest.summary.Find(0.4)
-
- if y.count != 6 || y.mean != 0.4 {
- t.Errorf("Adding centroids with same mean should increment the count only. Got %v", y)
- }
-
- err := tdigest.Add(0, 0)
+ err := tdigest.AddWeighted(0, 0)
if err == nil {
- t.Errorf("Expected Add() to error out with input (0,0)")
+ t.Errorf("Expected AddWeighted() to error out with input (0,0)")
}
+}
- if tdigest.Quantile(0.9999999) != tdigest.summary.Max().mean {
- t.Errorf("High quantiles with little data should give out the MAX recorded mean")
+func closeEnough(a float64, b float64) bool {
+ const EPS = 0.000001
+ if (a-b < EPS) && (b-a < EPS) {
+ return true
}
-
- if tdigest.Quantile(0.0000001) != tdigest.summary.Min().mean {
- t.Errorf("Low quantiles with little data should give out the MIN recorded mean")
- }
+ return false
}
func assertDifferenceSmallerThan(tdigest *TDigest, p float64, m float64, t *testing.T) {
}
func TestUniformDistribution(t *testing.T) {
- tdigest := New(100)
+ tdigest := uncheckedNew()
- for i := 0; i < 10000; i++ {
- tdigest.Add(rand.Float64(), 1)
+ for i := 0; i < 100000; i++ {
+ _ = tdigest.Add(rand.Float64())
}
assertDifferenceSmallerThan(tdigest, 0.5, 0.02, t)
}
func TestSequentialInsertion(t *testing.T) {
- tdigest := New(10)
+ tdigest := uncheckedNew()
data := make([]float64, 10000)
for i := 0; i < len(data); i++ {
}
for i := 0; i < len(data); i++ {
- tdigest.Add(data[i], 1)
+ _ = tdigest.Add(data[i])
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.001, 1.0+0.001*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.01, 1.0+0.005*float64(i), t)
}
}
-func TestNonUniformDistribution(t *testing.T) {
- tdigest := New(10)
-
- // Not quite a uniform distribution, but close.
- data := make([]float64, 1000)
- for i := 0; i < 500; i++ {
- data[i] = 700.0 + rand.Float64()*100.0
- }
- for i := 500; i < 750; i++ {
- data[i] = 100.0 + rand.Float64()*100.0
- }
- for i := 750; i < 1000; i++ {
- data[i] = 600.0 + rand.Float64()*10.0
- }
-
- for i := 0; i < len(data); i++ {
- tdigest.Add(data[i], 1)
- }
-
- max := float64(len(data))
- sort.Float64s(data)
- assertDifferenceFromQuantile(data, tdigest, 0.001, 1.0+0.001*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.01, 1.0+0.005*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.05, 1.0+0.01*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.25, 1.0+0.01*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.5, 1.0+0.05*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.75, 1.0+0.01*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.95, 1.0+0.01*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.99, 1.0+0.005*max, t)
- assertDifferenceFromQuantile(data, tdigest, 0.999, 1.0+0.001*max, t)
-}
-
func TestNonSequentialInsertion(t *testing.T) {
- tdigest := New(10)
+ tdigest := uncheckedNew()
// Not quite a uniform distribution, but close.
data := make([]float64, 1000)
sorted := make([]float64, 0, len(data))
for i := 0; i < len(data); i++ {
- tdigest.Add(data[i], 1)
+ _ = tdigest.Add(data[i])
sorted = append(sorted, data[i])
// Estimated quantiles are all over the place for low counts, which is
}
}
+func TestSingletonInACrowd(t *testing.T) {
+ tdigest := uncheckedNew()
+ for i := 0; i < 10000; i++ {
+ _ = tdigest.Add(10)
+ }
+ _ = tdigest.Add(20)
+ _ = tdigest.Compress()
+
+ for _, q := range []float64{0, 0.5, 0.8, 0.9, 0.99, 0.999} {
+ if q == 0.999 {
+ // Test for 0.999 disabled since it doesn't
+ // pass in the reference implementation
+ continue
+ }
+ result := tdigest.Quantile(q)
+ if !closeEnough(result, 10) {
+ t.Errorf("Expected Quantile(%.3f) = 10, but got %.4f (size=%d)", q, result, tdigest.summary.Len())
+ }
+ }
+
+ result := tdigest.Quantile(1)
+ if result != 20 {
+ t.Errorf("Expected Quantile(1) = 20, but got %.4f (size=%d)", result, tdigest.summary.Len())
+ }
+}
+
+func TestRespectBounds(t *testing.T) {
+ tdigest := uncheckedNew(Compression(10))
+
+ data := []float64{0, 279, 2, 281}
+ for _, f := range data {
+ _ = tdigest.Add(f)
+ }
+
+ quantiles := []float64{0.01, 0.25, 0.5, 0.75, 0.999}
+ for _, q := range quantiles {
+ result := tdigest.Quantile(q)
+ if result < 0 {
+ t.Errorf("q(%.3f) = %.4f < 0", q, result)
+ }
+ if tdigest.Quantile(q) > 281 {
+ t.Errorf("q(%.3f) = %.4f > 281", q, result)
+ }
+ }
+}
+
func TestWeights(t *testing.T) {
- tdigest := New(10)
+ tdigest := uncheckedNew(Compression(10))
// Create data slice with repeats matching weights we gave to tdigest
data := []float64{}
for i := 0; i < 100; i++ {
- tdigest.Add(float64(i), uint64(i))
+ _ = tdigest.AddWeighted(float64(i), uint64(i))
for j := 0; j < i; j++ {
data = append(data, float64(i))
}
func TestIntegers(t *testing.T) {
- tdigest := New(100)
+ tdigest := uncheckedNew()
- tdigest.Add(1, 1)
- tdigest.Add(2, 1)
- tdigest.Add(3, 1)
+ _ = tdigest.Add(1)
+ _ = tdigest.Add(2)
+ _ = tdigest.Add(3)
if tdigest.Quantile(0.5) != 2 {
t.Errorf("Expected p(0.5) = 2, Got %.2f instead", tdigest.Quantile(0.5))
}
- tdigest = New(100)
+ tdigest = uncheckedNew()
for _, i := range []float64{1, 2, 2, 2, 2, 2, 2, 2, 3} {
- tdigest.Add(i, 1)
+ _ = tdigest.Add(i)
}
if tdigest.Quantile(0.5) != 2 {
}
var tot uint64
- tdigest.summary.Iterate(func(item centroid) bool {
- tot += item.count
+ tdigest.ForEachCentroid(func(mean float64, count uint64) bool {
+ tot += count
return true
})
if tot != 9 {
t.Errorf("Expected the centroid count to be 9, Got %d instead", tot)
}
+}
+
+func cdf(x float64, data []float64) float64 {
+ var n1, n2 int
+ for i := 0; i < len(data); i++ {
+ if data[i] < x {
+ n1++
+ }
+ if data[i] <= x {
+ n2++
+ }
+ }
+ return float64(n1+n2) / 2.0 / float64(len(data))
}
func quantile(q float64, data []float64) float64 {
return data[int(index)+1]*(index-float64(int(index))) + data[int(index)]*(float64(int(index)+1)-index)
}
-func TestMerge(t *testing.T) {
+func TestMergeNormal(t *testing.T) {
+ testMerge(t, false)
+}
+
+func TestMergeDescructive(t *testing.T) {
+ testMerge(t, true)
+}
+
+func testMerge(t *testing.T, destructive bool) {
if testing.Short() {
t.Skipf("Skipping merge test. Short flag is on")
}
- const numItems = 10000
- const numSubs = 5
+ const numItems = 100000
- data := make([]float64, numItems)
- var subs [numSubs]*TDigest
+ for _, numSubs := range []int{2, 5, 10, 20, 50, 100} {
+ data := make([]float64, numItems)
- dist1 := New(10)
-
- for i := 0; i < numSubs; i++ {
- subs[i] = New(10)
- }
-
- for i := 0; i < numItems; i++ {
- num := rand.Float64()
-
- data[i] = num
- dist1.Add(num, 1)
- for j := 0; j < numSubs; j++ {
- subs[j].Add(num, 1)
+ subs := make([]*TDigest, numSubs)
+ for i := 0; i < numSubs; i++ {
+ subs[i] = uncheckedNew()
}
- }
- subzeroSummary := &summary{
- keys: make([]float64, len(subs[0].summary.keys)),
- counts: make([]uint64, len(subs[0].summary.counts)),
- }
- copy(subzeroSummary.keys, subs[0].summary.keys)
- copy(subzeroSummary.counts, subs[0].summary.counts)
+ dist := uncheckedNew()
+ for i := 0; i < numItems; i++ {
+ num := rand.Float64()
- dist2 := New(10)
- for i := 0; i < numSubs; i++ {
- dist2.Merge(subs[i])
- }
-
- // Make sure merge didn't scramble the summaries
- if !reflect.DeepEqual(subs[0].summary, subzeroSummary) {
- t.Error("summary changed by being merged")
- }
-
- // Merge empty. Should be no-op
- dist2.Merge(New(10))
-
- sort.Float64s(data)
-
- for _, p := range []float64{0.001, 0.01, 0.1, 0.2, 0.3, 0.5} {
- q := quantile(p, data)
- p1 := dist1.Quantile(p)
- p2 := dist2.Quantile(p)
-
- e1 := math.Abs(p1 - q)
- e2 := math.Abs(p1 - q)
-
- if e2/p >= 0.3 {
- t.Errorf("Relative error for %f above threshold. q=%f p1=%f p2=%f e1=%f e2=%f", p, q, p1, p2, e1, e2)
+ data[i] = num
+ _ = dist.Add(num)
+ _ = subs[i%numSubs].Add(num)
}
- if e2 >= 0.015 {
- t.Errorf("Absolute error for %f above threshold. q=%f p1=%f p2=%f e1=%f e2=%f", p, q, p1, p2, e1, e2)
+
+ _ = dist.Compress()
+
+ dist2 := uncheckedNew()
+ for i := 0; i < numSubs; i++ {
+ if destructive {
+ _ = dist2.MergeDestructive(subs[i])
+ } else {
+ _ = dist2.Merge(subs[i])
+ }
+
+ }
+
+ if dist.Count() != dist2.Count() {
+ t.Errorf("Expected the number of centroids to be the same. %d != %d", dist.Count(), dist2.Count())
+ }
+
+ if dist2.Count() != numItems {
+ t.Errorf("Items shouldn't have disappeared. %d != %d", dist2.Count(), numItems)
+ }
+
+ sort.Float64s(data)
+
+ for _, q := range []float64{0.001, 0.01, 0.1, 0.2, 0.3, 0.5} {
+ z := quantile(q, data)
+ p1 := dist.Quantile(q)
+ p2 := dist2.Quantile(q)
+
+ e1 := p1 - z
+ e2 := p2 - z
+
+ if math.Abs(e2)/q >= 0.3 {
+ t.Errorf("rel >= 0.3: parts=%3d q=%.3f e1=%.4f e2=%.4f rel=%.3f real=%.3f",
+ numSubs, q, e1, e2, math.Abs(e2)/q, z-q)
+ }
+ if math.Abs(e2) >= 0.015 {
+ t.Errorf("e2 >= 0.015: parts=%3d q=%.3f e1=%.4f e2=%.4f rel=%.3f real=%.3f",
+ numSubs, q, e1, e2, math.Abs(e2)/q, z-q)
+ }
+
+ z = cdf(q, data)
+ e1 = dist.CDF(q) - z
+ e2 = dist2.CDF(q) - z
+
+ if math.Abs(e2)/q > 0.3 {
+ t.Errorf("CDF e2 < 0.015: parts=%3d q=%.3f e1=%.4f e2=%.4f rel=%.3f",
+ numSubs, q, e1, e2, math.Abs(e2)/q)
+ }
+
+ if math.Abs(e2) >= 0.015 {
+ t.Errorf("CDF e2 < 0.015: parts=%3d q=%.3f e1=%.4f e2=%.4f rel=%.3f",
+ numSubs, q, e1, e2, math.Abs(e2)/q)
+ }
}
}
}
func TestCompressDoesntChangeCount(t *testing.T) {
- tdigest := New(100)
+ tdigest := uncheckedNew()
for i := 0; i < 1000; i++ {
- tdigest.Add(rand.Float64(), 1)
+ _ = tdigest.Add(rand.Float64())
}
- initialCount := tdigest.count
+ initialCount := tdigest.Count()
- tdigest.Compress()
+ err := tdigest.Compress()
+ if err != nil {
+ t.Errorf("Compress() triggered an unexpected error: %s", err)
+ }
- if tdigest.count != initialCount {
- t.Errorf("Compress() should not change count. Wanted %d, got %d", initialCount, tdigest.count)
+ if tdigest.Count() != initialCount {
+ t.Errorf("Compress() should not change count. Wanted %d, got %d", initialCount, tdigest.Count())
+ }
+}
+
+func TestGammaDistribution(t *testing.T) {
+ const numItems = 100000
+
+ digest := uncheckedNew()
+ gammaRNG := rng.NewGammaGenerator(0xDEADBEE)
+
+ data := make([]float64, numItems)
+ for i := 0; i < numItems; i++ {
+ data[i] = gammaRNG.Gamma(0.1, 0.1)
+ _ = digest.Add(data[i])
+ }
+
+ sort.Float64s(data)
+
+ softErrors := 0
+ for _, q := range []float64{0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999} {
+
+ ix := float64(len(data))*q - 0.5
+ index := int(math.Floor(ix))
+ p := ix - float64(index)
+ realQuantile := data[index]*(1-p) + data[index+1]*p
+
+ // estimated cdf of real quantile(x)
+ if math.Abs(digest.CDF(realQuantile)-q) > 0.005 {
+ t.Errorf("Error in estimated CDF too high")
+ }
+
+ // real cdf of estimated quantile(x)
+ error := math.Abs(q - cdf(digest.Quantile(q), data))
+ if error > 0.005 {
+ softErrors++
+ }
+
+ if error > 0.012 {
+ t.Errorf("Error in estimated Quantile too high")
+ }
+ }
+
+ if softErrors >= 3 {
+ t.Errorf("Too many soft errors")
+ }
+
+ // Issue #17, verify that we are hitting the extreme CDF case
+ // XXX Maybe test this properly instead of having a hardcoded value
+ extreme := digest.CDF(0.71875)
+ if !closeEnough(extreme, 1) {
+ t.Errorf("Expected something close to 1 but got %.4f instead", extreme)
}
}
}
func TestPanic(t *testing.T) {
- shouldPanic(func() {
- New(0.5)
- }, t, "Compression < 1 should panic!")
-
- tdigest := New(100)
+ tdigest := uncheckedNew()
shouldPanic(func() {
tdigest.Quantile(-42)
shouldPanic(func() {
tdigest.Quantile(42)
}, t, "Quantile > 1 should panic!")
-
- shouldPanic(func() {
- tdigest.findNearestCentroids(0.2)
- }, t, "findNearestCentroids on empty summary should panic!")
}
func TestForEachCentroid(t *testing.T) {
- t.Parallel()
- tdigest := New(10)
+ tdigest := uncheckedNew(Compression(10))
for i := 0; i < 100; i++ {
- tdigest.Add(float64(i), 1)
+ _ = tdigest.Add(float64(i))
}
// Iterate limited number.
means := []float64{}
tdigest.ForEachCentroid(func(mean float64, count uint64) bool {
means = append(means, mean)
- if len(means) == 3 {
- return false
- }
- return true
+ return len(means) != 3
})
if len(means) != 3 {
t.Errorf("ForEachCentroid handled incorrect number of data items")
means = append(means, mean)
return true
})
- if len(means) != tdigest.Len() {
+ if len(means) != tdigest.summary.Len() {
t.Errorf("ForEachCentroid did not handle all data")
}
}
func TestQuantilesDontOverflow(t *testing.T) {
- tdigest := New(100)
+ tdigest := uncheckedNew(Compression(100))
// Add slightly more than math.MaxUint32 samples uniformly in the range
// [0, 1). This would overflow a uint32-based implementation.
- tdigest.Add(1, 1)
+ tdigest.Add(1)
for i := 0; i < 1024; i++ {
- tdigest.Add(float64(i)/1024, 4194304)
+ tdigest.AddWeighted(float64(i)/1024, 4194304)
}
assertDifferenceSmallerThan(tdigest, 0.5, .02, t)
}
-func benchmarkAdd(compression float64, b *testing.B) {
- t := New(compression)
+func TestCDFInsideLastCentroid(t *testing.T) {
+ // values pulled from a live digest. sorry it's a lot!
+ td := &TDigest{
+ summary: &summary{
+ means: []float64{2120.75048828125, 2260.3844299316406, 3900.490264892578, 3937.495807647705, 5390.479816436768, 10450.335285186768, 14152.897296905518, 16442.676349639893, 24303.143146514893, 56961.87361526489, 63891.24959182739, 73982.55232620239, 86477.50447463989, 110746.62556838989, 175479.7388496399, 300492.3404121399, 440452.5279121399, 515611.7700996399, 535827.0025215149, 546241.6822090149, 556965.3648262024, 569791.2124824524, 587320.6870918274, 603969.4175605774, 613751.6177558899, 624708.7593574524, 635060.0718574524, 641924.2007637024, 650656.4302558899, 660653.1714668274, 671380.9009590149, 687094.3667793274, 716595.8824043274, 740870.9800605774, 760276.2437324524, 768857.5786933899, 775021.0025215149, 787686.0337715149, 801473.4624824524, 815225.1255683899, 832358.6997871399, 852438.4751777649, 866134.2935371399, 1.10661549666214e+06, 1.1212118980293274e+06, 1.2230108433418274e+06, 1.5446490620918274e+06, 4.306712312091827e+06, 5.487582562091827e+06, 6.306383562091827e+06, 7.089308312091827e+06, 7.520797593341827e+06},
+ counts: []uint64{0x1, 0x1, 0x1, 0x1, 0x1, 0x2, 0x1, 0x4, 0x5, 0x6, 0x3, 0x3, 0x4, 0x11, 0x23, 0x2f, 0x1e, 0x1b, 0x36, 0x31, 0x33, 0x4e, 0x5f, 0x61, 0x48, 0x2e, 0x26, 0x28, 0x2a, 0x31, 0x39, 0x51, 0x32, 0x2b, 0x12, 0x8, 0xb, 0xa, 0x11, 0xa, 0x11, 0x9, 0x7, 0x1, 0x1, 0x1, 0x3, 0x2, 0x1, 0x1, 0x1, 0x1},
+ },
+ compression: 5,
+ count: 1250,
+ rng: globalRNG{},
+ }
+
+ if cdf := td.CDF(7.144560976650238e+06); cdf > 1 {
+ t.Fatalf("invalid: %v", cdf)
+ }
+}
+
+func TestTrimmedMean(t *testing.T) {
+ tests := []struct {
+ p1, p2 float64
+ }{
+ {0, 1},
+ {0.1, 0.9},
+ {0.2, 0.8},
+ {0.25, 0.75},
+ {0, 0.5},
+ {0.5, 1},
+ {0.1, 0.7},
+ {0.3, 0.9},
+ }
+
+ for _, size := range []int{100, 1000, 10000} {
+ for _, test := range tests {
+ td := uncheckedNew(Compression(100))
+
+ data := make([]float64, 0, size)
+ for i := 0; i < size; i++ {
+ f := rand.Float64()
+ data = append(data, f)
+ err := td.Add(f)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ got := td.TrimmedMean(test.p1, test.p2)
+ wanted := trimmedMean(data, test.p1, test.p2)
+ if math.Abs(got-wanted) > 0.01 {
+ t.Fatalf("got %f, wanted %f (size=%d p1=%f p2=%f)",
+ got, wanted, size, test.p1, test.p2)
+ }
+
+ for i := 0; i < 10; i++ {
+ err := td.Add(float64(i * 100))
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ mean := td.TrimmedMean(0.1, 0.999)
+ if mean < 0 {
+ t.Fatalf("mean < 0")
+ }
+ }
+ }
+}
+
+func TestTrimmedMeanCornerCases(t *testing.T) {
+ td := uncheckedNew(Compression(100))
+
+ mean := td.TrimmedMean(0, 1)
+ if mean != 0 {
+ t.Fatalf("got %f, wanted 0", mean)
+ }
+
+ x := 1.0
+ err := td.Add(x)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ mean = td.TrimmedMean(0, 1)
+ if mean != 1 {
+ t.Fatalf("got %f, wanted %f", mean, x)
+ }
+
+ err = td.Add(1000)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ mean = td.TrimmedMean(0, 1)
+ wanted := 500.5
+ if !closeEnough(mean, wanted) {
+ t.Fatalf("got %f, wanted %f", mean, wanted)
+ }
+}
+
+func trimmedMean(ff []float64, p1, p2 float64) float64 {
+ sort.Float64s(ff)
+ x1 := stat.Quantile(p1, stat.Empirical, ff, nil)
+ x2 := stat.Quantile(p2, stat.Empirical, ff, nil)
+
+ var sum float64
+ var count int
+ for _, f := range ff {
+ if f >= x1 && f <= x2 {
+ sum += f
+ count++
+ }
+ }
+ return sum / float64(count)
+}
+
+func TestClone(t *testing.T) {
+ seed := func(td *TDigest) {
+ for i := 0; i < 100; i++ {
+ err := td.Add(rand.Float64())
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ }
+
+ td := uncheckedNew(Compression(42))
+ seed(td)
+ clone := td.Clone()
+
+ // Clone behaves like td.
+
+ if clone.Compression() != td.Compression() {
+ t.Fatalf("got %f, wanted %f", clone.Compression(), td.Compression())
+ }
+
+ cloneCount := clone.Count()
+ if cloneCount != td.Count() {
+ t.Fatalf("got %d, wanted %d", cloneCount, td.Count())
+ }
+
+ cloneQuantile := clone.Quantile(1)
+ if cloneQuantile != td.Quantile(1) {
+ t.Fatalf("got %f, wanted %f", cloneQuantile, td.Quantile(1))
+ }
+
+ seed(td)
+ if td.Count() == clone.Count() {
+ t.Fatal("seed does not work")
+ }
+
+ // Clone is not changed after td is changed.
+
+ if clone.Count() != cloneCount {
+ t.Fatalf("got %d, wanted %d", clone.Count(), cloneCount)
+ }
+
+ if clone.Quantile(1) != cloneQuantile {
+ t.Fatalf("got %f, wanted %f", clone.Quantile(1), cloneQuantile)
+ }
+
+ // Clone is fully functional.
+
+ err := clone.Add(1)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+var compressions = []float64{1, 10, 20, 30, 50, 100}
+
+func BenchmarkTDigestAddOnce(b *testing.B) {
+ for _, compression := range compressions {
+ compression := compression
+ b.Run(fmt.Sprintf("compression=%.0f", compression), func(b *testing.B) {
+ benchmarkAddOnce(b, compression)
+ })
+ }
+}
+
+func benchmarkAddOnce(b *testing.B, compression float64) {
+ t := uncheckedNew(Compression(compression))
data := make([]float64, b.N)
for n := 0; n < b.N; n++ {
data[n] = rand.Float64()
}
+ b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
- err := t.Add(data[n], 1)
+ err := t.Add(data[n])
if err != nil {
b.Error(err)
}
b.StopTimer()
}
-func BenchmarkAdd1(b *testing.B) {
- benchmarkAdd(1, b)
+func BenchmarkTDigestAddMulti(b *testing.B) {
+ for _, compression := range compressions {
+ compression := compression
+ for _, n := range []int{10, 100, 1000, 10000} {
+ n := n
+ name := fmt.Sprintf("compression=%.0f n=%d", compression, n)
+ b.Run(name, func(b *testing.B) {
+ benchmarkAddMulti(b, compression, n)
+ })
+ }
+ }
}
-func BenchmarkAdd10(b *testing.B) {
- benchmarkAdd(10, b)
+func benchmarkAddMulti(b *testing.B, compression float64, times int) {
+ data := make([]float64, times)
+ for i := 0; i < times; i++ {
+ data[i] = rand.Float64()
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ t := uncheckedNew(Compression(compression))
+ for i := 0; i < times; i++ {
+ err := t.AddWeighted(data[i], 1)
+ if err != nil {
+ b.Error(err)
+ }
+ }
+ }
+ b.StopTimer()
}
-func BenchmarkAdd100(b *testing.B) {
- benchmarkAdd(100, b)
+func BenchmarkTDigestMerge(b *testing.B) {
+ for _, compression := range compressions {
+ compression := compression
+ for _, n := range []int{1, 10, 100} {
+ name := fmt.Sprintf("compression=%.0f n=%d", compression, n)
+ b.Run(name, func(b *testing.B) {
+ benchmarkMerge(b, compression, n)
+ })
+ }
+ }
+}
+
+func benchmarkMerge(b *testing.B, compression float64, times int) {
+ ts := make([]*TDigest, times)
+ for i := 0; i < times; i++ {
+ ts[i] = randomTDigest(compression)
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ dst := uncheckedNew(Compression(compression))
+
+ for i := 0; i < times; i++ {
+ err := dst.Merge(ts[i])
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+
+ err := dst.Compress()
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func randomTDigest(compression float64) *TDigest {
+ t := uncheckedNew(Compression(compression))
+ n := 20 * int(compression)
+ for i := 0; i < n; i++ {
+ err := t.Add(rand.Float64())
+ if err != nil {
+ panic(err)
+ }
+ }
+ return t
+}
+
+var sumSizes = []int{10, 100, 1000, 10000}
+
+func BenchmarkSumLoopSimple(b *testing.B) {
+ for _, size := range sumSizes {
+ size := size
+ b.Run(fmt.Sprint(size), func(b *testing.B) {
+ benchmarkSumLoopSimple(b, size)
+ })
+ }
+}
+
+func benchmarkSumLoopSimple(b *testing.B, size int) {
+ counts := generateCounts(size)
+ indexes := generateIndexes(size)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ for _, idx := range indexes {
+ _ = sumUntilIndexSimple(counts, idx)
+ }
+ }
+}
+
+func BenchmarkSumLoopUnrolled(b *testing.B) {
+ for _, size := range sumSizes {
+ size := size
+ b.Run(fmt.Sprint(size), func(b *testing.B) {
+ benchmarkSumLoopUnrolled(b, size)
+ })
+ }
+}
+
+func benchmarkSumLoopUnrolled(b *testing.B, size int) {
+ counts := generateCounts(size)
+ indexes := generateIndexes(size)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ for _, idx := range indexes {
+ _ = sumUntilIndex(counts, idx)
+ }
+ }
+}
+
+func generateCounts(size int) []uint64 {
+ counts := make([]uint64, size)
+ for i := 0; i < size; i++ {
+ counts[i] = rand.Uint64()
+ }
+ return counts
+}
+
+func generateIndexes(size int) []int {
+ const num = 100
+
+ indexes := make([]int, num)
+ for i := 0; i < num; i++ {
+ indexes[i] = rand.Intn(size)
+ }
+ return indexes
+}
+
+func sumUntilIndexSimple(counts []uint64, idx int) uint64 {
+ var sum uint64
+ for _, c := range counts {
+ sum += uint64(c)
+ }
+ return sum
}
// Pathological ordered-input case.
func BenchmarkAddOrdered(b *testing.B) {
- t := New(100)
+ t, _ := New(Compression(100))
for n := 0; n < b.N; n++ {
- err := t.Add(float64(n), 1)
+ err := t.Add(float64(n))
if err != nil {
b.Error(err)
}
func BenchmarkMerge(b *testing.B) {
b.ReportAllocs()
- t := New(100)
+ t, _ := New(Compression(100))
for n := 0; n < 1000; n++ {
- t.Add(rand.Float64(), uint64(rand.Intn(100)))
+ t.AddWeighted(rand.Float64(), uint64(rand.Intn(100)))
}
- dest := New(100)
+ dest, _ := New(Compression(100))
b.ResetTimer()
for n := 0; n < b.N; n++ {
func BenchmarkMergeDestructive(b *testing.B) {
b.ReportAllocs()
- t := New(100)
+ t, _ := New(Compression(100))
for n := 0; n < 1000; n++ {
- t.Add(rand.Float64(), uint64(rand.Intn(100)))
+ t.AddWeighted(rand.Float64(), uint64(rand.Intn(100)))
}
- dest := New(100)
+ dest, _ := New(Compression(100))
b.ResetTimer()
Created .gitignore
+vendor/
+go-tdigest.test
Deleted .travis.yml
-language: go
-
-go:
- - 1.4
- - 1.5
- - 1.6
- - tip
-
-before_script:
- - go vet ./...
Created CONTRIBUTING.md
+# Contributing
+
+First and foremost: **thank you very much** for your interest in this
+project. Feel free to skip all this and open your issue / pull request
+if reading contribution guidelines is too much for you at this point.
+We value your contribution a lot more than we value your ability to
+follow rules (and thankfully we can afford to take this approach given
+this project's demand).
+
+Any kind of contribution is welcome. We can always use better docs and
+tests (and code, of course). If you think you can improve this project
+in any dimension _let's talk_ :-)
+
+## Guidelines
+
+Be kind and respectful in all your interactions with people inside
+(outside too!) this community; There is no excuse for not showing
+basic decency. Sarcasm and generally unconstructive remarks are **not
+welcome**.
+
+### Issues
+
+When opening and interacting with issues please:
+
+- Be as clear as possible
+- Provide examples if you can
+
+### Pull Requests
+
+We expect that pull requests:
+
+- Have [good commit messages][commits]
+- Contain tests for new features
+- Target and can be cleanly merged with the `master` branch
+- Pass the tests
+
+[commits]: https://www.git-scm.com/book/en/v2/Distributed-Git-Contributing-to-a-Project#_commit_guidelines
+
+### Project Management
+
+Don't bother with labels, milestones, assignments, etc. We don't make
+use of those.
Created Gopkg.lock
+# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
+
+
+[[projects]]
+ digest = "1:cf63454c1e81409484ded047413228de0f7a3031f0fcd36d4e1db7620c3c7d1b"
+ name = "github.com/leesper/go_rng"
+ packages = ["."]
+ pruneopts = ""
+ revision = "5344a9259b21627d94279721ab1f27eb029194e7"
+
+[[projects]]
+ branch = "master"
+ digest = "1:ad6d9b2cce40c7c44952d49a6a324a2110db43b4279d9e599db74e45de5ae80c"
+ name = "gonum.org/v1/gonum"
+ packages = [
+ "blas",
+ "blas/blas64",
+ "blas/gonum",
+ "floats",
+ "internal/asm/c128",
+ "internal/asm/f32",
+ "internal/asm/f64",
+ "internal/math32",
+ "lapack",
+ "lapack/gonum",
+ "lapack/lapack64",
+ "mat",
+ "stat",
+ ]
+ pruneopts = ""
+ revision = "f0982070f509ee139841ca385c44dc22a77c8da8"
+
+[solve-meta]
+ analyzer-name = "dep"
+ analyzer-version = 1
+ input-imports = [
+ "github.com/leesper/go_rng",
+ "gonum.org/v1/gonum/stat",
+ ]
+ solver-name = "gps-cdcl"
+ solver-version = 1
Created Gopkg.toml
+
+# Gopkg.toml example
+#
+# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
+# for detailed Gopkg.toml documentation.
+#
+# required = ["github.com/user/thing/cmd/thing"]
+# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
+#
+# [[constraint]]
+# name = "github.com/user/project"
+# version = "1.0.0"
+#
+# [[constraint]]
+# name = "github.com/user/project2"
+# branch = "dev"
+# source = "github.com/myfork/project2"
+#
+# [[override]]
+# name = "github.com/x/y"
+# version = "2.4.0"
Created options.go
+package tdigest
+
+import "errors"
+
+type tdigestOption func(*TDigest) error
+
+// Compression sets the digest compression
+//
+// The compression parameter rules the threshold in which samples are
+// merged together - the more often distinct samples are merged the more
+// precision is lost. Compression should be tuned according to your data
+// distribution, but a value of 100 (the default) is often good enough.
+//
+// A higher compression value means holding more centroids in memory
+// (thus: better precision), which means a bigger serialization payload,
+// higher memory footprint and slower addition of new samples.
+//
+// Compression must be a value greater of equal to 1, will yield an
+// error otherwise.
+func Compression(compression float64) tdigestOption { // nolint
+ return func(t *TDigest) error {
+ if compression < 1 {
+ return errors.New("Compression should be >= 1")
+ }
+ t.compression = compression
+ return nil
+ }
+}
+
+// RandomNumberGenerator sets the RNG to be used internally
+//
+// This allows changing which random number source is used when using
+// the TDigest structure (rngs are used when deciding which candidate
+// centroid to merge with and when compressing or merging with
+// another digest for it increases accuracy). This functionality is
+// particularly useful for testing or when you want to disconnect
+// your sample collection from the (default) shared random source
+// to minimize lock contention.
+func RandomNumberGenerator(rng RNG) tdigestOption { // nolint
+ return func(t *TDigest) error {
+ t.rng = rng
+ return nil
+ }
+}
+
+// LocalRandomNumberGenerator makes the TDigest use the default
+// `math/random` functions but with an unshared source that is
+// seeded with the given `seed` parameter.
+func LocalRandomNumberGenerator(seed int64) tdigestOption { // nolint
+ return RandomNumberGenerator(newLocalRNG(seed))
+}
Created options_test.go
+package tdigest
+
+import "testing"
+
+func TestDefaults(t *testing.T) {
+ digest, err := New()
+
+ if err != nil {
+ t.Errorf("Creating a default TDigest should never error out. Got %s", err)
+ }
+
+ if digest.compression != 100 {
+ t.Errorf("The default compression should be 100")
+ }
+}
+
+func TestCompression(t *testing.T) {
+ digest, _ := New(Compression(40))
+ if digest.compression != 40 {
+ t.Errorf("The compression option should change the new digest compression")
+ }
+
+ digest, err := New(Compression(0))
+ if err == nil || digest != nil {
+ t.Errorf("Trying to create a digest with bad compression should give an error")
+ }
+}
+
+func TestRandomNumberGenerator(t *testing.T) {
+ const numTests = 100
+
+ // Create two digests with unshared rngs seeded with
+ // the same seed
+ t1, _ := New(RandomNumberGenerator(newLocalRNG(0xDEADBEE)))
+ t2, _ := New(LocalRandomNumberGenerator(0xDEADBEE))
+
+ // So that they should emit the same values when called
+ // at the same frequency
+ for i := 0; i < numTests; i++ {
+ if t1.rng.Float32() != t2.rng.Float32() ||
+ t1.rng.Intn(10) != t2.rng.Intn(10) {
+ t.Errorf("r1 and r2 should be distinct RNGs returning the same values")
+ }
+ }
+}
Created rng.go
+package tdigest
+
+import (
+ "math/rand"
+)
+
+// RNG is an interface that wraps the needed random number
+// generator calls that tdigest uses during its runtime
+type RNG interface {
+ Float32() float32
+ Intn(int) int
+}
+
+type globalRNG struct{}
+
+func (r globalRNG) Float32() float32 {
+ return rand.Float32()
+}
+
+func (r globalRNG) Intn(i int) int {
+ return rand.Intn(i)
+}
+
+type localRNG struct {
+ localRand *rand.Rand
+}
+
+func newLocalRNG(seed int64) *localRNG {
+ return &localRNG{
+ localRand: rand.New(rand.NewSource(seed)),
+ }
+}
+
+func (r *localRNG) Float32() float32 {
+ return r.localRand.Float32()
+}
+
+func (r *localRNG) Intn(i int) int {
+ return r.localRand.Intn(i)
+}