mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: safer hand over between monitoring and main processing (#1316)
In runHot(), it's safer to use a separate channel between monitoring go-routine and processing go-routine to handle cancellations triggered by monitorin go-routine.
This commit is contained in:
@@ -771,8 +771,9 @@ func (a *agent) runHot(ctx context.Context, caller slotCaller, call *call, tok R
|
|||||||
logger := logrus.WithFields(logrus.Fields{"id": id, "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "idle_timeout": call.IdleTimeout})
|
logger := logrus.WithFields(logrus.Fields{"id": id, "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "idle_timeout": call.IdleTimeout})
|
||||||
ctx, cancel := context.WithCancel(common.WithLogger(ctx, logger))
|
ctx, cancel := context.WithCancel(common.WithLogger(ctx, logger))
|
||||||
|
|
||||||
udsWait := make(chan error, 1) // track UDS state and errors
|
initialized := make(chan struct{}) // when closed, container is ready to handle requests
|
||||||
errQueue := make(chan error, 1) // errors to be reflected back to the slot queue
|
udsWait := make(chan error, 1) // track UDS state and errors
|
||||||
|
errQueue := make(chan error, 1) // errors to be reflected back to the slot queue
|
||||||
|
|
||||||
evictor := a.evictor.CreateEvictToken(call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
|
evictor := a.evictor.CreateEvictToken(call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
|
||||||
|
|
||||||
@@ -784,8 +785,7 @@ func (a *agent) runHot(ctx context.Context, caller slotCaller, call *call, tok R
|
|||||||
// IMPORTANT: we ignore any errors due to eviction and do not reflect these to clients.
|
// IMPORTANT: we ignore any errors due to eviction and do not reflect these to clients.
|
||||||
if !evictor.isEvicted() {
|
if !evictor.isEvicted() {
|
||||||
select {
|
select {
|
||||||
case err := <-udsWait:
|
case <-initialized:
|
||||||
tryQueueErr(err, errQueue)
|
|
||||||
default:
|
default:
|
||||||
tryQueueErr(models.ErrContainerInitFail, errQueue)
|
tryQueueErr(models.ErrContainerInitFail, errQueue)
|
||||||
}
|
}
|
||||||
@@ -825,13 +825,16 @@ func (a *agent) runHot(ctx context.Context, caller slotCaller, call *call, tok R
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Monitor initialization and evictability.
|
// Monitor initialization and evictability. Closes 'initialized' channel
|
||||||
|
// to hand over the processing to main request processing go-routine
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-udsWait:
|
case err := <-udsWait:
|
||||||
if tryQueueErr(err, errQueue) != nil {
|
if tryQueueErr(err, errQueue) != nil {
|
||||||
cancel()
|
cancel()
|
||||||
|
} else {
|
||||||
|
close(initialized)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
case <-ctx.Done(): // container shutdown
|
case <-ctx.Done(): // container shutdown
|
||||||
@@ -886,15 +889,13 @@ func (a *agent) runHot(ctx context.Context, caller slotCaller, call *call, tok R
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Main request processing go-routine
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel() // also close if we get an agent shutdown / idle timeout
|
defer cancel() // also close if we get an agent shutdown / idle timeout
|
||||||
|
|
||||||
// INIT BARRIER HERE.
|
// INIT BARRIER HERE. Wait for the initialization go-routine signal
|
||||||
select {
|
select {
|
||||||
case err := <-udsWait:
|
case <-initialized:
|
||||||
if tryQueueErr(err, errQueue) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-a.shutWg.Closer(): // agent shutdown
|
case <-a.shutWg.Closer(): // agent shutdown
|
||||||
return
|
return
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|||||||
Reference in New Issue
Block a user