pipe swapparoo each slot (#721)

* pipe swapparoo each slot

previously, we made a pair of pipes for stdin and stdout for each container,
and then handed them out to each call (slot) to use. this meant that multiple
calls could have a handle on the same stdin pipe and stdout pipe to read/write
to/from from fn's perspective and could mix input/output and get garbage. this
also meant that each was blocked on the previous' reads.

now we make a new pipe every time we get a slot, and swap it out with the
previous ones. calls are no longer blocked from fn's perspective, and we don't
have to worry about timing out dispatch for any hot format. there is still the
issue that if a function does not finish reading the input from the previous
task, from its perspective, and reads the next call's it can error out the
second call. with fn deadline we provide the necessary tools to skirt this,
but without some additional coordination am not sure this is a closable hole
with our current protocols since terminating a previous calls input requires
some protocol specific bytes to go in (json in particular is tricky). anyway,
from fn's side fixing pipes was definitely a hole, but this client hole is
still hanging out. there was an attempt to send an io.EOF but the issue is
that will shut down docker's read on the stdin pipe (and the container). poop.

this adds a test for this behavior, and makes sure 2 containers don't get
launched.

this also closes the response writer header race a little, but not entirely, I
think there's still a chance that we read a full response from a function and
get a timeout while we're changing the headers. I guess we need a thread safe
header bucket, otherwise we have to rely on timings (racy). thinking on it.

* fix stats mu race
This commit is contained in:
Reed Allman
2018-01-31 17:25:24 -08:00
committed by GitHub
parent 4e989b0789
commit 3b261fc144
5 changed files with 469 additions and 60 deletions

View File

@@ -23,8 +23,6 @@ import (
// TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution)
// TODO async calls need to add route.Headers as well
// TODO need to shut off reads/writes in dispatch to the pipes when call times out so that
// 2 calls don't have the same container's pipes...
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
// TODO storing logs / call can push call over the timeout
@@ -36,7 +34,6 @@ import (
// dies). need coordination w/ db.
// TODO if a cold call times out but container is created but hasn't replied, could
// end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?)
// TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness.
// TODO if async would store requests (or interchange format) it would be slick, but
// if we're going to store full calls in db maybe we should only queue pointers to ids?
// TODO examine cases where hot can't start a container and the user would never see an error
@@ -517,9 +514,8 @@ func (s *coldSlot) Close() error {
// implements Slot
type hotSlot struct {
done chan<- struct{} // signal we are done with slot
proto protocol.ContainerIO
errC <-chan error // container error
container *container // TODO mask this
errC <-chan error // container error
container *container // TODO mask this
err error
}
@@ -541,16 +537,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
// link the container id and id in the logs [for us!]
common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call")
// swap in the new stderr logger & stat accumulator
oldStderr := s.container.swap(call.stderr, &call.Stats)
defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races
// swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out]
stdinRead, stdinWrite := io.Pipe()
stdoutRead, stdoutWrite := io.Pipe()
defer stdinRead.Close()
defer stdoutWrite.Close()
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
errApp := make(chan error, 1)
go func() {
// TODO make sure stdin / stdout not blocked if container dies or we leak goroutine
// we have to make sure this gets shut down or 2 threads will be reading/writing in/out
ci := protocol.NewCallInfo(call.Call, call.req)
errApp <- s.proto.Dispatch(ctx, ci, call.w)
errApp <- proto.Dispatch(ctx, ci, call.w)
}()
select {
@@ -561,8 +562,6 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
case <-ctx.Done(): // call timeout
return ctx.Err()
}
// TODO we REALLY need to wait for dispatch to return before conceding our slot
}
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
@@ -618,31 +617,29 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
defer span.Finish()
defer tok.Close() // IMPORTANT: this MUST get called
// TODO we have to make sure we flush these pipes or we will deadlock
stdinRead, stdinWrite := io.Pipe()
stdoutRead, stdoutWrite := io.Pipe()
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
state.UpdateState(ctx, ContainerStateStart, call.slots)
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
cid := id.New().String()
// set up the stderr for the first one to capture any logs before the slot is
// executed and between hot functions TODO this is still a little tobias funke
// set up the stderr to capture any logs before the slot is executed and
// between hot functions
stderr := newLineWriter(&logWriter{
logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}),
})
// between calls we need a reader that doesn't do anything
stdin := &ghostReader{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitReader)}
defer stdin.Close()
container := &container{
id: cid, // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
memory: call.Memory,
cpus: uint64(call.CPUs),
stdin: stdinRead,
stdout: stdoutWrite,
stdin: stdin,
stdout: &ghostWriter{inner: stderr},
stderr: &ghostWriter{inner: stderr},
}
@@ -684,7 +681,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
done := make(chan struct{})
state.UpdateState(ctx, ContainerStateIdle, call.slots)
s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil})
s := call.slots.queueSlot(&hotSlot{done, errC, container, nil})
select {
case <-s.trigger:
@@ -720,7 +717,8 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
if err != nil {
errC <- err
} else if res.Error() != nil {
errC <- res.Error()
err = res.Error()
errC <- err
}
logger.WithError(err).Info("hot function terminated")
@@ -742,24 +740,34 @@ type container struct {
stdout io.Writer
stderr io.Writer
// lock protects the swap and any fields that need to be swapped
sync.Mutex
stats *drivers.Stats
// lock protects the stats swapping
statsMu sync.Mutex
stats *drivers.Stats
}
func (c *container) swap(stderr io.Writer, cs *drivers.Stats) (old io.Writer) {
c.Lock()
defer c.Unlock()
func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() {
ostdin := c.stdin.(*ghostReader).inner
ostdout := c.stdout.(*ghostWriter).inner
ostderr := c.stderr.(*ghostWriter).inner
// TODO meh, maybe shouldn't bury this
old = c.stderr
gw, ok := c.stderr.(*ghostWriter)
if ok {
old = gw.swap(stderr)
}
// if tests don't catch this, then fuck me
c.stdin.(*ghostReader).swap(stdin)
c.stdout.(*ghostWriter).swap(stdout)
c.stderr.(*ghostWriter).swap(stderr)
c.statsMu.Lock()
ocs := c.stats
c.stats = cs
return old
c.statsMu.Unlock()
return func() {
c.stdin.(*ghostReader).swap(ostdin)
c.stdout.(*ghostWriter).swap(ostdout)
c.stderr.(*ghostWriter).swap(ostderr)
c.statsMu.Lock()
c.stats = ocs
c.statsMu.Unlock()
}
}
func (c *container) Id() string { return c.id }
@@ -788,11 +796,11 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
common.PublishHistograms(ctx, metrics)
c.Lock()
defer c.Unlock()
c.statsMu.Lock()
if c.stats != nil {
*(c.stats) = append(*(c.stats), stat)
}
c.statsMu.Unlock()
}
//func (c *container) DockerAuth() (docker.AuthConfiguration, error) {
@@ -800,8 +808,8 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
// TODO per call could implement this stored somewhere (vs. configured on host)
//}
// ghostWriter is a writer who will pass writes to an inner writer
// (that may be changed at will).
// ghostWriter is an io.Writer who will pass writes to an inner writer
// that may be changed at will. it is thread safe to swap or write.
type ghostWriter struct {
sync.Mutex
inner io.Writer
@@ -820,5 +828,78 @@ func (g *ghostWriter) Write(b []byte) (int, error) {
g.Lock()
w := g.inner
g.Unlock()
return w.Write(b)
n, err := w.Write(b)
if err == io.ErrClosedPipe {
// NOTE: we need to mask this error so that docker does not get an error
// from writing the output stream and shut down the container.
err = nil
}
return n, err
}
// ghostReader is an io.ReadCloser who will pass reads to an inner reader
// that may be changed at will. it is thread safe to swap or read.
// Read will wait for a 'real' reader if inner is of type *waitReader.
// Close must be called to prevent any pending readers from leaking.
type ghostReader struct {
cond *sync.Cond
inner io.Reader
closed bool
}
func (g *ghostReader) swap(r io.Reader) {
g.cond.L.Lock()
g.inner = r
g.cond.L.Unlock()
g.cond.Broadcast()
}
func (g *ghostReader) Close() {
g.cond.L.Lock()
g.closed = true
g.cond.L.Unlock()
g.cond.Broadcast()
}
func (g *ghostReader) awaitRealReader() (io.Reader, bool) {
// wait for a real reader
g.cond.L.Lock()
for {
if g.closed { // check this first
g.cond.L.Unlock()
return nil, false
}
if _, ok := g.inner.(*waitReader); ok {
g.cond.Wait()
} else {
break
}
}
// we don't need to serialize reads but swapping g.inner could be a race if unprotected
r := g.inner
g.cond.L.Unlock()
return r, true
}
func (g *ghostReader) Read(b []byte) (int, error) {
r, ok := g.awaitRealReader()
if !ok {
return 0, io.EOF
}
n, err := r.Read(b)
if err == io.ErrClosedPipe {
// NOTE: we need to mask this error so that docker does not get an error
// from reading the input stream and shut down the container.
err = nil
}
return n, err
}
// waitReader returns io.EOF if anyone calls Read. don't call Read, this is a sentinel type
type waitReader struct{}
func (e *waitReader) Read([]byte) (int, error) {
panic("read on waitReader should not happen")
}