fn: I/O related improvements (#809)

*) I/O protocol parse issues should shutdown the container as the container
goes to inconsistent state between calls. (eg. next call may receive previous
calls left overs.)
*) Move ghost read/write code into io_utils in common.
*) Clean unused error from docker Wait()
*) We can catch one case in JSON, if there's remaining unparsed data in
decoder buffer, we can shut the container
*) stdout/stderr when container is not handling a request are now blocked if freezer is also enabled.
*) if a fatal err is set for slot, we do not requeue it and proceed to shutdown
*) added a test function for a few cases with freezer strict behavior
This commit is contained in:
Tolga Ceylan
2018-03-07 15:09:24 -08:00
committed by GitHub
parent f044e509fc
commit 7677aad450
11 changed files with 568 additions and 202 deletions

View File

@@ -443,13 +443,13 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
// implements Slot
type coldSlot struct {
cookie drivers.Cookie
tok ResourceToken
err error
cookie drivers.Cookie
tok ResourceToken
fatalErr error
}
func (s *coldSlot) Error() error {
return s.err
return s.fatalErr
}
func (s *coldSlot) exec(ctx context.Context, call *call) error {
@@ -464,10 +464,8 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error {
return err
}
res, err := waiter.Wait(ctx)
if err != nil {
return err
} else if res.Error() != nil {
res := waiter.Wait(ctx)
if res.Error() != nil {
// check for call error (oom/exit) and beam it up
return res.Error()
}
@@ -496,7 +494,7 @@ type hotSlot struct {
errC <-chan error // container error
container *container // TODO mask this
maxRespSize uint64 // TODO boo.
err error
fatalErr error
}
func (s *hotSlot) Close(ctx context.Context) error {
@@ -505,7 +503,7 @@ func (s *hotSlot) Close(ctx context.Context) error {
}
func (s *hotSlot) Error() error {
return s.err
return s.fatalErr
}
func (s *hotSlot) exec(ctx context.Context, call *call) error {
@@ -542,6 +540,15 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
case err := <-s.errC: // error from container
return err
case err := <-errApp: // from dispatch
if s.fatalErr == nil && err != nil {
if models.IsAPIError(err) {
s.fatalErr = err
} else if err == protocol.ErrExcessData {
s.fatalErr = err
// suppress excess data error, but do shutdown the container
return nil
}
}
return err
case <-ctx.Done(): // call timeout
return ctx.Err()
@@ -604,42 +611,25 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
state.UpdateState(ctx, ContainerStateStart, call.slots)
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
cid := id.New().String()
// set up the stderr to capture any logs before the slot is executed and
// between hot functions
stderr := newLineWriter(&logWriter{
logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}),
})
// between calls we need a reader that doesn't do anything
stdin := &ghostReader{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitReader)}
defer stdin.Close()
container := &container{
id: cid, // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
memory: call.Memory,
cpus: uint64(call.CPUs),
stdin: stdin,
stdout: &ghostWriter{inner: stderr},
stderr: &ghostWriter{inner: stderr},
}
// if freezer is enabled, be consistent with freezer behavior and
// block stdout and stderr between calls.
isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdleMsecs
container, closer := NewHotContainer(call, isBlockIdleIO)
defer closer()
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout})
ctx = common.WithLogger(ctx, logger)
cookie, err := a.driver.Prepare(ctx, container)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err})
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out
waiter, err := cookie.Run(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err})
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
@@ -670,18 +660,20 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
// wait for this call to finish
// NOTE do NOT select with shutdown / other channels. slot handles this.
<-slot.done
if slot.fatalErr != nil {
logger.WithError(slot.fatalErr).Info("hot function terminating")
return
}
}
}()
res, err := waiter.Wait(ctx)
if err != nil {
errC <- err
} else if res.Error() != nil {
err = res.Error()
errC <- err
res := waiter.Wait(ctx)
if res.Error() != nil {
errC <- res.Error() // TODO: race condition, no guaranteed delivery fix this...
}
logger.WithError(err).Info("hot function terminated")
logger.WithError(res.Error()).Info("hot function terminated")
}
// runHotReq enqueues a free slot to slot queue manager and watches various timers and the consumer until
@@ -790,11 +782,45 @@ type container struct {
stats *drivers.Stats
}
func NewHotContainer(call *call, isBlockIdleIO bool) (*container, func()) {
id := id.New().String()
stdin := common.NewGhostReader()
stderr := common.NewGhostWriter()
stdout := common.NewGhostWriter()
// when not processing a request, do we block IO?
if !isBlockIdleIO {
stderr.Swap(newLineWriter(&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}),
}))
stdout.Swap(newLineWriter(&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}),
}))
}
return &container{
id: id, // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
memory: call.Memory,
cpus: uint64(call.CPUs),
stdin: stdin,
stdout: stdout,
stderr: stderr,
}, func() {
stdin.Close()
stderr.Close()
stdout.Close()
}
}
func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() {
// if tests don't catch this, then fuck me
ostdin := c.stdin.(*ghostReader).swap(stdin)
ostdout := c.stdout.(*ghostWriter).swap(stdout)
ostderr := c.stderr.(*ghostWriter).swap(stderr)
ostdin := c.stdin.(common.GhostReader).Swap(stdin)
ostdout := c.stdout.(common.GhostWriter).Swap(stdout)
ostderr := c.stderr.(common.GhostWriter).Swap(stderr)
c.statsMu.Lock()
ocs := c.stats
@@ -802,9 +828,9 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.
c.statsMu.Unlock()
return func() {
c.stdin.(*ghostReader).swap(ostdin)
c.stdout.(*ghostWriter).swap(ostdout)
c.stderr.(*ghostWriter).swap(ostderr)
c.stdin.(common.GhostReader).Swap(ostdin)
c.stdout.(common.GhostWriter).Swap(ostdout)
c.stderr.(common.GhostWriter).Swap(ostderr)
c.statsMu.Lock()
c.stats = ocs
c.statsMu.Unlock()
@@ -880,101 +906,3 @@ func init() {
// Implementing the docker.AuthConfiguration interface.
// TODO per call could implement this stored somewhere (vs. configured on host)
//}
// ghostWriter is an io.Writer who will pass writes to an inner writer
// that may be changed at will. it is thread safe to swap or write.
type ghostWriter struct {
sync.Mutex
inner io.Writer
}
func (g *ghostWriter) swap(w io.Writer) (old io.Writer) {
g.Lock()
old = g.inner
g.inner = w
g.Unlock()
return old
}
func (g *ghostWriter) Write(b []byte) (int, error) {
// we don't need to serialize writes but swapping g.inner could be a race if unprotected
g.Lock()
w := g.inner
g.Unlock()
n, err := w.Write(b)
if err == io.ErrClosedPipe {
// NOTE: we need to mask this error so that docker does not get an error
// from writing the output stream and shut down the container.
err = nil
}
return n, err
}
// ghostReader is an io.ReadCloser who will pass reads to an inner reader
// that may be changed at will. it is thread safe to swap or read.
// Read will wait for a 'real' reader if inner is of type *waitReader.
// Close must be called to prevent any pending readers from leaking.
type ghostReader struct {
cond *sync.Cond
inner io.Reader
closed bool
}
func (g *ghostReader) swap(r io.Reader) (old io.Reader) {
g.cond.L.Lock()
old = g.inner
g.inner = r
g.cond.L.Unlock()
g.cond.Broadcast()
return old
}
func (g *ghostReader) Close() {
g.cond.L.Lock()
g.closed = true
g.cond.L.Unlock()
g.cond.Broadcast()
}
func (g *ghostReader) awaitRealReader() (io.Reader, bool) {
// wait for a real reader
g.cond.L.Lock()
for {
if g.closed { // check this first
g.cond.L.Unlock()
return nil, false
}
if _, ok := g.inner.(*waitReader); ok {
g.cond.Wait()
} else {
break
}
}
// we don't need to serialize reads but swapping g.inner could be a race if unprotected
r := g.inner
g.cond.L.Unlock()
return r, true
}
func (g *ghostReader) Read(b []byte) (int, error) {
r, ok := g.awaitRealReader()
if !ok {
return 0, io.EOF
}
n, err := r.Read(b)
if err == io.ErrClosedPipe {
// NOTE: we need to mask this error so that docker does not get an error
// from reading the input stream and shut down the container.
err = nil
}
return n, err
}
// waitReader returns io.EOF if anyone calls Read. don't call Read, this is a sentinel type
type waitReader struct{}
func (e *waitReader) Read([]byte) (int, error) {
panic("read on waitReader should not happen")
}