mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: check context timeout when waiting for non-blocking attach (#1201)
* fn: check context timeout when waiting for non-blocking attach With this change, we no longer allow docker client AttachToContainerNonBlocking to block on Success channel more than our context deadline/timeout. * fn: move nbio chan handling in attach to docker from docker-client
This commit is contained in:
@@ -368,16 +368,44 @@ func dockerMsg(derr *docker.Error) string {
|
||||
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {
|
||||
|
||||
mwOut, mwErr := task.Logger()
|
||||
waiter, err := drv.docker.AttachToContainer(ctx, docker.AttachToContainerOptions{
|
||||
successChan := make(chan struct{})
|
||||
|
||||
waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{
|
||||
Container: container,
|
||||
InputStream: task.Input(),
|
||||
OutputStream: mwOut,
|
||||
ErrorStream: mwErr,
|
||||
Success: successChan,
|
||||
Stream: true,
|
||||
Stdout: true,
|
||||
Stderr: true,
|
||||
Stdin: true,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
mon := make(chan struct{})
|
||||
|
||||
// We block here, since we would like to have stdin/stdout/stderr
|
||||
// streams already attached before starting task and I/O.
|
||||
// if AttachToContainerNonBlocking() returns no error, then we'll
|
||||
// sync up with NB Attacher above before starting the task. However,
|
||||
// we might leak our go-routine if AttachToContainerNonBlocking()
|
||||
// Dial/HTTP does not honor the Success channel contract.
|
||||
// Here we assume that if our context times out, then underlying
|
||||
// go-routines in AttachToContainerNonBlocking() will unlock
|
||||
// (or eventually timeout) once we tear down the container.
|
||||
go func() {
|
||||
<-successChan
|
||||
successChan <- struct{}{}
|
||||
close(mon)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-mon:
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil && ctx.Err() == nil {
|
||||
// ignore if ctx has errored, rewrite status lay below
|
||||
return nil, err
|
||||
|
||||
@@ -31,7 +31,7 @@ const (
|
||||
type dockerClient interface {
|
||||
// Each of these are github.com/fsouza/go-dockerclient methods
|
||||
|
||||
AttachToContainer(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error)
|
||||
AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error)
|
||||
WaitContainerWithContext(id string, ctx context.Context) (int, error)
|
||||
StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error
|
||||
KillContainer(opts docker.KillContainerOptions) error
|
||||
@@ -252,29 +252,11 @@ func (d *dockerWrap) Info(ctx context.Context) (info *docker.DockerInfo, err err
|
||||
return d.docker.Info()
|
||||
}
|
||||
|
||||
func (d *dockerWrap) AttachToContainer(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) {
|
||||
func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) {
|
||||
ctx, closer := makeTracker(ctx, "docker_attach_container")
|
||||
defer closer()
|
||||
|
||||
if opts.Success != nil {
|
||||
logrus.Fatal("BUG: Invalid AttachToContainerOptions, Success channel must not be set")
|
||||
}
|
||||
opts.Success = make(chan struct{})
|
||||
|
||||
// We use non-blocking here since we need the CloseWaiter
|
||||
waiter, err := d.docker.AttachToContainerNonBlocking(opts)
|
||||
|
||||
// Sync up with NB Attacher above before starting the task
|
||||
if err == nil {
|
||||
// WARNING: the I/O below requires docker hijack function to honor
|
||||
// the contract below, specifically if an error is not returned
|
||||
// from AttachToContainerNonBlocking, then max blocking time
|
||||
// here should be what drv.docker dialer/client config was set to.
|
||||
<-opts.Success
|
||||
opts.Success <- struct{}{}
|
||||
}
|
||||
|
||||
return waiter, err
|
||||
return d.docker.AttachToContainerNonBlocking(opts)
|
||||
}
|
||||
|
||||
func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (code int, err error) {
|
||||
|
||||
Reference in New Issue
Block a user