mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
The V2 Calls api endpoints have been added beneath fns: (#1203)
/fns/{fnID}/calls
/fns/{fnID}/calls/{callID}
The S3 implementation forces our hand as we if we want to list Calls
under a Fn, we have to use the FnID as a prefix on the object names,
which mean we need it to look up any Call. It also makes sense in
terms of resource hierarchy.
These endpoints can optionally be disabled (as other endpoints), if a
service provider needs to provide this functionality via other means.
The 'calls' test has been fully migrated to fn calls. This has been
done to reduce the copy pasta a bit, and on balance is ok as the
routes calls will be removed soon.
This commit is contained in:
@@ -195,6 +195,9 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error {
|
||||
}
|
||||
|
||||
objectName := callKey(call.AppID, call.ID)
|
||||
if call.FnID != "" {
|
||||
objectName = callKey2(call.FnID, call.ID)
|
||||
}
|
||||
params := &s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(objectName),
|
||||
@@ -213,26 +216,28 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error {
|
||||
// see this entry when listing only when specifying a route path. (NOTE: this
|
||||
// behavior will go away if we stop listing by route -> triggers)
|
||||
|
||||
objectName = callMarkerKey(call.AppID, call.Path, call.ID)
|
||||
params = &s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(objectName),
|
||||
Body: bytes.NewReader([]byte{}),
|
||||
ContentType: aws.String("text/plain"),
|
||||
}
|
||||
if call.FnID == "" {
|
||||
objectName = callMarkerKey(call.AppID, call.Path, call.ID)
|
||||
params = &s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(objectName),
|
||||
Body: bytes.NewReader([]byte{}),
|
||||
ContentType: aws.String("text/plain"),
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker")
|
||||
_, err = s.uploader.UploadWithContext(ctx, params)
|
||||
if err != nil {
|
||||
// XXX(reed): we could just log this?
|
||||
return fmt.Errorf("failed to write marker key for log, %v", err)
|
||||
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker")
|
||||
_, err = s.uploader.UploadWithContext(ctx, params)
|
||||
if err != nil {
|
||||
// XXX(reed): we could just log this?
|
||||
return fmt.Errorf("failed to write marker key for log, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetCall returns a call at a certain id and app name.
|
||||
func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
||||
// GetCall1 returns a call at a certain id and app name.
|
||||
func (s *store) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "s3_get_call")
|
||||
defer span.End()
|
||||
|
||||
@@ -242,6 +247,17 @@ func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call
|
||||
return s.getCallByKey(ctx, objectName)
|
||||
}
|
||||
|
||||
// GetCall returns a call at a certain id
|
||||
func (s *store) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "s3_get_call")
|
||||
defer span.End()
|
||||
|
||||
objectName := callKey2(fnID, callID)
|
||||
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call")
|
||||
|
||||
return s.getCallByKey(ctx, objectName)
|
||||
}
|
||||
|
||||
func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) {
|
||||
// stream the logs to an in-memory buffer
|
||||
var target aws.WriteAtBuffer
|
||||
@@ -292,14 +308,23 @@ func callKeyFlipped(app, id string) string {
|
||||
return callKeyPrefix + app + "/" + id
|
||||
}
|
||||
|
||||
func callKey2(fnID, id string) string {
|
||||
id = flipCursor(id)
|
||||
return callKeyPrefix + fnID + "/" + id
|
||||
}
|
||||
|
||||
func logKey(appID, callID string) string {
|
||||
return logKeyPrefix + appID + "/" + callID
|
||||
}
|
||||
|
||||
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
||||
func logKey2(callID string) string {
|
||||
return logKeyPrefix + callID
|
||||
}
|
||||
|
||||
// GetCalls1 returns a list of calls that satisfy the given CallFilter. If no
|
||||
// calls exist, an empty list and a nil error are returned.
|
||||
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte
|
||||
func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
||||
func (s *store) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "s3_get_calls")
|
||||
defer span.End()
|
||||
|
||||
@@ -420,6 +445,116 @@ func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mod
|
||||
return calls, nil
|
||||
}
|
||||
|
||||
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
||||
// calls exist, an empty list and a nil error are returned.
|
||||
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte
|
||||
func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "s3_get_calls")
|
||||
defer span.End()
|
||||
|
||||
if filter.FnID == "" {
|
||||
return nil, errors.New("s3 store does not support listing across all fns")
|
||||
}
|
||||
|
||||
if filter.Cursor != "" {
|
||||
cursor, err := base64.RawURLEncoding.DecodeString(filter.Cursor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filter.Cursor = string(cursor)
|
||||
}
|
||||
|
||||
// NOTE we need marker keys to support (fn is REQUIRED):
|
||||
// 1) quick iteration per path
|
||||
// 2) sorted by id across all path
|
||||
// key: s: {fn} : {id}
|
||||
//
|
||||
// also s3 api returns sorted in lexicographic order, we need the reverse of this.
|
||||
var key string
|
||||
|
||||
// filter.Cursor is a call id, translate to our key format. if a path is
|
||||
// provided, we list keys from markers instead.
|
||||
if filter.Cursor != "" {
|
||||
key = callKey2(filter.FnID, filter.Cursor)
|
||||
} else if t := time.Time(filter.ToTime); !t.IsZero() {
|
||||
// get a fake id that has the most significant bits set to the to_time (first 48 bits)
|
||||
fako := id.NewWithTime(t)
|
||||
//var buf [id.EncodedSize]byte
|
||||
//fakoId.MarshalTextTo(buf)
|
||||
//mid := string(buf[:10])
|
||||
mid := fako.String()
|
||||
key = callKey(filter.FnID, mid)
|
||||
}
|
||||
|
||||
// prefix prevents leaving bounds of app or path marker keys
|
||||
prefix := callKey2(filter.FnID, "")
|
||||
|
||||
input := &s3.ListObjectsInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
MaxKeys: aws.Int64(int64(filter.PerPage)),
|
||||
Marker: aws.String(key),
|
||||
Prefix: aws.String(prefix),
|
||||
}
|
||||
|
||||
result, err := s.client.ListObjects(input)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list logs: %v", err)
|
||||
}
|
||||
|
||||
calls := make([]*models.Call, 0, len(result.Contents))
|
||||
|
||||
for _, obj := range result.Contents {
|
||||
if len(calls) == filter.PerPage {
|
||||
break
|
||||
}
|
||||
|
||||
// extract the app and id from the key to lookup the object, this also
|
||||
// validates we aren't reading strangely keyed objects from the bucket.
|
||||
var fnID, id string
|
||||
|
||||
fields := strings.Split(*obj.Key, "/")
|
||||
if len(fields) != 3 {
|
||||
return nil, fmt.Errorf("invalid key in calls: %v", *obj.Key)
|
||||
}
|
||||
fnID = fields[1]
|
||||
id = fields[2]
|
||||
|
||||
// the id here is already reverse encoded, keep it that way.
|
||||
objectName := callKeyFlipped(fnID, id)
|
||||
|
||||
// NOTE: s3 doesn't have a way to get multiple objects so just use GetCall
|
||||
// TODO we should reuse the buffer to decode these
|
||||
call, err := s.getCallByKey(ctx, objectName)
|
||||
if err != nil {
|
||||
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"fnID": fnID, "id": id}).Error("error filling call object")
|
||||
continue
|
||||
}
|
||||
|
||||
// ensure: from_time < created_at < to_time
|
||||
fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond)
|
||||
if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) {
|
||||
// NOTE could break, ids and created_at aren't necessarily in perfect order
|
||||
continue
|
||||
}
|
||||
|
||||
toTime := time.Time(filter.ToTime).Truncate(time.Millisecond)
|
||||
if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
calls = append(calls, call)
|
||||
}
|
||||
|
||||
callList := &models.CallList{Items: calls}
|
||||
|
||||
if len(calls) > 0 && len(calls) == filter.PerPage {
|
||||
last := []byte(calls[len(calls)-1].ID)
|
||||
callList.NextCursor = base64.RawURLEncoding.EncodeToString(last)
|
||||
}
|
||||
|
||||
return callList, nil
|
||||
}
|
||||
|
||||
func (s *store) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user