diff --git a/api/agent/agent.go b/api/agent/agent.go index f366a8a83..06dbe61df 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -115,12 +115,18 @@ type Agent interface { AddCallListener(fnext.CallListener) } +type AgentNodeType int32 + +const ( + AgentTypeFull AgentNodeType = iota + AgentTypeAPI + AgentTypeRunner +) + type agent struct { - // TODO maybe these should be on GetCall? idk. was getting bloated. - mq models.MessageQueue - ds models.Datastore - ls models.LogStore + da DataAccess callListeners []fnext.CallListener + tp AgentNodeType driver drivers.Driver @@ -140,14 +146,13 @@ type agent struct { promHandler http.Handler } -func New(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) Agent { +func New(ds models.Datastore, ls models.LogStore, mq models.MessageQueue, tp AgentNodeType) Agent { // TODO: Create drivers.New(runnerConfig) driver := docker.NewDocker(drivers.Config{}) a := &agent{ - ds: ds, - ls: ls, - mq: mq, + tp: tp, + da: NewDirectDataAccess(ds, ls, mq), driver: driver, hot: make(map[string]chan slot), resources: NewResourceTracker(), @@ -155,7 +160,12 @@ func New(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) Agent promHandler: promhttp.Handler(), } - go a.asyncDequeue() // safe shutdown can nanny this fine + switch tp { + case AgentTypeAPI: + // Don't start dequeuing + default: + go a.asyncDequeue() // safe shutdown can nanny this fine + } return a } @@ -181,6 +191,10 @@ func transformTimeout(e error, isRetriable bool) error { } func (a *agent) Submit(callI Call) error { + if a.tp == AgentTypeAPI { + return errors.New("API agent cannot execute calls") + } + a.wg.Add(1) defer a.wg.Done() diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 0e87bd609..3574edc59 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -49,7 +49,7 @@ func TestCallConfigurationRequest(t *testing.T) { }, nil, ) - a := New(ds, ds, new(mqs.Mock)) + a := New(ds, ds, new(mqs.Mock), AgentTypeFull) defer a.Close() w := httptest.NewRecorder() @@ -247,7 +247,7 @@ func TestCallConfigurationModel(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(ds, ds, new(mqs.Mock)) + a := New(ds, ds, new(mqs.Mock), AgentTypeFull) defer a.Close() callI, err := a.GetCall(FromModel(cm)) @@ -353,7 +353,7 @@ func TestSubmitError(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(ds, ds, new(mqs.Mock)) + a := New(ds, ds, new(mqs.Mock), AgentTypeFull) defer a.Close() callI, err := a.GetCall(FromModel(cm)) diff --git a/api/agent/async.go b/api/agent/async.go index 7ac324da7..267582e94 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -23,7 +23,7 @@ func (a *agent) asyncDequeue() { } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // TODO ??? - model, err := a.mq.Reserve(ctx) + model, err := a.da.Dequeue(ctx) cancel() if err != nil || model == nil { if err != nil { diff --git a/api/agent/call.go b/api/agent/call.go index a2c4c845f..60d1c953b 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -50,12 +50,12 @@ type Params []Param func FromRequest(appName, path string, req *http.Request, params Params) CallOpt { return func(a *agent, c *call) error { - app, err := a.ds.GetApp(req.Context(), appName) + app, err := a.da.GetApp(req.Context(), appName) if err != nil { return err } - route, err := a.ds.GetRoute(req.Context(), appName, path) + route, err := a.da.GetRoute(req.Context(), appName, path) if err != nil { return err } @@ -248,9 +248,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } - c.ds = a.ds - c.ls = a.ls - c.mq = a.mq + c.da = a.da c.ct = a ctx, _ := common.LoggerWithFields(c.req.Context(), @@ -272,9 +270,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { type call struct { *models.Call - ds models.Datastore - ls models.LogStore - mq models.MessageQueue + da DataAccess w io.Writer req *http.Request stderr io.ReadWriteCloser @@ -316,7 +312,7 @@ func (c *call) Start(ctx context.Context) error { // running to avoid running the call twice and potentially mark it as // errored (built in long running task detector, so to speak...) - err := c.mq.Delete(ctx, c.Call) + err := c.da.Start(ctx, c.Model()) if err != nil { return err // let another thread try this } @@ -346,29 +342,14 @@ func (c *call) End(ctx context.Context, errIn error) error { c.Error = errIn.Error() } - if c.Type == models.TypeAsync { - // XXX (reed): delete MQ message, eventually - } - // ensure stats histogram is reasonably bounded c.Call.Stats = drivers.Decimate(240, c.Call.Stats) - // this means that we could potentially store an error / timeout status for a - // call that ran successfully [by a user's perspective] - // TODO: this should be update, really - if err := c.ds.InsertCall(ctx, c.Call); err != nil { - common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + if err := c.da.Finish(ctx, c.Model(), c.stderr, c.Type == models.TypeAsync); err != nil { + common.Logger(ctx).WithError(err).Error("error finalizing call on datastore/mq") // note: Not returning err here since the job could have already finished successfully. } - if err := c.ls.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { - common.Logger(ctx).WithError(err).Error("error uploading log") - // note: Not returning err here since the job could have already finished successfully. - } - - // NOTE call this after InsertLog or the buffer will get reset - c.stderr.Close() - if err := c.ct.fireAfterCall(ctx, c.Model()); err != nil { return fmt.Errorf("AfterCall: %v", err) } diff --git a/api/agent/data_access.go b/api/agent/data_access.go new file mode 100644 index 000000000..308ccc66f --- /dev/null +++ b/api/agent/data_access.go @@ -0,0 +1,102 @@ +package agent + +import ( + "context" + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + "io" +) + +// DataAccess abstracts the datastore and message queue operations done by the +// agent, so that API nodes and runner nodes can work with the same interface +// but actually operate on the data in different ways (by direct access or by +// mediation through an API node). +type DataAccess interface { + // GetApp abstracts querying the datastore for an app. + GetApp(ctx context.Context, appName string) (*models.App, error) + + // GetRoute abstracts querying the datastore for a route within an app. + GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) + + // Enqueue will add a Call to the queue (ultimately forwards to mq.Push). + Enqueue(ctx context.Context, mCall *models.Call) (*models.Call, error) + + // Dequeue will query the queue for the next available Call that can be run + // by this Agent, and reserve it (ultimately forwards to mq.Reserve). + Dequeue(ctx context.Context) (*models.Call, error) + + // Start will attempt to start the provided Call within an appropriate + // context. + Start(ctx context.Context, mCall *models.Call) error + + // Finish will notify the system that the Call has been processed, and + // fulfill the reservation in the queue if the call came from a queue. + Finish(ctx context.Context, mCall *models.Call, stderr io.ReadWriteCloser, async bool) error +} + +type directDataAccess struct { + mq models.MessageQueue + ds models.Datastore + ls models.LogStore +} + +func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess { + da := &directDataAccess{ + mq: mq, + ds: ds, + ls: ls, + } + return da +} + +func (da *directDataAccess) GetApp(ctx context.Context, appName string) (*models.App, error) { + return da.ds.GetApp(ctx, appName) +} + +func (da *directDataAccess) GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) { + return da.ds.GetRoute(ctx, appName, routePath) +} + +func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) (*models.Call, error) { + return da.mq.Push(ctx, mCall) + // TODO: Insert a call in the datastore with the 'queued' state +} + +func (da *directDataAccess) Dequeue(ctx context.Context) (*models.Call, error) { + return da.mq.Reserve(ctx) +} + +func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error { + // TODO Access datastore and try a Compare-And-Swap to set the call to + // 'running'. If it fails, delete the message from the MQ and return an + // error. If it is successful, don't do anything - the message will be + // removed when the call Finish'es. + + // At the moment we don't have the queued/running/finished mechanics so we + // remove the message here. + return da.mq.Delete(ctx, mCall) +} + +func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.ReadWriteCloser, async bool) error { + // this means that we could potentially store an error / timeout status for a + // call that ran successfully [by a user's perspective] + // TODO: this should be update, really + if err := da.ds.InsertCall(ctx, mCall); err != nil { + common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + // note: Not returning err here since the job could have already finished successfully. + } + + if err := da.ls.InsertLog(ctx, mCall.AppName, mCall.ID, stderr); err != nil { + common.Logger(ctx).WithError(err).Error("error uploading log") + // note: Not returning err here since the job could have already finished successfully. + } + // NOTE call this after InsertLog or the buffer will get reset + stderr.Close() + + if async { + // XXX (reed): delete MQ message, eventually + // YYY (hhexo): yes, once we have the queued/running/finished mechanics + // return da.mq.Delete(ctx, mCall) + } + return nil +} diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index 82b24f9bc..a94e9373e 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -113,7 +113,7 @@ func (d *dockerWrap) retry(ctx context.Context, f func() error) error { err := filter(ctx, f()) if common.IsTemporary(err) || isDocker50x(err) { logger.WithError(err).Warn("docker temporary error, retrying") - b.Sleep() + b.Sleep(ctx) span.LogFields(log.String("task", "tmperror.docker")) continue } diff --git a/api/common/backoff.go b/api/common/backoff.go index bd7966734..d10c764eb 100644 --- a/api/common/backoff.go +++ b/api/common/backoff.go @@ -1,6 +1,7 @@ package common import ( + "context" "math" "math/rand" "sync" @@ -15,26 +16,23 @@ func (BoxTime) After(d time.Duration) <-chan time.Time { return time.After(d) } type Backoff int -func (b *Backoff) Sleep() { b.RandomSleep(nil, nil) } - -func (b *Backoff) RandomSleep(rng *rand.Rand, clock Clock) { +func (b *Backoff) Sleep(ctx context.Context) { const ( maxexp = 7 interval = 25 * time.Millisecond ) - if rng == nil { - rng = defaultRNG - } - if clock == nil { - clock = defaultClock - } + rng := defaultRNG + clock := defaultClock // 25-50ms, 50-100ms, 100-200ms, 200-400ms, 400-800ms, 800-1600ms, 1600-3200ms, 3200-6400ms d := time.Duration(math.Pow(2, float64(*b))) * interval d += (d * time.Duration(rng.Float64())) - clock.Sleep(d) + select { + case <-ctx.Done(): + case <-clock.After(d): + } if *b < maxexp { (*b)++ diff --git a/api/datastore/internal/datastoretest/test.go b/api/datastore/internal/datastoretest/test.go index 6d0b20c5c..247f87c8b 100644 --- a/api/datastore/internal/datastoretest/test.go +++ b/api/datastore/internal/datastoretest/test.go @@ -53,6 +53,86 @@ func Test(t *testing.T, dsf func(t *testing.T) models.Datastore) { } }) + t.Run("call-atomic-update", func(t *testing.T) { + ds := dsf(t) + call.ID = id.New().String() + err := ds.InsertCall(ctx, call) + if err != nil { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + newCall := new(models.Call) + *newCall = *call + newCall.Status = "success" + newCall.Error = "" + err = ds.UpdateCall(ctx, call, newCall) + if err != nil { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + dbCall, err := ds.GetCall(ctx, call.AppName, call.ID) + if err != nil { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + if dbCall.ID != newCall.ID { + t.Fatalf("Test GetCall: id mismatch `%v` `%v`", call.ID, newCall.ID) + } + if dbCall.Status != newCall.Status { + t.Fatalf("Test GetCall: status mismatch `%v` `%v`", call.Status, newCall.Status) + } + if dbCall.Error != newCall.Error { + t.Fatalf("Test GetCall: error mismatch `%v` `%v`", call.Error, newCall.Error) + } + if time.Time(dbCall.CreatedAt).Unix() != time.Time(newCall.CreatedAt).Unix() { + t.Fatalf("Test GetCall: created_at mismatch `%v` `%v`", call.CreatedAt, newCall.CreatedAt) + } + if time.Time(dbCall.StartedAt).Unix() != time.Time(newCall.StartedAt).Unix() { + t.Fatalf("Test GetCall: started_at mismatch `%v` `%v`", call.StartedAt, newCall.StartedAt) + } + if time.Time(dbCall.CompletedAt).Unix() != time.Time(newCall.CompletedAt).Unix() { + t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) + } + if dbCall.AppName != newCall.AppName { + t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppName, newCall.AppName) + } + if dbCall.Path != newCall.Path { + t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) + } + }) + + t.Run("call-atomic-update-no-existing-call", func(t *testing.T) { + ds := dsf(t) + call.ID = id.New().String() + // Do NOT insert the call + newCall := new(models.Call) + *newCall = *call + newCall.Status = "success" + newCall.Error = "" + err := ds.UpdateCall(ctx, call, newCall) + if err != models.ErrCallNotFound { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + }) + + t.Run("call-atomic-update-unexpected-existing-call", func(t *testing.T) { + ds := dsf(t) + call.ID = id.New().String() + err := ds.InsertCall(ctx, call) + if err != nil { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + // Now change the 'from' call so it becomes different from the db + badFrom := new(models.Call) + *badFrom = *call + badFrom.Status = "running" + newCall := new(models.Call) + *newCall = *call + newCall.Status = "success" + newCall.Error = "" + err = ds.UpdateCall(ctx, badFrom, newCall) + if err != models.ErrDatastoreCannotUpdateCall { + t.Fatalf("Test UpdateCall: unexpected error `%v`", err) + } + }) + t.Run("call-get", func(t *testing.T) { ds := dsf(t) call.ID = id.New().String() diff --git a/api/datastore/internal/datastoreutil/metrics.go b/api/datastore/internal/datastoreutil/metrics.go index 2a8a25628..445e40232 100644 --- a/api/datastore/internal/datastoreutil/metrics.go +++ b/api/datastore/internal/datastoreutil/metrics.go @@ -83,6 +83,12 @@ func (m *metricds) InsertCall(ctx context.Context, call *models.Call) error { return m.ds.InsertCall(ctx, call) } +func (m *metricds) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "ds_update_call") + defer span.Finish() + return m.ds.UpdateCall(ctx, from, to) +} + func (m *metricds) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "ds_get_call") defer span.Finish() diff --git a/api/datastore/mock.go b/api/datastore/mock.go index 0add9108a..44c713a99 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -182,6 +182,35 @@ func (m *mock) InsertCall(ctx context.Context, call *models.Call) error { return nil } +// This equivalence only makes sense in the context of the datastore, so it's +// not in the model. +func equivalentCalls(expected *models.Call, actual *models.Call) bool { + equivalentFields := expected.ID == actual.ID && + time.Time(expected.CreatedAt).Unix() == time.Time(actual.CreatedAt).Unix() && + time.Time(expected.StartedAt).Unix() == time.Time(actual.StartedAt).Unix() && + time.Time(expected.CompletedAt).Unix() == time.Time(actual.CompletedAt).Unix() && + expected.Status == actual.Status && + expected.AppName == actual.AppName && + expected.Path == actual.Path && + expected.Error == actual.Error && + len(expected.Stats) == len(actual.Stats) + // TODO: We don't do comparisons of individual Stats. We probably should. + return equivalentFields +} + +func (m *mock) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { + for _, t := range m.Calls { + if t.ID == from.ID && t.AppName == from.AppName { + if equivalentCalls(from, t) { + *t = *to + return nil + } + return models.ErrDatastoreCannotUpdateCall + } + } + return models.ErrCallNotFound +} + func (m *mock) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { for _, t := range m.Calls { if t.ID == callID && t.AppName == appName { diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 57f880d5c..6bab49e13 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -607,6 +607,81 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { return err } +// This equivalence only makes sense in the context of the datastore, so it's +// not in the model. +func equivalentCalls(expected *models.Call, actual *models.Call) bool { + equivalentFields := expected.ID == actual.ID && + time.Time(expected.CreatedAt).Unix() == time.Time(actual.CreatedAt).Unix() && + time.Time(expected.StartedAt).Unix() == time.Time(actual.StartedAt).Unix() && + time.Time(expected.CompletedAt).Unix() == time.Time(actual.CompletedAt).Unix() && + expected.Status == actual.Status && + expected.AppName == actual.AppName && + expected.Path == actual.Path && + expected.Error == actual.Error && + len(expected.Stats) == len(actual.Stats) + // TODO: We don't do comparisons of individual Stats. We probably should. + return equivalentFields +} + +func (ds *sqlStore) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { + // Assert that from and to are supposed to be the same call + if from.ID != to.ID || from.AppName != to.AppName { + return errors.New("assertion error: 'from' and 'to' calls refer to different app/ID") + } + + // Atomic update + err := ds.Tx(func(tx *sqlx.Tx) error { + var call models.Call + query := tx.Rebind(fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)) + row := tx.QueryRowxContext(ctx, query, from.ID, from.AppName) + + err := row.StructScan(&call) + if err == sql.ErrNoRows { + return models.ErrCallNotFound + } else if err != nil { + return err + } + + // Only do the update if the existing call is exactly what we expect. + // If something has modified it in the meantime, we must fail the + // transaction. + if !equivalentCalls(from, &call) { + return models.ErrDatastoreCannotUpdateCall + } + + query = tx.Rebind(`UPDATE calls SET + id = :id, + created_at = :created_at, + started_at = :started_at, + completed_at = :completed_at, + status = :status, + app_name = :app_name, + path = :path, + stats = :stats, + error = :error + WHERE id=:id AND app_name=:app_name;`) + + res, err := tx.NamedExecContext(ctx, query, to) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + // inside of the transaction, we are querying for the row, so we know that it exists + return nil + } + + return nil + }) + + if err != nil { + return err + } + return nil +} + func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector) query = ds.db.Rebind(query) diff --git a/api/models/datastore.go b/api/models/datastore.go index 2322b8cdd..5b23aa101 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -57,6 +57,11 @@ type Datastore interface { // exists. InsertCall(ctx context.Context, call *Call) error + // UpdateCall atomically updates a call into the datastore to the "to" value if it finds an existing call equivalent + // to "from", otherwise it will error. ErrCallNotFound is returned if the call was not found, and + // ErrDatastoreCannotUpdateCall is returned if a call with the right AppName/ID exists but is different from "from". + UpdateCall(ctx context.Context, from *Call, to *Call) error + // GetCall returns a call at a certain id and app name. GetCall(ctx context.Context, appName, callID string) (*Call, error) diff --git a/api/models/error.go b/api/models/error.go index a5fb7fa6c..fda5d1038 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -80,6 +80,10 @@ var ( code: http.StatusBadRequest, error: errors.New("Missing call ID"), } + ErrDatastoreCannotUpdateCall = err{ + code: http.StatusConflict, + error: errors.New("Call to be updated is different from expected"), + } ErrInvalidPayload = err{ code: http.StatusBadRequest, error: errors.New("Invalid payload"), @@ -168,6 +172,10 @@ var ( code: http.StatusNotFound, error: errors.New("Call log not found"), } + ErrSyncCallNotSupported = err{ + code: http.StatusBadRequest, + error: errors.New("Invoking routes of type sync is not supported on nodes configured as type API"), + } ) // APIError any error that implements this interface will return an API response diff --git a/api/server/apps_test.go b/api/server/apps_test.go index f643b5d3b..3522711d9 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -56,7 +56,7 @@ func TestAppCreate(t *testing.T) { {datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull) router := srv.Router body := bytes.NewBuffer([]byte(test.body)) @@ -103,7 +103,7 @@ func TestAppDelete(t *testing.T) { ), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) @@ -144,7 +144,7 @@ func TestAppList(t *testing.T) { nil, // no calls ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) a1b := base64.RawURLEncoding.EncodeToString([]byte("myapp")) a2b := base64.RawURLEncoding.EncodeToString([]byte("myapp2")) @@ -209,7 +209,7 @@ func TestAppGet(t *testing.T) { defer cancel() ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -271,7 +271,7 @@ func TestAppUpdate(t *testing.T) { ), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, srv.Router, "PATCH", test.path, body) diff --git a/api/server/calls_test.go b/api/server/calls_test.go index f45a022f1..7be311182 100644 --- a/api/server/calls_test.go +++ b/api/server/calls_test.go @@ -49,7 +49,7 @@ func TestCallGet(t *testing.T) { []*models.Call{call}, ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -124,7 +124,7 @@ func TestCallList(t *testing.T) { []*models.Call{call, &c2, &c3}, ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) // add / sub 1 second b/c unix time will lop off millis and mess up our comparisons rangeTest := fmt.Sprintf("from_time=%d&to_time=%d", diff --git a/api/server/hybrid.go b/api/server/hybrid.go new file mode 100644 index 000000000..47a620ae6 --- /dev/null +++ b/api/server/hybrid.go @@ -0,0 +1,180 @@ +package server + +import ( + "context" + "strings" + "time" + + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + "github.com/gin-gonic/gin" + "github.com/go-openapi/strfmt" +) + +func (s *Server) handleRunnerEnqueue(c *gin.Context) { + ctx := c.Request.Context() + + // TODO make this a list & let Push take a list! + var call models.Call + err := c.BindJSON(&call) + if err != nil { + handleErrorResponse(c, models.ErrInvalidJSON) + return + } + + // XXX (reed): validate the call struct + + // TODO/NOTE: if this endpoint is called multiple times for the same call we + // need to figure out the behavior we want. as it stands, there will be N + // messages for 1 call which only clogs up the MQ with spurious messages + // (possibly useful if things get wedged, not the point), the task will still + // just run once by the first runner to set it to status=running. we may well + // want to push msg only if inserting the call fails, but then we have a call + // in queued state with no message (much harder to handle). having this + // endpoint be retry safe seems ideal and runners likely won't spam it, so current + // behavior is okay [but beware of implications]. + call.Status = "queued" + _, err = s.MQ.Push(ctx, &call) + if err != nil { + handleErrorResponse(c, err) + return + } + + // at this point, the message is on the queue and could be picked up by a + // runner and enter into 'running' state before we can insert it in the db as + // 'queued' state. we can ignore any error inserting into db here and Start + // will ensure the call exists in the db in 'running' state there. + s.Datastore.InsertCall(ctx, &call) + + c.JSON(200, struct { + M string `json:"msg"` + }{M: "enqueued call"}) +} + +func (s *Server) handleRunnerDequeue(c *gin.Context) { + ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second) + defer cancel() + + // TODO finalize (return whole call?) and move + type m struct { + AppName string `json:"app_name"` + Path string `json:"path"` + } + type resp struct { + M []m `json:"calls"` + } + + // long poll until ctx expires / we find a message + var b common.Backoff + for { + msg, err := s.MQ.Reserve(ctx) + if err != nil { + handleErrorResponse(c, err) + return + } + if msg != nil { + c.JSON(200, resp{M: []m{{AppName: msg.AppName, Path: msg.Path}}}) + return + } + + b.Sleep(ctx) + + select { + case <-ctx.Done(): + c.JSON(200, resp{M: make([]m, 0)}) + return + default: + } + } +} + +func (s *Server) handleRunnerStart(c *gin.Context) { + var body struct { + AppName string `json:"app_name"` + CallID string `json:"id"` + } + + // TODO just take a whole call here? maybe the runner wants to mark it as error? + err := c.BindJSON(&body) + if err != nil { + handleErrorResponse(c, models.ErrInvalidJSON) + return + } + + // TODO hook up update. we really just want it to set status to running iff + // status=queued, but this must be in a txn in Update with behavior: + // queued->running + // running->error (returning error) + // error->error (returning error) + // success->success (returning error) + // timeout->timeout (returning error) + // + // there is nuance for running->error as in theory it could be the correct machine retrying + // and we risk not running a task [ever]. needs further thought, but marking as error will + // cover our tracks since if the db is down we can't run anything anyway (treat as such). + var call models.Call + call.AppName = body.AppName + call.ID = body.CallID + call.Status = "running" + call.StartedAt = strfmt.DateTime(time.Now()) + //err := s.Datastore.UpdateCall(c.Request.Context(), &call) + //if err != nil { + //if err == InvalidStatusChange { + //// TODO we could either let UpdateCall handle setting to error or do it + //// here explicitly + + //if err := s.MQ.Delete(&call); err != nil { // TODO change this to take some string(s), not a whole call + //logrus.WithFields(logrus.Fields{"id": call.Id}).WithError(err).Error("error deleting mq message") + //// just log this one, return error from update call + //} + //} + //handleErrorResponse(c, err) + //return + //} + + c.JSON(200, struct { + M string `json:"msg"` + }{M: "slingshot: engage"}) +} + +func (s *Server) handleRunnerFinish(c *gin.Context) { + ctx := c.Request.Context() + + var body struct { + Call models.Call `json:"call"` + Log string `json:"log"` // TODO use multipart so that we don't have to serialize/deserialize this? measure.. + } + err := c.BindJSON(&body) + if err != nil { + handleErrorResponse(c, models.ErrInvalidJSON) + return + } + + // TODO validate? + call := body.Call + + // TODO this needs UpdateCall functionality to work for async and should only work if: + // running->error|timeout|success + // TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted* + if err := s.Datastore.InsertCall(ctx, &call); err != nil { + common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + // note: Not returning err here since the job could have already finished successfully. + } + + if err := s.LogDB.InsertLog(ctx, call.AppName, call.ID, strings.NewReader(body.Log)); err != nil { + common.Logger(ctx).WithError(err).Error("error uploading log") + // note: Not returning err here since the job could have already finished successfully. + } + + // TODO we don't know whether a call is async or sync. we likely need an additional + // arg in params for a message id and can detect based on this. for now, delete messages + // for sync and async even though sync doesn't have any (ignore error) + if err := s.MQ.Delete(ctx, &call); err != nil { // TODO change this to take some string(s), not a whole call + common.Logger(ctx).WithError(err).Error("error deleting mq msg") + // note: Not returning err here since the job could have already finished successfully. + } + + c.JSON(200, struct { + M string `json:"msg"` + }{M: "good night, sweet prince"}) +} diff --git a/api/server/middleware_test.go b/api/server/middleware_test.go index 861efb160..cf19d4fbf 100644 --- a/api/server/middleware_test.go +++ b/api/server/middleware_test.go @@ -85,7 +85,7 @@ func TestRootMiddleware(t *testing.T) { defer cancelrnr() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) srv.AddRootMiddlewareFunc(func(next http.Handler) http.Handler { // this one will override a call to the API based on a header return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/api/server/routes_test.go b/api/server/routes_test.go index e786423f6..e7205ec70 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -26,7 +26,7 @@ type routeTestCase struct { func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, srv.Router, test.method, test.path, body) @@ -123,7 +123,7 @@ func TestRouteDelete(t *testing.T) { {datastore.NewMockInit(apps, routes, nil), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) if rec.Code != test.expectedCode { @@ -178,7 +178,7 @@ func TestRouteList(t *testing.T) { r2b := base64.RawURLEncoding.EncodeToString([]byte("/myroute1")) r3b := base64.RawURLEncoding.EncodeToString([]byte("/myroute2")) - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -242,7 +242,7 @@ func TestRouteGet(t *testing.T) { ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string diff --git a/api/server/runner.go b/api/server/runner.go index 6f7e1942f..3efbb5690 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -106,19 +106,24 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - err = s.Agent.Submit(call) - if err != nil { - // NOTE if they cancel the request then it will stop the call (kind of cool), - // we could filter that error out here too as right now it yells a little - if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout { - // TODO maneuver - // add this, since it means that start may not have been called [and it's relevant] - c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) + // Don't serve sync requests from API nodes + if s.nodeType != ServerTypeAPI { + err = s.Agent.Submit(call) + if err != nil { + // NOTE if they cancel the request then it will stop the call (kind of cool), + // we could filter that error out here too as right now it yells a little + if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout { + // TODO maneuver + // add this, since it means that start may not have been called [and it's relevant] + c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) + } + // NOTE: if the task wrote the headers already then this will fail to write + // a 5xx (and log about it to us) -- that's fine (nice, even!) + handleErrorResponse(c, err) + return } - // NOTE: if the task wrote the headers already then this will fail to write - // a 5xx (and log about it to us) -- that's fine (nice, even!) - handleErrorResponse(c, err) - return + } else { + handleErrorResponse(c, models.ErrSyncCallNotSupported) } // TODO plumb FXLB-WAIT somehow (api?) diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 7da951ffd..8c38dfadd 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -21,6 +21,7 @@ func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr agent.Agen Router: gin.New(), Datastore: ds, MQ: mq, + nodeType: ServerTypeFull, } r := s.Router diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 99c400184..a58a620f6 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -27,7 +27,7 @@ func testRunner(t *testing.T, args ...interface{}) (agent.Agent, context.CancelF mq = arg } } - r := agent.New(ds, ds, mq) + r := agent.New(ds, ds, mq, agent.AgentTypeFull) return r, func() { r.Close() } } @@ -42,7 +42,7 @@ func TestRouteRunnerGet(t *testing.T) { rnr, cancel := testRunner(t, ds) defer cancel() logDB := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, logDB, rnr) + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -87,7 +87,7 @@ func TestRouteRunnerPost(t *testing.T) { defer cancel() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -141,7 +141,8 @@ func TestRouteRunnerExecution(t *testing.T) { defer cancelrnr() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string @@ -211,7 +212,7 @@ func TestFailedEnqueue(t *testing.T) { rnr, cancelrnr := testRunner(t, ds, mq) defer cancelrnr() - srv := testServer(ds, mq, fnl, rnr) + srv := testServer(ds, mq, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string body string @@ -252,7 +253,7 @@ func TestRouteRunnerTimeout(t *testing.T) { defer cancelrnr() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { path string diff --git a/api/server/server.go b/api/server/server.go index 1d0b980a4..7dc92a4b8 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -37,6 +37,7 @@ const ( EnvMQURL = "FN_MQ_URL" EnvDBURL = "FN_DB_URL" EnvLOGDBURL = "FN_LOGSTORE_URL" + EnvNodeType = "FN_NODE_TYPE" EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port" EnvAPICORS = "FN_API_CORS" EnvZipkinURL = "FN_ZIPKIN_URL" @@ -46,31 +47,52 @@ const ( DefaultPort = 8080 ) -type Server struct { - Router *gin.Engine - Agent agent.Agent - Datastore models.Datastore - MQ models.MessageQueue - LogDB models.LogStore +type ServerNodeType int32 +const ( + ServerTypeFull ServerNodeType = iota + ServerTypeAPI + ServerTypeRunner +) + +type Server struct { + Router *gin.Engine + Agent agent.Agent + Datastore models.Datastore + MQ models.MessageQueue + LogDB models.LogStore + nodeType ServerNodeType appListeners []fnext.AppListener rootMiddlewares []fnext.Middleware apiMiddlewares []fnext.Middleware } +func nodeTypeFromString(value string) ServerNodeType { + switch value { + case "api": + return ServerTypeAPI + case "runner": + return ServerTypeRunner + default: + return ServerTypeFull + } +} + // NewFromEnv creates a new Functions server based on env vars. func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server { + return NewFromURLs(ctx, getEnv(EnvDBURL, fmt.Sprintf("sqlite3://%s/data/fn.db", currDir)), getEnv(EnvMQURL, fmt.Sprintf("bolt://%s/data/fn.mq", currDir)), getEnv(EnvLOGDBURL, ""), + nodeTypeFromString(getEnv(EnvNodeType, "")), opts..., ) } // Create a new server based on the string URLs for each service. // Sits in the middle of NewFromEnv and New -func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, opts ...ServerOption) *Server { +func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, nodeType ServerNodeType, opts ...ServerOption) *Server { ds, err := datastore.New(dbURL) if err != nil { logrus.WithError(err).Fatalln("Error initializing datastore.") @@ -89,19 +111,30 @@ func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, opts ... } } - return New(ctx, ds, mq, logDB, opts...) + return New(ctx, ds, mq, logDB, nodeType, opts...) } // New creates a new Functions server with the passed in datastore, message queue and API URL -func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls models.LogStore, opts ...ServerOption) *Server { +func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls models.LogStore, nodeType ServerNodeType, opts ...ServerOption) *Server { setTracer() + var tp agent.AgentNodeType + switch nodeType { + case ServerTypeAPI: + tp = agent.AgentTypeAPI + case ServerTypeRunner: + tp = agent.AgentTypeRunner + default: + tp = agent.AgentTypeFull + } + s := &Server{ - Agent: agent.New(cache.Wrap(ds), ls, mq), // only add datastore caching to agent + Agent: agent.New(cache.Wrap(ds), ls, mq, tp), // only add datastore caching to agent Router: gin.New(), Datastore: ds, MQ: mq, LogDB: ls, + nodeType: nodeType, } // NOTE: testServer() in tests doesn't use these @@ -265,7 +298,7 @@ func (s *Server) bindHandlers(ctx context.Context) { engine.GET("/stats", s.handleStats) engine.GET("/metrics", s.handlePrometheusMetrics) - { + if s.nodeType != ServerTypeRunner { v1 := engine.Group("/v1") v1.Use(s.apiMiddlewareWrapper()) v1.GET("/apps", s.handleAppList) @@ -291,6 +324,15 @@ func (s *Server) bindHandlers(ctx context.Context) { apps.GET("/calls/:call", s.handleCallGet) apps.GET("/calls/:call/log", s.handleCallLogGet) } + + { + runner := v1.Group("/runner") + runner.PUT("/async", s.handleRunnerEnqueue) + runner.GET("/async", s.handleRunnerDequeue) + + runner.POST("/start", s.handleRunnerStart) + runner.POST("/finish", s.handleRunnerFinish) + } } { diff --git a/api/server/server_test.go b/api/server/server_test.go index 31cb36519..ea53f3886 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -9,10 +9,13 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/datastore" + "github.com/fnproject/fn/api/id" + "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/mqs" "github.com/gin-gonic/gin" @@ -20,7 +23,7 @@ import ( var tmpDatastoreTests = "/tmp/func_test_datastore.db" -func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent) *Server { +func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent, nodeType ServerNodeType) *Server { ctx := context.Background() s := &Server{ @@ -29,6 +32,7 @@ func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogSto Datastore: ds, LogDB: logDB, MQ: mq, + nodeType: nodeType, } r := s.Router @@ -102,7 +106,7 @@ func TestFullStack(t *testing.T) { rnr, rnrcancel := testRunner(t, ds) defer rnrcancel() - srv := testServer(ds, &mqs.Mock{}, logDB, rnr) + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeFull) for _, test := range []struct { name string @@ -139,3 +143,169 @@ func TestFullStack(t *testing.T) { } } } + +func TestRunnerNode(t *testing.T) { + ctx := context.Background() + buf := setLogBuffer() + ds, logDB, close := prepareDB(ctx, t) + defer close() + + rnr, rnrcancel := testRunner(t, ds) + defer rnrcancel() + + // Add route with an API server using the same DB + { + apiServer := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeAPI) + _, rec := routerRequest(t, apiServer.Router, "POST", "/v1/apps/myapp/routes", bytes.NewBuffer([]byte(`{ "route": { "name": "myroute", "path": "/myroute", "image": "fnproject/hello", "type": "sync" } }`))) + if rec.Code != http.StatusOK { + t.Errorf("Expected status code 200 when creating sync route, but got %d", rec.Code) + } + _, rec = routerRequest(t, apiServer.Router, "POST", "/v1/apps/myapp/routes", bytes.NewBuffer([]byte(`{ "route": { "name": "myasyncroute", "path": "/myasyncroute", "image": "fnproject/hello", "type": "async" } }`))) + if rec.Code != http.StatusOK { + t.Errorf("Expected status code 200 when creating async route, but got %d", rec.Code) + } + } + + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeRunner) + + for _, test := range []struct { + name string + method string + path string + body string + expectedCode int + expectedCacheSize int // TODO kill me + }{ + // Support sync and async API calls + {"execute sync route succeeds", "POST", "/r/myapp/myroute", `{ "name": "Teste" }`, http.StatusOK, 1}, + {"execute async route succeeds", "POST", "/r/myapp/myasyncroute", `{ "name": "Teste" }`, http.StatusAccepted, 1}, + + // All other API functions should not be available on runner nodes + {"create app not found", "POST", "/v1/apps", `{ "app": { "name": "myapp" } }`, http.StatusNotFound, 0}, + {"list apps not found", "GET", "/v1/apps", ``, http.StatusNotFound, 0}, + {"get app not found", "GET", "/v1/apps/myapp", ``, http.StatusNotFound, 0}, + + {"add route not found", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myroute", "path": "/myroute", "image": "fnproject/hello", "type": "sync" } }`, http.StatusNotFound, 0}, + {"get route not found", "GET", "/v1/apps/myapp/routes/myroute", ``, http.StatusNotFound, 0}, + {"get all routes not found", "GET", "/v1/apps/myapp/routes", ``, http.StatusNotFound, 0}, + {"delete app not found", "DELETE", "/v1/apps/myapp", ``, http.StatusNotFound, 0}, + } { + _, rec := routerRequest(t, srv.Router, test.method, test.path, bytes.NewBuffer([]byte(test.body))) + + if rec.Code != test.expectedCode { + t.Log(buf.String()) + t.Errorf("Test \"%s\": Expected status code to be %d but was %d", + test.name, test.expectedCode, rec.Code) + } + } +} + +func TestApiNode(t *testing.T) { + ctx := context.Background() + buf := setLogBuffer() + ds, logDB, close := prepareDB(ctx, t) + defer close() + + rnr, rnrcancel := testRunner(t, ds) + defer rnrcancel() + + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeAPI) + + for _, test := range []struct { + name string + method string + path string + body string + expectedCode int + expectedCacheSize int // TODO kill me + }{ + // All routes should be supported + {"create my app", "POST", "/v1/apps", `{ "app": { "name": "myapp" } }`, http.StatusOK, 0}, + {"list apps", "GET", "/v1/apps", ``, http.StatusOK, 0}, + {"get app", "GET", "/v1/apps/myapp", ``, http.StatusOK, 0}, + + {"add myroute", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myroute", "path": "/myroute", "image": "fnproject/hello", "type": "sync" } }`, http.StatusOK, 0}, + {"add myroute2", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myroute2", "path": "/myroute2", "image": "fnproject/error", "type": "sync" } }`, http.StatusOK, 0}, + {"add myasyncroute", "POST", "/v1/apps/myapp/routes", `{ "route": { "name": "myasyncroute", "path": "/myasyncroute", "image": "fnproject/hello", "type": "async" } }`, http.StatusOK, 0}, + {"get myroute", "GET", "/v1/apps/myapp/routes/myroute", ``, http.StatusOK, 0}, + {"get myroute2", "GET", "/v1/apps/myapp/routes/myroute2", ``, http.StatusOK, 0}, + {"get all routes", "GET", "/v1/apps/myapp/routes", ``, http.StatusOK, 0}, + + // Don't support calling sync + {"execute myroute", "POST", "/r/myapp/myroute", `{ "name": "Teste" }`, http.StatusBadRequest, 1}, + {"execute myroute2", "POST", "/r/myapp/myroute2", `{ "name": "Teste" }`, http.StatusBadRequest, 2}, + + // Do support calling async + {"execute myasyncroute", "POST", "/r/myapp/myasyncroute", `{ "name": "Teste" }`, http.StatusAccepted, 1}, + + {"get myroute2", "GET", "/v1/apps/myapp/routes/myroute2", ``, http.StatusOK, 2}, + {"delete myroute", "DELETE", "/v1/apps/myapp/routes/myroute", ``, http.StatusOK, 1}, + {"delete myroute2", "DELETE", "/v1/apps/myapp/routes/myroute2", ``, http.StatusOK, 0}, + {"delete app (success)", "DELETE", "/v1/apps/myapp", ``, http.StatusOK, 0}, + {"get deleted app", "GET", "/v1/apps/myapp", ``, http.StatusNotFound, 0}, + {"get deleted route on deleted app", "GET", "/v1/apps/myapp/routes/myroute", ``, http.StatusNotFound, 0}, + } { + _, rec := routerRequest(t, srv.Router, test.method, test.path, bytes.NewBuffer([]byte(test.body))) + if rec.Code != test.expectedCode { + t.Log(buf.String()) + t.Errorf("Test \"%s\": Expected status code to be %d but was %d", + test.name, test.expectedCode, rec.Code) + } + } +} + +func TestHybridEndpoints(t *testing.T) { + buf := setLogBuffer() + ds := datastore.NewMockInit( + []*models.App{{ + Name: "myapp", + }}, + []*models.Route{{ + AppName: "myapp", + Path: "yodawg", + }}, nil, + ) + + logDB := logs.NewMock() + + srv := testServer(ds, &mqs.Mock{}, logDB, nil /* TODO */, ServerTypeAPI) + + newCallBody := func() string { + call := &models.Call{ + ID: id.New().String(), + AppName: "myapp", + Path: "yodawg", + // TODO ? + } + var b bytes.Buffer + json.NewEncoder(&b).Encode(&call) + return b.String() + } + + for _, test := range []struct { + name string + method string + path string + body string + expectedCode int + }{ + // TODO change all these tests to just do an async task in normal order once plumbing is done + + {"post async call", "PUT", "/v1/runner/async", newCallBody(), http.StatusOK}, + + // TODO this one only works if it's not the same as the first since update isn't hooked up + {"finish call", "POST", "/v1/runner/finish", newCallBody(), http.StatusOK}, + + // TODO these won't work until update works and the agent gets shut off + //{"get async call", "GET", "/v1/runner/async", "", http.StatusOK}, + //{"start call", "POST", "/v1/runner/start", "TODO", http.StatusOK}, + } { + _, rec := routerRequest(t, srv.Router, test.method, test.path, strings.NewReader(test.body)) + + if rec.Code != test.expectedCode { + t.Log(buf.String()) + t.Errorf("Test \"%s\": Expected status code to be %d but was %d", + test.name, test.expectedCode, rec.Code) + } + } +} diff --git a/clients/hybrid/client.go b/clients/hybrid/client.go new file mode 100644 index 000000000..1677288ac --- /dev/null +++ b/clients/hybrid/client.go @@ -0,0 +1,240 @@ +package hybrid + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" +) + +type Client interface { + Enqueue(context.Context, *models.Call) error + Dequeue(context.Context) ([]*models.Call, error) + Start(context.Context, *models.Call) error + Finish(context.Context, *models.Call, io.Reader) error + + // TODO we could/should make GetAppAndRoute endpoint? saves a round trip... + GetApp(ctx context.Context, appName string) (*models.App, error) + GetRoute(ctx context.Context, appName, route string) (*models.Route, error) +} + +var _ Client = new(client) + +type client struct { + base string + http *http.Client +} + +func New(u string) (Client, error) { + uri, err := url.Parse(u) + if err != nil { + return nil, err + } + + if uri.Host == "" { + return nil, errors.New("no host specified for client") + } + if uri.Scheme == "" { + uri.Scheme = "http" + } + host := uri.Scheme + "://" + uri.Host + "/v1/" + + httpClient := &http.Client{ + Timeout: 60 * time.Second, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + MaxIdleConns: 512, + MaxIdleConnsPerHost: 128, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + ClientSessionCache: tls.NewLRUClientSessionCache(8096), + }, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + return &client{ + base: host, + http: httpClient, + }, nil +} + +func (cl *client) Enqueue(ctx context.Context, c *models.Call) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_enqueue") + defer span.Finish() + + err := cl.do(ctx, c, nil, "PUT", "runner", "async") + return err +} + +func (cl *client) Dequeue(ctx context.Context) ([]*models.Call, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_dequeue") + defer span.Finish() + + var c struct { + C []*models.Call `json:"calls"` + } + err := cl.do(ctx, nil, &c, "GET", "runner", "async") + return c.C, err +} + +func (cl *client) Start(ctx context.Context, c *models.Call) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_start") + defer span.Finish() + + err := cl.do(ctx, c, nil, "POST", "runner", "start") + return err +} + +func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_end") + defer span.Finish() + + var b bytes.Buffer // TODO pool / we should multipart this? + _, err := io.Copy(&b, r) + if err != nil { + return err + } + bod := struct { + C *models.Call `json:"call"` + L string `json:"log"` + }{ + C: c, + L: b.String(), + } + + err = cl.do(ctx, bod, nil, "POST", "runner", "finish") + return err +} + +func (cl *client) GetApp(ctx context.Context, appName string) (*models.App, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_app") + defer span.Finish() + + var app models.App + err := cl.do(ctx, nil, &app, "GET", "apps", appName) + return &app, err +} + +func (cl *client) GetRoute(ctx context.Context, appName, route string) (*models.Route, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_route") + defer span.Finish() + + var r models.Route + err := cl.do(ctx, nil, &r, "GET", "apps", appName, "routes", route) + return &r, err +} + +type httpErr struct { + code int + error +} + +func (cl *client) do(ctx context.Context, request, result interface{}, method string, url ...string) error { + // TODO determine policy (should we count to infinity?) + + var b common.Backoff + for i := 0; i < 5; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // TODO this isn't re-using buffers very efficiently, but retries should be rare... + err := cl.once(ctx, request, result, method, url...) + switch err := err.(type) { + case nil: + return err + case httpErr: + if err.code < 500 { + return err + } + common.Logger(ctx).WithError(err).Error("error from API server, retrying") + // retry 500s... + default: + // this error wasn't from us [most likely], probably a conn refused/timeout, just retry it out + } + + b.Sleep(ctx) + } + + return context.DeadlineExceeded // basically, right? +} + +func (cl *client) once(ctx context.Context, request, result interface{}, method string, url ...string) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_http_do") + defer span.Finish() + + var b bytes.Buffer // TODO pool + if request != nil { + err := json.NewEncoder(&b).Encode(request) + if err != nil { + return err + } + } + + req, err := http.NewRequest(method, cl.url(url...), &b) + if err != nil { + return err + } + req = req.WithContext(ctx) + // shove the span headers in so that the server will continue this span + opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + resp, err := cl.http.Do(req) + if err != nil { + return err + } + defer func() { io.Copy(ioutil.Discard, resp.Body); resp.Body.Close() }() + + if resp.StatusCode >= 300 { + // one of our errors + var msg struct { + Err *struct { + Msg string `json:"message"` + } `json:"error"` + } + // copy into a buffer in case it wasn't from us + var b bytes.Buffer + io.Copy(&b, resp.Body) + json.Unmarshal(b.Bytes(), &msg) + if msg.Err != nil { + return &httpErr{code: resp.StatusCode, error: errors.New(msg.Err.Msg)} + } + return &httpErr{code: resp.StatusCode, error: errors.New(b.String())} + } + + if result != nil { + err := json.NewDecoder(resp.Body).Decode(result) + if err != nil { + return err + } + } + + return nil +} + +func (cl *client) url(args ...string) string { + return cl.base + strings.Join(args, "/") +} diff --git a/docs/developers/hybrid.md b/docs/developers/hybrid.md new file mode 100644 index 000000000..7c4029f75 --- /dev/null +++ b/docs/developers/hybrid.md @@ -0,0 +1,126 @@ +# Hybrid API Proposal + +TODO fill in auth information herein (possibly do w/o first?) + +Hybrid API will consist of a few endpoints that encapsulate all functionality +required for `fn` to run tasks using split API and 'runner' nodes. These +endpoints exist under the `/v1/runner/` endpoints. In addition to these +endpoints, the runner has access to any `/v1/` endpoints it needs as well +(namely, `GetApp` and `GetRoute`). + +API nodes are responsible for interacting with an MQ and DB [on behalf of the +runner], as well as handling all requests under the `/v1/` routes. + +Runner nodes are responsible for receiving requests under the `/r/` endpoints +from the fnlb and sending requests to the `/v1/runner/` endpoints to API nodes, +its duties are: + +* enqueueing async calls +* dequeueing async calls when there is spare capacity +* executing calls (both sync and async) +* management of message lifecycle +* reporting call status & logs + +## Endpoints + +All functionality listed here will be implemented in the API nodes under the +given endpoint. The runner is responsible for calling each of these endpoints +with the given input. + +##### POST /v1/runner/async + +this is called when a runner receives a request for an async route. the +request contains an entire constructed `models.Call` object, as well as an +identifier for this runner node to queue this call to a specific partition in +kafka [mapping to the runner node]`***`. returns success/fail. + +* enqueue an async call to an MQ +* insert a call to the DB with 'queued' state + +special cases: + +* if enqueue to MQ fails, the request fails and the runner will +reply with a 500 error to the client as if this call never existed +* if insert fails, we ignore this error, which will be handled in Start + +##### GET /v1/runner/async + +the runner long polls for a call to run. the request contains an identifier for +this runner node to pull from the partition in kafka for this runner node`***`. +the response contains a list of {app_name, route_name} (the runner will cache apps +and routes, otherwise looking this up at respective API call positions), +possibly an empty list. This call will timeout and return an empty list after +30 seconds if no messages are found. For now, it should return the first +message it finds immediately. + +* dequeue a message from the MQ if there is some capacity + +##### POST /v1/runner/start + +the runner calls this endpoint immediately before starting a task, only +starting the task if this endpoint returns a success. the request contains the +app name and the call id. the response returns success/fail code. this +transition _could_ be done in the dequeue portion of the call lifecycle since +it's only for async, however this exists because the time between dequeueing +and being prepared to execute a task may be long (or even succeed). + +sync: + +* noop, the runner will not call this endpoint. + +async: + +* update call in db to status=running, conditionally. if the status is already + running, set the status to error as this means that the task has been + started successfully previously and we don't want to run it twice, and after + successfully setting the status to error, delete the mq message, and return + a failure status code. if the status is a final state (error | timeout | + success), delete the mq message and return a failure status code. if the + update to status=running succeeds, return a success status code. + +##### POST /v1/runner/finish + +the runner calls this endpoint after a call has completed, either because of +an error, a timeout, or because it ran successfully. the request must contain +an entire completed call object as well as its log (multipart?). it will +always return a success code as the call is completed at this point, the +runner may retry this endpoint if it fails (timeout, etc). + +sync: + +* insert the call model into the db (ignore error, retry) +* insert the log into the log store (ignore error, retry) + +async: + +* insert the call model into the db (ignore error, retry) +* insert the log into the log store (ignore error, retry) +* delete the MQ message (ignore error, retry, failure is handled in start) + +## Additional notes & changes required + +* All runner requests and responses will contain a header `XXX-RUNNER-LOAD` + that API server nodes and FNLB nodes can use to determine how to distribute + load to that node. This will keep a relatively up to date view of each of + the runner nodes to the API and FNLB, assuming that each of those are small + sets of nodes. The API nodes can use this to determine whether to distribute + messages for async nodes to runner nodes as well as fnlb for routing async + or sync requests. +* Each runner node will have a partition in Kafka that maps to messages that + it enqueues. This will allow us to use distribution information, based off + load, from the load balancer, since the load balancer will send requests to + queue async tasks optimistically to runner nodes. The runner then only + processes messages on its partition. This is likely fraught with some + danger, however, kafka messaging semantics have no idea of timeouts and we + make no real SLAs about time between enqueue and start, so its somewhat sexy + to think that runners don't have to think about maneuvering timeouts. This + likely needs further fleshing out, as noted in`***`. + +`***` current understanding of kafka consumer groups semantics is largely +incomplete and this is making the assumption that if a runner fails, consumer +groups will allow another runner node to cover for this one as well as its +own. distribution should also be considered, and sending in partition ids is +important so that FNLB will dictate the distribution of async functions across +nodes as their load increases, this is also why `XXX-RUNNER-LOAD` is required, +since async tasks aren't returning wait information up stack to the lb. this +likely requires further thought, but is possibly correct as proposed (1% odds) diff --git a/test/fn-api-tests/utils.go b/test/fn-api-tests/utils.go index f234a91ee..b75c3e8ea 100644 --- a/test/fn-api-tests/utils.go +++ b/test/fn-api-tests/utils.go @@ -71,7 +71,7 @@ func getServerWithCancel() (*server.Server, context.CancelFunc) { dbURL = fmt.Sprintf("sqlite3://%s", tmpDb) } - s = server.NewFromURLs(ctx, dbURL, mqURL, "") + s = server.NewFromURLs(ctx, dbURL, mqURL, "", server.ServerTypeFull) go s.Start(ctx) started := false