mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* fn: reduce lbagent and agent dependency lbagent and agent code is too dependent. This causes any changed in agent to break lbagent. In reality, for LB there should be no delegated agent. Splitting these two will cause some code duplication, but it reduces dependency and complexity (eg. agent without docker) * fn: post rebase fixup * fn: runner/runnercall should use lbDeadline * fn: fixup ln agent test * fn: remove agent create option for common.WaitGroup
66 lines
1.4 KiB
Go
66 lines
1.4 KiB
Go
package runnerpool
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
// sleep time to attempt placement across all runners before retrying
|
|
retryWaitInterval = 10 * time.Millisecond
|
|
)
|
|
|
|
type naivePlacer struct{}
|
|
|
|
func NewNaivePlacer() Placer {
|
|
logrus.Info("Creating new naive runnerpool placer")
|
|
return &naivePlacer{}
|
|
}
|
|
|
|
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
|
|
timeout := time.After(call.LbDeadline().Sub(time.Now()))
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return models.ErrCallTimeoutServerBusy
|
|
case <-timeout:
|
|
return models.ErrCallTimeoutServerBusy
|
|
default:
|
|
runners, err := rp.Runners(call)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Failed to find runners for call")
|
|
} else {
|
|
for _, r := range runners {
|
|
placed, err := r.TryExec(ctx, call)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Failed during call placement")
|
|
}
|
|
if placed {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
remaining := call.LbDeadline().Sub(time.Now())
|
|
if remaining <= 0 {
|
|
return models.ErrCallTimeoutServerBusy
|
|
}
|
|
|
|
// backoff
|
|
select {
|
|
case <-ctx.Done():
|
|
return models.ErrCallTimeoutServerBusy
|
|
case <-timeout:
|
|
return models.ErrCallTimeoutServerBusy
|
|
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
|
|
}
|
|
}
|
|
}
|
|
}
|