Copy logs up to v2 endpoints (#1207)

Copies the log endpoints up to the V2 endpoints, in a similar way to
the call endpoints.

The main change is to when logs are inserted into S3. The signature of
the function has been changed to take the whole call object, rather
than just the app and call id's. This allows the function to switch
between calls for Routes and those for Fns. Obviously this switching
can be removed when v1 is removed.

In the sql implementation it inserts with both appID and fnID, this
allows the two get's to work, and the down grade of the
migration. When the v1 logs are removed, the appId can be dropped.

The log fetch test and error messages have been changed to be FnID specific.
This commit is contained in:
Tom Coupland
2018-09-13 10:30:10 +01:00
committed by GitHub
parent ac11d42e56
commit a0ccc4d7c4
15 changed files with 186 additions and 58 deletions

View File

@@ -192,7 +192,7 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }
if err := da.ls.InsertLog(ctx, mCall.AppID, mCall.ID, stderr); err != nil { if err := da.ls.InsertLog(ctx, mCall, stderr); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log") common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }

View File

@@ -0,0 +1,46 @@
package migrations
import (
"context"
"github.com/fnproject/fn/api/datastore/sql/migratex"
"github.com/jmoiron/sqlx"
)
func up19(ctx context.Context, tx *sqlx.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE logs ADD fn_id varchar(256);")
switch tx.DriverName() {
case "mysql":
_, err := tx.ExecContext(ctx, "ALTER TABLE logs MODIFY app_id varchar(256) NULL;")
return err
case "postgres", "pgx":
_, err = tx.ExecContext(ctx, "ALTER TABLE logs ALTER COLUMN app_id DROP NOT NULL;")
return err
}
return err
}
func down19(ctx context.Context, tx *sqlx.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE logs DROP COLUMN fn_id;")
switch tx.DriverName() {
case "mysql":
_, err := tx.ExecContext(ctx, "ALTER TABLE logs MODIFY app_id varchar(256) NOT NULL;")
return err
case "postgres", "pgx":
_, err = tx.ExecContext(ctx, "ALTER TABLE logs ALTER COLUMN app_id SET NOT NULL;")
return err
}
return err
}
func init() {
Migrations = append(Migrations, &migratex.MigFields{
VersionFunc: vfunc(19),
UpFunc: up19,
DownFunc: down19,
})
}

View File

@@ -95,7 +95,8 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
`CREATE TABLE IF NOT EXISTS logs ( `CREATE TABLE IF NOT EXISTS logs (
id varchar(256) NOT NULL PRIMARY KEY, id varchar(256) NOT NULL PRIMARY KEY,
app_id varchar(256) NOT NULL, app_id varchar(256),
fn_id varchar(256),
log text NOT NULL log text NOT NULL
);`, );`,
@@ -1059,7 +1060,7 @@ func (ds *SQLStore) GetCalls1(ctx context.Context, filter *models.CallFilter) ([
return res, nil return res, nil
} }
func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error { func (ds *SQLStore) InsertLog(ctx context.Context, call *models.Call, logR io.Reader) error {
// coerce this into a string for sql // coerce this into a string for sql
var log string var log string
if stringer, ok := logR.(fmt.Stringer); ok { if stringer, ok := logR.(fmt.Stringer); ok {
@@ -1072,13 +1073,12 @@ func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io
log = b.String() log = b.String()
} }
query := ds.db.Rebind(`INSERT INTO logs (id, app_id, log) VALUES (?, ?, ?);`) query := ds.db.Rebind(`INSERT INTO logs (id, app_id, fn_id, log) VALUES (?, ?, ?, ?);`)
_, err := ds.db.ExecContext(ctx, query, callID, appID, log) _, err := ds.db.ExecContext(ctx, query, call.ID, call.AppID, call.FnID, log)
return err return err
} }
func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { func (ds *SQLStore) GetLog1(ctx context.Context, appID, callID string) (io.Reader, error) {
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`) query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`)
row := ds.db.QueryRowContext(ctx, query, callID, appID) row := ds.db.QueryRowContext(ctx, query, callID, appID)
@@ -1094,6 +1094,22 @@ func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader
return strings.NewReader(log), nil return strings.NewReader(log), nil
} }
func (ds *SQLStore) GetLog(ctx context.Context, fnID, callID string) (io.Reader, error) {
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND fn_id=?`)
row := ds.db.QueryRowContext(ctx, query, callID, fnID)
var log string
err := row.Scan(&log)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrCallLogNotFound
}
return nil, err
}
return strings.NewReader(log), nil
}
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) { func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
if filter == nil { if filter == nil {
return "", nil return "", nil

View File

@@ -46,10 +46,10 @@ func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) (*mo
return m.ls.GetCalls(ctx, filter) return m.ls.GetCalls(ctx, filter)
} }
func (m *metricls) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { func (m *metricls) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error {
ctx, span := trace.StartSpan(ctx, "ls_insert_log") ctx, span := trace.StartSpan(ctx, "ls_insert_log")
defer span.End() defer span.End()
return m.ls.InsertLog(ctx, appName, callID, callLog) return m.ls.InsertLog(ctx, call, callLog)
} }
func (m *metricls) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) { func (m *metricls) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {

View File

@@ -32,13 +32,21 @@ func NewMock(args ...interface{}) models.LogStore {
return &mocker return &mocker
} }
func (m *mock) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { func (m *mock) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error {
bytes, err := ioutil.ReadAll(callLog) bytes, err := ioutil.ReadAll(callLog)
m.Logs[callID] = bytes m.Logs[call.ID] = bytes
return err return err
} }
func (m *mock) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { func (m *mock) GetLog1(ctx context.Context, appID, callID string) (io.Reader, error) {
logEntry, ok := m.Logs[callID]
if !ok {
return nil, models.ErrCallLogNotFound
}
return bytes.NewReader(logEntry), nil
}
func (m *mock) GetLog(ctx context.Context, fnID, callID string) (io.Reader, error) {
logEntry, ok := m.Logs[callID] logEntry, ok := m.Logs[callID]
if !ok { if !ok {
return nil, models.ErrCallLogNotFound return nil, models.ErrCallLogNotFound

View File

@@ -1,8 +1,9 @@
package logs package logs
import ( import (
logTesting "github.com/fnproject/fn/api/logs/testing"
"testing" "testing"
logTesting "github.com/fnproject/fn/api/logs/testing"
) )
func TestMock(t *testing.T) { func TestMock(t *testing.T) {

View File

@@ -136,13 +136,20 @@ func (s3StoreProvider) New(ctx context.Context, u *url.URL) (models.LogStore, er
return store, nil return store, nil
} }
func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { func (s *store) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error {
ctx, span := trace.StartSpan(ctx, "s3_insert_log") ctx, span := trace.StartSpan(ctx, "s3_insert_log")
defer span.End() defer span.End()
// wrap original reader in a decorator to keep track of read bytes without buffering // wrap original reader in a decorator to keep track of read bytes without buffering
cr := &countingReader{r: callLog} cr := &countingReader{r: callLog}
objectName := logKey(appID, callID)
objectName := ""
if call.FnID != "" {
objectName = logKey(call.FnID, call.ID)
} else {
objectName = logKey(call.AppID, call.ID)
}
params := &s3manager.UploadInput{ params := &s3manager.UploadInput{
Bucket: aws.String(s.bucket), Bucket: aws.String(s.bucket),
Key: aws.String(objectName), Key: aws.String(objectName),
@@ -317,10 +324,6 @@ func logKey(appID, callID string) string {
return logKeyPrefix + appID + "/" + callID return logKeyPrefix + appID + "/" + callID
} }
func logKey2(callID string) string {
return logKeyPrefix + callID
}
// GetCalls1 returns a list of calls that satisfy the given CallFilter. If no // GetCalls1 returns a list of calls that satisfy the given CallFilter. If no
// calls exist, an empty list and a nil error are returned. // calls exist, an empty list and a nil error are returned.
// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte // NOTE: this relies on call ids being lexicographically sortable and <= 16 byte

View File

@@ -145,11 +145,11 @@ func Test(t *testing.T, fnl models.LogStore) {
call.ID = id.New().String() call.ID = id.New().String()
logText := "test" logText := "test"
log := strings.NewReader(logText) log := strings.NewReader(logText)
err := fnl.InsertLog(ctx, call.AppID, call.ID, log) err := fnl.InsertLog(ctx, call, log)
if err != nil { if err != nil {
t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err)
} }
logEntry, err := fnl.GetLog(ctx, call.AppID, call.ID) logEntry, err := fnl.GetLog(ctx, call.FnID, call.ID)
var b bytes.Buffer var b bytes.Buffer
io.Copy(&b, logEntry) io.Copy(&b, logEntry)
if !strings.Contains(b.String(), logText) { if !strings.Contains(b.String(), logText) {
@@ -160,7 +160,7 @@ func Test(t *testing.T, fnl models.LogStore) {
t.Run("call-log-not-found", func(t *testing.T) { t.Run("call-log-not-found", func(t *testing.T) {
call.ID = id.New().String() call.ID = id.New().String()
_, err := fnl.GetLog(ctx, call.AppID, call.ID) _, err := fnl.GetLog(ctx, call.FnID, call.ID)
if err != models.ErrCallLogNotFound { if err != models.ErrCallLogNotFound {
t.Fatal("GetLog should return not found, but got:", err) t.Fatal("GetLog should return not found, but got:", err)
} }
@@ -172,7 +172,7 @@ func Test(t *testing.T, fnl models.LogStore) {
call.Error = "ya dun goofed" call.Error = "ya dun goofed"
call.StartedAt = common.DateTime(time.Now()) call.StartedAt = common.DateTime(time.Now())
call.CompletedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now())
call.AppID = testApp.Name call.AppID = testApp.ID
call.FnID = testFn.ID call.FnID = testFn.ID
t.Run("call-insert", func(t *testing.T) { t.Run("call-insert", func(t *testing.T) {
@@ -212,7 +212,7 @@ func Test(t *testing.T, fnl models.LogStore) {
t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt)
} }
if call.AppID != newCall.AppID { if call.AppID != newCall.AppID {
t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppID, newCall.AppID) t.Fatalf("Test GetCall: fn id mismatch `%v` `%v`", call.FnID, newCall.FnID)
} }
if call.Path != newCall.Path { if call.Path != newCall.Path {
t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path)

View File

@@ -16,14 +16,14 @@ type validator struct {
} }
// callID or appID will never be empty. // callID or appID will never be empty.
func (v *validator) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { func (v *validator) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error {
if callID == "" { if call.ID == "" {
return models.ErrDatastoreEmptyCallID return models.ErrDatastoreEmptyCallID
} }
if appID == "" { if call.AppID == "" && call.FnID == "" {
return models.ErrMissingAppID return models.ErrMissingFnID
} }
return v.LogStore.InsertLog(ctx, appID, callID, callLog) return v.LogStore.InsertLog(ctx, call, callLog)
} }
// callID or appID will never be empty. // callID or appID will never be empty.
@@ -32,7 +32,7 @@ func (v *validator) GetLog(ctx context.Context, appID, callID string) (io.Reader
return nil, models.ErrDatastoreEmptyCallID return nil, models.ErrDatastoreEmptyCallID
} }
if appID == "" { if appID == "" {
return nil, models.ErrMissingAppID return nil, models.ErrMissingFnID
} }
return v.LogStore.GetLog(ctx, appID, callID) return v.LogStore.GetLog(ctx, appID, callID)
} }

View File

@@ -43,6 +43,9 @@ var (
ErrMissingAppID = err{ ErrMissingAppID = err{
code: http.StatusBadRequest, code: http.StatusBadRequest,
error: errors.New("Missing App ID")} error: errors.New("Missing App ID")}
ErrMissingFnID = err{
code: http.StatusBadRequest,
error: errors.New("Missing Fn ID")}
ErrMissingName = err{ ErrMissingName = err{
code: http.StatusBadRequest, code: http.StatusBadRequest,
error: errors.New("Missing Name")} error: errors.New("Missing Name")}

View File

@@ -8,11 +8,11 @@ import (
type LogStore interface { type LogStore interface {
// InsertLog will insert the log at callID, overwriting if it previously // InsertLog will insert the log at callID, overwriting if it previously
// existed. // existed.
InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error InsertLog(ctx context.Context, call *Call, callLog io.Reader) error
// GetLog will return the log at callID, an error will be returned if the log // GetLog will return the log at callID, an error will be returned if the log
// cannot be found. // cannot be found.
GetLog(ctx context.Context, appID, callID string) (io.Reader, error) GetLog(ctx context.Context, fnID, callID string) (io.Reader, error)
// TODO we should probably allow deletion of a range of logs (also calls)? // TODO we should probably allow deletion of a range of logs (also calls)?
// common cases for deletion will be: // common cases for deletion will be:

View File

@@ -34,7 +34,7 @@ func writeJSON(c *gin.Context, callID string, logReader io.Reader) {
}}) }})
} }
func (s *Server) handleCallLogGet(c *gin.Context) { func (s *Server) handleCallLogGet1(c *gin.Context) {
ctx := c.Request.Context() ctx := c.Request.Context()
appID := c.MustGet(api.AppID).(string) appID := c.MustGet(api.AppID).(string)
@@ -73,3 +73,43 @@ func (s *Server) handleCallLogGet(c *gin.Context) {
handleV1ErrorResponse(c, models.NewAPIError(http.StatusNotAcceptable, handleV1ErrorResponse(c, models.NewAPIError(http.StatusNotAcceptable,
errors.New("unable to respond within acceptable response content types"))) errors.New("unable to respond within acceptable response content types")))
} }
func (s *Server) handleCallLogGet(c *gin.Context) {
ctx := c.Request.Context()
fnID := c.Param(api.ParamFnID)
callID := c.Param(api.ParamCallID)
logReader, err := s.logstore.GetLog(ctx, fnID, callID)
if err != nil {
handleV1ErrorResponse(c, err)
return
}
mimeTypes, _ := c.Request.Header["Accept"]
if len(mimeTypes) == 0 {
writeJSON(c, callID, logReader)
return
}
for _, mimeType := range mimeTypes {
if strings.Contains(mimeType, "application/json") {
writeJSON(c, callID, logReader)
return
}
if strings.Contains(mimeType, "text/plain") {
io.Copy(c.Writer, logReader)
return
}
if strings.Contains(mimeType, "*/*") {
writeJSON(c, callID, logReader)
return
}
}
// if we've reached this point it means that Fn didn't recognize Accepted content type
handleV1ErrorResponse(c, models.NewAPIError(http.StatusNotAcceptable,
errors.New("unable to respond within acceptable response content types")))
}

View File

@@ -7,12 +7,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http"
"path"
"github.com/fnproject/fn/api" "github.com/fnproject/fn/api"
"github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http"
"path"
) )
func (s *Server) handleRunnerEnqueue(c *gin.Context) { func (s *Server) handleRunnerEnqueue(c *gin.Context) {
@@ -171,7 +172,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) {
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }
if err := s.logstore.InsertLog(ctx, call.AppID, call.ID, strings.NewReader(body.Log)); err != nil { if err := s.logstore.InsertLog(ctx, &call, strings.NewReader(body.Log)); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log") common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }

View File

@@ -1110,7 +1110,7 @@ func (s *Server) bindHandlers(ctx context.Context) {
withAppCheck.PATCH("/routes/*route", s.handleRoutesPatch) withAppCheck.PATCH("/routes/*route", s.handleRoutesPatch)
withAppCheck.DELETE("/routes/*route", s.handleRouteDelete) withAppCheck.DELETE("/routes/*route", s.handleRouteDelete)
withAppCheck.GET("/calls/:call", s.handleCallGet1) withAppCheck.GET("/calls/:call", s.handleCallGet1)
withAppCheck.GET("/calls/:call/log", s.handleCallLogGet) withAppCheck.GET("/calls/:call/log", s.handleCallLogGet1)
withAppCheck.GET("/calls", s.handleCallList1) withAppCheck.GET("/calls", s.handleCallList1)
} }
@@ -1145,9 +1145,11 @@ func (s *Server) bindHandlers(ctx context.Context) {
if !s.noCallEndpoints { if !s.noCallEndpoints {
v2.GET("/fns/:fnID/calls", s.handleCallList) v2.GET("/fns/:fnID/calls", s.handleCallList)
v2.GET("/fns/:fnID/calls/:callID", s.handleCallGet) v2.GET("/fns/:fnID/calls/:callID", s.handleCallGet)
v2.GET("/fns/:fnID/calls/:callID/log", s.handleCallLogGet)
} else { } else {
v2.GET("/fns/:fnID/calls", s.goneResponse) v2.GET("/fns/:fnID/calls", s.goneResponse)
v2.GET("/fns/:fnID/calls/:callID", s.goneResponse) v2.GET("/fns/:fnID/calls/:callID", s.goneResponse)
v2.GET("/fns/:fnID/calls/:callID/log", s.goneResponse)
} }
if !s.noHybridAPI { // Hybrid API - this should only be enabled on API servers if !s.noHybridAPI { // Hybrid API - this should only be enabled on API servers

View File

@@ -453,27 +453,26 @@ paths:
schema: schema:
$ref: '#/definitions/Error' $ref: '#/definitions/Error'
# Implementing next /fns/{fnID}/calls/{callID}/log:
# /fns/{fnID}/calls/{callID}/log: get:
# get: operationId: "GetCallLogs"
# operationId: "GetCallLogs" summary: "Get logs for a call."
# summary: "Get logs for a call." description: "Get logs for a call."
# description: "Get logs for a call." tags:
# tags: - Call
# - Call - Log
# - Log parameters:
# parameters: - $ref: '#/parameters/FnID'
# - $ref: '#/parameters/FnIDQuery' - $ref: '#/parameters/CallID'
# - $ref: '#/parameters/CallID' responses:
# responses: 200:
# 200: description: Log found.
# description: Log found. schema:
# schema: $ref: '#/definitions/Log'
# $ref: '#/definitions/Log' 404:
# 404: description: Log not found.
# description: Log not found. schema:
# schema: $ref: '#/definitions/Error'
# $ref: '#/definitions/Error'
definitions: definitions:
App: App:
@@ -662,6 +661,15 @@ definitions:
type: string type: string
readOnly: true readOnly: true
Log:
type: object
properties:
call_id:
type: string
description: Call UUID ID
log:
type: string # maybe bytes, long logs wouldn't fit into string type
Call: Call:
type: object type: object
properties: properties: