From 5693f7dc53c3a024776b94063d9b122617f07f2a Mon Sep 17 00:00:00 2001 From: CI Date: Fri, 17 Nov 2017 20:01:24 +0000 Subject: [PATCH 1/5] fn-server: 0.3.190 release [skip ci] --- api/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/version/version.go b/api/version/version.go index a749a9637..646417f60 100644 --- a/api/version/version.go +++ b/api/version/version.go @@ -1,4 +1,4 @@ package version // Version of Functions -var Version = "0.3.189" +var Version = "0.3.190" From 57b24d63c3a5a444a03a68a5f90d339717d0378e Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 17 Nov 2017 12:46:53 -0800 Subject: [PATCH 2/5] fn: prometheus collector concurrent map access (#510) * fn: prometheus collector concurrent map access *) Added mutex to guard against concurrent access to maps * fn: prometheus collector method receivers should be ptr * fn: prometheus collector concurrent map access *) Moved the mutex into getHistogramVec() --- api/server/prom_zip_collector.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/api/server/prom_zip_collector.go b/api/server/prom_zip_collector.go index 17e0a78bb..d11f9e9f0 100644 --- a/api/server/prom_zip_collector.go +++ b/api/server/prom_zip_collector.go @@ -6,13 +6,14 @@ import ( "github.com/prometheus/client_golang/prometheus" "strconv" "strings" + "sync" "time" ) // PrometheusCollector is a custom Collector // which sends ZipKin traces to Prometheus type PrometheusCollector struct { - + lock sync.Mutex // Each span name is published as a separate Histogram metric // Using metric names of the form fn_span__duration_seconds @@ -27,12 +28,15 @@ type PrometheusCollector struct { // NewPrometheusCollector returns a new PrometheusCollector func NewPrometheusCollector() (zipkintracer.Collector, error) { - pc := &PrometheusCollector{make(map[string]*prometheus.HistogramVec), make(map[string][]string)} + pc := &PrometheusCollector{ + histogramVecMap: make(map[string]*prometheus.HistogramVec), + registeredLabelKeysMap: make(map[string][]string), + } return pc, nil } // PrometheusCollector implements Collector. -func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { +func (pc *PrometheusCollector) Collect(span *zipkincore.Span) error { spanName := span.GetName() @@ -61,12 +65,15 @@ func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { } // Return (and create, if necessary) a HistogramVec for the specified Prometheus metric -func (pc PrometheusCollector) getHistogramVec( +func (pc *PrometheusCollector) getHistogramVec( metricName string, metricHelp string, labelKeysFromSpan []string, labelValuesFromSpan map[string]string) ( *prometheus.HistogramVec, map[string]string) { var labelValuesToUse map[string]string + pc.lock.Lock() + defer pc.lock.Unlock() + histogramVec, found := pc.histogramVecMap[metricName] if !found { // create a new HistogramVec @@ -143,4 +150,4 @@ func getLoggedMetrics(span *zipkincore.Span) map[string]uint64 { } // PrometheusCollector implements Collector. -func (PrometheusCollector) Close() error { return nil } +func (*PrometheusCollector) Close() error { return nil } From f1d4387f233fdedb915ee4e255f467b2700b66a3 Mon Sep 17 00:00:00 2001 From: CI Date: Fri, 17 Nov 2017 21:02:20 +0000 Subject: [PATCH 3/5] fn-server: 0.3.191 release [skip ci] --- api/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/version/version.go b/api/version/version.go index 646417f60..f915ada7c 100644 --- a/api/version/version.go +++ b/api/version/version.go @@ -1,4 +1,4 @@ package version // Version of Functions -var Version = "0.3.190" +var Version = "0.3.191" From 1acb1e99b48db30b43d5a807fbcd61178d4676b3 Mon Sep 17 00:00:00 2001 From: CI Date: Fri, 17 Nov 2017 21:03:42 +0000 Subject: [PATCH 4/5] fn-lb: 0.0.155 release [skip ci] --- fnlb/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fnlb/main.go b/fnlb/main.go index 996e5fb23..afb38475e 100644 --- a/fnlb/main.go +++ b/fnlb/main.go @@ -17,12 +17,12 @@ import ( "github.com/sirupsen/logrus" ) -const VERSION = "0.0.154" +const VERSION = "0.0.155" func main() { // XXX (reed): normalize fnodes := flag.String("nodes", "", "comma separated list of functions nodes") - minAPIVersion := flag.String("min-api-version", "0.0.121", "minimal node API to accept") + minAPIVersion := flag.String("min-api-version", "0.0.122", "minimal node API to accept") var conf lb.Config flag.StringVar(&conf.DBurl, "db", "sqlite3://:memory:", "backend to store nodes, default to in memory") From 17d4271ffb10979606dde99fd72ab3f536225ff6 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 17 Nov 2017 15:25:53 -0800 Subject: [PATCH 5/5] fn: move memory/token code into resource (#512) *) bugfix: fix nil ptr access in docker registry RoundTrip *) move async and ram token related code into resource.go --- api/agent/agent.go | 97 ++---------------------- api/agent/async.go | 2 +- api/agent/drivers/docker/registry.go | 2 +- api/agent/{mem.go => resource.go} | 107 +++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 91 deletions(-) rename api/agent/{mem.go => resource.go} (53%) diff --git a/api/agent/agent.go b/api/agent/agent.go index 3508bd31d..4d702a069 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -126,14 +126,8 @@ type agent struct { hMu sync.RWMutex // protects hot hot map[string]chan slot - // TODO we could make a separate struct for the memory stuff - // cond protects access to ramUsed - cond *sync.Cond - // ramTotal is the total accessible memory by this process - ramTotal uint64 - // ramUsed is ram reserved for running containers. idle hot containers - // count against ramUsed. - ramUsed uint64 + // track usage + resources ResourceTracker // used to track running calls / safe shutdown wg sync.WaitGroup // TODO rename @@ -154,8 +148,7 @@ func New(ds models.Datastore, mq models.MessageQueue) Agent { mq: mq, driver: driver, hot: make(map[string]chan slot), - cond: sync.NewCond(new(sync.Mutex)), - ramTotal: getAvailableMemory(), + resources: NewResourceTracker(), shutdown: make(chan struct{}), promHandler: promhttp.Handler(), } @@ -279,7 +272,7 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) ( select { case s := <-slots: return s, nil - case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle + case tok := <-a.resources.GetResourceToken(ctx, call): errCh = a.launch(ctx, slots, call, tok) // TODO mangle case <-ctx.Done(): return nil, ctx.Err() @@ -363,80 +356,6 @@ func hotKey(call *call) string { return string(hash.Sum(buf[:0])) } -// TODO we could rename this more appropriately (ideas?) -type Token interface { - // Close must be called by any thread that receives a token. - io.Closer -} - -type token struct { - decrement func() -} - -func (t *token) Close() error { - t.decrement() - return nil -} - -// the received token should be passed directly to launch (unconditionally), launch -// will close this token (i.e. the receiver should not call Close) -func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token { - c := a.cond - ch := make(chan Token) - - go func() { - c.L.Lock() - for (a.ramUsed + memory) > a.ramTotal { - select { - case <-ctx.Done(): - c.L.Unlock() - return - default: - } - - c.Wait() - } - - a.ramUsed += memory - c.L.Unlock() - - t := &token{decrement: func() { - c.L.Lock() - a.ramUsed -= memory - c.L.Unlock() - c.Broadcast() - }} - - select { - case ch <- t: - case <-ctx.Done(): - // if we can't send b/c nobody is waiting anymore, need to decrement here - t.Close() - } - }() - - return ch -} - -// asyncRAM will send a signal on the returned channel when at least half of -// the available RAM on this machine is free. -func (a *agent) asyncRAM() chan struct{} { - ch := make(chan struct{}) - - c := a.cond - go func() { - c.L.Lock() - for (a.ramTotal/2)-a.ramUsed < 0 { - c.Wait() - } - c.L.Unlock() - ch <- struct{}{} - // TODO this could leak forever (only in shutdown, blech) - }() - - return ch -} - type slot interface { exec(ctx context.Context, call *call) error io.Closer @@ -445,7 +364,7 @@ type slot interface { // implements Slot type coldSlot struct { cookie drivers.Cookie - tok Token + tok ResourceToken } func (s *coldSlot) exec(ctx context.Context, call *call) error { @@ -523,7 +442,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { // this will work for hot & cold (woo) // if launch encounters a non-nil error it will send it on the returned channel, // this can be useful if an image doesn't exist, e.g. -func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error { +func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) <-chan error { ch := make(chan error, 1) if !protocol.IsStreamable(protocol.Protocol(call.Format)) { @@ -546,7 +465,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T return ch } -func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error { +func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) error { container := &container{ id: id.New().String(), // XXX we could just let docker generate ids... image: call.Image, @@ -575,7 +494,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok return nil } -func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error { +func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok ResourceToken) error { // We must be careful to only use ctxArg for logs/spans // create a span from ctxArg but ignore the new Context diff --git a/api/agent/async.go b/api/agent/async.go index 14f1b5e52..7ac324da7 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -15,7 +15,7 @@ func (a *agent) asyncDequeue() { select { case <-a.shutdown: return - case <-a.asyncRAM(): + case <-a.resources.WaitAsyncResource(): // TODO we _could_ return a token here to reserve the ram so that there's // not a race between here and Submit but we're single threaded // dequeueing and retries handled gracefully inside of Submit if we run diff --git a/api/agent/drivers/docker/registry.go b/api/agent/drivers/docker/registry.go index 2e4f7ab3b..1f170ae0d 100644 --- a/api/agent/drivers/docker/registry.go +++ b/api/agent/drivers/docker/registry.go @@ -126,7 +126,7 @@ func (d *retryWrap) RoundTrip(req *http.Request) (*http.Response, error) { // and then retry it (it will get authed and the challenge then accepted). // why the docker distribution transport doesn't do this for you is // a real testament to what sadists those docker people are. - if resp.StatusCode == http.StatusUnauthorized { + if resp != nil && resp.StatusCode == http.StatusUnauthorized { pingPath := req.URL.Path if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 { pingPath = pingPath[:v2Root+4] diff --git a/api/agent/mem.go b/api/agent/resource.go similarity index 53% rename from api/agent/mem.go rename to api/agent/resource.go index d185dc427..4f14c3bf8 100644 --- a/api/agent/mem.go +++ b/api/agent/resource.go @@ -2,17 +2,124 @@ package agent import ( "bufio" + "context" "errors" "fmt" + "io" "io/ioutil" "os" "runtime" "strconv" "strings" + "sync" "github.com/sirupsen/logrus" ) +// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. +// TODO: improve memory implementation +// TODO: add cpu, disk, network IO for future +type ResourceTracker interface { + WaitAsyncResource() chan struct{} + GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken +} + +type resourceTracker struct { + // cond protects access to ramUsed + cond *sync.Cond + // ramTotal is the total accessible memory by this process + ramTotal uint64 + // ramUsed is ram reserved for running containers. idle hot containers + // count against ramUsed. + ramUsed uint64 +} + +func NewResourceTracker() ResourceTracker { + + obj := &resourceTracker{ + cond: sync.NewCond(new(sync.Mutex)), + ramTotal: getAvailableMemory(), + } + + return obj +} + +type ResourceToken interface { + // Close must be called by any thread that receives a token. + io.Closer +} + +type resourceToken struct { + decrement func() +} + +func (t *resourceToken) Close() error { + t.decrement() + return nil +} + +// the received token should be passed directly to launch (unconditionally), launch +// will close this token (i.e. the receiver should not call Close) +func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken { + + memory := call.Memory * 1024 * 1024 + + c := a.cond + ch := make(chan ResourceToken) + + go func() { + c.L.Lock() + for (a.ramUsed + memory) > a.ramTotal { + select { + case <-ctx.Done(): + c.L.Unlock() + return + default: + } + + c.Wait() + } + + a.ramUsed += memory + c.L.Unlock() + + t := &resourceToken{decrement: func() { + c.L.Lock() + a.ramUsed -= memory + c.L.Unlock() + c.Broadcast() + }} + + select { + case ch <- t: + case <-ctx.Done(): + // if we can't send b/c nobody is waiting anymore, need to decrement here + t.Close() + } + }() + + return ch +} + +// GetAsyncResource will send a signal on the returned channel when at least half of +// the available RAM on this machine is free. +func (a *resourceTracker) WaitAsyncResource() chan struct{} { + ch := make(chan struct{}) + + c := a.cond + go func() { + c.L.Lock() + for (a.ramTotal/2)-a.ramUsed < 0 { + c.Wait() + } + c.L.Unlock() + ch <- struct{}{} + // TODO this could leak forever (only in shutdown, blech) + }() + + return ch +} + func getAvailableMemory() uint64 { const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB