From 17d4271ffb10979606dde99fd72ab3f536225ff6 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 17 Nov 2017 15:25:53 -0800 Subject: [PATCH] fn: move memory/token code into resource (#512) *) bugfix: fix nil ptr access in docker registry RoundTrip *) move async and ram token related code into resource.go --- api/agent/agent.go | 97 ++---------------------- api/agent/async.go | 2 +- api/agent/drivers/docker/registry.go | 2 +- api/agent/{mem.go => resource.go} | 107 +++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 91 deletions(-) rename api/agent/{mem.go => resource.go} (53%) diff --git a/api/agent/agent.go b/api/agent/agent.go index 3508bd31d..4d702a069 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -126,14 +126,8 @@ type agent struct { hMu sync.RWMutex // protects hot hot map[string]chan slot - // TODO we could make a separate struct for the memory stuff - // cond protects access to ramUsed - cond *sync.Cond - // ramTotal is the total accessible memory by this process - ramTotal uint64 - // ramUsed is ram reserved for running containers. idle hot containers - // count against ramUsed. - ramUsed uint64 + // track usage + resources ResourceTracker // used to track running calls / safe shutdown wg sync.WaitGroup // TODO rename @@ -154,8 +148,7 @@ func New(ds models.Datastore, mq models.MessageQueue) Agent { mq: mq, driver: driver, hot: make(map[string]chan slot), - cond: sync.NewCond(new(sync.Mutex)), - ramTotal: getAvailableMemory(), + resources: NewResourceTracker(), shutdown: make(chan struct{}), promHandler: promhttp.Handler(), } @@ -279,7 +272,7 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) ( select { case s := <-slots: return s, nil - case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle + case tok := <-a.resources.GetResourceToken(ctx, call): errCh = a.launch(ctx, slots, call, tok) // TODO mangle case <-ctx.Done(): return nil, ctx.Err() @@ -363,80 +356,6 @@ func hotKey(call *call) string { return string(hash.Sum(buf[:0])) } -// TODO we could rename this more appropriately (ideas?) -type Token interface { - // Close must be called by any thread that receives a token. - io.Closer -} - -type token struct { - decrement func() -} - -func (t *token) Close() error { - t.decrement() - return nil -} - -// the received token should be passed directly to launch (unconditionally), launch -// will close this token (i.e. the receiver should not call Close) -func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token { - c := a.cond - ch := make(chan Token) - - go func() { - c.L.Lock() - for (a.ramUsed + memory) > a.ramTotal { - select { - case <-ctx.Done(): - c.L.Unlock() - return - default: - } - - c.Wait() - } - - a.ramUsed += memory - c.L.Unlock() - - t := &token{decrement: func() { - c.L.Lock() - a.ramUsed -= memory - c.L.Unlock() - c.Broadcast() - }} - - select { - case ch <- t: - case <-ctx.Done(): - // if we can't send b/c nobody is waiting anymore, need to decrement here - t.Close() - } - }() - - return ch -} - -// asyncRAM will send a signal on the returned channel when at least half of -// the available RAM on this machine is free. -func (a *agent) asyncRAM() chan struct{} { - ch := make(chan struct{}) - - c := a.cond - go func() { - c.L.Lock() - for (a.ramTotal/2)-a.ramUsed < 0 { - c.Wait() - } - c.L.Unlock() - ch <- struct{}{} - // TODO this could leak forever (only in shutdown, blech) - }() - - return ch -} - type slot interface { exec(ctx context.Context, call *call) error io.Closer @@ -445,7 +364,7 @@ type slot interface { // implements Slot type coldSlot struct { cookie drivers.Cookie - tok Token + tok ResourceToken } func (s *coldSlot) exec(ctx context.Context, call *call) error { @@ -523,7 +442,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { // this will work for hot & cold (woo) // if launch encounters a non-nil error it will send it on the returned channel, // this can be useful if an image doesn't exist, e.g. -func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error { +func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) <-chan error { ch := make(chan error, 1) if !protocol.IsStreamable(protocol.Protocol(call.Format)) { @@ -546,7 +465,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T return ch } -func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error { +func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) error { container := &container{ id: id.New().String(), // XXX we could just let docker generate ids... image: call.Image, @@ -575,7 +494,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok return nil } -func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error { +func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok ResourceToken) error { // We must be careful to only use ctxArg for logs/spans // create a span from ctxArg but ignore the new Context diff --git a/api/agent/async.go b/api/agent/async.go index 14f1b5e52..7ac324da7 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -15,7 +15,7 @@ func (a *agent) asyncDequeue() { select { case <-a.shutdown: return - case <-a.asyncRAM(): + case <-a.resources.WaitAsyncResource(): // TODO we _could_ return a token here to reserve the ram so that there's // not a race between here and Submit but we're single threaded // dequeueing and retries handled gracefully inside of Submit if we run diff --git a/api/agent/drivers/docker/registry.go b/api/agent/drivers/docker/registry.go index 2e4f7ab3b..1f170ae0d 100644 --- a/api/agent/drivers/docker/registry.go +++ b/api/agent/drivers/docker/registry.go @@ -126,7 +126,7 @@ func (d *retryWrap) RoundTrip(req *http.Request) (*http.Response, error) { // and then retry it (it will get authed and the challenge then accepted). // why the docker distribution transport doesn't do this for you is // a real testament to what sadists those docker people are. - if resp.StatusCode == http.StatusUnauthorized { + if resp != nil && resp.StatusCode == http.StatusUnauthorized { pingPath := req.URL.Path if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 { pingPath = pingPath[:v2Root+4] diff --git a/api/agent/mem.go b/api/agent/resource.go similarity index 53% rename from api/agent/mem.go rename to api/agent/resource.go index d185dc427..4f14c3bf8 100644 --- a/api/agent/mem.go +++ b/api/agent/resource.go @@ -2,17 +2,124 @@ package agent import ( "bufio" + "context" "errors" "fmt" + "io" "io/ioutil" "os" "runtime" "strconv" "strings" + "sync" "github.com/sirupsen/logrus" ) +// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. +// TODO: improve memory implementation +// TODO: add cpu, disk, network IO for future +type ResourceTracker interface { + WaitAsyncResource() chan struct{} + GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken +} + +type resourceTracker struct { + // cond protects access to ramUsed + cond *sync.Cond + // ramTotal is the total accessible memory by this process + ramTotal uint64 + // ramUsed is ram reserved for running containers. idle hot containers + // count against ramUsed. + ramUsed uint64 +} + +func NewResourceTracker() ResourceTracker { + + obj := &resourceTracker{ + cond: sync.NewCond(new(sync.Mutex)), + ramTotal: getAvailableMemory(), + } + + return obj +} + +type ResourceToken interface { + // Close must be called by any thread that receives a token. + io.Closer +} + +type resourceToken struct { + decrement func() +} + +func (t *resourceToken) Close() error { + t.decrement() + return nil +} + +// the received token should be passed directly to launch (unconditionally), launch +// will close this token (i.e. the receiver should not call Close) +func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken { + + memory := call.Memory * 1024 * 1024 + + c := a.cond + ch := make(chan ResourceToken) + + go func() { + c.L.Lock() + for (a.ramUsed + memory) > a.ramTotal { + select { + case <-ctx.Done(): + c.L.Unlock() + return + default: + } + + c.Wait() + } + + a.ramUsed += memory + c.L.Unlock() + + t := &resourceToken{decrement: func() { + c.L.Lock() + a.ramUsed -= memory + c.L.Unlock() + c.Broadcast() + }} + + select { + case ch <- t: + case <-ctx.Done(): + // if we can't send b/c nobody is waiting anymore, need to decrement here + t.Close() + } + }() + + return ch +} + +// GetAsyncResource will send a signal on the returned channel when at least half of +// the available RAM on this machine is free. +func (a *resourceTracker) WaitAsyncResource() chan struct{} { + ch := make(chan struct{}) + + c := a.cond + go func() { + c.L.Lock() + for (a.ramTotal/2)-a.ramUsed < 0 { + c.Wait() + } + c.L.Unlock() + ch <- struct{}{} + // TODO this could leak forever (only in shutdown, blech) + }() + + return ch +} + func getAvailableMemory() uint64 { const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB