mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* fn: stats view/distribution improvements *) View latency distribution is now an argument in view creation functions. This allows easier override to set custom buckets. It is simplistic and assumes all latency views would use the same set, but in practice this is already the case. *) Removed API view creation to main, this should not be enabled for all node types. This is consistent with the rest of the system. * fn: Docker samples of cpu/mem/disk with specific buckets
451 lines
14 KiB
Go
451 lines
14 KiB
Go
// Package s3 implements an s3 api compatible log store
|
|
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/id"
|
|
"github.com/fnproject/fn/api/logs"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/stats"
|
|
"go.opencensus.io/stats/view"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// TODO we should encrypt these, user will have to supply a key though (or all
|
|
// OSS users logs will be encrypted with same key unless they change it which
|
|
// just seems mean...)
|
|
|
|
// TODO do we need to use the v2 API? can't find BMC object store docs :/
|
|
|
|
const (
|
|
// key prefixes
|
|
callKeyPrefix = "c/"
|
|
callMarkerPrefix = "m/"
|
|
logKeyPrefix = "l/"
|
|
)
|
|
|
|
type store struct {
|
|
client *s3.S3
|
|
uploader *s3manager.Uploader
|
|
downloader *s3manager.Downloader
|
|
bucket string
|
|
}
|
|
|
|
type s3StoreProvider int
|
|
|
|
// decorator around the Reader interface that keeps track of the number of bytes read
|
|
// in order to avoid double buffering and track Reader size
|
|
type countingReader struct {
|
|
r io.Reader
|
|
count int
|
|
}
|
|
|
|
func (cr *countingReader) Read(p []byte) (n int, err error) {
|
|
n, err = cr.r.Read(p)
|
|
cr.count += n
|
|
return n, err
|
|
}
|
|
|
|
func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
|
|
config := &aws.Config{
|
|
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
|
|
Endpoint: aws.String(endpoint),
|
|
Region: aws.String(region),
|
|
DisableSSL: aws.Bool(!useSSL),
|
|
S3ForcePathStyle: aws.Bool(true),
|
|
}
|
|
client := s3.New(session.Must(session.NewSession(config)))
|
|
|
|
return &store{
|
|
client: client,
|
|
uploader: s3manager.NewUploaderWithClient(client),
|
|
downloader: s3manager.NewDownloaderWithClient(client),
|
|
bucket: bucketName,
|
|
}
|
|
}
|
|
|
|
func (s3StoreProvider) String() string {
|
|
return "s3"
|
|
}
|
|
|
|
func (s3StoreProvider) Supports(u *url.URL) bool {
|
|
return u.Scheme == "s3"
|
|
}
|
|
|
|
// New returns an s3 api compatible log store.
|
|
// url format: s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
|
|
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
|
|
func (s3StoreProvider) New(ctx context.Context, u *url.URL) (models.LogStore, error) {
|
|
endpoint := u.Host
|
|
|
|
var accessKeyID, secretAccessKey string
|
|
if u.User != nil {
|
|
accessKeyID = u.User.Username()
|
|
secretAccessKey, _ = u.User.Password()
|
|
}
|
|
useSSL := u.Query().Get("ssl") == "true"
|
|
|
|
strs := strings.SplitN(u.Path, "/", 3)
|
|
if len(strs) < 3 {
|
|
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
region := strs[1]
|
|
bucketName := strs[2]
|
|
if region == "" {
|
|
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
} else if bucketName == "" {
|
|
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
|
|
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)
|
|
|
|
// ensure the bucket exists, creating if it does not
|
|
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
switch aerr.Code() {
|
|
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
|
|
// bucket already exists, NO-OP
|
|
default:
|
|
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
|
|
}
|
|
}
|
|
|
|
return store, nil
|
|
}
|
|
|
|
func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error {
|
|
ctx, span := trace.StartSpan(ctx, "s3_insert_log")
|
|
defer span.End()
|
|
|
|
// wrap original reader in a decorator to keep track of read bytes without buffering
|
|
cr := &countingReader{r: callLog}
|
|
objectName := logKey(appID, callID)
|
|
params := &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: cr,
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
|
|
_, err := s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, uploadSizeMeasure.M(int64(cr.count)))
|
|
return nil
|
|
}
|
|
|
|
func (s *store) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_log")
|
|
defer span.End()
|
|
|
|
objectName := logKey(appID, callID)
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")
|
|
|
|
// stream the logs to an in-memory buffer
|
|
target := &aws.WriteAtBuffer{}
|
|
size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
})
|
|
if err != nil {
|
|
aerr, ok := err.(awserr.Error)
|
|
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
|
|
return nil, models.ErrCallLogNotFound
|
|
}
|
|
return nil, fmt.Errorf("failed to read log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, downloadSizeMeasure.M(size))
|
|
return bytes.NewReader(target.Bytes()), nil
|
|
}
|
|
|
|
func (s *store) InsertCall(ctx context.Context, call *models.Call) error {
|
|
ctx, span := trace.StartSpan(ctx, "s3_insert_call")
|
|
defer span.End()
|
|
|
|
byts, err := json.Marshal(call)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
objectName := callKey(call.AppID, call.ID)
|
|
params := &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: bytes.NewReader(byts),
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call")
|
|
_, err = s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert call, %v", err)
|
|
}
|
|
|
|
// at this point, they can point lookup the log and it will work. now, we can try to upload
|
|
// the marker key. if the marker key upload fails, the user will simply not
|
|
// see this entry when listing only when specifying a route path. (NOTE: this
|
|
// behavior will go away if we stop listing by route -> triggers)
|
|
|
|
objectName = callMarkerKey(call.AppID, call.Path, call.ID)
|
|
params = &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: bytes.NewReader([]byte{}),
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker")
|
|
_, err = s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
// XXX(reed): we could just log this?
|
|
return fmt.Errorf("failed to write marker key for log, %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetCall returns a call at a certain id and app name.
|
|
func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_call")
|
|
defer span.End()
|
|
|
|
objectName := callKey(appID, callID)
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call")
|
|
|
|
return s.getCallByKey(ctx, objectName)
|
|
}
|
|
|
|
func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) {
|
|
// stream the logs to an in-memory buffer
|
|
var target aws.WriteAtBuffer
|
|
_, err := s.downloader.DownloadWithContext(ctx, &target, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
aerr, ok := err.(awserr.Error)
|
|
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
|
|
return nil, models.ErrCallNotFound
|
|
}
|
|
return nil, fmt.Errorf("failed to read log, %v", err)
|
|
}
|
|
|
|
var call models.Call
|
|
err = json.Unmarshal(target.Bytes(), &call)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &call, nil
|
|
}
|
|
|
|
func flipCursor(oid string) string {
|
|
if oid == "" {
|
|
return ""
|
|
}
|
|
|
|
return id.EncodeDescending(oid)
|
|
}
|
|
|
|
func callMarkerKey(app, path, id string) string {
|
|
id = flipCursor(id)
|
|
// s3 urls use / and are url, we need to encode this since paths have / in them
|
|
// NOTE: s3 urls are max of 1024 chars. path is the only non-fixed sized object in here
|
|
// but it is fixed to 256 chars in sql (by chance, mostly). further validation may be needed if weirdness ensues.
|
|
path = base64.RawURLEncoding.EncodeToString([]byte(path))
|
|
return callMarkerPrefix + app + "/" + path + "/" + id
|
|
}
|
|
|
|
func callKey(app, id string) string {
|
|
id = flipCursor(id)
|
|
return callKeyFlipped(app, id)
|
|
}
|
|
|
|
func callKeyFlipped(app, id string) string {
|
|
return callKeyPrefix + app + "/" + id
|
|
}
|
|
|
|
func logKey(appID, callID string) string {
|
|
return logKeyPrefix + appID + "/" + callID
|
|
}
|
|
|
|
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
|
// calls exist, an empty list and a nil error are returned.
|
|
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte
|
|
func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_calls")
|
|
defer span.End()
|
|
|
|
if filter.AppID == "" {
|
|
return nil, errors.New("s3 store does not support listing across all apps")
|
|
}
|
|
|
|
// NOTE:
|
|
// if filter.Path != ""
|
|
// find marker from marker keys, start there, list keys, get next marker from there
|
|
// else
|
|
// use marker for keys
|
|
|
|
// NOTE we need marker keys to support (app is REQUIRED):
|
|
// 1) quick iteration per path
|
|
// 2) sorted by id across all path
|
|
// marker key: m : {app} : {path} : {id}
|
|
// key: s: {app} : {id}
|
|
//
|
|
// also s3 api returns sorted in lexicographic order, we need the reverse of this.
|
|
|
|
// marker is either a provided marker, or a key we create based on parameters
|
|
// that contains app_id, may be a marker key if path is provided, and may
|
|
// have a time guesstimate if to time is provided.
|
|
|
|
var marker string
|
|
|
|
// filter.Cursor is a call id, translate to our key format. if a path is
|
|
// provided, we list keys from markers instead.
|
|
if filter.Cursor != "" {
|
|
marker = callKey(filter.AppID, filter.Cursor)
|
|
if filter.Path != "" {
|
|
marker = callMarkerKey(filter.AppID, filter.Path, filter.Cursor)
|
|
}
|
|
} else if t := time.Time(filter.ToTime); !t.IsZero() {
|
|
// get a fake id that has the most significant bits set to the to_time (first 48 bits)
|
|
fako := id.NewWithTime(t)
|
|
//var buf [id.EncodedSize]byte
|
|
//fakoId.MarshalTextTo(buf)
|
|
//mid := string(buf[:10])
|
|
mid := fako.String()
|
|
marker = callKey(filter.AppID, mid)
|
|
if filter.Path != "" {
|
|
marker = callMarkerKey(filter.AppID, filter.Path, mid)
|
|
}
|
|
}
|
|
|
|
// prefix prevents leaving bounds of app or path marker keys
|
|
prefix := callKey(filter.AppID, "")
|
|
if filter.Path != "" {
|
|
prefix = callMarkerKey(filter.AppID, filter.Path, "")
|
|
}
|
|
|
|
input := &s3.ListObjectsInput{
|
|
Bucket: aws.String(s.bucket),
|
|
MaxKeys: aws.Int64(int64(filter.PerPage)),
|
|
Marker: aws.String(marker),
|
|
Prefix: aws.String(prefix),
|
|
}
|
|
|
|
result, err := s.client.ListObjects(input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list logs: %v", err)
|
|
}
|
|
|
|
calls := make([]*models.Call, 0, len(result.Contents))
|
|
|
|
for _, obj := range result.Contents {
|
|
if len(calls) == filter.PerPage {
|
|
break
|
|
}
|
|
|
|
// extract the app and id from the key to lookup the object, this also
|
|
// validates we aren't reading strangely keyed objects from the bucket.
|
|
var app, id string
|
|
if filter.Path != "" {
|
|
fields := strings.Split(*obj.Key, "/")
|
|
if len(fields) != 4 {
|
|
return calls, fmt.Errorf("invalid key in call markers: %v", *obj.Key)
|
|
}
|
|
app = fields[1]
|
|
id = fields[3]
|
|
} else {
|
|
fields := strings.Split(*obj.Key, "/")
|
|
if len(fields) != 3 {
|
|
return calls, fmt.Errorf("invalid key in calls: %v", *obj.Key)
|
|
}
|
|
app = fields[1]
|
|
id = fields[2]
|
|
}
|
|
|
|
// the id here is already reverse encoded, keep it that way.
|
|
objectName := callKeyFlipped(app, id)
|
|
|
|
// NOTE: s3 doesn't have a way to get multiple objects so just use GetCall
|
|
// TODO we should reuse the buffer to decode these
|
|
call, err := s.getCallByKey(ctx, objectName)
|
|
if err != nil {
|
|
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app": app, "id": id}).Error("error filling call object")
|
|
continue
|
|
}
|
|
|
|
// ensure: from_time < created_at < to_time
|
|
fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond)
|
|
if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) {
|
|
// NOTE could break, ids and created_at aren't necessarily in perfect order
|
|
continue
|
|
}
|
|
|
|
toTime := time.Time(filter.ToTime).Truncate(time.Millisecond)
|
|
if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) {
|
|
continue
|
|
}
|
|
|
|
calls = append(calls, call)
|
|
}
|
|
|
|
return calls, nil
|
|
}
|
|
|
|
func (s *store) Close() error {
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
uploadSizeMetricName = "s3_log_upload_size"
|
|
downloadSizeMetricName = "s3_log_download_size"
|
|
)
|
|
|
|
var (
|
|
uploadSizeMeasure = common.MakeMeasure(uploadSizeMetricName, "uploaded log size", "byte")
|
|
downloadSizeMeasure = common.MakeMeasure(downloadSizeMetricName, "downloaded log size", "byte")
|
|
)
|
|
|
|
// RegisterViews registers views for s3 measures
|
|
func RegisterViews(tagKeys []string, dist []float64) {
|
|
err := view.Register(
|
|
common.CreateView(uploadSizeMeasure, view.Distribution(dist...), tagKeys),
|
|
common.CreateView(downloadSizeMeasure, view.Distribution(dist...), tagKeys),
|
|
)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("cannot create view")
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
logs.Register(s3StoreProvider(0))
|
|
}
|