diff --git a/api/agent/agent.go b/api/agent/agent.go index 06dbe61df..dfb533703 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -24,7 +24,6 @@ import ( "github.com/sirupsen/logrus" ) -// TODO make sure some errors that user should see (like image doesn't exist) bubble up // TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution) // TODO async calls need to add route.Headers as well // TODO need to shut off reads/writes in dispatch to the pipes when call times out so that @@ -32,15 +31,8 @@ import ( // TODO add spans back around container launching for hot (follows from?) + other more granular spans // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) -// TODO herd launch prevention part deux // TODO storing logs / call can push call over the timeout -// TODO all Datastore methods need to take unit of tenancy (app or route) at least (e.g. not just call id) // TODO discuss concrete policy for hot launch or timeout / timeout vs time left -// TODO it may be nice to have an interchange type for Dispatch that can have -// all the info we need to build e.g. http req, grpc req, json, etc. so that -// we can easily do e.g. http->grpc, grpc->http, http->json. ofc grpc<->http is -// weird for proto specifics like e.g. proto version, method, headers, et al. -// discuss. // TODO if we don't cap the number of any one container we could get into a situation // where the machine is full but all the containers are idle up to the idle timeout. meh. // TODO async is still broken, but way less so. we need to modify mq semantics @@ -49,9 +41,7 @@ import ( // dies). need coordination w/ db. // TODO if a cold call times out but container is created but hasn't replied, could // end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?) -// TODO the call api should fill in all the fields // TODO the log api should be plaintext (or at least offer it) -// TODO we should probably differentiate ran-but-timeout vs timeout-before-run // TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness. // TODO if async would store requests (or interchange format) it would be slick, but // if we're going to store full calls in db maybe we should only queue pointers to ids? @@ -113,20 +103,15 @@ type Agent interface { // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler AddCallListener(fnext.CallListener) + + // Enqueue is to use the agent's sweet sweet client bindings to remotely + // queue async tasks and should be removed from Agent interface ASAP. + Enqueue(context.Context, *models.Call) error } -type AgentNodeType int32 - -const ( - AgentTypeFull AgentNodeType = iota - AgentTypeAPI - AgentTypeRunner -) - type agent struct { da DataAccess callListeners []fnext.CallListener - tp AgentNodeType driver drivers.Driver @@ -146,13 +131,12 @@ type agent struct { promHandler http.Handler } -func New(ds models.Datastore, ls models.LogStore, mq models.MessageQueue, tp AgentNodeType) Agent { +func New(da DataAccess) Agent { // TODO: Create drivers.New(runnerConfig) driver := docker.NewDocker(drivers.Config{}) a := &agent{ - tp: tp, - da: NewDirectDataAccess(ds, ls, mq), + da: da, driver: driver, hot: make(map[string]chan slot), resources: NewResourceTracker(), @@ -160,16 +144,17 @@ func New(ds models.Datastore, ls models.LogStore, mq models.MessageQueue, tp Age promHandler: promhttp.Handler(), } - switch tp { - case AgentTypeAPI: - // Don't start dequeuing - default: - go a.asyncDequeue() // safe shutdown can nanny this fine - } + // TODO assert that agent doesn't get started for API nodes up above ? + go a.asyncDequeue() // safe shutdown can nanny this fine return a } +// TODO shuffle this around somewhere else (maybe) +func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { + return a.da.Enqueue(ctx, call) +} + func (a *agent) Close() error { select { case <-a.shutdown: @@ -191,10 +176,6 @@ func transformTimeout(e error, isRetriable bool) error { } func (a *agent) Submit(callI Call) error { - if a.tp == AgentTypeAPI { - return errors.New("API agent cannot execute calls") - } - a.wg.Add(1) defer a.wg.Done() diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 3574edc59..b075bb141 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -49,7 +49,7 @@ func TestCallConfigurationRequest(t *testing.T) { }, nil, ) - a := New(ds, ds, new(mqs.Mock), AgentTypeFull) + a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) defer a.Close() w := httptest.NewRecorder() @@ -247,7 +247,7 @@ func TestCallConfigurationModel(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(ds, ds, new(mqs.Mock), AgentTypeFull) + a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) defer a.Close() callI, err := a.GetCall(FromModel(cm)) @@ -353,7 +353,7 @@ func TestSubmitError(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(ds, ds, new(mqs.Mock), AgentTypeFull) + a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) defer a.Close() callI, err := a.GetCall(FromModel(cm)) diff --git a/api/agent/async.go b/api/agent/async.go index 267582e94..f160db83b 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" ) @@ -11,6 +12,10 @@ func (a *agent) asyncDequeue() { a.wg.Add(1) defer a.wg.Done() // we can treat this thread like one big task and get safe shutdown fo free + // this is just so we can hang up the dequeue request if we get shut down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { select { case <-a.shutdown: @@ -22,42 +27,64 @@ func (a *agent) asyncDequeue() { // out of RAM so.. } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // TODO ??? - model, err := a.da.Dequeue(ctx) - cancel() - if err != nil || model == nil { - if err != nil { - logrus.WithError(err).Error("error fetching queued calls") + // we think we can get a cookie now, so go get a cookie + select { + case <-a.shutdown: + return + case model, ok := <-a.asyncChew(ctx): + if ok { + a.wg.Add(1) // need to add 1 in this thread to ensure safe shutdown + go func(model *models.Call) { + a.asyncRun(model) + a.wg.Done() // can shed it after this is done, Submit will add 1 too but it's fine + }(model) } - time.Sleep(1 * time.Second) // backoff a little - continue } - - // TODO output / logger should be here too... - - a.wg.Add(1) // need to add 1 in this thread to ensure safe shutdown - go func() { - defer a.wg.Done() // can shed it after this is done, Submit will add 1 too but it's fine - - call, err := a.GetCall(FromModel(model)) - if err != nil { - logrus.WithError(err).Error("error getting async call") - return - } - - // TODO if the task is cold and doesn't require reading STDIN, it could - // run but we may not listen for output since the task timed out. these - // are at least once semantics, which is really preferable to at most - // once, so let's do it for now - - err = a.Submit(call) - if err != nil { - // NOTE: these could be errors / timeouts from the call that we're - // logging here (i.e. not our fault), but it's likely better to log - // these than suppress them so... - id := call.Model().ID - logrus.WithFields(logrus.Fields{"id": id}).WithError(err).Error("error running async call") - } - }() + } +} + +func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call { + ch := make(chan *models.Call, 1) + + go func() { + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + call, err := a.da.Dequeue(ctx) + if call != nil { + ch <- call + } else { // call is nil / error + if err != nil && err != context.DeadlineExceeded { + logrus.WithError(err).Error("error fetching queued calls") + } + // queue may be empty / unavailable + time.Sleep(1 * time.Second) // backoff a little before sending no cookie message + close(ch) + } + }() + + return ch +} + +func (a *agent) asyncRun(model *models.Call) { + // TODO output / logger should be here too... + call, err := a.GetCall(FromModel(model)) + if err != nil { + logrus.WithError(err).Error("error getting async call") + return + } + + // TODO if the task is cold and doesn't require reading STDIN, it could + // run but we may not listen for output since the task timed out. these + // are at least once semantics, which is really preferable to at most + // once, so let's do it for now + + err = a.Submit(call) + if err != nil { + // NOTE: these could be errors / timeouts from the call that we're + // logging here (i.e. not our fault), but it's likely better to log + // these than suppress them so... + id := call.Model().ID + logrus.WithFields(logrus.Fields{"id": id}).WithError(err).Error("error running async call") } } diff --git a/api/agent/call.go b/api/agent/call.go index 60d1c953b..6a169d854 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -350,6 +350,9 @@ func (c *call) End(ctx context.Context, errIn error) error { // note: Not returning err here since the job could have already finished successfully. } + // NOTE call this after InsertLog or the buffer will get reset + c.stderr.Close() + if err := c.ct.fireAfterCall(ctx, c.Model()); err != nil { return fmt.Errorf("AfterCall: %v", err) } diff --git a/api/agent/data_access.go b/api/agent/data_access.go index 308ccc66f..564f1ef4f 100644 --- a/api/agent/data_access.go +++ b/api/agent/data_access.go @@ -2,9 +2,13 @@ package agent import ( "context" - "github.com/fnproject/fn/api/common" - "github.com/fnproject/fn/api/models" "io" + "time" + + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/common/singleflight" + "github.com/fnproject/fn/api/models" + "github.com/patrickmn/go-cache" ) // DataAccess abstracts the datastore and message queue operations done by the @@ -19,7 +23,7 @@ type DataAccess interface { GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) // Enqueue will add a Call to the queue (ultimately forwards to mq.Push). - Enqueue(ctx context.Context, mCall *models.Call) (*models.Call, error) + Enqueue(ctx context.Context, mCall *models.Call) error // Dequeue will query the queue for the next available Call that can be run // by this Agent, and reserve it (ultimately forwards to mq.Reserve). @@ -31,7 +35,71 @@ type DataAccess interface { // Finish will notify the system that the Call has been processed, and // fulfill the reservation in the queue if the call came from a queue. - Finish(ctx context.Context, mCall *models.Call, stderr io.ReadWriteCloser, async bool) error + Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error +} + +// CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute. +type CachedDataAccess struct { + DataAccess + + cache *cache.Cache + singleflight singleflight.SingleFlight +} + +func NewCachedDataAccess(da DataAccess) DataAccess { + cda := &CachedDataAccess{ + DataAccess: da, + cache: cache.New(5*time.Second, 1*time.Minute), + } + return cda +} + +func routeCacheKey(appname, path string) string { + return "r:" + appname + "\x00" + path +} + +func appCacheKey(appname string) string { + return "a:" + appname +} + +func (da *CachedDataAccess) GetApp(ctx context.Context, appName string) (*models.App, error) { + key := appCacheKey(appName) + app, ok := da.cache.Get(key) + if ok { + return app.(*models.App), nil + } + + resp, err := da.singleflight.Do(key, + func() (interface{}, error) { + return da.DataAccess.GetApp(ctx, appName) + }) + + if err != nil { + return nil, err + } + app = resp.(*models.App) + da.cache.Set(key, app, cache.DefaultExpiration) + return app.(*models.App), nil +} + +func (da *CachedDataAccess) GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) { + key := routeCacheKey(appName, routePath) + r, ok := da.cache.Get(key) + if ok { + return r.(*models.Route), nil + } + + resp, err := da.singleflight.Do(key, + func() (interface{}, error) { + return da.DataAccess.GetRoute(ctx, appName, routePath) + }) + + if err != nil { + return nil, err + } + r = resp.(*models.Route) + da.cache.Set(key, r, cache.DefaultExpiration) + return r.(*models.Route), nil } type directDataAccess struct { @@ -57,8 +125,9 @@ func (da *directDataAccess) GetRoute(ctx context.Context, appName string, routeP return da.ds.GetRoute(ctx, appName, routePath) } -func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) (*models.Call, error) { - return da.mq.Push(ctx, mCall) +func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) error { + _, err := da.mq.Push(ctx, mCall) + return err // TODO: Insert a call in the datastore with the 'queued' state } @@ -77,7 +146,7 @@ func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error return da.mq.Delete(ctx, mCall) } -func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.ReadWriteCloser, async bool) error { +func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error { // this means that we could potentially store an error / timeout status for a // call that ran successfully [by a user's perspective] // TODO: this should be update, really @@ -90,8 +159,6 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde common.Logger(ctx).WithError(err).Error("error uploading log") // note: Not returning err here since the job could have already finished successfully. } - // NOTE call this after InsertLog or the buffer will get reset - stderr.Close() if async { // XXX (reed): delete MQ message, eventually diff --git a/clients/hybrid/client.go b/api/agent/hybrid/client.go similarity index 81% rename from clients/hybrid/client.go rename to api/agent/hybrid/client.go index 1677288ac..b54d9918e 100644 --- a/clients/hybrid/client.go +++ b/api/agent/hybrid/client.go @@ -14,30 +14,19 @@ import ( "strings" "time" + "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" opentracing "github.com/opentracing/opentracing-go" ) -type Client interface { - Enqueue(context.Context, *models.Call) error - Dequeue(context.Context) ([]*models.Call, error) - Start(context.Context, *models.Call) error - Finish(context.Context, *models.Call, io.Reader) error - - // TODO we could/should make GetAppAndRoute endpoint? saves a round trip... - GetApp(ctx context.Context, appName string) (*models.App, error) - GetRoute(ctx context.Context, appName, route string) (*models.Route, error) -} - -var _ Client = new(client) - +// client implements agent.DataAccess type client struct { base string http *http.Client } -func New(u string) (Client, error) { +func NewClient(u string) (agent.DataAccess, error) { uri, err := url.Parse(u) if err != nil { return nil, err @@ -84,7 +73,7 @@ func (cl *client) Enqueue(ctx context.Context, c *models.Call) error { return err } -func (cl *client) Dequeue(ctx context.Context) ([]*models.Call, error) { +func (cl *client) Dequeue(ctx context.Context) (*models.Call, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_dequeue") defer span.Finish() @@ -92,7 +81,10 @@ func (cl *client) Dequeue(ctx context.Context) ([]*models.Call, error) { C []*models.Call `json:"calls"` } err := cl.do(ctx, nil, &c, "GET", "runner", "async") - return c.C, err + if len(c.C) > 0 { + return c.C[0], nil + } + return nil, err } func (cl *client) Start(ctx context.Context, c *models.Call) error { @@ -103,7 +95,7 @@ func (cl *client) Start(ctx context.Context, c *models.Call) error { return err } -func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader) error { +func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader, async bool) error { span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_end") defer span.Finish() @@ -120,6 +112,7 @@ func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader) error L: b.String(), } + // TODO add async bit to query params or body err = cl.do(ctx, bod, nil, "POST", "runner", "finish") return err } @@ -128,18 +121,23 @@ func (cl *client) GetApp(ctx context.Context, appName string) (*models.App, erro span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_app") defer span.Finish() - var app models.App - err := cl.do(ctx, nil, &app, "GET", "apps", appName) - return &app, err + var a struct { + A models.App `json:"app"` + } + err := cl.do(ctx, nil, &a, "GET", "apps", appName) + return &a.A, err } func (cl *client) GetRoute(ctx context.Context, appName, route string) (*models.Route, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_route") defer span.Finish() - var r models.Route - err := cl.do(ctx, nil, &r, "GET", "apps", appName, "routes", route) - return &r, err + // TODO trim prefix is pretty odd here eh? + var r struct { + R models.Route `json:"route"` + } + err := cl.do(ctx, nil, &r, "GET", "apps", appName, "routes", strings.TrimPrefix(route, "/")) + return &r.R, err } type httpErr struct { @@ -151,6 +149,7 @@ func (cl *client) do(ctx context.Context, request, result interface{}, method st // TODO determine policy (should we count to infinity?) var b common.Backoff + var err error for i := 0; i < 5; i++ { select { case <-ctx.Done(): @@ -159,24 +158,26 @@ func (cl *client) do(ctx context.Context, request, result interface{}, method st } // TODO this isn't re-using buffers very efficiently, but retries should be rare... - err := cl.once(ctx, request, result, method, url...) + err = cl.once(ctx, request, result, method, url...) switch err := err.(type) { case nil: return err - case httpErr: + case *httpErr: if err.code < 500 { return err } - common.Logger(ctx).WithError(err).Error("error from API server, retrying") // retry 500s... default: // this error wasn't from us [most likely], probably a conn refused/timeout, just retry it out } + common.Logger(ctx).WithError(err).Error("error from API server, retrying") + b.Sleep(ctx) } - return context.DeadlineExceeded // basically, right? + // return last error + return err } func (cl *client) once(ctx context.Context, request, result interface{}, method string, url ...string) error { @@ -226,7 +227,7 @@ func (cl *client) once(ctx context.Context, request, result interface{}, method } if result != nil { - err := json.NewDecoder(resp.Body).Decode(result) + err := json.NewDecoder(resp.Body).Decode(&result) if err != nil { return err } diff --git a/api/datastore/cache/cache.go b/api/datastore/cache/cache.go deleted file mode 100644 index e951c1161..000000000 --- a/api/datastore/cache/cache.go +++ /dev/null @@ -1,71 +0,0 @@ -package cache - -import ( - "context" - "time" - - "github.com/fnproject/fn/api/common/singleflight" - "github.com/fnproject/fn/api/models" - "github.com/patrickmn/go-cache" -) - -type cacheDB struct { - models.Datastore - - cache *cache.Cache - singleflight singleflight.SingleFlight // singleflight assists Datastore -} - -// Wrap implements models.Datastore by wrapping an existing datastore and -// adding caching around certain methods. At present, GetApp and GetRoute add -// caching. -func Wrap(ds models.Datastore) models.Datastore { - return &cacheDB{ - Datastore: ds, - cache: cache.New(5*time.Second, 1*time.Minute), // TODO configurable from env - } -} - -func (c *cacheDB) GetApp(ctx context.Context, appName string) (*models.App, error) { - key := appCacheKey(appName) - app, ok := c.cache.Get(key) - if ok { - return app.(*models.App), nil - } - - resp, err := c.singleflight.Do(key, - func() (interface{}, error) { return c.Datastore.GetApp(ctx, appName) }, - ) - if err != nil { - return nil, err - } - app = resp.(*models.App) - c.cache.Set(key, app, cache.DefaultExpiration) - return app.(*models.App), nil -} - -func (c *cacheDB) GetRoute(ctx context.Context, appName, path string) (*models.Route, error) { - key := routeCacheKey(appName, path) - route, ok := c.cache.Get(key) - if ok { - return route.(*models.Route), nil - } - - resp, err := c.singleflight.Do(key, - func() (interface{}, error) { return c.Datastore.GetRoute(ctx, appName, path) }, - ) - if err != nil { - return nil, err - } - route = resp.(*models.Route) - c.cache.Set(key, route, cache.DefaultExpiration) - return route.(*models.Route), nil -} - -func routeCacheKey(appname, path string) string { - return "r:" + appname + "\x00" + path -} - -func appCacheKey(appname string) string { - return "a:" + appname -} diff --git a/api/models/error.go b/api/models/error.go index fda5d1038..5114297e2 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -172,9 +172,17 @@ var ( code: http.StatusNotFound, error: errors.New("Call log not found"), } - ErrSyncCallNotSupported = err{ + ErrInvokeNotSupported = err{ code: http.StatusBadRequest, - error: errors.New("Invoking routes of type sync is not supported on nodes configured as type API"), + error: errors.New("Invoking routes /r/ is not supported on nodes configured as type API"), + } + ErrAPINotSupported = err{ + code: http.StatusBadRequest, + error: errors.New("Invoking api /v1/ requests is not supported on nodes configured as type Runner"), + } + ErrPathNotFound = err{ + code: http.StatusNotFound, + error: errors.New("Path not found"), } ) diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 7bcbfe015..64b89a0a6 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -206,7 +206,7 @@ func (mq *BoltDbMQ) delayCall(job *models.Call) (*models.Call, error) { func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Call) (*models.Call, error) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("Pushed to MQ") + log.Debugln("Pushed to MQ") if job.Delay > 0 { return mq.delayCall(job) @@ -316,7 +316,7 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Call, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("Reserved") + log.Debugln("Reserved") return &job, nil } @@ -326,7 +326,7 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Call, error) { func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Call) error { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - defer log.Println("Deleted") + defer log.Debugln("Deleted") return mq.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(timeoutName(int(*job.Priority))) diff --git a/api/mqs/memory.go b/api/mqs/memory.go index e5577c9f4..ccaf1053c 100644 --- a/api/mqs/memory.go +++ b/api/mqs/memory.go @@ -113,7 +113,7 @@ func (ji *callItem) Less(than btree.Item) bool { func (mq *MemoryMQ) Push(ctx context.Context, job *models.Call) (*models.Call, error) { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("Pushed to MQ") + log.Debugln("Pushed to MQ") // It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation. // job.MsgId = randSeq(20) @@ -174,7 +174,7 @@ func (mq *MemoryMQ) Reserve(ctx context.Context) (*models.Call, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("Reserved") + log.Debugln("Reserved") return job, mq.pushTimeout(job) } @@ -189,6 +189,6 @@ func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Call) error { } delete(mq.Timeouts, job.ID) - log.Println("Deleted") + log.Debugln("Deleted") return nil } diff --git a/api/mqs/redis.go b/api/mqs/redis.go index f950fbff9..80a371ef2 100644 --- a/api/mqs/redis.go +++ b/api/mqs/redis.go @@ -211,7 +211,7 @@ func (mq *RedisMQ) delayCall(conn redis.Conn, job *models.Call) (*models.Call, e func (mq *RedisMQ) Push(ctx context.Context, job *models.Call) (*models.Call, error) { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - defer log.Println("Pushed to MQ") + defer log.Debugln("Pushed to MQ") conn := mq.pool.Get() defer conn.Close() @@ -281,14 +281,14 @@ func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Call, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("Reserved") + log.Debugln("Reserved") return &job, nil } func (mq *RedisMQ) Delete(ctx context.Context, job *models.Call) error { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - defer log.Println("Deleted") + defer log.Debugln("Deleted") conn := mq.pool.Get() defer conn.Close() diff --git a/api/server/hybrid.go b/api/server/hybrid.go index 47a620ae6..612d140b4 100644 --- a/api/server/hybrid.go +++ b/api/server/hybrid.go @@ -8,7 +8,6 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/gin-gonic/gin" - "github.com/go-openapi/strfmt" ) func (s *Server) handleRunnerEnqueue(c *gin.Context) { @@ -40,11 +39,12 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) { return } + // TODO once update call is hooked up, do this // at this point, the message is on the queue and could be picked up by a // runner and enter into 'running' state before we can insert it in the db as // 'queued' state. we can ignore any error inserting into db here and Start // will ensure the call exists in the db in 'running' state there. - s.Datastore.InsertCall(ctx, &call) + // s.Datastore.InsertCall(ctx, &call) c.JSON(200, struct { M string `json:"msg"` @@ -55,25 +55,23 @@ func (s *Server) handleRunnerDequeue(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second) defer cancel() - // TODO finalize (return whole call?) and move - type m struct { - AppName string `json:"app_name"` - Path string `json:"path"` - } - type resp struct { - M []m `json:"calls"` + var resp struct { + M []*models.Call `json:"calls"` } + var m [1]*models.Call // avoid alloc + resp.M = m[:0] // long poll until ctx expires / we find a message var b common.Backoff for { - msg, err := s.MQ.Reserve(ctx) + call, err := s.MQ.Reserve(ctx) if err != nil { handleErrorResponse(c, err) return } - if msg != nil { - c.JSON(200, resp{M: []m{{AppName: msg.AppName, Path: msg.Path}}}) + if call != nil { + resp.M = append(resp.M, call) + c.JSON(200, resp) return } @@ -81,26 +79,25 @@ func (s *Server) handleRunnerDequeue(c *gin.Context) { select { case <-ctx.Done(): - c.JSON(200, resp{M: make([]m, 0)}) + c.JSON(200, resp) // TODO assert this return `[]` & not 'nil' return - default: + default: // poll until we find a cookie } } } func (s *Server) handleRunnerStart(c *gin.Context) { - var body struct { - AppName string `json:"app_name"` - CallID string `json:"id"` - } + ctx := c.Request.Context() - // TODO just take a whole call here? maybe the runner wants to mark it as error? - err := c.BindJSON(&body) + var call models.Call + err := c.BindJSON(&call) if err != nil { handleErrorResponse(c, models.ErrInvalidJSON) return } + // TODO validate call? + // TODO hook up update. we really just want it to set status to running iff // status=queued, but this must be in a txn in Update with behavior: // queued->running @@ -112,21 +109,21 @@ func (s *Server) handleRunnerStart(c *gin.Context) { // there is nuance for running->error as in theory it could be the correct machine retrying // and we risk not running a task [ever]. needs further thought, but marking as error will // cover our tracks since if the db is down we can't run anything anyway (treat as such). - var call models.Call - call.AppName = body.AppName - call.ID = body.CallID - call.Status = "running" - call.StartedAt = strfmt.DateTime(time.Now()) + // TODO do this client side and validate it here? + //call.Status = "running" + //call.StartedAt = strfmt.DateTime(time.Now()) //err := s.Datastore.UpdateCall(c.Request.Context(), &call) //if err != nil { //if err == InvalidStatusChange { //// TODO we could either let UpdateCall handle setting to error or do it //// here explicitly - //if err := s.MQ.Delete(&call); err != nil { // TODO change this to take some string(s), not a whole call - //logrus.WithFields(logrus.Fields{"id": call.Id}).WithError(err).Error("error deleting mq message") - //// just log this one, return error from update call - //} + // TODO change this to only delete message if the status change fails b/c it already ran + // after messaging semantics change + if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call + handleErrorResponse(c, err) + return + } //} //handleErrorResponse(c, err) //return @@ -166,13 +163,14 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { // note: Not returning err here since the job could have already finished successfully. } + // TODO open this up after we change messaging semantics. // TODO we don't know whether a call is async or sync. we likely need an additional // arg in params for a message id and can detect based on this. for now, delete messages // for sync and async even though sync doesn't have any (ignore error) - if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call - common.Logger(ctx).WithError(err).Error("error deleting mq msg") - // note: Not returning err here since the job could have already finished successfully. - } + //if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call + //common.Logger(ctx).WithError(err).Error("error deleting mq msg") + //// note: Not returning err here since the job could have already finished successfully. + //} c.JSON(200, struct { M string `json:"msg"` diff --git a/api/server/runner.go b/api/server/runner.go index 3efbb5690..f40b192be 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -15,22 +15,11 @@ import ( "github.com/sirupsen/logrus" ) -type runnerResponse struct { - RequestID string `json:"request_id,omitempty"` - Error *models.ErrorBody `json:"error,omitempty"` -} - // handleFunctionCall executes the function. // Requires the following in the context: // * "app_name" // * "path" func (s *Server) handleFunctionCall(c *gin.Context) { - // @treeder: Is this necessary? An app could have this prefix too. Leaving here for review. - // if strings.HasPrefix(c.Request.URL.Path, "/v1") { - // c.Status(http.StatusNotFound) - // return - // } - ctx := c.Request.Context() var p string r := ctx.Value(api.Path) @@ -95,8 +84,8 @@ func (s *Server) serve(c *gin.Context, appName, path string) { } model.Payload = buf.String() - // TODO we should probably add this to the datastore too. consider the plumber! - _, err = s.MQ.Push(c.Request.Context(), model) + // TODO idk where to put this, but agent is all runner really has... + err = s.Agent.Enqueue(c.Request.Context(), model) if err != nil { handleErrorResponse(c, err) return @@ -106,24 +95,19 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - // Don't serve sync requests from API nodes - if s.nodeType != ServerTypeAPI { - err = s.Agent.Submit(call) - if err != nil { - // NOTE if they cancel the request then it will stop the call (kind of cool), - // we could filter that error out here too as right now it yells a little - if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout { - // TODO maneuver - // add this, since it means that start may not have been called [and it's relevant] - c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) - } - // NOTE: if the task wrote the headers already then this will fail to write - // a 5xx (and log about it to us) -- that's fine (nice, even!) - handleErrorResponse(c, err) - return + err = s.Agent.Submit(call) + if err != nil { + // NOTE if they cancel the request then it will stop the call (kind of cool), + // we could filter that error out here too as right now it yells a little + if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout { + // TODO maneuver + // add this, since it means that start may not have been called [and it's relevant] + c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) } - } else { - handleErrorResponse(c, models.ErrSyncCallNotSupported) + // NOTE: if the task wrote the headers already then this will fail to write + // a 5xx (and log about it to us) -- that's fine (nice, even!) + handleErrorResponse(c, err) + return } // TODO plumb FXLB-WAIT somehow (api?) diff --git a/api/server/runner_test.go b/api/server/runner_test.go index a58a620f6..a5cd6dd66 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -27,7 +27,7 @@ func testRunner(t *testing.T, args ...interface{}) (agent.Agent, context.CancelF mq = arg } } - r := agent.New(ds, ds, mq, agent.AgentTypeFull) + r := agent.New(agent.NewDirectDataAccess(ds, ds, mq)) return r, func() { r.Close() } } diff --git a/api/server/server.go b/api/server/server.go index 7dc92a4b8..247267772 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -4,18 +4,18 @@ import ( "bytes" "context" "encoding/base64" - "errors" "fmt" "net" "net/http" "os" "path" "strconv" + "strings" "syscall" "github.com/fnproject/fn/api/agent" + "github.com/fnproject/fn/api/agent/hybrid" "github.com/fnproject/fn/api/datastore" - "github.com/fnproject/fn/api/datastore/cache" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -37,6 +37,7 @@ const ( EnvMQURL = "FN_MQ_URL" EnvDBURL = "FN_DB_URL" EnvLOGDBURL = "FN_LOGSTORE_URL" + EnvRunnerURL = "FN_RUNNER_URL" EnvNodeType = "FN_NODE_TYPE" EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port" EnvAPICORS = "FN_API_CORS" @@ -80,122 +81,182 @@ func nodeTypeFromString(value string) ServerNodeType { // NewFromEnv creates a new Functions server based on env vars. func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server { - - return NewFromURLs(ctx, - getEnv(EnvDBURL, fmt.Sprintf("sqlite3://%s/data/fn.db", currDir)), - getEnv(EnvMQURL, fmt.Sprintf("bolt://%s/data/fn.mq", currDir)), - getEnv(EnvLOGDBURL, ""), - nodeTypeFromString(getEnv(EnvNodeType, "")), - opts..., - ) + var defaultDB, defaultMQ string + nodeType := nodeTypeFromString(getEnv(EnvNodeType, "")) // default to full + if nodeType != ServerTypeRunner { + // only want to activate these for full and api nodes + defaultDB = fmt.Sprintf("sqlite3://%s/data/fn.db", currDir) + defaultMQ = fmt.Sprintf("bolt://%s/data/fn.mq", currDir) + } + opts = append(opts, WithZipkin(getEnv(EnvZipkinURL, ""))) + opts = append(opts, WithDBURL(getEnv(EnvDBURL, defaultDB))) + opts = append(opts, WithMQURL(getEnv(EnvMQURL, defaultMQ))) + opts = append(opts, WithLogURL(getEnv(EnvLOGDBURL, ""))) + opts = append(opts, WithRunnerURL(getEnv(EnvRunnerURL, ""))) + opts = append(opts, WithType(nodeType)) + return New(ctx, opts...) } -// Create a new server based on the string URLs for each service. -// Sits in the middle of NewFromEnv and New -func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, nodeType ServerNodeType, opts ...ServerOption) *Server { - ds, err := datastore.New(dbURL) - if err != nil { - logrus.WithError(err).Fatalln("Error initializing datastore.") +func WithDBURL(dbURL string) ServerOption { + if dbURL != "" { + ds, err := datastore.New(dbURL) + if err != nil { + logrus.WithError(err).Fatalln("Error initializing datastore.") + } + return WithDatastore(ds) } + return noop +} - mq, err := mqs.New(mqURL) - if err != nil { - logrus.WithError(err).Fatal("Error initializing message queue.") +func WithMQURL(mqURL string) ServerOption { + if mqURL != "" { + mq, err := mqs.New(mqURL) + if err != nil { + logrus.WithError(err).Fatal("Error initializing message queue.") + } + return WithMQ(mq) } + return noop +} - var logDB models.LogStore = ds - if ldb := logstoreURL; ldb != "" && ldb != dbURL { - logDB, err = logs.New(logstoreURL) +func WithLogURL(logstoreURL string) ServerOption { + if ldb := logstoreURL; ldb != "" { + logDB, err := logs.New(logstoreURL) if err != nil { logrus.WithError(err).Fatal("Error initializing logs store.") } + return WithLogstore(logDB) } - - return New(ctx, ds, mq, logDB, nodeType, opts...) + return noop } -// New creates a new Functions server with the passed in datastore, message queue and API URL -func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls models.LogStore, nodeType ServerNodeType, opts ...ServerOption) *Server { - setTracer() - - var tp agent.AgentNodeType - switch nodeType { - case ServerTypeAPI: - tp = agent.AgentTypeAPI - case ServerTypeRunner: - tp = agent.AgentTypeRunner - default: - tp = agent.AgentTypeFull +func WithRunnerURL(runnerURL string) ServerOption { + if runnerURL != "" { + cl, err := hybrid.NewClient(runnerURL) + if err != nil { + logrus.WithError(err).Fatal("Error initializing runner API client.") + } + return WithAgent(agent.New(agent.NewCachedDataAccess(cl))) } + return noop +} +func noop(s *Server) {} + +func WithType(t ServerNodeType) ServerOption { + return func(s *Server) { s.nodeType = t } +} + +func WithDatastore(ds models.Datastore) ServerOption { + return func(s *Server) { s.Datastore = ds } +} + +func WithMQ(mq models.MessageQueue) ServerOption { + return func(s *Server) { s.MQ = mq } +} + +func WithLogstore(ls models.LogStore) ServerOption { + return func(s *Server) { s.LogDB = ls } +} + +func WithAgent(agent agent.Agent) ServerOption { + return func(s *Server) { s.Agent = agent } +} + +// New creates a new Functions server with the opts given. For convenience, users may +// prefer to use NewFromEnv but New is more flexible if needed. +func New(ctx context.Context, opts ...ServerOption) *Server { s := &Server{ - Agent: agent.New(cache.Wrap(ds), ls, mq, tp), // only add datastore caching to agent - Router: gin.New(), - Datastore: ds, - MQ: mq, - LogDB: ls, - nodeType: nodeType, + Router: gin.New(), + // Almost everything else is configured through opts (see NewFromEnv for ex.) or below } - // NOTE: testServer() in tests doesn't use these - setMachineID() - s.Router.Use(loggerWrap, traceWrap, panicWrap) - optionalCorsWrap(s.Router) - s.bindHandlers(ctx) - for _, opt := range opts { if opt == nil { continue } opt(s) } + + if s.LogDB == nil { // TODO seems weird? + s.LogDB = s.Datastore + } + + // TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side? + + switch s.nodeType { + case ServerTypeAPI: + s.Agent = nil + case ServerTypeRunner: + if s.Agent == nil { + logrus.Fatal("No agent started for a runner node, add FN_RUNNER_URL to configuration.") + } + default: + s.nodeType = ServerTypeFull + if s.Datastore == nil || s.LogDB == nil || s.MQ == nil { + logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") + } + + // TODO force caller to use WithAgent option ? + // TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it... + s.Agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.Datastore, s.LogDB, s.MQ))) + } + + setMachineID() + s.Router.Use(loggerWrap, traceWrap, panicWrap) // TODO should be opts + optionalCorsWrap(s.Router) // TODO should be an opt + s.bindHandlers(ctx) return s } -func setTracer() { - var ( - debugMode = false - serviceName = "fnserver" - serviceHostPort = "localhost:8080" // meh - zipkinHTTPEndpoint = getEnv(EnvZipkinURL, "") - // ex: "http://zipkin:9411/api/v1/spans" - ) +// TODO this doesn't need to be an option necessarily since it's just setting +// globals but it makes things uniform. change if you've a better idear +func WithZipkin(zipkinURL string) ServerOption { + return func(s *Server) { + var ( + debugMode = false + serviceName = "fnserver" + serviceHostPort = "localhost:8080" // meh + zipkinHTTPEndpoint = zipkinURL + // ex: "http://zipkin:9411/api/v1/spans" + ) - var collector zipkintracer.Collector + var collector zipkintracer.Collector - // custom Zipkin collector to send tracing spans to Prometheus - promCollector, promErr := NewPrometheusCollector() - if promErr != nil { - logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector") - } - - logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil }) - - if zipkinHTTPEndpoint != "" { - // Custom PrometheusCollector and Zipkin HTTPCollector - httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger)) - if zipErr != nil { - logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector") + // custom Zipkin collector to send tracing spans to Prometheus + promCollector, promErr := NewPrometheusCollector() + if promErr != nil { + logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector") } - collector = zipkintracer.MultiCollector{httpCollector, promCollector} - } else { - // Custom PrometheusCollector only - collector = promCollector + + logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil }) + + if zipkinHTTPEndpoint != "" { + // Custom PrometheusCollector and Zipkin HTTPCollector + httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger)) + if zipErr != nil { + logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector") + } + collector = zipkintracer.MultiCollector{httpCollector, promCollector} + } else { + // Custom PrometheusCollector only + collector = promCollector + } + + ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName), + zipkintracer.ClientServerSameSpan(true), + zipkintracer.TraceID128Bit(true), + ) + if err != nil { + logrus.WithError(err).Fatalln("couldn't start tracer") + } + + // wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus + fntracer := NewFnTracer(ziptracer) + + opentracing.SetGlobalTracer(fntracer) + logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer") } - - ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName), - zipkintracer.ClientServerSameSpan(true), - zipkintracer.TraceID128Bit(true), - ) - if err != nil { - logrus.WithError(err).Fatalln("couldn't start tracer") - } - - // wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus - fntracer := NewFnTracer(ziptracer) - - opentracing.SetGlobalTracer(fntracer) - logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer") } func setMachineID() { @@ -284,7 +345,9 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { logrus.WithError(err).Error("server shutdown error") } - s.Agent.Close() // after we stop taking requests, wait for all tasks to finish + if s.Agent != nil { + s.Agent.Close() // after we stop taking requests, wait for all tasks to finish + } } func (s *Server) bindHandlers(ctx context.Context) { @@ -335,7 +398,7 @@ func (s *Server) bindHandlers(ctx context.Context) { } } - { + if s.nodeType != ServerTypeAPI { runner := engine.Group("/r") runner.Use(appWrap) runner.Any("/:app", s.handleFunctionCall) @@ -343,8 +406,17 @@ func (s *Server) bindHandlers(ctx context.Context) { } engine.NoRoute(func(c *gin.Context) { - logrus.Debugln("not found", c.Request.URL.Path) - c.JSON(http.StatusNotFound, simpleError(errors.New("Path not found"))) + var err error + switch { + case s.nodeType == ServerTypeAPI && strings.HasPrefix(c.Request.URL.Path, "/r/"): + err = models.ErrInvokeNotSupported + case s.nodeType == ServerTypeRunner && strings.HasPrefix(c.Request.URL.Path, "/v1/"): + err = models.ErrAPINotSupported + default: + var e models.APIError = models.ErrPathNotFound + err = models.NewAPIError(e.Code(), fmt.Errorf("%v: %s", e.Error(), c.Request.URL.Path)) + } + handleErrorResponse(c, err) }) } diff --git a/api/server/server_test.go b/api/server/server_test.go index ea53f3886..89d930d56 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -24,23 +24,13 @@ import ( var tmpDatastoreTests = "/tmp/func_test_datastore.db" func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent, nodeType ServerNodeType) *Server { - ctx := context.Background() - - s := &Server{ - Agent: rnr, - Router: gin.New(), - Datastore: ds, - LogDB: logDB, - MQ: mq, - nodeType: nodeType, - } - - r := s.Router - r.Use(gin.Logger()) - - s.Router.Use(loggerWrap) - s.bindHandlers(ctx) - return s + return New(context.Background(), + WithDatastore(ds), + WithMQ(mq), + WithLogstore(logDB), + WithAgent(rnr), + WithType(nodeType), + ) } func routerRequest(t *testing.T, router *gin.Engine, method, path string, body io.Reader) (*http.Request, *httptest.ResponseRecorder) { @@ -181,14 +171,14 @@ func TestRunnerNode(t *testing.T) { {"execute async route succeeds", "POST", "/r/myapp/myasyncroute", `{ "name": "Teste" }`, http.StatusAccepted, 1}, // All other API functions should not be available on runner nodes - {"create app not found", "POST", "/v1/apps", `{ "app": { "name": "myapp" } }`, http.StatusNotFound, 0}, - {"list apps not found", "GET", "/v1/apps", ``, http.StatusNotFound, 0}, - {"get app not found", "GET", "/v1/apps/myapp", ``, http.StatusNotFound, 0}, + {"create app not found", "POST", "/v1/apps", `{ "app": { "name": "myapp" } }`, http.StatusBadRequest, 0}, + {"list apps not found", "GET", "/v1/apps", ``, http.StatusBadRequest, 0}, + {"get app not found", "GET", "/v1/apps/myapp", ``, http.StatusBadRequest, 0}, - {"add route not found", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myroute", "path": "/myroute", "image": "fnproject/hello", "type": "sync" } }`, http.StatusNotFound, 0}, - {"get route not found", "GET", "/v1/apps/myapp/routes/myroute", ``, http.StatusNotFound, 0}, - {"get all routes not found", "GET", "/v1/apps/myapp/routes", ``, http.StatusNotFound, 0}, - {"delete app not found", "DELETE", "/v1/apps/myapp", ``, http.StatusNotFound, 0}, + {"add route not found", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myroute", "path": "/myroute", "image": "fnproject/hello", "type": "sync" } }`, http.StatusBadRequest, 0}, + {"get route not found", "GET", "/v1/apps/myapp/routes/myroute", ``, http.StatusBadRequest, 0}, + {"get all routes not found", "GET", "/v1/apps/myapp/routes", ``, http.StatusBadRequest, 0}, + {"delete app not found", "DELETE", "/v1/apps/myapp", ``, http.StatusBadRequest, 0}, } { _, rec := routerRequest(t, srv.Router, test.method, test.path, bytes.NewBuffer([]byte(test.body))) @@ -231,12 +221,10 @@ func TestApiNode(t *testing.T) { {"get myroute2", "GET", "/v1/apps/myapp/routes/myroute2", ``, http.StatusOK, 0}, {"get all routes", "GET", "/v1/apps/myapp/routes", ``, http.StatusOK, 0}, - // Don't support calling sync + // Don't support calling sync or async {"execute myroute", "POST", "/r/myapp/myroute", `{ "name": "Teste" }`, http.StatusBadRequest, 1}, {"execute myroute2", "POST", "/r/myapp/myroute2", `{ "name": "Teste" }`, http.StatusBadRequest, 2}, - - // Do support calling async - {"execute myasyncroute", "POST", "/r/myapp/myasyncroute", `{ "name": "Teste" }`, http.StatusAccepted, 1}, + {"execute myasyncroute", "POST", "/r/myapp/myasyncroute", `{ "name": "Teste" }`, http.StatusBadRequest, 1}, {"get myroute2", "GET", "/v1/apps/myapp/routes/myroute2", ``, http.StatusOK, 2}, {"delete myroute", "DELETE", "/v1/apps/myapp/routes/myroute", ``, http.StatusOK, 1}, diff --git a/test.sh b/test.sh index 449bb2644..4711f733f 100755 --- a/test.sh +++ b/test.sh @@ -48,7 +48,7 @@ export MYSQL_URL="mysql://root:root@tcp(${MYSQL_HOST}:${MYSQL_PORT})/funcs" export MINIO_URL="s3://admin:password@${MINIO_HOST}:${MINIO_PORT}/us-east-1/fnlogs" go test -v $(go list ./... | grep -v vendor | grep -v examples | grep -v test/fn-api-tests) -go vet -v $(go list ./... | grep -v vendor) +go vet $(go list ./... | grep -v vendor) docker rm --force func-postgres-test docker rm --force func-mysql-test docker rm --force func-minio-test diff --git a/test/fn-api-tests/utils.go b/test/fn-api-tests/utils.go index b75c3e8ea..2647facf4 100644 --- a/test/fn-api-tests/utils.go +++ b/test/fn-api-tests/utils.go @@ -71,7 +71,7 @@ func getServerWithCancel() (*server.Server, context.CancelFunc) { dbURL = fmt.Sprintf("sqlite3://%s", tmpDb) } - s = server.NewFromURLs(ctx, dbURL, mqURL, "", server.ServerTypeFull) + s = server.New(ctx, server.WithDBURL(dbURL), server.WithMQURL(mqURL)) go s.Start(ctx) started := false