mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* move calls to logstore, implement s3 closes #482 the basic motivation is that logs and calls will be stored with a very high write rate, while apps and routes will be relatively infrequently updated; it follows that we should likely split up their storage location, to back them with appropriate storage facilities. s3 is a good candidate for ingesting higher write rate data than a sql database, and will make it easier to manage that data set. can read #482 for more detailed justification. summary: * calls api moved from datastore to logstore * logstore used in front-end to serve calls endpoints * agent now throws calls into logstore instead of datastore * s3 implementation of calls api for logstore * s3 logs key changed (nobody using / nbd?) * removed UpdateCall api (not in use) * moved call tests from datastore to logstore tests * mock logstore now tested (prev. sqlite3 only) * logstore tests run against every datastore (mysql, pg; prev. only sqlite3) * simplify NewMock in tests commentary: brunt of the work is implementing the listing of calls in GetCalls for the s3 logstore implementation. the GetCalls API requires returning items in the newest to oldest order, and the s3 api lists items in lexicographic order based on created_at. An easy thing to do here seemed to be to reverse the encoding of our id format to return a lexicographically descending order, since ids are time based, reasonably encoded to be lexicographically sortable, and de-duped (unlike created_at). This seems to work pretty well, it's not perfect around the boundaries of to_time and from_time and a tiny amount of results may be omitted, but to me this doesn't seem like a deal breaker to get 6999 results instead of 7000 when trying to get calls between 3:00pm and 4:00pm Monday 3 weeks ago. Of course, without to_time and from_time, there are no issues in listing results. We could use created at and encode it, but it would be an additional marker for point lookup (GetCall) since we would have to search for a created_at stamp, search for ids around that until we find the matching one, just to do a point lookup. So, the tradeoff here seems worth it. There is additional optimization around to_time to seek over newer results (since we have descending order). The other complication in GetCalls is returning a list of calls for a given path. Since the keys to do point lookups are only app_id + call_id, and we need listing across an app as well, this leads us to the 'marker' collection which is sorted by app_id + path + call_id, to allow quick listing by path. All in all, it should be pretty straightforward to follow the implementation and I tried to be lavish with the comments, please let me know if anything needs further clarification in the code. The implementation itself has some glaring inefficiencies, but they're relatively minute: json encoding is kinda lazy, but workable; s3 doesn't offer batch retrieval, so we point look up each call one by one in get call; not re-using buffers -- but the seeking around the keys should all be relatively fast, not too worried about performance really and this isn't a hot path for reads (need to make a cut point and turn this in!). Interestingly, in testing, minio performs significantly worse than pg for storing both logs and calls (or just logs, I tested that too). minio seems to have really high cpu consumption, but in any event, we won't be using minio, we'll be using a cloud object store that implements the s3 api. Anyway, mostly a knock on using minio for high performance, not really anything to do with this, just thought it was interesting. I think it's safe to remove UpdateCall, admittedly this made implementing the s3 api a lot easier. This operation may also be something we never need, it was unused at present and was only in the cards for a previous hybrid implementation, which we've now abandoned. If we need, we can always resurrect from git. Also not worried about changing the log key, we need to put a prefix on this thing anyway, but I don't think anybody is using this anyway. in any event, it simply means old logs won't show up through the API, but aside from nobody using this yet, that doesn't seem a big deal breaker really -- new logs will appear fine. future: TODO make logstore implementation optional for datastore, check in front-end at runtime and offer a nil logstore that errors appropriately TODO low hanging fruit optimizations of json encoding, re-using buffers for download, get multiple calls at a time, id reverse encoding could be optimized like normal encoding to not be n^2 TODO api for range removal of logs and calls * address review comments * push id to_time magic into id package * add note about s3 key sizes * fix validation check
289 lines
9.7 KiB
Go
289 lines
9.7 KiB
Go
package id
|
|
|
|
import (
|
|
"errors"
|
|
"net"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type Id [16]byte
|
|
|
|
var (
|
|
machineID uint64
|
|
counter uint32
|
|
)
|
|
|
|
// SetMachineId may only be called by one thread before any id generation
|
|
// is done. It must be set if multiple machines are generating ids in order
|
|
// to avoid collisions. Only the least significant 48 bits are used.
|
|
func SetMachineId(ID uint64) {
|
|
machineID = ID
|
|
}
|
|
|
|
// SetMachineIdHost is a convenience wrapper to hide bit twiddling of
|
|
// calling SetMachineId, it has the same constraints as SetMachineId
|
|
// with an addition that net.IP must be a ipv4 address.
|
|
func SetMachineIdHost(addr net.IP, port uint16) {
|
|
var machineID uint64 // 48 bits
|
|
machineID |= uint64(addr[0]) << 40
|
|
machineID |= uint64(addr[1]) << 32
|
|
machineID |= uint64(addr[2]) << 24
|
|
machineID |= uint64(addr[3]) << 16
|
|
machineID |= uint64(port)
|
|
|
|
SetMachineId(machineID)
|
|
}
|
|
|
|
// New will generate a new Id for use. New is safe to be called from
|
|
// concurrent threads. SetMachineId should be called once before any calls to
|
|
// New are made. 2^32 calls to New per millisecond will be unique, provided
|
|
// machine id is seeded correctly across machines.
|
|
//
|
|
// binary format: [ [ 48 bits time ] [ 48 bits machineID ] [ 32 bits counter ] ]
|
|
//
|
|
// Ids are sortable within (not between, thanks to clocks) each machine, with
|
|
// a modified base32 encoding exposed for convenience in API usage.
|
|
func New() Id {
|
|
// NewWithTime will be inlined
|
|
return NewWithTime(time.Now())
|
|
}
|
|
|
|
// NewWithTime returns an id that uses the milliseconds from the given time.
|
|
// New is identical to NewWithTime(time.Now())
|
|
func NewWithTime(t time.Time) Id {
|
|
// NOTE compiler optimizes out division by constant for us
|
|
ms := uint64(t.Unix())*1000 + uint64(t.Nanosecond()/int(time.Millisecond))
|
|
count := atomic.AddUint32(&counter, 1)
|
|
return newID(ms, machineID, count)
|
|
}
|
|
|
|
func newID(ms, machineID uint64, count uint32) Id {
|
|
var id Id
|
|
|
|
id[0] = byte(ms >> 40)
|
|
id[1] = byte(ms >> 32)
|
|
id[2] = byte(ms >> 24)
|
|
id[3] = byte(ms >> 16)
|
|
id[4] = byte(ms >> 8)
|
|
id[5] = byte(ms)
|
|
|
|
id[6] = byte(machineID >> 40)
|
|
id[7] = byte(machineID >> 32)
|
|
id[8] = byte(machineID >> 24)
|
|
id[9] = byte(machineID >> 16)
|
|
id[10] = byte(machineID >> 8)
|
|
id[11] = byte(machineID)
|
|
|
|
id[12] = byte(count >> 24)
|
|
id[13] = byte(count >> 16)
|
|
id[14] = byte(count >> 8)
|
|
id[15] = byte(count)
|
|
|
|
return id
|
|
}
|
|
|
|
// following encodings are slightly modified from https://github.com/oklog/ulid
|
|
|
|
// String returns a lexicographically sortable string encoded Id
|
|
// (26 characters, non-standard base 32) e.g. 01AN4Z07BY79KA1307SR9X4MV3
|
|
// Format: ttttttttttmmmmmmmmmmeeeeee where t is time, m is machine id
|
|
// and c is a counter
|
|
func (id Id) String() string {
|
|
var b [EncodedSize]byte
|
|
_ = id.MarshalTextTo(b[:])
|
|
return string(b[:])
|
|
}
|
|
|
|
// MarshalBinary implements the encoding.BinaryMarshaler interface by
|
|
// returning the Id as a byte slice.
|
|
func (id Id) MarshalBinary() ([]byte, error) {
|
|
var b [EncodedSize]byte
|
|
return b[:], id.MarshalBinaryTo(b[:])
|
|
}
|
|
|
|
// MarshalBinaryTo writes the binary encoding of the Id to the given buffer.
|
|
// ErrBufferSize is returned when the len(dst) != 16.
|
|
func (id Id) MarshalBinaryTo(dst []byte) error {
|
|
if len(dst) != len(id) {
|
|
return errors.New("provided buffer not large enough to marshal id")
|
|
}
|
|
|
|
copy(dst, id[:])
|
|
return nil
|
|
}
|
|
|
|
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface by
|
|
// copying the passed data and converting it to an Id. ErrDataSize is
|
|
// returned if the data length is different from Id length.
|
|
func (id *Id) UnmarshalBinary(data []byte) error {
|
|
if len(data) != len(*id) {
|
|
return errors.New("can't unmarshal id from unexpected byte slice size")
|
|
}
|
|
|
|
copy((*id)[:], data)
|
|
return nil
|
|
}
|
|
|
|
// Encoding is the base 32 encoding alphabet used in Id strings.
|
|
const Encoding = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
|
|
|
|
// MarshalText implements the encoding.TextMarshaler interface by
|
|
// returning the string encoded Id.
|
|
func (id Id) MarshalText() ([]byte, error) {
|
|
var b [EncodedSize]byte
|
|
return b[:], id.MarshalTextTo(b[:])
|
|
}
|
|
|
|
// MarshalTextTo writes the Id as a string to the given buffer.
|
|
// an error is returned when the len(dst) != 26.
|
|
func (id Id) MarshalTextTo(dst []byte) error {
|
|
// Optimized unrolled loop ahead.
|
|
// From https://github.com/RobThree/NUlid
|
|
|
|
if len(dst) != EncodedSize {
|
|
return errors.New("not enough bytes to marshal id to")
|
|
}
|
|
|
|
// 10 byte timestamp
|
|
dst[0] = Encoding[(id[0]&224)>>5]
|
|
dst[1] = Encoding[id[0]&31]
|
|
dst[2] = Encoding[(id[1]&248)>>3]
|
|
dst[3] = Encoding[((id[1]&7)<<2)|((id[2]&192)>>6)]
|
|
dst[4] = Encoding[(id[2]&62)>>1]
|
|
dst[5] = Encoding[((id[2]&1)<<4)|((id[3]&240)>>4)]
|
|
dst[6] = Encoding[((id[3]&15)<<1)|((id[4]&128)>>7)]
|
|
dst[7] = Encoding[(id[4]&124)>>2]
|
|
dst[8] = Encoding[((id[4]&3)<<3)|((id[5]&224)>>5)]
|
|
dst[9] = Encoding[id[5]&31]
|
|
|
|
// 16 bytes of entropy
|
|
dst[10] = Encoding[(id[6]&248)>>3]
|
|
dst[11] = Encoding[((id[6]&7)<<2)|((id[7]&192)>>6)]
|
|
dst[12] = Encoding[(id[7]&62)>>1]
|
|
dst[13] = Encoding[((id[7]&1)<<4)|((id[8]&240)>>4)]
|
|
dst[14] = Encoding[((id[8]&15)<<1)|((id[9]&128)>>7)]
|
|
dst[15] = Encoding[(id[9]&124)>>2]
|
|
dst[16] = Encoding[((id[9]&3)<<3)|((id[10]&224)>>5)]
|
|
dst[17] = Encoding[id[10]&31]
|
|
dst[18] = Encoding[(id[11]&248)>>3]
|
|
dst[19] = Encoding[((id[11]&7)<<2)|((id[12]&192)>>6)]
|
|
dst[20] = Encoding[(id[12]&62)>>1]
|
|
dst[21] = Encoding[((id[12]&1)<<4)|((id[13]&240)>>4)]
|
|
dst[22] = Encoding[((id[13]&15)<<1)|((id[14]&128)>>7)]
|
|
dst[23] = Encoding[(id[14]&124)>>2]
|
|
dst[24] = Encoding[((id[14]&3)<<3)|((id[15]&224)>>5)]
|
|
dst[25] = Encoding[id[15]&31]
|
|
|
|
return nil
|
|
}
|
|
|
|
// Byte to index table for O(1) lookups when unmarshaling.
|
|
// We use 0xFF as sentinel value for invalid indexes.
|
|
var dec = [...]byte{
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x01,
|
|
0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
|
|
0x0F, 0x10, 0x11, 0xFF, 0x12, 0x13, 0xFF, 0x14, 0x15, 0xFF,
|
|
0x16, 0x17, 0x18, 0x19, 0x1A, 0xFF, 0x1B, 0x1C, 0x1D, 0x1E,
|
|
0x1F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, 0x0B, 0x0C,
|
|
0x0D, 0x0E, 0x0F, 0x10, 0x11, 0xFF, 0x12, 0x13, 0xFF, 0x14,
|
|
0x15, 0xFF, 0x16, 0x17, 0x18, 0x19, 0x1A, 0xFF, 0x1B, 0x1C,
|
|
0x1D, 0x1E, 0x1F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
|
}
|
|
|
|
// EncodedSize is the length of a text encoded Id.
|
|
const EncodedSize = 26
|
|
|
|
// UnmarshalText implements the encoding.TextUnmarshaler interface by
|
|
// parsing the data as string encoded Id.
|
|
//
|
|
// an error is returned if the len(v) is different from an encoded
|
|
// Id's length. Invalid encodings produce undefined Ids.
|
|
func (id *Id) UnmarshalText(v []byte) error {
|
|
// Optimized unrolled loop ahead.
|
|
// From https://github.com/RobThree/NUlid
|
|
if len(v) != EncodedSize {
|
|
return errors.New("id to unmarshal is of unexpected size")
|
|
}
|
|
|
|
// 6 bytes timestamp (48 bits)
|
|
(*id)[0] = ((dec[v[0]] << 5) | dec[v[1]])
|
|
(*id)[1] = ((dec[v[2]] << 3) | (dec[v[3]] >> 2))
|
|
(*id)[2] = ((dec[v[3]] << 6) | (dec[v[4]] << 1) | (dec[v[5]] >> 4))
|
|
(*id)[3] = ((dec[v[5]] << 4) | (dec[v[6]] >> 1))
|
|
(*id)[4] = ((dec[v[6]] << 7) | (dec[v[7]] << 2) | (dec[v[8]] >> 3))
|
|
(*id)[5] = ((dec[v[8]] << 5) | dec[v[9]])
|
|
|
|
// 10 bytes of entropy (80 bits)
|
|
(*id)[6] = ((dec[v[10]] << 3) | (dec[v[11]] >> 2))
|
|
(*id)[7] = ((dec[v[11]] << 6) | (dec[v[12]] << 1) | (dec[v[13]] >> 4))
|
|
(*id)[8] = ((dec[v[13]] << 4) | (dec[v[14]] >> 1))
|
|
(*id)[9] = ((dec[v[14]] << 7) | (dec[v[15]] << 2) | (dec[v[16]] >> 3))
|
|
(*id)[10] = ((dec[v[16]] << 5) | dec[v[17]])
|
|
(*id)[11] = ((dec[v[18]] << 3) | dec[v[19]]>>2)
|
|
(*id)[12] = ((dec[v[19]] << 6) | (dec[v[20]] << 1) | (dec[v[21]] >> 4))
|
|
(*id)[13] = ((dec[v[21]] << 4) | (dec[v[22]] >> 1))
|
|
(*id)[14] = ((dec[v[22]] << 7) | (dec[v[23]] << 2) | (dec[v[24]] >> 3))
|
|
(*id)[15] = ((dec[v[24]] << 5) | dec[v[25]])
|
|
|
|
return nil
|
|
}
|
|
|
|
// reverse encoding useful for sorting, descending
|
|
var rEncoding = reverseString(Encoding)
|
|
|
|
func reverseString(input string) string {
|
|
// rsc: http://groups.google.com/group/golang-nuts/browse_thread/thread/a0fb81698275eede
|
|
|
|
// Get Unicode code points.
|
|
n := 0
|
|
rune := make([]rune, len(input))
|
|
for _, r := range input {
|
|
rune[n] = r
|
|
n++
|
|
}
|
|
rune = rune[0:n]
|
|
// Reverse
|
|
for i := 0; i < n/2; i++ {
|
|
rune[i], rune[n-1-i] = rune[n-1-i], rune[i]
|
|
}
|
|
|
|
// Convert back to UTF-8.
|
|
return string(rune)
|
|
}
|
|
|
|
// EncodeDescending returns a lexicographically sortable descending encoding
|
|
// of a given id, e.g. 000 -> ZZZ, which allows reversing the sort order when stored
|
|
// contiguously since ids are lexicographically sortable. The returned string will
|
|
// be of len(src), and assumes src is from the base32 crockford alphabet, otherwise
|
|
// using 0xFF.
|
|
func EncodeDescending(src string) string {
|
|
var buf [EncodedSize]byte
|
|
copy(buf[:], src)
|
|
for i, s := range buf[:len(src)] {
|
|
// XXX(reed): optimize as dec is
|
|
j := strings.Index(Encoding, string(s))
|
|
buf[i] = rEncoding[j]
|
|
}
|
|
return string(buf[:len(src)])
|
|
}
|