From a0ccc4d7c4759a06606f441b731dc9cc662cf33c Mon Sep 17 00:00:00 2001 From: Tom Coupland Date: Thu, 13 Sep 2018 10:30:10 +0100 Subject: [PATCH] Copy logs up to v2 endpoints (#1207) Copies the log endpoints up to the V2 endpoints, in a similar way to the call endpoints. The main change is to when logs are inserted into S3. The signature of the function has been changed to take the whole call object, rather than just the app and call id's. This allows the function to switch between calls for Routes and those for Fns. Obviously this switching can be removed when v1 is removed. In the sql implementation it inserts with both appID and fnID, this allows the two get's to work, and the down grade of the migration. When the v1 logs are removed, the appId can be dropped. The log fetch test and error messages have been changed to be FnID specific. --- api/agent/data_access.go | 2 +- .../sql/migrations/19_add_fnid_logs.go | 46 +++++++++++++++++ api/datastore/sql/sql.go | 28 ++++++++--- api/logs/metrics/metrics.go | 4 +- api/logs/mock.go | 14 ++++-- api/logs/mock_test.go | 3 +- api/logs/s3/s3.go | 15 +++--- api/logs/testing/test.go | 10 ++-- api/logs/validator/validator.go | 12 ++--- api/models/error.go | 3 ++ api/models/logs.go | 4 +- api/server/call_logs.go | 42 +++++++++++++++- api/server/hybrid.go | 7 +-- api/server/server.go | 4 +- docs/swagger_v2.yml | 50 +++++++++++-------- 15 files changed, 186 insertions(+), 58 deletions(-) create mode 100644 api/datastore/sql/migrations/19_add_fnid_logs.go diff --git a/api/agent/data_access.go b/api/agent/data_access.go index 197765655..5536e4ce4 100644 --- a/api/agent/data_access.go +++ b/api/agent/data_access.go @@ -192,7 +192,7 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde // note: Not returning err here since the job could have already finished successfully. } - if err := da.ls.InsertLog(ctx, mCall.AppID, mCall.ID, stderr); err != nil { + if err := da.ls.InsertLog(ctx, mCall, stderr); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") // note: Not returning err here since the job could have already finished successfully. } diff --git a/api/datastore/sql/migrations/19_add_fnid_logs.go b/api/datastore/sql/migrations/19_add_fnid_logs.go new file mode 100644 index 000000000..cfd05dc9d --- /dev/null +++ b/api/datastore/sql/migrations/19_add_fnid_logs.go @@ -0,0 +1,46 @@ +package migrations + +import ( + "context" + + "github.com/fnproject/fn/api/datastore/sql/migratex" + "github.com/jmoiron/sqlx" +) + +func up19(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE logs ADD fn_id varchar(256);") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE logs MODIFY app_id varchar(256) NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE logs ALTER COLUMN app_id DROP NOT NULL;") + return err + } + + return err +} + +func down19(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE logs DROP COLUMN fn_id;") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE logs MODIFY app_id varchar(256) NOT NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE logs ALTER COLUMN app_id SET NOT NULL;") + return err + } + + return err +} + +func init() { + Migrations = append(Migrations, &migratex.MigFields{ + VersionFunc: vfunc(19), + UpFunc: up19, + DownFunc: down19, + }) +} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 225c45bdd..97a3acb05 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -95,7 +95,8 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( `CREATE TABLE IF NOT EXISTS logs ( id varchar(256) NOT NULL PRIMARY KEY, - app_id varchar(256) NOT NULL, + app_id varchar(256), + fn_id varchar(256), log text NOT NULL );`, @@ -1059,7 +1060,7 @@ func (ds *SQLStore) GetCalls1(ctx context.Context, filter *models.CallFilter) ([ return res, nil } -func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error { +func (ds *SQLStore) InsertLog(ctx context.Context, call *models.Call, logR io.Reader) error { // coerce this into a string for sql var log string if stringer, ok := logR.(fmt.Stringer); ok { @@ -1072,13 +1073,12 @@ func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io log = b.String() } - query := ds.db.Rebind(`INSERT INTO logs (id, app_id, log) VALUES (?, ?, ?);`) - _, err := ds.db.ExecContext(ctx, query, callID, appID, log) - + query := ds.db.Rebind(`INSERT INTO logs (id, app_id, fn_id, log) VALUES (?, ?, ?, ?);`) + _, err := ds.db.ExecContext(ctx, query, call.ID, call.AppID, call.FnID, log) return err } -func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { +func (ds *SQLStore) GetLog1(ctx context.Context, appID, callID string) (io.Reader, error) { query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`) row := ds.db.QueryRowContext(ctx, query, callID, appID) @@ -1094,6 +1094,22 @@ func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader return strings.NewReader(log), nil } +func (ds *SQLStore) GetLog(ctx context.Context, fnID, callID string) (io.Reader, error) { + query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND fn_id=?`) + row := ds.db.QueryRowContext(ctx, query, callID, fnID) + + var log string + err := row.Scan(&log) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrCallLogNotFound + } + return nil, err + } + + return strings.NewReader(log), nil +} + func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) { if filter == nil { return "", nil diff --git a/api/logs/metrics/metrics.go b/api/logs/metrics/metrics.go index 08bf2dad5..b15dfa63f 100644 --- a/api/logs/metrics/metrics.go +++ b/api/logs/metrics/metrics.go @@ -46,10 +46,10 @@ func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) (*mo return m.ls.GetCalls(ctx, filter) } -func (m *metricls) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { +func (m *metricls) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error { ctx, span := trace.StartSpan(ctx, "ls_insert_log") defer span.End() - return m.ls.InsertLog(ctx, appName, callID, callLog) + return m.ls.InsertLog(ctx, call, callLog) } func (m *metricls) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) { diff --git a/api/logs/mock.go b/api/logs/mock.go index df0191e9e..c03247797 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -32,13 +32,21 @@ func NewMock(args ...interface{}) models.LogStore { return &mocker } -func (m *mock) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { +func (m *mock) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error { bytes, err := ioutil.ReadAll(callLog) - m.Logs[callID] = bytes + m.Logs[call.ID] = bytes return err } -func (m *mock) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { +func (m *mock) GetLog1(ctx context.Context, appID, callID string) (io.Reader, error) { + logEntry, ok := m.Logs[callID] + if !ok { + return nil, models.ErrCallLogNotFound + } + return bytes.NewReader(logEntry), nil +} + +func (m *mock) GetLog(ctx context.Context, fnID, callID string) (io.Reader, error) { logEntry, ok := m.Logs[callID] if !ok { return nil, models.ErrCallLogNotFound diff --git a/api/logs/mock_test.go b/api/logs/mock_test.go index 705be84dc..fc38b11b7 100644 --- a/api/logs/mock_test.go +++ b/api/logs/mock_test.go @@ -1,8 +1,9 @@ package logs import ( - logTesting "github.com/fnproject/fn/api/logs/testing" "testing" + + logTesting "github.com/fnproject/fn/api/logs/testing" ) func TestMock(t *testing.T) { diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index eec3f4695..082f76371 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -136,13 +136,20 @@ func (s3StoreProvider) New(ctx context.Context, u *url.URL) (models.LogStore, er return store, nil } -func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { +func (s *store) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error { ctx, span := trace.StartSpan(ctx, "s3_insert_log") defer span.End() // wrap original reader in a decorator to keep track of read bytes without buffering cr := &countingReader{r: callLog} - objectName := logKey(appID, callID) + + objectName := "" + if call.FnID != "" { + objectName = logKey(call.FnID, call.ID) + } else { + objectName = logKey(call.AppID, call.ID) + } + params := &s3manager.UploadInput{ Bucket: aws.String(s.bucket), Key: aws.String(objectName), @@ -317,10 +324,6 @@ func logKey(appID, callID string) string { return logKeyPrefix + appID + "/" + callID } -func logKey2(callID string) string { - return logKeyPrefix + callID -} - // GetCalls1 returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. // NOTE: this relies on call ids being lexicographically sortable and <= 16 byte diff --git a/api/logs/testing/test.go b/api/logs/testing/test.go index b04a9beaa..fcbce8500 100644 --- a/api/logs/testing/test.go +++ b/api/logs/testing/test.go @@ -145,11 +145,11 @@ func Test(t *testing.T, fnl models.LogStore) { call.ID = id.New().String() logText := "test" log := strings.NewReader(logText) - err := fnl.InsertLog(ctx, call.AppID, call.ID, log) + err := fnl.InsertLog(ctx, call, log) if err != nil { t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) } - logEntry, err := fnl.GetLog(ctx, call.AppID, call.ID) + logEntry, err := fnl.GetLog(ctx, call.FnID, call.ID) var b bytes.Buffer io.Copy(&b, logEntry) if !strings.Contains(b.String(), logText) { @@ -160,7 +160,7 @@ func Test(t *testing.T, fnl models.LogStore) { t.Run("call-log-not-found", func(t *testing.T) { call.ID = id.New().String() - _, err := fnl.GetLog(ctx, call.AppID, call.ID) + _, err := fnl.GetLog(ctx, call.FnID, call.ID) if err != models.ErrCallLogNotFound { t.Fatal("GetLog should return not found, but got:", err) } @@ -172,7 +172,7 @@ func Test(t *testing.T, fnl models.LogStore) { call.Error = "ya dun goofed" call.StartedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now()) - call.AppID = testApp.Name + call.AppID = testApp.ID call.FnID = testFn.ID t.Run("call-insert", func(t *testing.T) { @@ -212,7 +212,7 @@ func Test(t *testing.T, fnl models.LogStore) { t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) } if call.AppID != newCall.AppID { - t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppID, newCall.AppID) + t.Fatalf("Test GetCall: fn id mismatch `%v` `%v`", call.FnID, newCall.FnID) } if call.Path != newCall.Path { t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) diff --git a/api/logs/validator/validator.go b/api/logs/validator/validator.go index d39b870d4..5c463c2bb 100644 --- a/api/logs/validator/validator.go +++ b/api/logs/validator/validator.go @@ -16,14 +16,14 @@ type validator struct { } // callID or appID will never be empty. -func (v *validator) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { - if callID == "" { +func (v *validator) InsertLog(ctx context.Context, call *models.Call, callLog io.Reader) error { + if call.ID == "" { return models.ErrDatastoreEmptyCallID } - if appID == "" { - return models.ErrMissingAppID + if call.AppID == "" && call.FnID == "" { + return models.ErrMissingFnID } - return v.LogStore.InsertLog(ctx, appID, callID, callLog) + return v.LogStore.InsertLog(ctx, call, callLog) } // callID or appID will never be empty. @@ -32,7 +32,7 @@ func (v *validator) GetLog(ctx context.Context, appID, callID string) (io.Reader return nil, models.ErrDatastoreEmptyCallID } if appID == "" { - return nil, models.ErrMissingAppID + return nil, models.ErrMissingFnID } return v.LogStore.GetLog(ctx, appID, callID) } diff --git a/api/models/error.go b/api/models/error.go index 5f3ae1901..d64833e2e 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -43,6 +43,9 @@ var ( ErrMissingAppID = err{ code: http.StatusBadRequest, error: errors.New("Missing App ID")} + ErrMissingFnID = err{ + code: http.StatusBadRequest, + error: errors.New("Missing Fn ID")} ErrMissingName = err{ code: http.StatusBadRequest, error: errors.New("Missing Name")} diff --git a/api/models/logs.go b/api/models/logs.go index 35a748934..3fb2decef 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -8,11 +8,11 @@ import ( type LogStore interface { // InsertLog will insert the log at callID, overwriting if it previously // existed. - InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error + InsertLog(ctx context.Context, call *Call, callLog io.Reader) error // GetLog will return the log at callID, an error will be returned if the log // cannot be found. - GetLog(ctx context.Context, appID, callID string) (io.Reader, error) + GetLog(ctx context.Context, fnID, callID string) (io.Reader, error) // TODO we should probably allow deletion of a range of logs (also calls)? // common cases for deletion will be: diff --git a/api/server/call_logs.go b/api/server/call_logs.go index 899706e1e..cd5045708 100644 --- a/api/server/call_logs.go +++ b/api/server/call_logs.go @@ -34,7 +34,7 @@ func writeJSON(c *gin.Context, callID string, logReader io.Reader) { }}) } -func (s *Server) handleCallLogGet(c *gin.Context) { +func (s *Server) handleCallLogGet1(c *gin.Context) { ctx := c.Request.Context() appID := c.MustGet(api.AppID).(string) @@ -73,3 +73,43 @@ func (s *Server) handleCallLogGet(c *gin.Context) { handleV1ErrorResponse(c, models.NewAPIError(http.StatusNotAcceptable, errors.New("unable to respond within acceptable response content types"))) } + +func (s *Server) handleCallLogGet(c *gin.Context) { + ctx := c.Request.Context() + + fnID := c.Param(api.ParamFnID) + callID := c.Param(api.ParamCallID) + + logReader, err := s.logstore.GetLog(ctx, fnID, callID) + if err != nil { + handleV1ErrorResponse(c, err) + return + } + + mimeTypes, _ := c.Request.Header["Accept"] + + if len(mimeTypes) == 0 { + writeJSON(c, callID, logReader) + return + } + + for _, mimeType := range mimeTypes { + if strings.Contains(mimeType, "application/json") { + writeJSON(c, callID, logReader) + return + } + if strings.Contains(mimeType, "text/plain") { + io.Copy(c.Writer, logReader) + return + + } + if strings.Contains(mimeType, "*/*") { + writeJSON(c, callID, logReader) + return + } + } + + // if we've reached this point it means that Fn didn't recognize Accepted content type + handleV1ErrorResponse(c, models.NewAPIError(http.StatusNotAcceptable, + errors.New("unable to respond within acceptable response content types"))) +} diff --git a/api/server/hybrid.go b/api/server/hybrid.go index e44017b3e..1dd961af1 100644 --- a/api/server/hybrid.go +++ b/api/server/hybrid.go @@ -7,12 +7,13 @@ import ( "errors" "fmt" + "net/http" + "path" + "github.com/fnproject/fn/api" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/gin-gonic/gin" - "net/http" - "path" ) func (s *Server) handleRunnerEnqueue(c *gin.Context) { @@ -171,7 +172,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { // note: Not returning err here since the job could have already finished successfully. } - if err := s.logstore.InsertLog(ctx, call.AppID, call.ID, strings.NewReader(body.Log)); err != nil { + if err := s.logstore.InsertLog(ctx, &call, strings.NewReader(body.Log)); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") // note: Not returning err here since the job could have already finished successfully. } diff --git a/api/server/server.go b/api/server/server.go index be441888b..1334e7587 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -1110,7 +1110,7 @@ func (s *Server) bindHandlers(ctx context.Context) { withAppCheck.PATCH("/routes/*route", s.handleRoutesPatch) withAppCheck.DELETE("/routes/*route", s.handleRouteDelete) withAppCheck.GET("/calls/:call", s.handleCallGet1) - withAppCheck.GET("/calls/:call/log", s.handleCallLogGet) + withAppCheck.GET("/calls/:call/log", s.handleCallLogGet1) withAppCheck.GET("/calls", s.handleCallList1) } @@ -1145,9 +1145,11 @@ func (s *Server) bindHandlers(ctx context.Context) { if !s.noCallEndpoints { v2.GET("/fns/:fnID/calls", s.handleCallList) v2.GET("/fns/:fnID/calls/:callID", s.handleCallGet) + v2.GET("/fns/:fnID/calls/:callID/log", s.handleCallLogGet) } else { v2.GET("/fns/:fnID/calls", s.goneResponse) v2.GET("/fns/:fnID/calls/:callID", s.goneResponse) + v2.GET("/fns/:fnID/calls/:callID/log", s.goneResponse) } if !s.noHybridAPI { // Hybrid API - this should only be enabled on API servers diff --git a/docs/swagger_v2.yml b/docs/swagger_v2.yml index 8d4fb7db8..e6918b1a0 100644 --- a/docs/swagger_v2.yml +++ b/docs/swagger_v2.yml @@ -453,27 +453,26 @@ paths: schema: $ref: '#/definitions/Error' - # Implementing next - # /fns/{fnID}/calls/{callID}/log: - # get: - # operationId: "GetCallLogs" - # summary: "Get logs for a call." - # description: "Get logs for a call." - # tags: - # - Call - # - Log - # parameters: - # - $ref: '#/parameters/FnIDQuery' - # - $ref: '#/parameters/CallID' - # responses: - # 200: - # description: Log found. - # schema: - # $ref: '#/definitions/Log' - # 404: - # description: Log not found. - # schema: - # $ref: '#/definitions/Error' + /fns/{fnID}/calls/{callID}/log: + get: + operationId: "GetCallLogs" + summary: "Get logs for a call." + description: "Get logs for a call." + tags: + - Call + - Log + parameters: + - $ref: '#/parameters/FnID' + - $ref: '#/parameters/CallID' + responses: + 200: + description: Log found. + schema: + $ref: '#/definitions/Log' + 404: + description: Log not found. + schema: + $ref: '#/definitions/Error' definitions: App: @@ -662,6 +661,15 @@ definitions: type: string readOnly: true + Log: + type: object + properties: + call_id: + type: string + description: Call UUID ID + log: + type: string # maybe bytes, long logs wouldn't fit into string type + Call: type: object properties: