mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* update vendor directory, add go.opencensus.io * update imports * oops * s/opentracing/opencensus/ & remove prometheus / zipkin stuff & remove old stats * the dep train rides again * fix gin build * deps from last guy * start in on the agent metrics * she builds * remove tags for now, cardinality error is fussing. subscribe instead of register * update to patched version of opencensus to proceed for now TODO switch to a release * meh fix imports * println debug the bad boys * lace it with the tags * update deps again * fix all inconsistent cardinality errors * add our own logger * fix init * fix oom measure * remove bugged removal code * fix s3 measures * fix prom handler nil
236 lines
6.8 KiB
Go
236 lines
6.8 KiB
Go
// package s3 implements an s3 api compatible log store
|
|
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/stats"
|
|
"go.opencensus.io/stats/view"
|
|
"go.opencensus.io/tag"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// TODO we should encrypt these, user will have to supply a key though (or all
|
|
// OSS users logs will be encrypted with same key unless they change it which
|
|
// just seems mean...)
|
|
|
|
// TODO do we need to use the v2 API? can't find BMC object store docs :/
|
|
|
|
const (
|
|
contentType = "text/plain"
|
|
)
|
|
|
|
type store struct {
|
|
client *s3.S3
|
|
uploader *s3manager.Uploader
|
|
downloader *s3manager.Downloader
|
|
bucket string
|
|
}
|
|
|
|
// decorator around the Reader interface that keeps track of the number of bytes read
|
|
// in order to avoid double buffering and track Reader size
|
|
type countingReader struct {
|
|
r io.Reader
|
|
count int
|
|
}
|
|
|
|
func (cr *countingReader) Read(p []byte) (n int, err error) {
|
|
n, err = cr.r.Read(p)
|
|
cr.count += n
|
|
return n, err
|
|
}
|
|
|
|
func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
|
|
config := &aws.Config{
|
|
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
|
|
Endpoint: aws.String(endpoint),
|
|
Region: aws.String(region),
|
|
DisableSSL: aws.Bool(!useSSL),
|
|
S3ForcePathStyle: aws.Bool(true),
|
|
}
|
|
session := session.Must(session.NewSession(config))
|
|
client := s3.New(session)
|
|
|
|
return &store{
|
|
client: client,
|
|
uploader: s3manager.NewUploaderWithClient(client),
|
|
downloader: s3manager.NewDownloaderWithClient(client),
|
|
bucket: bucketName,
|
|
}
|
|
}
|
|
|
|
// s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
|
|
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
|
|
func New(u *url.URL) (models.LogStore, error) {
|
|
endpoint := u.Host
|
|
|
|
var accessKeyID, secretAccessKey string
|
|
if u.User != nil {
|
|
accessKeyID = u.User.Username()
|
|
secretAccessKey, _ = u.User.Password()
|
|
}
|
|
useSSL := u.Query().Get("ssl") == "true"
|
|
|
|
strs := strings.SplitN(u.Path, "/", 3)
|
|
if len(strs) < 3 {
|
|
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
region := strs[1]
|
|
bucketName := strs[2]
|
|
if region == "" {
|
|
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
} else if bucketName == "" {
|
|
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
|
|
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)
|
|
|
|
// ensure the bucket exists, creating if it does not
|
|
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
switch aerr.Code() {
|
|
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
|
|
// bucket already exists, NO-OP
|
|
default:
|
|
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
|
|
}
|
|
}
|
|
|
|
return store, nil
|
|
}
|
|
|
|
func path(appName, callID string) string {
|
|
// raw url encode, b/c s3 does not like: & $ @ = : ; + , ?
|
|
appName = base64.RawURLEncoding.EncodeToString([]byte(appName)) // TODO optimize..
|
|
return appName + "/" + callID
|
|
}
|
|
|
|
func (s *store) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
|
|
ctx, span := trace.StartSpan(ctx, "s3_insert_log")
|
|
defer span.End()
|
|
|
|
// wrap original reader in a decorator to keep track of read bytes without buffering
|
|
cr := &countingReader{r: callLog}
|
|
|
|
objectName := path(appName, callID)
|
|
params := &s3manager.UploadInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
Body: cr,
|
|
ContentType: aws.String(contentType),
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
|
|
_, err := s.uploader.UploadWithContext(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, uploadSizeMeasure.M(int64(cr.count)))
|
|
return nil
|
|
}
|
|
|
|
func (s *store) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
|
|
ctx, span := trace.StartSpan(ctx, "s3_get_log")
|
|
defer span.End()
|
|
|
|
objectName := path(appName, callID)
|
|
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")
|
|
|
|
// stream the logs to an in-memory buffer
|
|
target := &aws.WriteAtBuffer{}
|
|
size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(objectName),
|
|
})
|
|
if err != nil {
|
|
aerr, ok := err.(awserr.Error)
|
|
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
|
|
return nil, models.ErrCallLogNotFound
|
|
}
|
|
return nil, fmt.Errorf("failed to read log, %v", err)
|
|
}
|
|
|
|
stats.Record(ctx, downloadSizeMeasure.M(size))
|
|
return bytes.NewReader(target.Bytes()), nil
|
|
}
|
|
|
|
var (
|
|
uploadSizeMeasure *stats.Int64Measure
|
|
downloadSizeMeasure *stats.Int64Measure
|
|
)
|
|
|
|
func init() {
|
|
// TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated
|
|
// or we have to white list them in the view for them to show up? test...
|
|
var err error
|
|
appKey, err := tag.NewKey("fn_appname")
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
pathKey, err := tag.NewKey("fn_path")
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
|
|
{
|
|
uploadSizeMeasure, err = stats.Int64("s3_log_upload_size", "uploaded log size", "byte")
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
v, err := view.New(
|
|
"s3_log_upload_size",
|
|
"uploaded log size",
|
|
[]tag.Key{appKey, pathKey},
|
|
uploadSizeMeasure,
|
|
view.DistributionAggregation{},
|
|
)
|
|
if err != nil {
|
|
logrus.Fatalf("cannot create view: %v", err)
|
|
}
|
|
if err := v.Subscribe(); err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
}
|
|
|
|
{
|
|
downloadSizeMeasure, err = stats.Int64("s3_log_download_size", "downloaded log size", "byte")
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
v, err := view.New(
|
|
"s3_log_download_size",
|
|
"downloaded log size",
|
|
[]tag.Key{appKey, pathKey},
|
|
downloadSizeMeasure,
|
|
view.DistributionAggregation{},
|
|
)
|
|
if err != nil {
|
|
logrus.Fatalf("cannot create view: %v", err)
|
|
}
|
|
if err := v.Subscribe(); err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
}
|
|
}
|