unexport all data abstractions on Server (#618)

this patch has no behavior changes, changes are:

* server.Datastore() -> server.datastore
* server.MQ -> server.mq
* server.LogDB -> server.logstore
* server.Agent -> server.agent

these were at a minimum not uniform. further, it's probably better to force
configuration through initialization in `server.New` to ensure thread safety
of referencing if someone does want to modify these as well as forcing things
into our initialization path and reducing the surface area of the Server
abstraction.
This commit is contained in:
Reed Allman
2017-12-21 13:21:02 -06:00
committed by GitHub
parent 6d71f2acc2
commit a8a3e143c7
20 changed files with 57 additions and 54 deletions

View File

@@ -34,7 +34,7 @@ func (s *Server) handleAppCreate(c *gin.Context) {
return return
} }
app, err := s.Datastore().InsertApp(ctx, wapp.App) app, err := s.datastore.InsertApp(ctx, wapp.App)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -17,7 +17,7 @@ func (s *Server) handleAppDelete(c *gin.Context) {
err := s.FireBeforeAppDelete(ctx, app) err := s.FireBeforeAppDelete(ctx, app)
err = s.Datastore().RemoveApp(ctx, app.Name) err = s.datastore.RemoveApp(ctx, app.Name)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -18,7 +18,7 @@ func (s *Server) handleAppGet(c *gin.Context) {
return return
} }
app, err := s.Datastore().GetApp(ctx, appName) app, err := s.datastore.GetApp(ctx, appName)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -20,7 +20,7 @@ func (s *Server) handleAppList(c *gin.Context) {
return return
} }
apps, err := s.Datastore().GetApps(ctx, filter) apps, err := s.datastore.GetApps(ctx, filter)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -37,7 +37,7 @@ func (s *Server) handleAppUpdate(c *gin.Context) {
return return
} }
app, err := s.Datastore().UpdateApp(ctx, wapp.App) app, err := s.datastore.UpdateApp(ctx, wapp.App)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -12,7 +12,7 @@ func (s *Server) handleCallGet(c *gin.Context) {
appName := c.MustGet(api.AppName).(string) appName := c.MustGet(api.AppName).(string)
callID := c.Param(api.Call) callID := c.Param(api.Call)
callObj, err := s.Datastore().GetCall(ctx, appName, callID) callObj, err := s.datastore.GetCall(ctx, appName, callID)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -27,11 +27,11 @@ func (s *Server) handleCallList(c *gin.Context) {
return return
} }
calls, err := s.Datastore().GetCalls(ctx, &filter) calls, err := s.datastore.GetCalls(ctx, &filter)
if len(calls) == 0 { if len(calls) == 0 {
// TODO this should be done in front of this handler to even get here... // TODO this should be done in front of this handler to even get here...
_, err = s.Datastore().GetApp(c, appName) _, err = s.datastore.GetApp(c, appName)
} }
if err != nil { if err != nil {

View File

@@ -15,7 +15,7 @@ func (s *Server) handleCallLogGet(c *gin.Context) {
appName := c.MustGet(api.AppName).(string) appName := c.MustGet(api.AppName).(string)
callID := c.Param(api.Call) callID := c.Param(api.Call)
logReader, err := s.LogDB.GetLog(ctx, appName, callID) logReader, err := s.logstore.GetLog(ctx, appName, callID)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -20,7 +20,7 @@ func (s *Server) apiAppHandlerWrapperFunc(apiHandler fnext.ApiAppHandler) gin.Ha
return func(c *gin.Context) { return func(c *gin.Context) {
// get the app // get the app
appName := c.Param(api.CApp) appName := c.Param(api.CApp)
app, err := s.Datastore().GetApp(c.Request.Context(), appName) app, err := s.datastore.GetApp(c.Request.Context(), appName)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
c.Abort() c.Abort()
@@ -41,7 +41,7 @@ func (s *Server) apiRouteHandlerWrapperFunc(apiHandler fnext.ApiRouteHandler) gi
context := c.Request.Context() context := c.Request.Context()
// get the app // get the app
appName := c.Param(api.CApp) appName := c.Param(api.CApp)
app, err := s.Datastore().GetApp(context, appName) app, err := s.datastore.GetApp(context, appName)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
c.Abort() c.Abort()
@@ -54,7 +54,7 @@ func (s *Server) apiRouteHandlerWrapperFunc(apiHandler fnext.ApiRouteHandler) gi
} }
// get the route TODO // get the route TODO
routePath := "/" + c.Param(api.CRoute) routePath := "/" + c.Param(api.CRoute)
route, err := s.Datastore().GetRoute(context, appName, routePath) route, err := s.datastore.GetRoute(context, appName, routePath)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
c.Abort() c.Abort()

View File

@@ -25,7 +25,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) {
// TODO/NOTE: if this endpoint is called multiple times for the same call we // TODO/NOTE: if this endpoint is called multiple times for the same call we
// need to figure out the behavior we want. as it stands, there will be N // need to figure out the behavior we want. as it stands, there will be N
// messages for 1 call which only clogs up the MQ with spurious messages // messages for 1 call which only clogs up the mq with spurious messages
// (possibly useful if things get wedged, not the point), the task will still // (possibly useful if things get wedged, not the point), the task will still
// just run once by the first runner to set it to status=running. we may well // just run once by the first runner to set it to status=running. we may well
// want to push msg only if inserting the call fails, but then we have a call // want to push msg only if inserting the call fails, but then we have a call
@@ -33,7 +33,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) {
// endpoint be retry safe seems ideal and runners likely won't spam it, so current // endpoint be retry safe seems ideal and runners likely won't spam it, so current
// behavior is okay [but beware of implications]. // behavior is okay [but beware of implications].
call.Status = "queued" call.Status = "queued"
_, err = s.MQ.Push(ctx, &call) _, err = s.mq.Push(ctx, &call)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
@@ -44,7 +44,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) {
// runner and enter into 'running' state before we can insert it in the db as // runner and enter into 'running' state before we can insert it in the db as
// 'queued' state. we can ignore any error inserting into db here and Start // 'queued' state. we can ignore any error inserting into db here and Start
// will ensure the call exists in the db in 'running' state there. // will ensure the call exists in the db in 'running' state there.
// s.Datastore().InsertCall(ctx, &call) // s.datastore.InsertCall(ctx, &call)
c.JSON(200, struct { c.JSON(200, struct {
M string `json:"msg"` M string `json:"msg"`
@@ -64,7 +64,7 @@ func (s *Server) handleRunnerDequeue(c *gin.Context) {
// long poll until ctx expires / we find a message // long poll until ctx expires / we find a message
var b common.Backoff var b common.Backoff
for { for {
call, err := s.MQ.Reserve(ctx) call, err := s.mq.Reserve(ctx)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
@@ -112,7 +112,7 @@ func (s *Server) handleRunnerStart(c *gin.Context) {
// TODO do this client side and validate it here? // TODO do this client side and validate it here?
//call.Status = "running" //call.Status = "running"
//call.StartedAt = strfmt.DateTime(time.Now()) //call.StartedAt = strfmt.DateTime(time.Now())
//err := s.Datastore().UpdateCall(c.Request.Context(), &call) //err := s.datastore.UpdateCall(c.Request.Context(), &call)
//if err != nil { //if err != nil {
//if err == InvalidStatusChange { //if err == InvalidStatusChange {
//// TODO we could either let UpdateCall handle setting to error or do it //// TODO we could either let UpdateCall handle setting to error or do it
@@ -120,7 +120,7 @@ func (s *Server) handleRunnerStart(c *gin.Context) {
// TODO change this to only delete message if the status change fails b/c it already ran // TODO change this to only delete message if the status change fails b/c it already ran
// after messaging semantics change // after messaging semantics change
if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
} }
@@ -153,12 +153,12 @@ func (s *Server) handleRunnerFinish(c *gin.Context) {
// TODO this needs UpdateCall functionality to work for async and should only work if: // TODO this needs UpdateCall functionality to work for async and should only work if:
// running->error|timeout|success // running->error|timeout|success
// TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted* // TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted*
if err := s.Datastore().InsertCall(ctx, &call); err != nil { if err := s.datastore.InsertCall(ctx, &call); err != nil {
common.Logger(ctx).WithError(err).Error("error inserting call into datastore") common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }
if err := s.LogDB.InsertLog(ctx, call.AppName, call.ID, strings.NewReader(body.Log)); err != nil { if err := s.logstore.InsertLog(ctx, call.AppName, call.ID, strings.NewReader(body.Log)); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log") common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully. // note: Not returning err here since the job could have already finished successfully.
} }
@@ -167,7 +167,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) {
// TODO we don't know whether a call is async or sync. we likely need an additional // TODO we don't know whether a call is async or sync. we likely need an additional
// arg in params for a message id and can detect based on this. for now, delete messages // arg in params for a message id and can detect based on this. for now, delete messages
// for sync and async even though sync doesn't have any (ignore error) // for sync and async even though sync doesn't have any (ignore error)
//if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call //if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call
//common.Logger(ctx).WithError(err).Error("error deleting mq msg") //common.Logger(ctx).WithError(err).Error("error deleting mq msg")
//// note: Not returning err here since the job could have already finished successfully. //// note: Not returning err here since the job could have already finished successfully.
//} //}

View File

@@ -4,5 +4,5 @@ import "github.com/fnproject/fn/fnext"
// AddCallListener adds a listener that will be fired before and after a function is executed. // AddCallListener adds a listener that will be fired before and after a function is executed.
func (s *Server) AddCallListener(listener fnext.CallListener) { func (s *Server) AddCallListener(listener fnext.CallListener) {
s.Agent.AddCallListener(listener) s.agent.AddCallListener(listener)
} }

View File

@@ -5,5 +5,5 @@ import (
) )
func (s *Server) handlePrometheusMetrics(c *gin.Context) { func (s *Server) handlePrometheusMetrics(c *gin.Context) {
s.Agent.PromHandler().ServeHTTP(c.Writer, c.Request) s.agent.PromHandler().ServeHTTP(c.Writer, c.Request)
} }

View File

@@ -57,7 +57,7 @@ func (s *Server) submitRoute(ctx context.Context, wroute *models.RouteWrapper) e
if err != nil { if err != nil {
return err return err
} }
r, err := s.Datastore().InsertRoute(ctx, wroute.Route) r, err := s.datastore.InsertRoute(ctx, wroute.Route)
if err != nil { if err != nil {
return err return err
} }
@@ -66,7 +66,7 @@ func (s *Server) submitRoute(ctx context.Context, wroute *models.RouteWrapper) e
} }
func (s *Server) changeRoute(ctx context.Context, wroute *models.RouteWrapper) error { func (s *Server) changeRoute(ctx context.Context, wroute *models.RouteWrapper) error {
r, err := s.Datastore().UpdateRoute(ctx, wroute.Route) r, err := s.datastore.UpdateRoute(ctx, wroute.Route)
if err != nil { if err != nil {
return err return err
} }
@@ -86,7 +86,7 @@ func (s *Server) ensureRoute(ctx context.Context, method string, wroute *models.
} }
return routeResponse{"Route successfully created", wroute.Route}, nil return routeResponse{"Route successfully created", wroute.Route}, nil
case http.MethodPut: case http.MethodPut:
_, err := s.Datastore().GetRoute(ctx, wroute.Route.AppName, wroute.Route.Path) _, err := s.datastore.GetRoute(ctx, wroute.Route.AppName, wroute.Route.Path)
if err != nil && err == models.ErrRoutesNotFound { if err != nil && err == models.ErrRoutesNotFound {
err := s.submitRoute(ctx, wroute) err := s.submitRoute(ctx, wroute)
if err != nil { if err != nil {
@@ -111,7 +111,7 @@ func (s *Server) ensureRoute(ctx context.Context, method string, wroute *models.
// ensureApp will only execute if it is on post or put. Patch is not allowed to create apps. // ensureApp will only execute if it is on post or put. Patch is not allowed to create apps.
func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, method string) error { func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, method string) error {
app, err := s.Datastore().GetApp(ctx, wroute.Route.AppName) app, err := s.datastore.GetApp(ctx, wroute.Route.AppName)
if err != nil && err != models.ErrAppsNotFound { if err != nil && err != models.ErrAppsNotFound {
return err return err
} else if app == nil { } else if app == nil {
@@ -126,7 +126,7 @@ func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, met
return err return err
} }
_, err = s.Datastore().InsertApp(ctx, newapp) _, err = s.datastore.InsertApp(ctx, newapp)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -14,12 +14,12 @@ func (s *Server) handleRouteDelete(c *gin.Context) {
appName := c.MustGet(api.AppName).(string) appName := c.MustGet(api.AppName).(string)
routePath := path.Clean(c.MustGet(api.Path).(string)) routePath := path.Clean(c.MustGet(api.Path).(string))
if _, err := s.Datastore().GetRoute(ctx, appName, routePath); err != nil { if _, err := s.datastore.GetRoute(ctx, appName, routePath); err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
} }
if err := s.Datastore().RemoveRoute(ctx, appName, routePath); err != nil { if err := s.datastore.RemoveRoute(ctx, appName, routePath); err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
} }

View File

@@ -13,7 +13,7 @@ func (s *Server) handleRouteGet(c *gin.Context) {
appName := c.MustGet(api.AppName).(string) appName := c.MustGet(api.AppName).(string)
routePath := path.Clean("/" + c.MustGet(api.Path).(string)) routePath := path.Clean("/" + c.MustGet(api.Path).(string))
route, err := s.Datastore().GetRoute(ctx, appName, routePath) route, err := s.datastore.GetRoute(ctx, appName, routePath)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return

View File

@@ -19,13 +19,13 @@ func (s *Server) handleRouteList(c *gin.Context) {
// filter.PathPrefix = c.Query("path_prefix") TODO not hooked up // filter.PathPrefix = c.Query("path_prefix") TODO not hooked up
filter.Cursor, filter.PerPage = pageParams(c, true) filter.Cursor, filter.PerPage = pageParams(c, true)
routes, err := s.Datastore().GetRoutesByApp(ctx, appName, &filter) routes, err := s.datastore.GetRoutesByApp(ctx, appName, &filter)
// if there are no routes for the app, check if the app exists to return // if there are no routes for the app, check if the app exists to return
// 404 if it does not // 404 if it does not
// TODO this should be done in front of this handler to even get here... // TODO this should be done in front of this handler to even get here...
if err == nil && len(routes) == 0 { if err == nil && len(routes) == 0 {
_, err = s.Datastore().GetApp(ctx, appName) _, err = s.datastore.GetApp(ctx, appName)
} }
if err != nil { if err != nil {

View File

@@ -55,7 +55,7 @@ func parseParams(params gin.Params) agent.Params {
func (s *Server) serve(c *gin.Context, appName, path string) { func (s *Server) serve(c *gin.Context, appName, path string) {
// GetCall can mod headers, assign an id, look up the route/app (cached), // GetCall can mod headers, assign an id, look up the route/app (cached),
// strip params, etc. // strip params, etc.
call, err := s.Agent.GetCall( call, err := s.agent.GetCall(
agent.WithWriter(c.Writer), // XXX (reed): order matters [for now] agent.WithWriter(c.Writer), // XXX (reed): order matters [for now]
agent.FromRequest(appName, path, c.Request, parseParams(c.Params)), agent.FromRequest(appName, path, c.Request, parseParams(c.Params)),
) )
@@ -85,7 +85,7 @@ func (s *Server) serve(c *gin.Context, appName, path string) {
model.Payload = buf.String() model.Payload = buf.String()
// TODO idk where to put this, but agent is all runner really has... // TODO idk where to put this, but agent is all runner really has...
err = s.Agent.Enqueue(c.Request.Context(), model) err = s.agent.Enqueue(c.Request.Context(), model)
if err != nil { if err != nil {
handleErrorResponse(c, err) handleErrorResponse(c, err)
return return
@@ -95,7 +95,7 @@ func (s *Server) serve(c *gin.Context, appName, path string) {
return return
} }
err = s.Agent.Submit(call) err = s.agent.Submit(call)
if err != nil { if err != nil {
// NOTE if they cancel the request then it will stop the call (kind of cool), // NOTE if they cancel the request then it will stop the call (kind of cool),
// we could filter that error out here too as right now it yells a little // we could filter that error out here too as right now it yells a little

View File

@@ -17,10 +17,10 @@ func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr agent.Agen
ctx := context.Background() ctx := context.Background()
s := &Server{ s := &Server{
Agent: rnr, agent: rnr,
Router: gin.New(), Router: gin.New(),
datastore: ds, datastore: ds,
MQ: mq, mq: mq,
nodeType: ServerTypeFull, nodeType: ServerTypeFull,
} }

View File

@@ -64,11 +64,13 @@ func (s ServerNodeType) String() string {
} }
type Server struct { type Server struct {
// TODO this one maybe we have `AddRoute` in extensions?
Router *gin.Engine Router *gin.Engine
Agent agent.Agent
agent agent.Agent
datastore models.Datastore datastore models.Datastore
MQ models.MessageQueue mq models.MessageQueue
LogDB models.LogStore logstore models.LogStore
nodeType ServerNodeType nodeType ServerNodeType
appListeners []fnext.AppListener appListeners []fnext.AppListener
rootMiddlewares []fnext.Middleware rootMiddlewares []fnext.Middleware
@@ -168,15 +170,15 @@ func WithDatastore(ds models.Datastore) ServerOption {
} }
func WithMQ(mq models.MessageQueue) ServerOption { func WithMQ(mq models.MessageQueue) ServerOption {
return func(s *Server) { s.MQ = mq } return func(s *Server) { s.mq = mq }
} }
func WithLogstore(ls models.LogStore) ServerOption { func WithLogstore(ls models.LogStore) ServerOption {
return func(s *Server) { s.LogDB = ls } return func(s *Server) { s.logstore = ls }
} }
func WithAgent(agent agent.Agent) ServerOption { func WithAgent(agent agent.Agent) ServerOption {
return func(s *Server) { s.Agent = agent } return func(s *Server) { s.agent = agent }
} }
// New creates a new Functions server with the opts given. For convenience, users may // New creates a new Functions server with the opts given. For convenience, users may
@@ -194,28 +196,28 @@ func New(ctx context.Context, opts ...ServerOption) *Server {
opt(s) opt(s)
} }
if s.LogDB == nil { // TODO seems weird? if s.logstore == nil { // TODO seems weird?
s.LogDB = s.Datastore() s.logstore = s.datastore
} }
// TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side? // TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side?
switch s.nodeType { switch s.nodeType {
case ServerTypeAPI: case ServerTypeAPI:
s.Agent = nil s.agent = nil
case ServerTypeRunner: case ServerTypeRunner:
if s.Agent == nil { if s.agent == nil {
logrus.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.") logrus.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.")
} }
default: default:
s.nodeType = ServerTypeFull s.nodeType = ServerTypeFull
if s.Datastore() == nil || s.LogDB == nil || s.MQ == nil { if s.datastore == nil || s.logstore == nil || s.mq == nil {
logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
} }
// TODO force caller to use WithAgent option ? // TODO force caller to use WithAgent option ?
// TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it... // TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it...
s.Agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.Datastore(), s.LogDB, s.MQ))) s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq)))
} }
setMachineID() setMachineID()
@@ -358,8 +360,8 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
logrus.WithError(err).Error("server shutdown error") logrus.WithError(err).Error("server shutdown error")
} }
if s.Agent != nil { if s.agent != nil {
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish s.agent.Close() // after we stop taking requests, wait for all tasks to finish
} }
} }
@@ -434,6 +436,7 @@ func (s *Server) bindHandlers(ctx context.Context) {
}) })
} }
// implements fnext.ExtServer
func (s *Server) Datastore() models.Datastore { func (s *Server) Datastore() models.Datastore {
return s.datastore return s.datastore
} }

View File

@@ -7,5 +7,5 @@ import (
) )
func (s *Server) handleStats(c *gin.Context) { func (s *Server) handleStats(c *gin.Context) {
c.JSON(http.StatusOK, s.Agent.Stats()) c.JSON(http.StatusOK, s.agent.Stats())
} }