mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* so it begins * add clarification to /dequeue, change response to list to future proof * Specify that runner endpoints are also under /v1 * Add a flag to choose operation mode (node type). This is specified using the `FN_NODE_TYPE` environment variable. The default is the existing behaviour, where the server supports all operations (full API plus asynchronous and synchronous runners). The additional modes are: * API - the full API is available, but no functions are executed by the node. Async calls are placed into a message queue, and synchronous calls are not supported (invoking them results in an API error). * Runner - only the invocation/route API is present. Asynchronous and synchronous invocation requests are supported, but asynchronous requests are placed onto the message queue, so might be handled by another runner. * Add agent type and checks on Submit * Sketch of a factored out data access abstraction for api/runner agents * Fix tests, adding node/agent types to constructors * Add tests for full, API, and runner server modes. * Added atomic UpdateCall to datastore * adds in server side endpoints * Made ServerNodeType public because tests use it * Made ServerNodeType public because tests use it * fix test build * add hybrid runner client pretty simple go api client that covers surface area needed for hybrid, returning structs from models that the agent can use directly. not exactly sure where to put this, so put it in `/clients/hybrid` but maybe we should make `/api/runner/client` or something and shove it in there. want to get integration tests set up and use the real endpoints next and then wrap this up in the DataAccessLayer stuff. * gracefully handles errors from fn * handles backoff & retry on 500s * will add to existing spans for debuggo action * minor fixes * meh
135 lines
3.9 KiB
Go
135 lines
3.9 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"net/http"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api"
|
|
"github.com/fnproject/fn/api/agent"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type runnerResponse struct {
|
|
RequestID string `json:"request_id,omitempty"`
|
|
Error *models.ErrorBody `json:"error,omitempty"`
|
|
}
|
|
|
|
// handleFunctionCall executes the function.
|
|
// Requires the following in the context:
|
|
// * "app_name"
|
|
// * "path"
|
|
func (s *Server) handleFunctionCall(c *gin.Context) {
|
|
// @treeder: Is this necessary? An app could have this prefix too. Leaving here for review.
|
|
// if strings.HasPrefix(c.Request.URL.Path, "/v1") {
|
|
// c.Status(http.StatusNotFound)
|
|
// return
|
|
// }
|
|
|
|
ctx := c.Request.Context()
|
|
var p string
|
|
r := ctx.Value(api.Path)
|
|
if r == nil {
|
|
p = "/"
|
|
} else {
|
|
p = r.(string)
|
|
}
|
|
|
|
var a string
|
|
ai := ctx.Value(api.AppName)
|
|
if ai == nil {
|
|
handleErrorResponse(c, errors.New("app name not set"))
|
|
return
|
|
}
|
|
a = ai.(string)
|
|
|
|
s.serve(c, a, path.Clean(p))
|
|
}
|
|
|
|
// convert gin.Params to agent.Params to avoid introducing gin
|
|
// dependency to agent
|
|
func parseParams(params gin.Params) agent.Params {
|
|
out := make(agent.Params, 0, len(params))
|
|
for _, val := range params {
|
|
out = append(out, agent.Param{Key: val.Key, Value: val.Value})
|
|
}
|
|
return out
|
|
}
|
|
|
|
// TODO it would be nice if we could make this have nothing to do with the gin.Context but meh
|
|
// TODO make async store an *http.Request? would be sexy until we have different api format...
|
|
func (s *Server) serve(c *gin.Context, appName, path string) {
|
|
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
|
// strip params, etc.
|
|
call, err := s.Agent.GetCall(
|
|
agent.WithWriter(c.Writer), // XXX (reed): order matters [for now]
|
|
agent.FromRequest(appName, path, c.Request, parseParams(c.Params)),
|
|
)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
|
|
model := call.Model()
|
|
{ // scope this, to disallow ctx use outside of this scope. add id for handleErrorResponse logger
|
|
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
|
c.Request = c.Request.WithContext(ctx)
|
|
}
|
|
|
|
if model.Type == "async" {
|
|
// TODO we should push this into GetCall somehow (CallOpt maybe) or maybe agent.Queue(Call) ?
|
|
contentLength := c.Request.ContentLength
|
|
if contentLength < 128 { // contentLength could be -1 or really small, sanitize
|
|
contentLength = 128
|
|
}
|
|
buf := bytes.NewBuffer(make([]byte, int(contentLength))[:0]) // TODO sync.Pool me
|
|
_, err := buf.ReadFrom(c.Request.Body)
|
|
if err != nil {
|
|
handleErrorResponse(c, models.ErrInvalidPayload)
|
|
return
|
|
}
|
|
model.Payload = buf.String()
|
|
|
|
// TODO we should probably add this to the datastore too. consider the plumber!
|
|
_, err = s.MQ.Push(c.Request.Context(), model)
|
|
if err != nil {
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusAccepted, map[string]string{"call_id": model.ID})
|
|
return
|
|
}
|
|
|
|
// Don't serve sync requests from API nodes
|
|
if s.nodeType != ServerTypeAPI {
|
|
err = s.Agent.Submit(call)
|
|
if err != nil {
|
|
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
|
// we could filter that error out here too as right now it yells a little
|
|
if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout {
|
|
// TODO maneuver
|
|
// add this, since it means that start may not have been called [and it's relevant]
|
|
c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String())
|
|
}
|
|
// NOTE: if the task wrote the headers already then this will fail to write
|
|
// a 5xx (and log about it to us) -- that's fine (nice, even!)
|
|
handleErrorResponse(c, err)
|
|
return
|
|
}
|
|
} else {
|
|
handleErrorResponse(c, models.ErrSyncCallNotSupported)
|
|
}
|
|
|
|
// TODO plumb FXLB-WAIT somehow (api?)
|
|
|
|
// TODO we need to watch the response writer and if no bytes written
|
|
// then write a 200 at this point?
|
|
// c.Data(http.StatusOK)
|
|
}
|