mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Use context logging more to ensure context vars are present in log lines (#1039)
This commit is contained in:
@@ -1006,7 +1006,7 @@ func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*contai
|
||||
syslogConns, err := syslogConns(ctx, call.SyslogURL)
|
||||
if err != nil {
|
||||
// TODO we could write this to between stderr but between stderr doesn't go to user either. kill me.
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}).Error("error dialing syslog urls")
|
||||
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}).Error("error dialing syslog urls")
|
||||
}
|
||||
|
||||
// for use if no freezer (or we ever make up our minds)
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -41,7 +42,7 @@ func setupLogger(ctx context.Context, maxSize uint64, c *models.Call) io.ReadWri
|
||||
limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)}
|
||||
|
||||
// accumulate all line writers, wrap in same line writer (to re-use buffer)
|
||||
stderrLogger := logrus.WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "path": c.Path, "image": c.Image, "call_id": c.ID})
|
||||
stderrLogger := common.Logger(ctx).WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "path": c.Path, "image": c.Image, "call_id": c.ID})
|
||||
loggo := &nopCloser{&logWriter{stderrLogger}}
|
||||
|
||||
// we don't need to limit the log writer(s), but we do need it to dispense lines
|
||||
|
||||
@@ -181,8 +181,9 @@ func (a *lbAgent) Submit(callI Call) error {
|
||||
if buf != nil {
|
||||
defer bufPool.Put(buf)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to process call body")
|
||||
common.Logger(call.req.Context()).WithError(err).Error("Failed to process call body")
|
||||
return a.handleCallEnd(ctx, call, err, true)
|
||||
}
|
||||
|
||||
@@ -192,7 +193,7 @@ func (a *lbAgent) Submit(callI Call) error {
|
||||
// isStarted=true means we will call Call.End().
|
||||
err = a.placer.PlaceCall(a.rp, ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to place call")
|
||||
common.Logger(call.req.Context()).WithError(err).Error("Failed to place call")
|
||||
}
|
||||
|
||||
return a.handleCallEnd(ctx, call, err, true)
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"time"
|
||||
|
||||
runner "github.com/fnproject/fn/api/agent/grpc"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/go-openapi/strfmt"
|
||||
@@ -142,7 +143,7 @@ func (ch *callHandle) shutdown(err error) {
|
||||
ch.closePipeToFn()
|
||||
|
||||
ch.shutOnce.Do(func() {
|
||||
logrus.WithError(err).Debugf("Shutting down call handle")
|
||||
common.Logger(ch.ctx).WithError(err).Debugf("Shutting down call handle")
|
||||
|
||||
// try to queue an error message if it's not already queued.
|
||||
if err != nil {
|
||||
@@ -215,7 +216,7 @@ func (ch *callHandle) enqueueCallResponse(err error) {
|
||||
details = ch.c.Model().ID
|
||||
}
|
||||
|
||||
logrus.Debugf("Sending Call Finish details=%v", details)
|
||||
common.Logger(ch.ctx).Debugf("Sending Call Finish details=%v", details)
|
||||
|
||||
errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{
|
||||
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
||||
@@ -226,13 +227,13 @@ func (ch *callHandle) enqueueCallResponse(err error) {
|
||||
}}})
|
||||
|
||||
if errTmp != nil {
|
||||
logrus.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
return
|
||||
}
|
||||
|
||||
errTmp = ch.finalize()
|
||||
if errTmp != nil {
|
||||
logrus.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -557,14 +558,15 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
|
||||
atomic.AddInt32(&pr.inflight, 1)
|
||||
defer atomic.AddInt32(&pr.inflight, -1)
|
||||
|
||||
log := common.Logger(engagement.Context())
|
||||
pv, ok := peer.FromContext(engagement.Context())
|
||||
logrus.Debug("Starting engagement")
|
||||
log.Debug("Starting engagement")
|
||||
if ok {
|
||||
logrus.Debug("Peer is ", pv)
|
||||
log.Debug("Peer is ", pv)
|
||||
}
|
||||
md, ok := metadata.FromIncomingContext(engagement.Context())
|
||||
if ok {
|
||||
logrus.Debug("MD is ", md)
|
||||
log.Debug("MD is ", md)
|
||||
}
|
||||
|
||||
state := NewCallHandle(engagement)
|
||||
|
||||
@@ -101,7 +101,9 @@ func isTooBusy(err error) bool {
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
|
||||
log := common.Logger(ctx).WithField("runner_addr", r.address)
|
||||
|
||||
log.Debug("Attempting to place call")
|
||||
if !r.shutWg.AddSession(1) {
|
||||
// try another runner if this one is closed.
|
||||
return false, ErrorRunnerClosed
|
||||
@@ -111,14 +113,14 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
// extract the call's model data to pass on to the pure runner
|
||||
modelJSON, err := json.Marshal(call.Model())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to encode model as JSON")
|
||||
log.WithError(err).Error("Failed to encode model as JSON")
|
||||
// If we can't encode the model, no runner will ever be able to run this. Give up.
|
||||
return true, err
|
||||
}
|
||||
|
||||
runnerConnection, err := r.client.Engage(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to create client to runner node")
|
||||
log.WithError(err).Error("Unable to create client to runner node")
|
||||
// Try on next runner
|
||||
return false, err
|
||||
}
|
||||
@@ -128,7 +130,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
SlotHashId: hex.EncodeToString([]byte(call.SlotHashId())),
|
||||
}}})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send message to runner node")
|
||||
log.WithError(err).Error("Failed to send message to runner node")
|
||||
// Try on next runner
|
||||
return false, err
|
||||
}
|
||||
@@ -138,12 +140,12 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
|
||||
recvDone := make(chan error, 1)
|
||||
|
||||
go receiveFromRunner(runnerConnection, call, recvDone)
|
||||
go sendToRunner(runnerConnection, call)
|
||||
go receiveFromRunner(ctx, runnerConnection, call, recvDone)
|
||||
go sendToRunner(ctx, runnerConnection, call)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logrus.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
|
||||
log.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
|
||||
return true, ctx.Err()
|
||||
case recvErr := <-recvDone:
|
||||
if isTooBusy(recvErr) {
|
||||
@@ -154,10 +156,11 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
}
|
||||
}
|
||||
|
||||
func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
|
||||
func sendToRunner(ctx context.Context, protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
|
||||
bodyReader := call.RequestBody()
|
||||
writeBuffer := make([]byte, MaxDataChunk)
|
||||
|
||||
log := common.Logger(ctx)
|
||||
// IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry
|
||||
// case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is
|
||||
// already blocked on a read) or in the case of reading the http body multiple times (retries.)
|
||||
@@ -170,14 +173,14 @@ func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.Runne
|
||||
// WARNING: blocking read.
|
||||
n, err := bodyReader.Read(writeBuffer)
|
||||
if err != nil && err != io.EOF {
|
||||
logrus.WithError(err).Error("Failed to receive data from http client body")
|
||||
log.WithError(err).Error("Failed to receive data from http client body")
|
||||
}
|
||||
|
||||
// any IO error or n == 0 is an EOF for pure-runner
|
||||
isEOF := err != nil || n == 0
|
||||
data := writeBuffer[:n]
|
||||
|
||||
logrus.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
|
||||
log.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
|
||||
sendErr := protocolClient.Send(&pb.ClientMsg{
|
||||
Body: &pb.ClientMsg_Data{
|
||||
Data: &pb.DataFrame{
|
||||
@@ -190,7 +193,7 @@ func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.Runne
|
||||
// It's often normal to receive an EOF here as we optimistically start sending body until a NACK
|
||||
// from the runner. Let's ignore EOF and rely on recv side to catch premature EOF.
|
||||
if sendErr != io.EOF {
|
||||
logrus.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
|
||||
log.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -219,17 +222,18 @@ func tryQueueError(err error, done chan error) {
|
||||
}
|
||||
}
|
||||
|
||||
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
|
||||
func receiveFromRunner(ctx context.Context, protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
|
||||
w := c.ResponseWriter()
|
||||
defer close(done)
|
||||
|
||||
log := common.Logger(ctx)
|
||||
isPartialWrite := false
|
||||
|
||||
DataLoop:
|
||||
for {
|
||||
msg, err := protocolClient.Recv()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Info("Receive error from runner")
|
||||
log.WithError(err).Info("Receive error from runner")
|
||||
tryQueueError(err, done)
|
||||
return
|
||||
}
|
||||
@@ -241,7 +245,7 @@ DataLoop:
|
||||
case *pb.RunnerMsg_ResultStart:
|
||||
switch meta := body.ResultStart.Meta.(type) {
|
||||
case *pb.CallResultStart_Http:
|
||||
logrus.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
|
||||
log.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
|
||||
for _, header := range meta.Http.Headers {
|
||||
w.Header().Set(header.Key, header.Value)
|
||||
}
|
||||
@@ -249,18 +253,18 @@ DataLoop:
|
||||
w.WriteHeader(int(meta.Http.StatusCode))
|
||||
}
|
||||
default:
|
||||
logrus.Errorf("Unhandled meta type in start message: %v", meta)
|
||||
log.Errorf("Unhandled meta type in start message: %v", meta)
|
||||
}
|
||||
|
||||
// May arrive if function has output. We ignore EOF.
|
||||
case *pb.RunnerMsg_Data:
|
||||
logrus.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
|
||||
log.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
|
||||
if !isPartialWrite {
|
||||
// WARNING: blocking write
|
||||
n, err := w.Write(body.Data.Data)
|
||||
if n != len(body.Data.Data) {
|
||||
isPartialWrite = true
|
||||
logrus.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
|
||||
log.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
|
||||
if err == nil {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
@@ -270,7 +274,7 @@ DataLoop:
|
||||
|
||||
// Finish messages required for finish/finalize the processing.
|
||||
case *pb.RunnerMsg_Finished:
|
||||
logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
|
||||
log.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
|
||||
if !body.Finished.Success {
|
||||
err := parseError(body.Finished)
|
||||
tryQueueError(err, done)
|
||||
@@ -278,7 +282,7 @@ DataLoop:
|
||||
break DataLoop
|
||||
|
||||
default:
|
||||
logrus.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
|
||||
log.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,14 +293,14 @@ DataLoop:
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
logrus.WithError(err).Infof("Call Waiting EOF received error")
|
||||
log.WithError(err).Infof("Call Waiting EOF received error")
|
||||
tryQueueError(err, done)
|
||||
break
|
||||
}
|
||||
|
||||
switch body := msg.Body.(type) {
|
||||
default:
|
||||
logrus.Infof("Call Waiting EOF ignoring message %T", body)
|
||||
log.Infof("Call Waiting EOF ignoring message %T", body)
|
||||
}
|
||||
tryQueueError(ErrorPureRunnerNoEOF, done)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/fnproject/fn/api/models"
|
||||
|
||||
"github.com/dchest/siphash"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -28,6 +29,9 @@ func NewCHPlacer() Placer {
|
||||
// Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use
|
||||
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
|
||||
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
|
||||
|
||||
log := common.Logger(ctx)
|
||||
|
||||
// The key is just the path in this case
|
||||
key := call.Model().Path
|
||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||
@@ -35,7 +39,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
|
||||
for {
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
log.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
i := int(jumpConsistentHash(sum64, int32(len(runners))))
|
||||
for j := 0; j < len(runners); j++ {
|
||||
@@ -53,7 +57,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
|
||||
tryCancel()
|
||||
|
||||
if err != nil && err != models.ErrCallTimeoutServerBusy {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
log.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -26,10 +27,11 @@ func NewNaivePlacer() Placer {
|
||||
|
||||
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
|
||||
|
||||
log := common.Logger(ctx)
|
||||
for {
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
log.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
for j := 0; j < len(runners); j++ {
|
||||
|
||||
@@ -47,7 +49,7 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner
|
||||
tryCancel()
|
||||
|
||||
if err != nil && err != models.ErrCallTimeoutServerBusy {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
log.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
|
||||
@@ -10,8 +10,7 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
@@ -23,23 +22,24 @@ func DialWithBackoff(ctx context.Context, address string, creds credentials.Tran
|
||||
|
||||
// uses grpc connection backoff protocol https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
|
||||
func dial(ctx context.Context, address string, creds credentials.TransportCredentials, timeoutDialer time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
|
||||
dialer := func(address string, timeout time.Duration) (net.Conn, error) {
|
||||
log := common.Logger(ctx).WithField("grpc_addr", address)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
conn, err := (&net.Dialer{Cancel: ctx.Done(), Timeout: timeoutDialer}).Dial("tcp", address)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("grpc_addr", address).Warn("Failed to dial grpc connection")
|
||||
log.WithError(err).Warn("Failed to dial grpc connection")
|
||||
return nil, err
|
||||
}
|
||||
if creds == nil {
|
||||
logrus.WithField("grpc_addr", address).Warn("Created insecure grpc connection")
|
||||
log.Warn("Created insecure grpc connection")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
conn, _, err = creds.ClientHandshake(ctx, address, conn)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("grpc_addr", address).Warn("Failed grpc TLS handshake")
|
||||
log.Warn("Failed grpc TLS handshake")
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
|
||||
Reference in New Issue
Block a user