caio.co/de/go-tdigest

Add TDigest.FromBytes and cleanup RNG interface

Id
4cd6ffbcfa938077a3a19b66b2a09520b2e89f93
Author
Vladimir Mihailenco
Commit time
2018-11-09T11:13:15+02:00

Modified options_test.go

@@ -37,7 +37,8
// So that they should emit the same values when called
// at the same frequency
for i := 0; i < numTests; i++ {
- if t1.rng.Float32() != t2.rng.Float32() || t1.rng.Intn(10) != t2.rng.Intn(10) {
+ if t1.rng.Float32() != t2.rng.Float32() ||
+ t1.rng.Intn(10) != t2.rng.Intn(10) {
t.Errorf("r1 and r2 should be distinct RNGs returning the same values")
}
}

Modified rng.go

@@ -9,21 +9,16
type RNG interface {
Float32() float32
Intn(int) int
- Perm(n int) []int
}

type globalRNG struct{}

-func (r *globalRNG) Float32() float32 {
+func (r globalRNG) Float32() float32 {
return rand.Float32()
}

-func (r *globalRNG) Intn(i int) int {
+func (r globalRNG) Intn(i int) int {
return rand.Intn(i)
-}
-
-func (r *globalRNG) Perm(n int) []int {
- return rand.Perm(n)
}

type localRNG struct {
@@ -42,8 +37,4

func (r *localRNG) Intn(i int) int {
return r.localRand.Intn(i)
-}
-
-func (r *localRNG) Perm(n int) []int {
- return rand.Perm(n)
}

Modified serialization.go

@@ -5,6 +5,7
"encoding/binary"
"errors"
"fmt"
+ "math"
)

const smallEncoding int32 = 2
@@ -116,6 +117,65
}

return t, nil
+}
+
+// FromBytes deserializes into the supplied TDigest struct, re-using and
+// overwriting any existing buffers.
+func (t *TDigest) FromBytes(buf []byte) error {
+ if len(buf) < 16 {
+ return errors.New("buffer too small for deserialization")
+ }
+
+ encoding := int32(endianess.Uint32(buf))
+ if encoding != smallEncoding {
+ return fmt.Errorf("unsupported encoding version: %d", encoding)
+ }
+
+ compression := math.Float64frombits(endianess.Uint64(buf[4:12]))
+ numCentroids := int(endianess.Uint32(buf[12:16]))
+ if numCentroids < 0 || numCentroids > 1<<22 {
+ return errors.New("bad number of centroids in serialization")
+ }
+
+ if len(buf) < 16+(4*numCentroids) {
+ return errors.New("buffer too small for deserialization")
+ }
+
+ t.count = 0
+ t.compression = compression
+ if t.summary == nil ||
+ cap(t.summary.means) < numCentroids ||
+ cap(t.summary.counts) < numCentroids {
+ t.summary = newSummary(numCentroids)
+ }
+ t.summary.means = t.summary.means[:numCentroids]
+ t.summary.counts = t.summary.counts[:numCentroids]
+
+ idx := 16
+ var x float64
+ for i := 0; i < numCentroids; i++ {
+ delta := math.Float32frombits(endianess.Uint32(buf[idx:]))
+ idx += 4
+ x += float64(delta)
+ t.summary.means[i] = x
+ }
+
+ for i := 0; i < numCentroids; i++ {
+ count, read := binary.Uvarint(buf[idx:])
+ if read < 1 {
+ return errors.New("error decoding varint, this TDigest is now invalid")
+ }
+
+ idx += read
+
+ t.summary.counts[i] = uint32(count)
+ t.count += count
+ }
+
+ if idx != len(buf) {
+ return errors.New("buffer has unread data")
+ }
+ return nil
}

func encodeUint(buf *bytes.Buffer, n uint32) error {

Modified serialization_test.go

@@ -33,10 +33,24

serialized, _ := t1.AsBytes()

- t2, _ := FromBytes(bytes.NewReader(serialized))
+ t2, err := FromBytes(bytes.NewReader(serialized))
+ if err != nil {
+ t.Fatal(err)
+ }
+ assertEqual(t, t1, t2)

- if t1.Count() != t2.Count() || t1.summary.Len() != t2.summary.Len() || t1.compression != t2.compression {
- t.Errorf("Deserialized to something different. t1=%v t2=%v serialized=%v", t1, t2, serialized)
+ err = t2.FromBytes(serialized)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assertEqual(t, t1, t2)
+}
+
+func assertEqual(t *testing.T, t1, t2 *TDigest) {
+ if t1.Count() != t2.Count() ||
+ t1.summary.Len() != t2.summary.Len() ||
+ t1.compression != t2.compression {
+ t.Errorf("Deserialized to something different. t1=%v t2=%v", t1, t2)
}
}

Modified summary.go

@@ -1,9 +1,8
package tdigest

import (
"fmt"
"math"
- "math/rand"
"sort"
)

@@ -148,7 +147,7
}

func (s *summary) Perm(rng RNG, f func(float64, uint32) bool) {
- for _, i := range rng.Perm(s.Len()) {
+ for _, i := range perm(rng, s.Len()) {
if !f(s.means[i], s.counts[i]) {
break
}
@@ -166,7 +165,7
// with being pathological. Renders summary invalid.
func (s *summary) shuffle(rng RNG) {
for i := len(s.means) - 1; i > 1; i-- {
- s.Swap(i, rand.Intn(i+1))
+ s.Swap(i, rng.Intn(i+1))
}
}

@@ -194,4 +193,14
cumSum += uint64(s[i])
}
return cumSum
+}
+
+func perm(rng RNG, n int) []int {
+ m := make([]int, n)
+ for i := 1; i < n; i++ {
+ j := rng.Intn(i + 1)
+ m[i] = m[j]
+ m[j] = i
+ }
+ return m
}

Modified tdigest.go

@@ -43,7 +43,7
tdigest := &TDigest{
compression: 100,
count: 0,
- rng: &globalRNG{},
+ rng: globalRNG{},
}

for _, option := range options {

Modified tdigest_test.go

@@ -514,7 +514,7
},
compression: 5,
count: 1250,
- rng: &globalRNG{},
+ rng: globalRNG{},
}

if cdf := td.CDF(7.144560976650238e+06); cdf > 1 {