mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
This implements a "detached" mechanism to get an ack from the runner once it actually starts to run a function. In this scenario the response returned back is just a 202 if we placed the function in a specific time-frame. If we hit some errors or we fail to place the fn in time we return back different errors.
65 lines
1.4 KiB
Go
65 lines
1.4 KiB
Go
package runnerpool
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/models"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type naivePlacer struct {
|
|
cfg PlacerConfig
|
|
rrIndex uint64
|
|
}
|
|
|
|
func NewNaivePlacer(cfg *PlacerConfig) Placer {
|
|
logrus.Infof("Creating new naive runnerpool placer with config=%+v", cfg)
|
|
return &naivePlacer{
|
|
cfg: *cfg,
|
|
rrIndex: uint64(time.Now().Nanosecond()),
|
|
}
|
|
}
|
|
|
|
func (sp *naivePlacer) GetPlacerConfig() PlacerConfig {
|
|
return sp.cfg
|
|
}
|
|
|
|
func (sp *naivePlacer) PlaceCall(ctx context.Context, rp RunnerPool, call RunnerCall) error {
|
|
state := NewPlacerTracker(ctx, &sp.cfg, call)
|
|
defer state.HandleDone()
|
|
|
|
var runnerPoolErr error
|
|
for {
|
|
var runners []Runner
|
|
runners, runnerPoolErr = rp.Runners(call)
|
|
|
|
for j := 0; j < len(runners) && !state.IsDone(); j++ {
|
|
|
|
i := atomic.AddUint64(&sp.rrIndex, uint64(1))
|
|
r := runners[int(i)%len(runners)]
|
|
|
|
placed, err := state.TryRunner(r, call)
|
|
if placed {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !state.RetryAllBackoff(len(runners)) {
|
|
break
|
|
}
|
|
}
|
|
|
|
if runnerPoolErr != nil {
|
|
// If we haven't been able to place the function and we got an error
|
|
// from the runner pool, return that error (since we don't have
|
|
// enough runners to handle the current load and the runner pool is
|
|
// having trouble).
|
|
state.HandleFindRunnersFailure(runnerPoolErr)
|
|
return runnerPoolErr
|
|
}
|
|
return models.ErrCallTimeoutServerBusy
|
|
}
|