mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Largely a removal job, however many tests, particularly system level ones relied on Routes. These have been migrated to use Fns. * Add 410 response to swagger * No app names in log tags * Adding constraint in GetCall for FnID * Adding test to check FnID is required on call * Add fn_id to call selector * Fix text in docker mem warning * Correct buildConfig func name * Test fix up * Removing CPU setting from Agent test CPU setting has been deprecated, but the code base is still riddled with it. This just removes it from this layer. Really we need to remove it from Call. * Remove fn id check on calls * Reintroduce fn id required on call * Adding fnID to calls for execute test * Correct setting of app id in middleware * Removes root middlewares ability to redirect fun invocations * Add over sized test check * Removing call fn id check
220 lines
6.6 KiB
Go
220 lines
6.6 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"github.com/fnproject/fn/api"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
func (s *Server) handleRunnerEnqueue(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
// TODO make this a list & let Push take a list!
|
|
var call models.Call
|
|
err := c.BindJSON(&call)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// XXX (reed): validate the call struct
|
|
|
|
// TODO/NOTE: if this endpoint is called multiple times for the same call we
|
|
// need to figure out the behavior we want. as it stands, there will be N
|
|
// messages for 1 call which only clogs up the mq with spurious messages
|
|
// (possibly useful if things get wedged, not the point), the task will still
|
|
// just run once by the first runner to set it to status=running. we may well
|
|
// want to push msg only if inserting the call fails, but then we have a call
|
|
// in queued state with no message (much harder to handle). having this
|
|
// endpoint be retry safe seems ideal and runners likely won't spam it, so current
|
|
// behavior is okay [but beware of implications].
|
|
call.Status = "queued"
|
|
_, err = s.mq.Push(ctx, &call)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
|
|
// TODO once update call is hooked up, do this
|
|
// at this point, the message is on the queue and could be picked up by a
|
|
// runner and enter into 'running' state before we can insert it in the db as
|
|
// 'queued' state. we can ignore any error inserting into db here and Start
|
|
// will ensure the call exists in the db in 'running' state there.
|
|
// s.datastore.InsertCall(ctx, &call)
|
|
|
|
c.String(http.StatusNoContent, "")
|
|
}
|
|
|
|
func (s *Server) handleRunnerDequeue(c *gin.Context) {
|
|
ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
var resp struct {
|
|
M []*models.Call `json:"calls"`
|
|
}
|
|
var m [1]*models.Call // avoid alloc
|
|
resp.M = m[:0]
|
|
|
|
// long poll until ctx expires / we find a message
|
|
var b common.Backoff
|
|
for {
|
|
call, err := s.mq.Reserve(ctx)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
if call != nil {
|
|
resp.M = append(resp.M, call)
|
|
c.JSON(200, resp)
|
|
return
|
|
}
|
|
|
|
b.Sleep(ctx)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
c.JSON(200, resp) // TODO assert this return `[]` & not 'nil'
|
|
return
|
|
default: // poll until we find a cookie
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleRunnerStart(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
var call models.Call
|
|
err := c.BindJSON(&call)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// TODO validate call?
|
|
|
|
// TODO hook up update. we really just want it to set status to running iff
|
|
// status=queued, but this must be in a txn in Update with behavior:
|
|
// queued->running
|
|
// running->error (returning error)
|
|
// error->error (returning error)
|
|
// success->success (returning error)
|
|
// timeout->timeout (returning error)
|
|
//
|
|
// there is nuance for running->error as in theory it could be the correct machine retrying
|
|
// and we risk not running a task [ever]. needs further thought, but marking as error will
|
|
// cover our tracks since if the db is down we can't run anything anyway (treat as such).
|
|
// TODO do this client side and validate it here?
|
|
//call.Status = "running"
|
|
//call.StartedAt = common.DateTime(time.Now())
|
|
//err := s.datastore.UpdateCall(c.Request.Context(), &call)
|
|
//if err != nil {
|
|
//if err == InvalidStatusChange {
|
|
//// TODO we could either let UpdateCall handle setting to error or do it
|
|
//// here explicitly
|
|
|
|
// TODO change this to only delete message if the status change fails b/c it already ran
|
|
// after messaging semantics change
|
|
if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
//}
|
|
//handleV1ErrorResponse(c, err)
|
|
//return
|
|
//}
|
|
|
|
c.String(http.StatusNoContent, "")
|
|
}
|
|
|
|
func (s *Server) handleRunnerFinish(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
var body struct {
|
|
Call models.Call `json:"call"`
|
|
Log string `json:"log"` // TODO use multipart so that we don't have to serialize/deserialize this? measure..
|
|
}
|
|
err := c.BindJSON(&body)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// TODO validate?
|
|
call := body.Call
|
|
|
|
// TODO this needs UpdateCall functionality to work for async and should only work if:
|
|
// running->error|timeout|success
|
|
// TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted*
|
|
if err := s.logstore.InsertCall(ctx, &call); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
if err := s.logstore.InsertLog(ctx, &call, strings.NewReader(body.Log)); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error uploading log")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
// TODO open this up after we change messaging semantics.
|
|
// TODO we don't know whether a call is async or sync. we likely need an additional
|
|
// arg in params for a message id and can detect based on this. for now, delete messages
|
|
// for sync and async even though sync doesn't have any (ignore error)
|
|
//if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
|
|
//common.Logger(ctx).WithError(err).Error("error deleting mq msg")
|
|
//// note: Not returning err here since the job could have already finished successfully.
|
|
//}
|
|
|
|
c.String(http.StatusNoContent, "")
|
|
}
|
|
|
|
func (s *Server) handleRunnerGetTriggerBySource(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
appId := c.MustGet(api.AppID).(string)
|
|
|
|
triggerType := c.Param(api.ParamTriggerType)
|
|
if triggerType == "" {
|
|
handleErrorResponse(c, errors.New("no trigger type in request"))
|
|
return
|
|
}
|
|
triggerSource := strings.TrimPrefix(c.Param(api.ParamTriggerSource), "/")
|
|
|
|
trigger, err := s.datastore.GetTriggerBySource(ctx, appId, triggerType, triggerSource)
|
|
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
// Not clear that we really need to annotate the trigger here but ... lets do it just in case.
|
|
app, err := s.datastore.GetAppByID(ctx, trigger.AppID)
|
|
|
|
if err != nil {
|
|
handleErrorResponse(c, fmt.Errorf("unexpected error - trigger app not available: %s", err))
|
|
}
|
|
|
|
s.triggerAnnotator.AnnotateTrigger(c, app, trigger)
|
|
|
|
c.JSON(http.StatusOK, trigger)
|
|
}
|