mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
474 lines
14 KiB
Go
474 lines
14 KiB
Go
// Package s3 implements an s3 api compatible log store
|
|
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/id"
|
|
"github.com/fnproject/fn/api/logs"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/stats"
|
|
"go.opencensus.io/stats/view"
|
|
"go.opencensus.io/tag"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// TODO we should encrypt these, user will have to supply a key though (or all
|
|
// OSS users logs will be encrypted with same key unless they change it which
|
|
// just seems mean...)
|
|
|
|
// TODO do we need to use the v2 API? can't find BMC object store docs :/
|
|
|
|
const (
|
|
// key prefixes
|
|
callKeyPrefix = "c/"
|
|
callMarkerPrefix = "m/"
|
|
logKeyPrefix = "l/"
|
|
)
|
|
|
|
type store struct {
|
|
client *s3.S3
|
|
uploader *s3manager.Uploader
|
|
downloader *s3manager.Downloader
|
|
bucket string
|
|
}
|
|
|
|
type s3StoreProvider int
|
|
|
|
// decorator around the Reader interface that keeps track of the number of bytes read
|
|
// in order to avoid double buffering and track Reader size
|
|
type countingReader struct {
|
|
r io.Reader
|
|
count int
|
|
}
|
|
|
|
func (cr *countingReader) Read(p []byte) (n int, err error) {
|
|
n, err = cr.r.Read(p)
|
|
cr.count += n
|
|
return n, err
|
|
}
|
|
|
|
func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
|
|
config := &aws.Config{
|
|
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
|
|
Endpoint: aws.String(endpoint),
|
|
Region: aws.String(region),
|
|
DisableSSL: aws.Bool(!useSSL),
|
|
S3ForcePathStyle: aws.Bool(true),
|
|
}
|
|
client := s3.New(session.Must(session.NewSession(config)))
|
|
|
|
return &store{
|
|
client: client,
|
|
uploader: s3manager.NewUploaderWithClient(client),
|
|
downloader: s3manager.NewDownloaderWithClient(client),
|
|
bucket: bucketName,
|
|
}
|
|
}
|
|
|
|
func (s3StoreProvider) String() string {
|
|
return "s3"
|
|
}
|
|
|
|
func (s3StoreProvider) Supports(u *url.URL) bool {
|
|
return u.Scheme == "s3"
|
|
}
|
|
|
|
// New returns an s3 api compatible log store.
|
|
// url format: s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
|
|
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
|
|
func (s3StoreProvider) New(ctx context.Context, u *url.URL) (models.LogStore, error) {
|
|
endpoint := u.Host
|
|
|
|
var accessKeyID, secretAccessKey string
|
|
if u.User != nil {
|
|
accessKeyID = u.User.Username()
|
|
secretAccessKey, _ = u.User.Password()
|
|
}
|
|
useSSL := u.Query().Get("ssl") == "true"
|
|
|
|
strs := strings.SplitN(u.Path, "/", 3)
|
|
if len(strs) < 3 {
|
|
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
region := strs[1]
|
|
bucketName := strs[2]
|
|
if region == "" {
|
|
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
} else if bucketName == "" {
|
|
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
|
|
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)
|
|
|
|
// ensure the bucket exists, creating if it does not
|
|
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
switch aerr.Code() {
|
|
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
|
|
// bucket already exists, NO-OP
|
|
default:
|
|
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
|
|
}
|
|
}
|
|
|
|
return store, nil
|
|
}
|
|
|
|
func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error {
|
|
ctx, span := trace.StartSpan(ctx, "s3_insert_log")
|
|
defer span.End()
|
|
|
|
// wrap original reader in a decorator to keep track of read bytes without buffering
|
|
cr := &countingReader{r: callLog}
|
|
objectName := logKey(appID, callID)
|
|
params := &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: cr,
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
|
|
_, err := s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, uploadSizeMeasure.M(int64(cr.count)))
|
|
return nil
|
|
}
|
|
|
|
func (s *store) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_log")
|
|
defer span.End()
|
|
|
|
objectName := logKey(appID, callID)
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")
|
|
|
|
// stream the logs to an in-memory buffer
|
|
target := &aws.WriteAtBuffer{}
|
|
size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
})
|
|
if err != nil {
|
|
aerr, ok := err.(awserr.Error)
|
|
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
|
|
return nil, models.ErrCallLogNotFound
|
|
}
|
|
return nil, fmt.Errorf("failed to read log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, downloadSizeMeasure.M(size))
|
|
return bytes.NewReader(target.Bytes()), nil
|
|
}
|
|
|
|
func (s *store) InsertCall(ctx context.Context, call *models.Call) error {
|
|
ctx, span := trace.StartSpan(ctx, "s3_insert_call")
|
|
defer span.End()
|
|
|
|
byts, err := json.Marshal(call)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
objectName := callKey(call.AppID, call.ID)
|
|
params := &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: bytes.NewReader(byts),
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call")
|
|
_, err = s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert call, %v", err)
|
|
}
|
|
|
|
// at this point, they can point lookup the log and it will work. now, we can try to upload
|
|
// the marker key. if the marker key upload fails, the user will simply not
|
|
// see this entry when listing only when specifying a route path. (NOTE: this
|
|
// behavior will go away if we stop listing by route -> triggers)
|
|
|
|
objectName = callMarkerKey(call.AppID, call.Path, call.ID)
|
|
params = &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: bytes.NewReader([]byte{}),
|
|
ContentType: aws.String("text/plain"),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker")
|
|
_, err = s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
// XXX(reed): we could just log this?
|
|
return fmt.Errorf("failed to write marker key for log, %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetCall returns a call at a certain id and app name.
|
|
func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_call")
|
|
defer span.End()
|
|
|
|
objectName := callKey(appID, callID)
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call")
|
|
|
|
return s.getCallByKey(ctx, objectName)
|
|
}
|
|
|
|
func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) {
|
|
// stream the logs to an in-memory buffer
|
|
var target aws.WriteAtBuffer
|
|
_, err := s.downloader.DownloadWithContext(ctx, &target, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
aerr, ok := err.(awserr.Error)
|
|
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
|
|
return nil, models.ErrCallNotFound
|
|
}
|
|
return nil, fmt.Errorf("failed to read log, %v", err)
|
|
}
|
|
|
|
var call models.Call
|
|
err = json.Unmarshal(target.Bytes(), &call)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &call, nil
|
|
}
|
|
|
|
func flipCursor(oid string) string {
|
|
if oid == "" {
|
|
return ""
|
|
}
|
|
|
|
return id.EncodeDescending(oid)
|
|
}
|
|
|
|
func callMarkerKey(app, path, id string) string {
|
|
id = flipCursor(id)
|
|
// s3 urls use / and are url, we need to encode this since paths have / in them
|
|
// NOTE: s3 urls are max of 1024 chars. path is the only non-fixed sized object in here
|
|
// but it is fixed to 256 chars in sql (by chance, mostly). further validation may be needed if weirdness ensues.
|
|
path = base64.RawURLEncoding.EncodeToString([]byte(path))
|
|
return callMarkerPrefix + app + "/" + path + "/" + id
|
|
}
|
|
|
|
func callKey(app, id string) string {
|
|
id = flipCursor(id)
|
|
return callKeyFlipped(app, id)
|
|
}
|
|
|
|
func callKeyFlipped(app, id string) string {
|
|
return callKeyPrefix + app + "/" + id
|
|
}
|
|
|
|
func logKey(appID, callID string) string {
|
|
return logKeyPrefix + appID + "/" + callID
|
|
}
|
|
|
|
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
|
// calls exist, an empty list and a nil error are returned.
|
|
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte
|
|
func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_calls")
|
|
defer span.End()
|
|
|
|
if filter.AppID == "" {
|
|
return nil, errors.New("s3 store does not support listing across all apps")
|
|
}
|
|
|
|
// NOTE:
|
|
// if filter.Path != ""
|
|
// find marker from marker keys, start there, list keys, get next marker from there
|
|
// else
|
|
// use marker for keys
|
|
|
|
// NOTE we need marker keys to support (app is REQUIRED):
|
|
// 1) quick iteration per path
|
|
// 2) sorted by id across all path
|
|
// marker key: m : {app} : {path} : {id}
|
|
// key: s: {app} : {id}
|
|
//
|
|
// also s3 api returns sorted in lexicographic order, we need the reverse of this.
|
|
|
|
// marker is either a provided marker, or a key we create based on parameters
|
|
// that contains app_id, may be a marker key if path is provided, and may
|
|
// have a time guesstimate if to time is provided.
|
|
|
|
var marker string
|
|
|
|
// filter.Cursor is a call id, translate to our key format. if a path is
|
|
// provided, we list keys from markers instead.
|
|
if filter.Cursor != "" {
|
|
marker = callKey(filter.AppID, filter.Cursor)
|
|
if filter.Path != "" {
|
|
marker = callMarkerKey(filter.AppID, filter.Path, filter.Cursor)
|
|
}
|
|
} else if t := time.Time(filter.ToTime); !t.IsZero() {
|
|
// get a fake id that has the most significant bits set to the to_time (first 48 bits)
|
|
fako := id.NewWithTime(t)
|
|
//var buf [id.EncodedSize]byte
|
|
//fakoId.MarshalTextTo(buf)
|
|
//mid := string(buf[:10])
|
|
mid := fako.String()
|
|
marker = callKey(filter.AppID, mid)
|
|
if filter.Path != "" {
|
|
marker = callMarkerKey(filter.AppID, filter.Path, mid)
|
|
}
|
|
}
|
|
|
|
// prefix prevents leaving bounds of app or path marker keys
|
|
prefix := callKey(filter.AppID, "")
|
|
if filter.Path != "" {
|
|
prefix = callMarkerKey(filter.AppID, filter.Path, "")
|
|
}
|
|
|
|
input := &s3.ListObjectsInput{
|
|
Bucket: aws.String(s.bucket),
|
|
MaxKeys: aws.Int64(int64(filter.PerPage)),
|
|
Marker: aws.String(marker),
|
|
Prefix: aws.String(prefix),
|
|
}
|
|
|
|
result, err := s.client.ListObjects(input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list logs: %v", err)
|
|
}
|
|
|
|
calls := make([]*models.Call, 0, len(result.Contents))
|
|
|
|
for _, obj := range result.Contents {
|
|
if len(calls) == filter.PerPage {
|
|
break
|
|
}
|
|
|
|
// extract the app and id from the key to lookup the object, this also
|
|
// validates we aren't reading strangely keyed objects from the bucket.
|
|
var app, id string
|
|
if filter.Path != "" {
|
|
fields := strings.Split(*obj.Key, "/")
|
|
if len(fields) != 4 {
|
|
return calls, fmt.Errorf("invalid key in call markers: %v", *obj.Key)
|
|
}
|
|
app = fields[1]
|
|
id = fields[3]
|
|
} else {
|
|
fields := strings.Split(*obj.Key, "/")
|
|
if len(fields) != 3 {
|
|
return calls, fmt.Errorf("invalid key in calls: %v", *obj.Key)
|
|
}
|
|
app = fields[1]
|
|
id = fields[2]
|
|
}
|
|
|
|
// the id here is already reverse encoded, keep it that way.
|
|
objectName := callKeyFlipped(app, id)
|
|
|
|
// NOTE: s3 doesn't have a way to get multiple objects so just use GetCall
|
|
// TODO we should reuse the buffer to decode these
|
|
call, err := s.getCallByKey(ctx, objectName)
|
|
if err != nil {
|
|
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app": app, "id": id}).Error("error filling call object")
|
|
continue
|
|
}
|
|
|
|
// ensure: from_time < created_at < to_time
|
|
fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond)
|
|
if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) {
|
|
// NOTE could break, ids and created_at aren't necessarily in perfect order
|
|
continue
|
|
}
|
|
|
|
toTime := time.Time(filter.ToTime).Truncate(time.Millisecond)
|
|
if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) {
|
|
continue
|
|
}
|
|
|
|
calls = append(calls, call)
|
|
}
|
|
|
|
return calls, nil
|
|
}
|
|
|
|
func (s *store) Close() error {
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
uploadSizeMetricName = "s3_log_upload_size"
|
|
downloadSizeMetricName = "s3_log_download_size"
|
|
)
|
|
|
|
var (
|
|
uploadSizeMeasure = stats.Int64(uploadSizeMetricName, "uploaded log size", "byte")
|
|
downloadSizeMeasure = stats.Int64(downloadSizeMetricName, "downloaded log size", "byte")
|
|
)
|
|
|
|
// RegisterViews registers views for s3 measures
|
|
func RegisterViews(tagKeys []string) {
|
|
err := view.Register(
|
|
createView(uploadSizeMeasure, view.Distribution(), tagKeys),
|
|
createView(downloadSizeMeasure, view.Distribution(), tagKeys),
|
|
)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("cannot create view")
|
|
}
|
|
}
|
|
|
|
func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View {
|
|
return &view.View{
|
|
Name: measure.Name(),
|
|
Description: measure.Description(),
|
|
Measure: measure,
|
|
TagKeys: makeKeys(tagKeys),
|
|
Aggregation: agg,
|
|
}
|
|
}
|
|
|
|
func makeKeys(names []string) []tag.Key {
|
|
tagKeys := make([]tag.Key, len(names))
|
|
for i, name := range names {
|
|
key, err := tag.NewKey(name)
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
tagKeys[i] = key
|
|
}
|
|
return tagKeys
|
|
}
|
|
|
|
func init() {
|
|
logs.Register(s3StoreProvider(0))
|
|
}
|