diff --git a/api/server/apps_create.go b/api/server/apps_create.go index 599da89b7..97ffe5b44 100644 --- a/api/server/apps_create.go +++ b/api/server/apps_create.go @@ -34,7 +34,7 @@ func (s *Server) handleAppCreate(c *gin.Context) { return } - app, err := s.Datastore().InsertApp(ctx, wapp.App) + app, err := s.datastore.InsertApp(ctx, wapp.App) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/apps_delete.go b/api/server/apps_delete.go index c6d9075ac..e1116505d 100644 --- a/api/server/apps_delete.go +++ b/api/server/apps_delete.go @@ -17,7 +17,7 @@ func (s *Server) handleAppDelete(c *gin.Context) { err := s.FireBeforeAppDelete(ctx, app) - err = s.Datastore().RemoveApp(ctx, app.Name) + err = s.datastore.RemoveApp(ctx, app.Name) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/apps_get.go b/api/server/apps_get.go index b425f71f8..5f8eed367 100644 --- a/api/server/apps_get.go +++ b/api/server/apps_get.go @@ -18,7 +18,7 @@ func (s *Server) handleAppGet(c *gin.Context) { return } - app, err := s.Datastore().GetApp(ctx, appName) + app, err := s.datastore.GetApp(ctx, appName) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/apps_list.go b/api/server/apps_list.go index 1577d055c..04097aac6 100644 --- a/api/server/apps_list.go +++ b/api/server/apps_list.go @@ -20,7 +20,7 @@ func (s *Server) handleAppList(c *gin.Context) { return } - apps, err := s.Datastore().GetApps(ctx, filter) + apps, err := s.datastore.GetApps(ctx, filter) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/apps_update.go b/api/server/apps_update.go index 8be7eaa16..8f4161b9f 100644 --- a/api/server/apps_update.go +++ b/api/server/apps_update.go @@ -37,7 +37,7 @@ func (s *Server) handleAppUpdate(c *gin.Context) { return } - app, err := s.Datastore().UpdateApp(ctx, wapp.App) + app, err := s.datastore.UpdateApp(ctx, wapp.App) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/call_get.go b/api/server/call_get.go index f5345c65d..767e13b87 100644 --- a/api/server/call_get.go +++ b/api/server/call_get.go @@ -12,7 +12,7 @@ func (s *Server) handleCallGet(c *gin.Context) { appName := c.MustGet(api.AppName).(string) callID := c.Param(api.Call) - callObj, err := s.Datastore().GetCall(ctx, appName, callID) + callObj, err := s.datastore.GetCall(ctx, appName, callID) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/call_list.go b/api/server/call_list.go index 8b18fc0a6..e81172dca 100644 --- a/api/server/call_list.go +++ b/api/server/call_list.go @@ -27,11 +27,11 @@ func (s *Server) handleCallList(c *gin.Context) { return } - calls, err := s.Datastore().GetCalls(ctx, &filter) + calls, err := s.datastore.GetCalls(ctx, &filter) if len(calls) == 0 { // TODO this should be done in front of this handler to even get here... - _, err = s.Datastore().GetApp(c, appName) + _, err = s.datastore.GetApp(c, appName) } if err != nil { diff --git a/api/server/call_logs.go b/api/server/call_logs.go index 11f65f850..62745bcf2 100644 --- a/api/server/call_logs.go +++ b/api/server/call_logs.go @@ -15,7 +15,7 @@ func (s *Server) handleCallLogGet(c *gin.Context) { appName := c.MustGet(api.AppName).(string) callID := c.Param(api.Call) - logReader, err := s.LogDB.GetLog(ctx, appName, callID) + logReader, err := s.logstore.GetLog(ctx, appName, callID) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/extension_points.go b/api/server/extension_points.go index 277c500a2..4fa1a7bc8 100644 --- a/api/server/extension_points.go +++ b/api/server/extension_points.go @@ -20,7 +20,7 @@ func (s *Server) apiAppHandlerWrapperFunc(apiHandler fnext.ApiAppHandler) gin.Ha return func(c *gin.Context) { // get the app appName := c.Param(api.CApp) - app, err := s.Datastore().GetApp(c.Request.Context(), appName) + app, err := s.datastore.GetApp(c.Request.Context(), appName) if err != nil { handleErrorResponse(c, err) c.Abort() @@ -41,7 +41,7 @@ func (s *Server) apiRouteHandlerWrapperFunc(apiHandler fnext.ApiRouteHandler) gi context := c.Request.Context() // get the app appName := c.Param(api.CApp) - app, err := s.Datastore().GetApp(context, appName) + app, err := s.datastore.GetApp(context, appName) if err != nil { handleErrorResponse(c, err) c.Abort() @@ -54,7 +54,7 @@ func (s *Server) apiRouteHandlerWrapperFunc(apiHandler fnext.ApiRouteHandler) gi } // get the route TODO routePath := "/" + c.Param(api.CRoute) - route, err := s.Datastore().GetRoute(context, appName, routePath) + route, err := s.datastore.GetRoute(context, appName, routePath) if err != nil { handleErrorResponse(c, err) c.Abort() diff --git a/api/server/hybrid.go b/api/server/hybrid.go index e9971cb0d..ed2f23275 100644 --- a/api/server/hybrid.go +++ b/api/server/hybrid.go @@ -25,7 +25,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) { // TODO/NOTE: if this endpoint is called multiple times for the same call we // need to figure out the behavior we want. as it stands, there will be N - // messages for 1 call which only clogs up the MQ with spurious messages + // messages for 1 call which only clogs up the mq with spurious messages // (possibly useful if things get wedged, not the point), the task will still // just run once by the first runner to set it to status=running. we may well // want to push msg only if inserting the call fails, but then we have a call @@ -33,7 +33,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) { // endpoint be retry safe seems ideal and runners likely won't spam it, so current // behavior is okay [but beware of implications]. call.Status = "queued" - _, err = s.MQ.Push(ctx, &call) + _, err = s.mq.Push(ctx, &call) if err != nil { handleErrorResponse(c, err) return @@ -44,7 +44,7 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) { // runner and enter into 'running' state before we can insert it in the db as // 'queued' state. we can ignore any error inserting into db here and Start // will ensure the call exists in the db in 'running' state there. - // s.Datastore().InsertCall(ctx, &call) + // s.datastore.InsertCall(ctx, &call) c.JSON(200, struct { M string `json:"msg"` @@ -64,7 +64,7 @@ func (s *Server) handleRunnerDequeue(c *gin.Context) { // long poll until ctx expires / we find a message var b common.Backoff for { - call, err := s.MQ.Reserve(ctx) + call, err := s.mq.Reserve(ctx) if err != nil { handleErrorResponse(c, err) return @@ -112,7 +112,7 @@ func (s *Server) handleRunnerStart(c *gin.Context) { // TODO do this client side and validate it here? //call.Status = "running" //call.StartedAt = strfmt.DateTime(time.Now()) - //err := s.Datastore().UpdateCall(c.Request.Context(), &call) + //err := s.datastore.UpdateCall(c.Request.Context(), &call) //if err != nil { //if err == InvalidStatusChange { //// TODO we could either let UpdateCall handle setting to error or do it @@ -120,7 +120,7 @@ func (s *Server) handleRunnerStart(c *gin.Context) { // TODO change this to only delete message if the status change fails b/c it already ran // after messaging semantics change - if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call + if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call handleErrorResponse(c, err) return } @@ -153,12 +153,12 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { // TODO this needs UpdateCall functionality to work for async and should only work if: // running->error|timeout|success // TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted* - if err := s.Datastore().InsertCall(ctx, &call); err != nil { + if err := s.datastore.InsertCall(ctx, &call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") // note: Not returning err here since the job could have already finished successfully. } - if err := s.LogDB.InsertLog(ctx, call.AppName, call.ID, strings.NewReader(body.Log)); err != nil { + if err := s.logstore.InsertLog(ctx, call.AppName, call.ID, strings.NewReader(body.Log)); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") // note: Not returning err here since the job could have already finished successfully. } @@ -167,7 +167,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { // TODO we don't know whether a call is async or sync. we likely need an additional // arg in params for a message id and can detect based on this. for now, delete messages // for sync and async even though sync doesn't have any (ignore error) - //if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call + //if err := s.mq.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call //common.Logger(ctx).WithError(err).Error("error deleting mq msg") //// note: Not returning err here since the job could have already finished successfully. //} diff --git a/api/server/listeners.go b/api/server/listeners.go index d63abb2fb..5c606c858 100644 --- a/api/server/listeners.go +++ b/api/server/listeners.go @@ -4,5 +4,5 @@ import "github.com/fnproject/fn/fnext" // AddCallListener adds a listener that will be fired before and after a function is executed. func (s *Server) AddCallListener(listener fnext.CallListener) { - s.Agent.AddCallListener(listener) + s.agent.AddCallListener(listener) } diff --git a/api/server/prometheus_metrics.go b/api/server/prometheus_metrics.go index 5ab6be40e..f023056a3 100644 --- a/api/server/prometheus_metrics.go +++ b/api/server/prometheus_metrics.go @@ -5,5 +5,5 @@ import ( ) func (s *Server) handlePrometheusMetrics(c *gin.Context) { - s.Agent.PromHandler().ServeHTTP(c.Writer, c.Request) + s.agent.PromHandler().ServeHTTP(c.Writer, c.Request) } diff --git a/api/server/routes_create_update.go b/api/server/routes_create_update.go index a3af69f32..98cfbdc20 100644 --- a/api/server/routes_create_update.go +++ b/api/server/routes_create_update.go @@ -57,7 +57,7 @@ func (s *Server) submitRoute(ctx context.Context, wroute *models.RouteWrapper) e if err != nil { return err } - r, err := s.Datastore().InsertRoute(ctx, wroute.Route) + r, err := s.datastore.InsertRoute(ctx, wroute.Route) if err != nil { return err } @@ -66,7 +66,7 @@ func (s *Server) submitRoute(ctx context.Context, wroute *models.RouteWrapper) e } func (s *Server) changeRoute(ctx context.Context, wroute *models.RouteWrapper) error { - r, err := s.Datastore().UpdateRoute(ctx, wroute.Route) + r, err := s.datastore.UpdateRoute(ctx, wroute.Route) if err != nil { return err } @@ -86,7 +86,7 @@ func (s *Server) ensureRoute(ctx context.Context, method string, wroute *models. } return routeResponse{"Route successfully created", wroute.Route}, nil case http.MethodPut: - _, err := s.Datastore().GetRoute(ctx, wroute.Route.AppName, wroute.Route.Path) + _, err := s.datastore.GetRoute(ctx, wroute.Route.AppName, wroute.Route.Path) if err != nil && err == models.ErrRoutesNotFound { err := s.submitRoute(ctx, wroute) if err != nil { @@ -111,7 +111,7 @@ func (s *Server) ensureRoute(ctx context.Context, method string, wroute *models. // ensureApp will only execute if it is on post or put. Patch is not allowed to create apps. func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, method string) error { - app, err := s.Datastore().GetApp(ctx, wroute.Route.AppName) + app, err := s.datastore.GetApp(ctx, wroute.Route.AppName) if err != nil && err != models.ErrAppsNotFound { return err } else if app == nil { @@ -126,7 +126,7 @@ func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, met return err } - _, err = s.Datastore().InsertApp(ctx, newapp) + _, err = s.datastore.InsertApp(ctx, newapp) if err != nil { return err } diff --git a/api/server/routes_delete.go b/api/server/routes_delete.go index b457fffac..cebe03c65 100644 --- a/api/server/routes_delete.go +++ b/api/server/routes_delete.go @@ -14,12 +14,12 @@ func (s *Server) handleRouteDelete(c *gin.Context) { appName := c.MustGet(api.AppName).(string) routePath := path.Clean(c.MustGet(api.Path).(string)) - if _, err := s.Datastore().GetRoute(ctx, appName, routePath); err != nil { + if _, err := s.datastore.GetRoute(ctx, appName, routePath); err != nil { handleErrorResponse(c, err) return } - if err := s.Datastore().RemoveRoute(ctx, appName, routePath); err != nil { + if err := s.datastore.RemoveRoute(ctx, appName, routePath); err != nil { handleErrorResponse(c, err) return } diff --git a/api/server/routes_get.go b/api/server/routes_get.go index 959d0ffe4..f508d56f9 100644 --- a/api/server/routes_get.go +++ b/api/server/routes_get.go @@ -13,7 +13,7 @@ func (s *Server) handleRouteGet(c *gin.Context) { appName := c.MustGet(api.AppName).(string) routePath := path.Clean("/" + c.MustGet(api.Path).(string)) - route, err := s.Datastore().GetRoute(ctx, appName, routePath) + route, err := s.datastore.GetRoute(ctx, appName, routePath) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/routes_list.go b/api/server/routes_list.go index ce222d109..e88204785 100644 --- a/api/server/routes_list.go +++ b/api/server/routes_list.go @@ -19,13 +19,13 @@ func (s *Server) handleRouteList(c *gin.Context) { // filter.PathPrefix = c.Query("path_prefix") TODO not hooked up filter.Cursor, filter.PerPage = pageParams(c, true) - routes, err := s.Datastore().GetRoutesByApp(ctx, appName, &filter) + routes, err := s.datastore.GetRoutesByApp(ctx, appName, &filter) // if there are no routes for the app, check if the app exists to return // 404 if it does not // TODO this should be done in front of this handler to even get here... if err == nil && len(routes) == 0 { - _, err = s.Datastore().GetApp(ctx, appName) + _, err = s.datastore.GetApp(ctx, appName) } if err != nil { diff --git a/api/server/runner.go b/api/server/runner.go index f40b192be..21231a09e 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -55,7 +55,7 @@ func parseParams(params gin.Params) agent.Params { func (s *Server) serve(c *gin.Context, appName, path string) { // GetCall can mod headers, assign an id, look up the route/app (cached), // strip params, etc. - call, err := s.Agent.GetCall( + call, err := s.agent.GetCall( agent.WithWriter(c.Writer), // XXX (reed): order matters [for now] agent.FromRequest(appName, path, c.Request, parseParams(c.Params)), ) @@ -85,7 +85,7 @@ func (s *Server) serve(c *gin.Context, appName, path string) { model.Payload = buf.String() // TODO idk where to put this, but agent is all runner really has... - err = s.Agent.Enqueue(c.Request.Context(), model) + err = s.agent.Enqueue(c.Request.Context(), model) if err != nil { handleErrorResponse(c, err) return @@ -95,7 +95,7 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - err = s.Agent.Submit(call) + err = s.agent.Submit(call) if err != nil { // NOTE if they cancel the request then it will stop the call (kind of cool), // we could filter that error out here too as right now it yells a little diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 9516d968d..943cf96fb 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -17,10 +17,10 @@ func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr agent.Agen ctx := context.Background() s := &Server{ - Agent: rnr, + agent: rnr, Router: gin.New(), datastore: ds, - MQ: mq, + mq: mq, nodeType: ServerTypeFull, } diff --git a/api/server/server.go b/api/server/server.go index c9617913c..2c931d96f 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -64,11 +64,13 @@ func (s ServerNodeType) String() string { } type Server struct { - Router *gin.Engine - Agent agent.Agent + // TODO this one maybe we have `AddRoute` in extensions? + Router *gin.Engine + + agent agent.Agent datastore models.Datastore - MQ models.MessageQueue - LogDB models.LogStore + mq models.MessageQueue + logstore models.LogStore nodeType ServerNodeType appListeners []fnext.AppListener rootMiddlewares []fnext.Middleware @@ -168,15 +170,15 @@ func WithDatastore(ds models.Datastore) ServerOption { } func WithMQ(mq models.MessageQueue) ServerOption { - return func(s *Server) { s.MQ = mq } + return func(s *Server) { s.mq = mq } } func WithLogstore(ls models.LogStore) ServerOption { - return func(s *Server) { s.LogDB = ls } + return func(s *Server) { s.logstore = ls } } func WithAgent(agent agent.Agent) ServerOption { - return func(s *Server) { s.Agent = agent } + return func(s *Server) { s.agent = agent } } // New creates a new Functions server with the opts given. For convenience, users may @@ -194,28 +196,28 @@ func New(ctx context.Context, opts ...ServerOption) *Server { opt(s) } - if s.LogDB == nil { // TODO seems weird? - s.LogDB = s.Datastore() + if s.logstore == nil { // TODO seems weird? + s.logstore = s.datastore } // TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side? switch s.nodeType { case ServerTypeAPI: - s.Agent = nil + s.agent = nil case ServerTypeRunner: - if s.Agent == nil { + if s.agent == nil { logrus.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.") } default: s.nodeType = ServerTypeFull - if s.Datastore() == nil || s.LogDB == nil || s.MQ == nil { + if s.datastore == nil || s.logstore == nil || s.mq == nil { logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") } // TODO force caller to use WithAgent option ? // TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it... - s.Agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.Datastore(), s.LogDB, s.MQ))) + s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq))) } setMachineID() @@ -358,8 +360,8 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { logrus.WithError(err).Error("server shutdown error") } - if s.Agent != nil { - s.Agent.Close() // after we stop taking requests, wait for all tasks to finish + if s.agent != nil { + s.agent.Close() // after we stop taking requests, wait for all tasks to finish } } @@ -434,6 +436,7 @@ func (s *Server) bindHandlers(ctx context.Context) { }) } +// implements fnext.ExtServer func (s *Server) Datastore() models.Datastore { return s.datastore } diff --git a/api/server/stats.go b/api/server/stats.go index 9256cc87d..f51d7104e 100644 --- a/api/server/stats.go +++ b/api/server/stats.go @@ -7,5 +7,5 @@ import ( ) func (s *Server) handleStats(c *gin.Context) { - c.JSON(http.StatusOK, s.Agent.Stats()) + c.JSON(http.StatusOK, s.agent.Stats()) }