caio.co/de/go-tdigest


Make AsBytes a convenience wrapper around ToBytes 💬 by Caio 5 years ago (log)
Before:

> BenchmarkAsBytes-4  90274	    13426 ns/op	   1952 B/op	    210 allocs/op

After:

> BenchmarkAsBytes-4  877605	     1348 ns/op	   1536 B/op	      1 allocs/op

Blob serialization.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package tdigest

import (
	"bytes"
	"encoding/binary"
	"errors"
	"fmt"
	"math"
)

const smallEncoding int32 = 2

var endianess = binary.BigEndian

// AsBytes serializes the digest into a byte array so it can be
// saved to disk or sent over the wire.
func (t TDigest) AsBytes() ([]byte, error) {
	// TODO get rid of the (now) useless error
	return t.ToBytes(make([]byte, t.requiredSize())), nil
}

func (t *TDigest) requiredSize() int {
	return 16 + (4 * len(t.summary.means)) + (len(t.summary.counts) * binary.MaxVarintLen64)
}

// ToBytes serializes into the supplied slice, avoiding allocation if the slice
// is large enough. The result slice is returned.
func (t *TDigest) ToBytes(b []byte) []byte {
	requiredSize := t.requiredSize()
	if cap(b) < requiredSize {
		b = make([]byte, requiredSize)
	}

	// The binary.Put* functions helpfully don't extend the slice for you, they
	// just panic if it's not already long enough. So pre-set the slice length;
	// we'll return it with the actual encoded length.
	b = b[:cap(b)]

	endianess.PutUint32(b[0:4], uint32(smallEncoding))
	endianess.PutUint64(b[4:12], math.Float64bits(t.compression))
	endianess.PutUint32(b[12:16], uint32(t.summary.Len()))

	var x float64
	idx := 16
	for _, mean := range t.summary.means {
		delta := mean - x
		x = mean
		endianess.PutUint32(b[idx:], math.Float32bits(float32(delta)))
		idx += 4
	}

	for _, count := range t.summary.counts {
		idx += binary.PutUvarint(b[idx:], count)
	}
	return b[:idx]
}

// FromBytes reads a byte buffer with a serialized digest (from AsBytes)
// and deserializes it.
//
// This function creates a new tdigest instance with the provided options,
// but ignores the compression setting since the correct value comes
// from the buffer.
func FromBytes(buf *bytes.Reader, options ...tdigestOption) (*TDigest, error) {
	var encoding int32
	err := binary.Read(buf, endianess, &encoding)
	if err != nil {
		return nil, err
	}

	if encoding != smallEncoding {
		return nil, fmt.Errorf("Unsupported encoding version: %d", encoding)
	}

	t, err := newWithoutSummary(options...)

	if err != nil {
		return nil, err
	}

	var compression float64
	err = binary.Read(buf, endianess, &compression)
	if err != nil {
		return nil, err
	}

	t.compression = compression

	var numCentroids int32
	err = binary.Read(buf, endianess, &numCentroids)
	if err != nil {
		return nil, err
	}

	if numCentroids < 0 || numCentroids > 1<<22 {
		return nil, errors.New("bad number of centroids in serialization")
	}

	t.summary = newSummary(int(numCentroids))
	t.summary.means = t.summary.means[:numCentroids]
	t.summary.counts = t.summary.counts[:numCentroids]

	var x float64
	for i := 0; i < int(numCentroids); i++ {
		var delta float32
		err = binary.Read(buf, endianess, &delta)
		if err != nil {
			return nil, err
		}
		x += float64(delta)
		t.summary.means[i] = x
	}

	for i := 0; i < int(numCentroids); i++ {
		count, err := decodeUint(buf)
		if err != nil {
			return nil, err
		}
		t.summary.counts[i] = count
		t.count += count
	}

	return t, nil
}

// FromBytes deserializes into the supplied TDigest struct, re-using
// and overwriting any existing buffers.
//
// This method reinitializes the digest from the provided buffer
// discarding any previously collected data. Notice that in case
// of errors this may leave the digest in a unusable state.
func (t *TDigest) FromBytes(buf []byte) error {
	if len(buf) < 16 {
		return errors.New("buffer too small for deserialization")
	}

	encoding := int32(endianess.Uint32(buf))
	if encoding != smallEncoding {
		return fmt.Errorf("unsupported encoding version: %d", encoding)
	}

	compression := math.Float64frombits(endianess.Uint64(buf[4:12]))
	numCentroids := int(endianess.Uint32(buf[12:16]))
	if numCentroids < 0 || numCentroids > 1<<22 {
		return errors.New("bad number of centroids in serialization")
	}

	if len(buf) < 16+(4*numCentroids) {
		return errors.New("buffer too small for deserialization")
	}

	t.count = 0
	t.compression = compression
	if t.summary == nil ||
		cap(t.summary.means) < numCentroids ||
		cap(t.summary.counts) < numCentroids {
		t.summary = newSummary(numCentroids)
	}
	t.summary.means = t.summary.means[:numCentroids]
	t.summary.counts = t.summary.counts[:numCentroids]

	idx := 16
	var x float64
	for i := 0; i < numCentroids; i++ {
		delta := math.Float32frombits(endianess.Uint32(buf[idx:]))
		idx += 4
		x += float64(delta)
		t.summary.means[i] = x
	}

	for i := 0; i < numCentroids; i++ {
		count, read := binary.Uvarint(buf[idx:])
		if read < 1 {
			return errors.New("error decoding varint, this TDigest is now invalid")
		}

		idx += read

		t.summary.counts[i] = count
		t.count += count
	}

	if idx != len(buf) {
		return errors.New("buffer has unread data")
	}
	return nil
}

func encodeUint(buf *bytes.Buffer, n uint64) error {
	var b [binary.MaxVarintLen64]byte

	l := binary.PutUvarint(b[:], n)

	_, err := buf.Write(b[:l])

	return err
}

func decodeUint(buf *bytes.Reader) (uint64, error) {
	v, err := binary.ReadUvarint(buf)
	return v, err
}