fn: move memory/token code into resource (#512)

*) bugfix: fix nil ptr access in docker registry RoundTrip
*) move async and ram token related code into resource.go
This commit is contained in:
Tolga Ceylan
2017-11-17 15:25:53 -08:00
committed by Travis Reeder
parent 1acb1e99b4
commit 17d4271ffb
4 changed files with 117 additions and 91 deletions

View File

@@ -126,14 +126,8 @@ type agent struct {
hMu sync.RWMutex // protects hot hMu sync.RWMutex // protects hot
hot map[string]chan slot hot map[string]chan slot
// TODO we could make a separate struct for the memory stuff // track usage
// cond protects access to ramUsed resources ResourceTracker
cond *sync.Cond
// ramTotal is the total accessible memory by this process
ramTotal uint64
// ramUsed is ram reserved for running containers. idle hot containers
// count against ramUsed.
ramUsed uint64
// used to track running calls / safe shutdown // used to track running calls / safe shutdown
wg sync.WaitGroup // TODO rename wg sync.WaitGroup // TODO rename
@@ -154,8 +148,7 @@ func New(ds models.Datastore, mq models.MessageQueue) Agent {
mq: mq, mq: mq,
driver: driver, driver: driver,
hot: make(map[string]chan slot), hot: make(map[string]chan slot),
cond: sync.NewCond(new(sync.Mutex)), resources: NewResourceTracker(),
ramTotal: getAvailableMemory(),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
promHandler: promhttp.Handler(), promHandler: promhttp.Handler(),
} }
@@ -279,7 +272,7 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
select { select {
case s := <-slots: case s := <-slots:
return s, nil return s, nil
case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle case tok := <-a.resources.GetResourceToken(ctx, call):
errCh = a.launch(ctx, slots, call, tok) // TODO mangle errCh = a.launch(ctx, slots, call, tok) // TODO mangle
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
@@ -363,80 +356,6 @@ func hotKey(call *call) string {
return string(hash.Sum(buf[:0])) return string(hash.Sum(buf[:0]))
} }
// TODO we could rename this more appropriately (ideas?)
type Token interface {
// Close must be called by any thread that receives a token.
io.Closer
}
type token struct {
decrement func()
}
func (t *token) Close() error {
t.decrement()
return nil
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token {
c := a.cond
ch := make(chan Token)
go func() {
c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal {
select {
case <-ctx.Done():
c.L.Unlock()
return
default:
}
c.Wait()
}
a.ramUsed += memory
c.L.Unlock()
t := &token{decrement: func() {
c.L.Lock()
a.ramUsed -= memory
c.L.Unlock()
c.Broadcast()
}}
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// asyncRAM will send a signal on the returned channel when at least half of
// the available RAM on this machine is free.
func (a *agent) asyncRAM() chan struct{} {
ch := make(chan struct{})
c := a.cond
go func() {
c.L.Lock()
for (a.ramTotal/2)-a.ramUsed < 0 {
c.Wait()
}
c.L.Unlock()
ch <- struct{}{}
// TODO this could leak forever (only in shutdown, blech)
}()
return ch
}
type slot interface { type slot interface {
exec(ctx context.Context, call *call) error exec(ctx context.Context, call *call) error
io.Closer io.Closer
@@ -445,7 +364,7 @@ type slot interface {
// implements Slot // implements Slot
type coldSlot struct { type coldSlot struct {
cookie drivers.Cookie cookie drivers.Cookie
tok Token tok ResourceToken
} }
func (s *coldSlot) exec(ctx context.Context, call *call) error { func (s *coldSlot) exec(ctx context.Context, call *call) error {
@@ -523,7 +442,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
// this will work for hot & cold (woo) // this will work for hot & cold (woo)
// if launch encounters a non-nil error it will send it on the returned channel, // if launch encounters a non-nil error it will send it on the returned channel,
// this can be useful if an image doesn't exist, e.g. // this can be useful if an image doesn't exist, e.g.
func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error { func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) <-chan error {
ch := make(chan error, 1) ch := make(chan error, 1)
if !protocol.IsStreamable(protocol.Protocol(call.Format)) { if !protocol.IsStreamable(protocol.Protocol(call.Format)) {
@@ -546,7 +465,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T
return ch return ch
} }
func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error { func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok ResourceToken) error {
container := &container{ container := &container{
id: id.New().String(), // XXX we could just let docker generate ids... id: id.New().String(), // XXX we could just let docker generate ids...
image: call.Image, image: call.Image,
@@ -575,7 +494,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
return nil return nil
} }
func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error { func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok ResourceToken) error {
// We must be careful to only use ctxArg for logs/spans // We must be careful to only use ctxArg for logs/spans
// create a span from ctxArg but ignore the new Context // create a span from ctxArg but ignore the new Context

View File

@@ -15,7 +15,7 @@ func (a *agent) asyncDequeue() {
select { select {
case <-a.shutdown: case <-a.shutdown:
return return
case <-a.asyncRAM(): case <-a.resources.WaitAsyncResource():
// TODO we _could_ return a token here to reserve the ram so that there's // TODO we _could_ return a token here to reserve the ram so that there's
// not a race between here and Submit but we're single threaded // not a race between here and Submit but we're single threaded
// dequeueing and retries handled gracefully inside of Submit if we run // dequeueing and retries handled gracefully inside of Submit if we run

View File

@@ -126,7 +126,7 @@ func (d *retryWrap) RoundTrip(req *http.Request) (*http.Response, error) {
// and then retry it (it will get authed and the challenge then accepted). // and then retry it (it will get authed and the challenge then accepted).
// why the docker distribution transport doesn't do this for you is // why the docker distribution transport doesn't do this for you is
// a real testament to what sadists those docker people are. // a real testament to what sadists those docker people are.
if resp.StatusCode == http.StatusUnauthorized { if resp != nil && resp.StatusCode == http.StatusUnauthorized {
pingPath := req.URL.Path pingPath := req.URL.Path
if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 { if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 {
pingPath = pingPath[:v2Root+4] pingPath = pingPath[:v2Root+4]

View File

@@ -2,17 +2,124 @@ package agent
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: improve memory implementation
// TODO: add cpu, disk, network IO for future
type ResourceTracker interface {
WaitAsyncResource() chan struct{}
GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken
}
type resourceTracker struct {
// cond protects access to ramUsed
cond *sync.Cond
// ramTotal is the total accessible memory by this process
ramTotal uint64
// ramUsed is ram reserved for running containers. idle hot containers
// count against ramUsed.
ramUsed uint64
}
func NewResourceTracker() ResourceTracker {
obj := &resourceTracker{
cond: sync.NewCond(new(sync.Mutex)),
ramTotal: getAvailableMemory(),
}
return obj
}
type ResourceToken interface {
// Close must be called by any thread that receives a token.
io.Closer
}
type resourceToken struct {
decrement func()
}
func (t *resourceToken) Close() error {
t.decrement()
return nil
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken {
memory := call.Memory * 1024 * 1024
c := a.cond
ch := make(chan ResourceToken)
go func() {
c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal {
select {
case <-ctx.Done():
c.L.Unlock()
return
default:
}
c.Wait()
}
a.ramUsed += memory
c.L.Unlock()
t := &resourceToken{decrement: func() {
c.L.Lock()
a.ramUsed -= memory
c.L.Unlock()
c.Broadcast()
}}
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// GetAsyncResource will send a signal on the returned channel when at least half of
// the available RAM on this machine is free.
func (a *resourceTracker) WaitAsyncResource() chan struct{} {
ch := make(chan struct{})
c := a.cond
go func() {
c.L.Lock()
for (a.ramTotal/2)-a.ramUsed < 0 {
c.Wait()
}
c.L.Unlock()
ch <- struct{}{}
// TODO this could leak forever (only in shutdown, blech)
}()
return ch
}
func getAvailableMemory() uint64 { func getAvailableMemory() uint64 {
const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB