mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* add DateTime sans mgo * change all uses of strfmt.DateTime to common.DateTime, remove test strfmt usage * remove api tests, system-test dep on api test multiple reasons to remove the api tests: * awkward dependency with fn_go meant generating bindings on a branched fn to vendor those to test new stuff. this is at a minimum not at all intuitive, worth it, nor a fun way to spend the finite amount of time we have to live. * api tests only tested a subset of functionality that the server/ api tests already test, and we risk having tests where one tests some thing and the other doesn't. let's not. we have too many test suites as it is, and these pretty much only test that we updated the fn_go bindings, which is actually a hassle as noted above and the cli will pretty quickly figure out anyway. * fn_go relies on openapi, which relies on mgo, which is deprecated and we'd like to remove as a dependency. openapi is a _huge_ dep built in a NIH fashion, that cannot simply remove the mgo dep as users may be using it. we've now stolen their date time and otherwise killed usage of it in fn core, for fn_go it still exists but that's less of a problem. * update deps removals: * easyjson * mgo * go-openapi * mapstructure * fn_go * purell * go-validator also, had to lock docker. we shouldn't use docker on master anyway, they strongly advise against that. had no luck with latest version rev, so i locked it to what we were using before. until next time. the rest is just playing dep roulette, those end up removing a ton tho * fix exec test to work * account for john le cache
191 lines
5.8 KiB
Go
191 lines
5.8 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
func (s *Server) handleRunnerEnqueue(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
// TODO make this a list & let Push take a list!
|
|
var call models.Call
|
|
err := c.BindJSON(&call)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// XXX (reed): validate the call struct
|
|
|
|
// TODO/NOTE: if this endpoint is called multiple times for the same call we
|
|
// need to figure out the behavior we want. as it stands, there will be N
|
|
// messages for 1 call which only clogs up the mq with spurious messages
|
|
// (possibly useful if things get wedged, not the point), the task will still
|
|
// just run once by the first runner to set it to status=running. we may well
|
|
// want to push msg only if inserting the call fails, but then we have a call
|
|
// in queued state with no message (much harder to handle). having this
|
|
// endpoint be retry safe seems ideal and runners likely won't spam it, so current
|
|
// behavior is okay [but beware of implications].
|
|
call.Status = "queued"
|
|
_, err = s.mq.Push(ctx, &call)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
|
|
// TODO once update call is hooked up, do this
|
|
// at this point, the message is on the queue and could be picked up by a
|
|
// runner and enter into 'running' state before we can insert it in the db as
|
|
// 'queued' state. we can ignore any error inserting into db here and Start
|
|
// will ensure the call exists in the db in 'running' state there.
|
|
// s.datastore.InsertCall(ctx, &call)
|
|
|
|
c.JSON(200, struct {
|
|
M string `json:"msg"`
|
|
}{M: "enqueued call"})
|
|
}
|
|
|
|
func (s *Server) handleRunnerDequeue(c *gin.Context) {
|
|
ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
var resp struct {
|
|
M []*models.Call `json:"calls"`
|
|
}
|
|
var m [1]*models.Call // avoid alloc
|
|
resp.M = m[:0]
|
|
|
|
// long poll until ctx expires / we find a message
|
|
var b common.Backoff
|
|
for {
|
|
call, err := s.mq.Reserve(ctx)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
if call != nil {
|
|
resp.M = append(resp.M, call)
|
|
c.JSON(200, resp)
|
|
return
|
|
}
|
|
|
|
b.Sleep(ctx)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
c.JSON(200, resp) // TODO assert this return `[]` & not 'nil'
|
|
return
|
|
default: // poll until we find a cookie
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleRunnerStart(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
var call models.Call
|
|
err := c.BindJSON(&call)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// TODO validate call?
|
|
|
|
// TODO hook up update. we really just want it to set status to running iff
|
|
// status=queued, but this must be in a txn in Update with behavior:
|
|
// queued->running
|
|
// running->error (returning error)
|
|
// error->error (returning error)
|
|
// success->success (returning error)
|
|
// timeout->timeout (returning error)
|
|
//
|
|
// there is nuance for running->error as in theory it could be the correct machine retrying
|
|
// and we risk not running a task [ever]. needs further thought, but marking as error will
|
|
// cover our tracks since if the db is down we can't run anything anyway (treat as such).
|
|
// TODO do this client side and validate it here?
|
|
//call.Status = "running"
|
|
//call.StartedAt = common.DateTime(time.Now())
|
|
//err := s.datastore.UpdateCall(c.Request.Context(), &call)
|
|
//if err != nil {
|
|
//if err == InvalidStatusChange {
|
|
//// TODO we could either let UpdateCall handle setting to error or do it
|
|
//// here explicitly
|
|
|
|
// TODO change this to only delete message if the status change fails b/c it already ran
|
|
// after messaging semantics change
|
|
if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
//}
|
|
//handleErrorResponse(c, err)
|
|
//return
|
|
//}
|
|
|
|
c.JSON(200, struct {
|
|
M string `json:"msg"`
|
|
}{M: "slingshot: engage"})
|
|
}
|
|
|
|
func (s *Server) handleRunnerFinish(c *gin.Context) {
|
|
ctx := c.Request.Context()
|
|
|
|
var body struct {
|
|
Call models.Call `json:"call"`
|
|
Log string `json:"log"` // TODO use multipart so that we don't have to serialize/deserialize this? measure..
|
|
}
|
|
err := c.BindJSON(&body)
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
handleErrorResponse(c, err)
|
|
} else {
|
|
handleErrorResponse(c, models.ErrInvalidJSON)
|
|
}
|
|
return
|
|
}
|
|
|
|
// TODO validate?
|
|
call := body.Call
|
|
|
|
// TODO this needs UpdateCall functionality to work for async and should only work if:
|
|
// running->error|timeout|success
|
|
// TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted*
|
|
if err := s.logstore.InsertCall(ctx, &call); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
if err := s.logstore.InsertLog(ctx, call.AppID, call.ID, strings.NewReader(body.Log)); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error uploading log")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
// TODO open this up after we change messaging semantics.
|
|
// TODO we don't know whether a call is async or sync. we likely need an additional
|
|
// arg in params for a message id and can detect based on this. for now, delete messages
|
|
// for sync and async even though sync doesn't have any (ignore error)
|
|
//if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
|
|
//common.Logger(ctx).WithError(err).Error("error deleting mq msg")
|
|
//// note: Not returning err here since the job could have already finished successfully.
|
|
//}
|
|
|
|
c.JSON(200, struct {
|
|
M string `json:"msg"`
|
|
}{M: "good night, sweet prince"})
|
|
}
|