Files
fn-serverless/api/logs/s3/s3.go
Reed Allman 56a2861748 move calls to logstore, implement s3 (#911)
* move calls to logstore, implement s3

closes #482

the basic motivation is that logs and calls will be stored with a very high
write rate, while apps and routes will be relatively infrequently updated; it
follows that we should likely split up their storage location, to back them
with appropriate storage facilities. s3 is a good candidate for ingesting
higher write rate data than a sql database, and will make it easier to manage
that data set. can read #482 for more detailed justification.

summary:

* calls api moved from datastore to logstore
* logstore used in front-end to serve calls endpoints
* agent now throws calls into logstore instead of datastore
* s3 implementation of calls api for logstore
* s3 logs key changed (nobody using / nbd?)
* removed UpdateCall api (not in use)
* moved call tests from datastore to logstore tests
* mock logstore now tested (prev. sqlite3 only)
* logstore tests run against every datastore (mysql, pg; prev. only sqlite3)
* simplify NewMock in tests

commentary:

brunt of the work is implementing the listing of calls in GetCalls for the s3
logstore implementation. the GetCalls API requires returning items in the
newest to oldest order, and the s3 api lists items in lexicographic order
based on created_at. An easy thing to do here seemed to be to reverse the
encoding of our id format to return a lexicographically descending order,
since ids are time based, reasonably encoded to be lexicographically
sortable, and de-duped (unlike created_at). This seems to work pretty well,
it's not perfect around the boundaries of to_time and from_time and a tiny
amount of results may be omitted, but to me this doesn't seem like a deal
breaker to get 6999 results instead of 7000 when trying to get calls between
3:00pm and 4:00pm Monday 3 weeks ago. Of course, without to_time and
from_time, there are no issues in listing results. We could use created at and
encode it, but it would be an additional marker for point lookup (GetCall)
since we would have to search for a created_at stamp, search for ids around
that until we find the matching one, just to do a point lookup. So, the
tradeoff here seems worth it. There is additional optimization around to_time
to seek over newer results (since we have descending order).

The other complication in GetCalls is returning a list of calls for a given
path. Since the keys to do point lookups are only app_id + call_id, and we
need listing across an app as well, this leads us to the 'marker' collection
which is sorted by app_id + path + call_id, to allow quick listing by path.
All in all, it should be pretty straightforward to follow the implementation
and I tried to be lavish with the comments, please let me know if anything
needs further clarification in the code.

The implementation itself has some glaring inefficiencies, but they're
relatively minute: json encoding is kinda lazy, but workable; s3 doesn't offer
batch retrieval, so we point look up each call one by one in get call; not
re-using buffers -- but the seeking around the keys should all be relatively
fast, not too worried about performance really and this isn't a hot path for
reads (need to make a cut point and turn this in!).

Interestingly, in testing, minio performs significantly worse than pg for
storing both logs and calls (or just logs, I tested that too). minio seems to
have really high cpu consumption, but in any event, we won't be using minio,
we'll be using a cloud object store that implements the s3 api. Anyway, mostly
a knock on using minio for high performance, not really anything to do with
this, just thought it was interesting.

I think it's safe to remove UpdateCall, admittedly this made implementing the
s3 api a lot easier. This operation may also be something we never need, it
was unused at present and was only in the cards for a previous hybrid
implementation, which we've now abandoned. If we need, we can always resurrect
from git.

Also not worried about changing the log key, we need to put a prefix on this
thing anyway, but I don't think anybody is using this anyway. in any event, it
simply means old logs won't show up through the API, but aside from nobody
using this yet, that doesn't seem a big deal breaker really -- new logs will
appear fine.

future:

TODO make logstore implementation optional for datastore, check in front-end
at runtime and offer a nil logstore that errors appropriately

TODO low hanging fruit optimizations of json encoding, re-using buffers for
download, get multiple calls at a time, id reverse encoding could be optimized
like normal encoding to not be n^2

TODO api for range removal of logs and calls

* address review comments

* push id to_time magic into id package
* add note about s3 key sizes
* fix validation check
2018-04-05 10:49:25 -07:00

471 lines
14 KiB
Go

// Package s3 implements an s3 api compatible log store
package s3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/models"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)
// TODO we should encrypt these, user will have to supply a key though (or all
// OSS users logs will be encrypted with same key unless they change it which
// just seems mean...)
// TODO do we need to use the v2 API? can't find BMC object store docs :/
const (
// key prefixes
callKeyPrefix = "c/"
callMarkerPrefix = "m/"
logKeyPrefix = "l/"
)
type store struct {
client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
bucket string
}
// decorator around the Reader interface that keeps track of the number of bytes read
// in order to avoid double buffering and track Reader size
type countingReader struct {
r io.Reader
count int
}
func (cr *countingReader) Read(p []byte) (n int, err error) {
n, err = cr.r.Read(p)
cr.count += n
return n, err
}
func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
config := &aws.Config{
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
Endpoint: aws.String(endpoint),
Region: aws.String(region),
DisableSSL: aws.Bool(!useSSL),
S3ForcePathStyle: aws.Bool(true),
}
client := s3.New(session.Must(session.NewSession(config)))
return &store{
client: client,
uploader: s3manager.NewUploaderWithClient(client),
downloader: s3manager.NewDownloaderWithClient(client),
bucket: bucketName,
}
}
// New returns an s3 api compatible log store.
// url format: s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
func New(u *url.URL) (models.LogStore, error) {
endpoint := u.Host
var accessKeyID, secretAccessKey string
if u.User != nil {
accessKeyID = u.User.Username()
secretAccessKey, _ = u.User.Password()
}
useSSL := u.Query().Get("ssl") == "true"
strs := strings.SplitN(u.Path, "/", 3)
if len(strs) < 3 {
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}
region := strs[1]
bucketName := strs[2]
if region == "" {
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
} else if bucketName == "" {
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)
// ensure the bucket exists, creating if it does not
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
// bucket already exists, NO-OP
default:
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
}
} else {
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
}
}
return store, nil
}
func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error {
ctx, span := trace.StartSpan(ctx, "s3_insert_log")
defer span.End()
// wrap original reader in a decorator to keep track of read bytes without buffering
cr := &countingReader{r: callLog}
objectName := logKey(appID, callID)
params := &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
Body: cr,
ContentType: aws.String("text/plain"),
}
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
_, err := s.uploader.UploadWithContext(ctx, params)
if err != nil {
return fmt.Errorf("failed to write log, %v", err)
}
stats.Record(ctx, uploadSizeMeasure.M(int64(cr.count)))
return nil
}
func (s *store) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
ctx, span := trace.StartSpan(ctx, "s3_get_log")
defer span.End()
objectName := logKey(appID, callID)
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")
// stream the logs to an in-memory buffer
target := &aws.WriteAtBuffer{}
size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
})
if err != nil {
aerr, ok := err.(awserr.Error)
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, models.ErrCallLogNotFound
}
return nil, fmt.Errorf("failed to read log, %v", err)
}
stats.Record(ctx, downloadSizeMeasure.M(size))
return bytes.NewReader(target.Bytes()), nil
}
func (s *store) InsertCall(ctx context.Context, call *models.Call) error {
ctx, span := trace.StartSpan(ctx, "s3_insert_call")
defer span.End()
byts, err := json.Marshal(call)
if err != nil {
return err
}
objectName := callKey(call.AppID, call.ID)
params := &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
Body: bytes.NewReader(byts),
ContentType: aws.String("text/plain"),
}
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call")
_, err = s.uploader.UploadWithContext(ctx, params)
if err != nil {
return fmt.Errorf("failed to insert call, %v", err)
}
// at this point, they can point lookup the log and it will work. now, we can try to upload
// the marker key. if the marker key upload fails, the user will simply not
// see this entry when listing only when specifying a route path. (NOTE: this
// behavior will go away if we stop listing by route -> triggers)
objectName = callMarkerKey(call.AppID, call.Path, call.ID)
params = &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
Body: bytes.NewReader([]byte{}),
ContentType: aws.String("text/plain"),
}
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker")
_, err = s.uploader.UploadWithContext(ctx, params)
if err != nil {
// XXX(reed): we could just log this?
return fmt.Errorf("failed to write marker key for log, %v", err)
}
return nil
}
// GetCall returns a call at a certain id and app name.
func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
ctx, span := trace.StartSpan(ctx, "s3_get_call")
defer span.End()
objectName := callKey(appID, callID)
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call")
return s.getCallByKey(ctx, objectName)
}
func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) {
// stream the logs to an in-memory buffer
var target aws.WriteAtBuffer
_, err := s.downloader.DownloadWithContext(ctx, &target, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
aerr, ok := err.(awserr.Error)
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, models.ErrCallNotFound
}
return nil, fmt.Errorf("failed to read log, %v", err)
}
var call models.Call
err = json.Unmarshal(target.Bytes(), &call)
if err != nil {
return nil, err
}
return &call, nil
}
func flipCursor(oid string) string {
if oid == "" {
return ""
}
return id.EncodeDescending(oid)
}
func callMarkerKey(app, path, id string) string {
id = flipCursor(id)
// s3 urls use / and are url, we need to encode this since paths have / in them
// NOTE: s3 urls are max of 1024 chars. path is the only non-fixed sized object in here
// but it is fixed to 256 chars in sql (by chance, mostly). further validation may be needed if weirdness ensues.
path = base64.RawURLEncoding.EncodeToString([]byte(path))
return callMarkerPrefix + app + "/" + path + "/" + id
}
func callKey(app, id string) string {
id = flipCursor(id)
return callKeyFlipped(app, id)
}
func callKeyFlipped(app, id string) string {
return callKeyPrefix + app + "/" + id
}
func logKey(appID, callID string) string {
return logKeyPrefix + appID + "/" + callID
}
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
// calls exist, an empty list and a nil error are returned.
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte
func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
ctx, span := trace.StartSpan(ctx, "s3_get_calls")
defer span.End()
if filter.AppID == "" {
return nil, errors.New("s3 store does not support listing across all apps")
}
// NOTE:
// if filter.Path != ""
// find marker from marker keys, start there, list keys, get next marker from there
// else
// use marker for keys
// NOTE we need marker keys to support (app is REQUIRED):
// 1) quick iteration per path
// 2) sorted by id across all path
// marker key: m : {app} : {path} : {id}
// key: s: {app} : {id}
//
// also s3 api returns sorted in lexicographic order, we need the reverse of this.
// marker is either a provided marker, or a key we create based on parameters
// that contains app_id, may be a marker key if path is provided, and may
// have a time guesstimate if to time is provided.
var marker string
// filter.Cursor is a call id, translate to our key format. if a path is
// provided, we list keys from markers instead.
if filter.Cursor != "" {
marker = callKey(filter.AppID, filter.Cursor)
if filter.Path != "" {
marker = callMarkerKey(filter.AppID, filter.Path, filter.Cursor)
}
} else if t := time.Time(filter.ToTime); !t.IsZero() {
// get a fake id that has the most significant bits set to the to_time (first 48 bits)
fako := id.NewWithTime(t)
//var buf [id.EncodedSize]byte
//fakoId.MarshalTextTo(buf)
//mid := string(buf[:10])
mid := fako.String()
marker = callKey(filter.AppID, mid)
if filter.Path != "" {
marker = callMarkerKey(filter.AppID, filter.Path, mid)
}
}
// prefix prevents leaving bounds of app or path marker keys
prefix := callKey(filter.AppID, "")
if filter.Path != "" {
prefix = callMarkerKey(filter.AppID, filter.Path, "")
}
input := &s3.ListObjectsInput{
Bucket: aws.String(s.bucket),
MaxKeys: aws.Int64(int64(filter.PerPage)),
Marker: aws.String(marker),
Prefix: aws.String(prefix),
}
result, err := s.client.ListObjects(input)
if err != nil {
return nil, fmt.Errorf("failed to list logs: %v", err)
}
calls := make([]*models.Call, 0, len(result.Contents))
for _, obj := range result.Contents {
if len(calls) == filter.PerPage {
break
}
// extract the app and id from the key to lookup the object, this also
// validates we aren't reading strangely keyed objects from the bucket.
var app, id string
if filter.Path != "" {
fields := strings.Split(*obj.Key, "/")
if len(fields) != 4 {
return calls, fmt.Errorf("invalid key in call markers: %v", *obj.Key)
}
app = fields[1]
id = fields[3]
} else {
fields := strings.Split(*obj.Key, "/")
if len(fields) != 3 {
return calls, fmt.Errorf("invalid key in calls: %v", *obj.Key)
}
app = fields[1]
id = fields[2]
}
// the id here is already reverse encoded, keep it that way.
objectName := callKeyFlipped(app, id)
// NOTE: s3 doesn't have a way to get multiple objects so just use GetCall
// TODO we should reuse the buffer to decode these
call, err := s.getCallByKey(ctx, objectName)
if err != nil {
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app": app, "id": id}).Error("error filling call object")
continue
}
// ensure: from_time < created_at < to_time
fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond)
if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) {
// NOTE could break, ids and created_at aren't necessarily in perfect order
continue
}
toTime := time.Time(filter.ToTime).Truncate(time.Millisecond)
if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) {
continue
}
calls = append(calls, call)
}
return calls, nil
}
var (
uploadSizeMeasure *stats.Int64Measure
downloadSizeMeasure *stats.Int64Measure
)
func init() {
// TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated
// or we have to white list them in the view for them to show up? test...
var err error
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
{
uploadSizeMeasure, err = stats.Int64("s3_log_upload_size", "uploaded log size", "byte")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"s3_log_upload_size",
"uploaded log size",
[]tag.Key{appKey, pathKey},
uploadSizeMeasure,
view.Distribution(),
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
downloadSizeMeasure, err = stats.Int64("s3_log_download_size", "downloaded log size", "byte")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"s3_log_download_size",
"downloaded log size",
[]tag.Key{appKey, pathKey},
downloadSizeMeasure,
view.Distribution(),
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
}