This commit is contained in:
Travis Reeder
2017-11-17 15:27:59 -08:00
7 changed files with 132 additions and 99 deletions

View File

@@ -126,14 +126,8 @@ type agent struct {
hMu sync.RWMutex // protects hot hMu sync.RWMutex // protects hot
hot map[string]chan slot hot map[string]chan slot
// TODO we could make a separate struct for the memory stuff // track usage
// cond protects access to ramUsed resources ResourceTracker
cond *sync.Cond
// ramTotal is the total accessible memory by this process
ramTotal uint64
// ramUsed is ram reserved for running containers. idle hot containers
// count against ramUsed.
ramUsed uint64
// used to track running calls / safe shutdown // used to track running calls / safe shutdown
wg sync.WaitGroup // TODO rename wg sync.WaitGroup // TODO rename
@@ -154,8 +148,7 @@ func New(ds models.Datastore, mq models.MessageQueue) Agent {
mq: mq, mq: mq,
driver: driver, driver: driver,
hot: make(map[string]chan slot), hot: make(map[string]chan slot),
cond: sync.NewCond(new(sync.Mutex)), resources: NewResourceTracker(),
ramTotal: getAvailableMemory(),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
promHandler: promhttp.Handler(), promHandler: promhttp.Handler(),
} }
@@ -279,7 +272,7 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
select { select {
case s := <-slots: case s := <-slots:
return s, nil return s, nil
case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle case tok := <-a.resources.GetResourceToken(ctx, call):
errCh = a.launch(ctx, slots, call, tok) // TODO mangle errCh = a.launch(ctx, slots, call, tok) // TODO mangle
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
@@ -363,80 +356,6 @@ func hotKey(call *call) string {
return string(hash.Sum(buf[:0])) return string(hash.Sum(buf[:0]))
} }
// TODO we could rename this more appropriately (ideas?)
type Token interface {
// Close must be called by any thread that receives a token.
io.Closer
}
type token struct {
decrement func()
}
func (t *token) Close() error {
t.decrement()
return nil
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token {
c := a.cond
ch := make(chan Token)
go func() {
c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal {
select {
case <-ctx.Done():
c.L.Unlock()
return
default:
}
c.Wait()
}
a.ramUsed += memory
c.L.Unlock()
t := &token{decrement: func() {
c.L.Lock()
a.ramUsed -= memory
c.L.Unlock()
c.Broadcast()
}}
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// asyncRAM will send a signal on the returned channel when at least half of
// the available RAM on this machine is free.
func (a *agent) asyncRAM() chan struct{} {
ch := make(chan struct{})
c := a.cond
go func() {
c.L.Lock()
for (a.ramTotal/2)-a.ramUsed < 0 {
c.Wait()
}
c.L.Unlock()
ch <- struct{}{}
// TODO this could leak forever (only in shutdown, blech)
}()
return ch
}
type slot interface { type slot interface {
exec(ctx context.Context, call *call) error exec(ctx context.Context, call *call) error
io.Closer io.Closer
@@ -445,7 +364,7 @@ type slot interface {
// implements Slot // implements Slot
type coldSlot struct { type coldSlot struct {
cookie drivers.Cookie cookie drivers.Cookie
tok Token tok ResourceToken
} }
func (s *coldSlot) exec(ctx context.Context, call *call) error { func (s *coldSlot) exec(ctx context.Context, call *call) error {
@@ -523,7 +442,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
// this will work for hot & cold (woo) // this will work for hot & cold (woo)
// if launch encounters a non-nil error it will send it on the returned channel, // if launch encounters a non-nil error it will send it on the returned channel,
// this can be useful if an image doesn't exist, e.g. // this can be useful if an image doesn't exist, e.g.
func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error { func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) <-chan error {
ch := make(chan error, 1) ch := make(chan error, 1)
if !protocol.IsStreamable(protocol.Protocol(call.Format)) { if !protocol.IsStreamable(protocol.Protocol(call.Format)) {
@@ -546,7 +465,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T
return ch return ch
} }
func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error { func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) error {
container := &container{ container := &container{
id: id.New().String(), // XXX we could just let docker generate ids... id: id.New().String(), // XXX we could just let docker generate ids...
image: call.Image, image: call.Image,
@@ -575,7 +494,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
return nil return nil
} }
func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error { func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok ResourceToken) error {
// We must be careful to only use ctxArg for logs/spans // We must be careful to only use ctxArg for logs/spans
// create a span from ctxArg but ignore the new Context // create a span from ctxArg but ignore the new Context

View File

@@ -15,7 +15,7 @@ func (a *agent) asyncDequeue() {
select { select {
case <-a.shutdown: case <-a.shutdown:
return return
case <-a.asyncRAM(): case <-a.resources.WaitAsyncResource():
// TODO we _could_ return a token here to reserve the ram so that there's // TODO we _could_ return a token here to reserve the ram so that there's
// not a race between here and Submit but we're single threaded // not a race between here and Submit but we're single threaded
// dequeueing and retries handled gracefully inside of Submit if we run // dequeueing and retries handled gracefully inside of Submit if we run

View File

@@ -126,7 +126,7 @@ func (d *retryWrap) RoundTrip(req *http.Request) (*http.Response, error) {
// and then retry it (it will get authed and the challenge then accepted). // and then retry it (it will get authed and the challenge then accepted).
// why the docker distribution transport doesn't do this for you is // why the docker distribution transport doesn't do this for you is
// a real testament to what sadists those docker people are. // a real testament to what sadists those docker people are.
if resp.StatusCode == http.StatusUnauthorized { if resp != nil && resp.StatusCode == http.StatusUnauthorized {
pingPath := req.URL.Path pingPath := req.URL.Path
if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 { if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 {
pingPath = pingPath[:v2Root+4] pingPath = pingPath[:v2Root+4]

View File

@@ -2,17 +2,124 @@ package agent
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: improve memory implementation
// TODO: add cpu, disk, network IO for future
type ResourceTracker interface {
WaitAsyncResource() chan struct{}
GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken
}
type resourceTracker struct {
// cond protects access to ramUsed
cond *sync.Cond
// ramTotal is the total accessible memory by this process
ramTotal uint64
// ramUsed is ram reserved for running containers. idle hot containers
// count against ramUsed.
ramUsed uint64
}
func NewResourceTracker() ResourceTracker {
obj := &resourceTracker{
cond: sync.NewCond(new(sync.Mutex)),
ramTotal: getAvailableMemory(),
}
return obj
}
type ResourceToken interface {
// Close must be called by any thread that receives a token.
io.Closer
}
type resourceToken struct {
decrement func()
}
func (t *resourceToken) Close() error {
t.decrement()
return nil
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken {
memory := call.Memory * 1024 * 1024
c := a.cond
ch := make(chan ResourceToken)
go func() {
c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal {
select {
case <-ctx.Done():
c.L.Unlock()
return
default:
}
c.Wait()
}
a.ramUsed += memory
c.L.Unlock()
t := &resourceToken{decrement: func() {
c.L.Lock()
a.ramUsed -= memory
c.L.Unlock()
c.Broadcast()
}}
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// GetAsyncResource will send a signal on the returned channel when at least half of
// the available RAM on this machine is free.
func (a *resourceTracker) WaitAsyncResource() chan struct{} {
ch := make(chan struct{})
c := a.cond
go func() {
c.L.Lock()
for (a.ramTotal/2)-a.ramUsed < 0 {
c.Wait()
}
c.L.Unlock()
ch <- struct{}{}
// TODO this could leak forever (only in shutdown, blech)
}()
return ch
}
func getAvailableMemory() uint64 { func getAvailableMemory() uint64 {
const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB

View File

@@ -6,13 +6,14 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
// PrometheusCollector is a custom Collector // PrometheusCollector is a custom Collector
// which sends ZipKin traces to Prometheus // which sends ZipKin traces to Prometheus
type PrometheusCollector struct { type PrometheusCollector struct {
lock sync.Mutex
// Each span name is published as a separate Histogram metric // Each span name is published as a separate Histogram metric
// Using metric names of the form fn_span_<span-name>_duration_seconds // Using metric names of the form fn_span_<span-name>_duration_seconds
@@ -27,12 +28,15 @@ type PrometheusCollector struct {
// NewPrometheusCollector returns a new PrometheusCollector // NewPrometheusCollector returns a new PrometheusCollector
func NewPrometheusCollector() (zipkintracer.Collector, error) { func NewPrometheusCollector() (zipkintracer.Collector, error) {
pc := &PrometheusCollector{make(map[string]*prometheus.HistogramVec), make(map[string][]string)} pc := &PrometheusCollector{
histogramVecMap: make(map[string]*prometheus.HistogramVec),
registeredLabelKeysMap: make(map[string][]string),
}
return pc, nil return pc, nil
} }
// PrometheusCollector implements Collector. // PrometheusCollector implements Collector.
func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { func (pc *PrometheusCollector) Collect(span *zipkincore.Span) error {
spanName := span.GetName() spanName := span.GetName()
@@ -61,12 +65,15 @@ func (pc PrometheusCollector) Collect(span *zipkincore.Span) error {
} }
// Return (and create, if necessary) a HistogramVec for the specified Prometheus metric // Return (and create, if necessary) a HistogramVec for the specified Prometheus metric
func (pc PrometheusCollector) getHistogramVec( func (pc *PrometheusCollector) getHistogramVec(
metricName string, metricHelp string, labelKeysFromSpan []string, labelValuesFromSpan map[string]string) ( metricName string, metricHelp string, labelKeysFromSpan []string, labelValuesFromSpan map[string]string) (
*prometheus.HistogramVec, map[string]string) { *prometheus.HistogramVec, map[string]string) {
var labelValuesToUse map[string]string var labelValuesToUse map[string]string
pc.lock.Lock()
defer pc.lock.Unlock()
histogramVec, found := pc.histogramVecMap[metricName] histogramVec, found := pc.histogramVecMap[metricName]
if !found { if !found {
// create a new HistogramVec // create a new HistogramVec
@@ -143,4 +150,4 @@ func getLoggedMetrics(span *zipkincore.Span) map[string]uint64 {
} }
// PrometheusCollector implements Collector. // PrometheusCollector implements Collector.
func (PrometheusCollector) Close() error { return nil } func (*PrometheusCollector) Close() error { return nil }

View File

@@ -1,4 +1,4 @@
package version package version
// Version of Functions // Version of Functions
var Version = "0.3.189" var Version = "0.3.191"

View File

@@ -17,12 +17,12 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
const VERSION = "0.0.154" const VERSION = "0.0.155"
func main() { func main() {
// XXX (reed): normalize // XXX (reed): normalize
fnodes := flag.String("nodes", "", "comma separated list of functions nodes") fnodes := flag.String("nodes", "", "comma separated list of functions nodes")
minAPIVersion := flag.String("min-api-version", "0.0.121", "minimal node API to accept") minAPIVersion := flag.String("min-api-version", "0.0.122", "minimal node API to accept")
var conf lb.Config var conf lb.Config
flag.StringVar(&conf.DBurl, "db", "sqlite3://:memory:", "backend to store nodes, default to in memory") flag.StringVar(&conf.DBurl, "db", "sqlite3://:memory:", "backend to store nodes, default to in memory")