mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Adding a way to inject a request ID (#1046)
* Adding a way to inject a request ID It is very useful to associate a request ID to each incoming request, this change allows to provide a function to do that via Server Option. The change comes with a default function which will generate a new request ID. The request ID is put in the request context along with a common logger which always logs the request-id We add gRPC interceptors to the server so it can get the request ID out of the gRPC metadata and put it in the common logger stored in the context so as all the log lines using the common logger from the context will have the request ID logged
This commit is contained in:
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/fnproject/fn/grpcutil"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -206,6 +207,7 @@ func (ch *callHandle) enqueueCallResponse(err error) {
|
||||
var details string
|
||||
var errCode int
|
||||
var errStr string
|
||||
log := common.Logger(ch.ctx)
|
||||
|
||||
if err != nil {
|
||||
errCode = models.GetAPIErrorCode(err)
|
||||
@@ -215,8 +217,7 @@ func (ch *callHandle) enqueueCallResponse(err error) {
|
||||
if ch.c != nil {
|
||||
details = ch.c.Model().ID
|
||||
}
|
||||
|
||||
common.Logger(ch.ctx).Debugf("Sending Call Finish details=%v", details)
|
||||
log.Debugf("Sending Call Finish details=%v", details)
|
||||
|
||||
errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{
|
||||
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
||||
@@ -227,13 +228,13 @@ func (ch *callHandle) enqueueCallResponse(err error) {
|
||||
}}})
|
||||
|
||||
if errTmp != nil {
|
||||
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
log.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
return
|
||||
}
|
||||
|
||||
errTmp = ch.finalize()
|
||||
if errTmp != nil {
|
||||
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
log.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,22 +554,21 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
|
||||
// Handles a client engagement
|
||||
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
|
||||
grpc.EnableTracing = false
|
||||
|
||||
ctx := engagement.Context()
|
||||
log := common.Logger(ctx)
|
||||
// Keep lightweight tabs on what this runner is doing: for draindown tests
|
||||
atomic.AddInt32(&pr.inflight, 1)
|
||||
defer atomic.AddInt32(&pr.inflight, -1)
|
||||
|
||||
log := common.Logger(engagement.Context())
|
||||
pv, ok := peer.FromContext(engagement.Context())
|
||||
pv, ok := peer.FromContext(ctx)
|
||||
log.Debug("Starting engagement")
|
||||
if ok {
|
||||
log.Debug("Peer is ", pv)
|
||||
}
|
||||
md, ok := metadata.FromIncomingContext(engagement.Context())
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
log.Debug("MD is ", md)
|
||||
}
|
||||
|
||||
state := NewCallHandle(engagement)
|
||||
|
||||
tryMsg := state.getTryMsg()
|
||||
@@ -713,11 +713,16 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials
|
||||
|
||||
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) {
|
||||
var srv *grpc.Server
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
sInterceptor := grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor)
|
||||
uInterceptor := grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor)
|
||||
opts = append(opts, sInterceptor)
|
||||
opts = append(opts, uInterceptor)
|
||||
if creds != nil {
|
||||
srv = grpc.NewServer(grpc.Creds(creds))
|
||||
} else {
|
||||
srv = grpc.NewServer()
|
||||
opts = append(opts, grpc.Creds(creds))
|
||||
}
|
||||
srv = grpc.NewServer(opts...)
|
||||
|
||||
pr := &pureRunner{
|
||||
gRPCServer: srv,
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
pb "github.com/fnproject/fn/api/agent/grpc"
|
||||
@@ -118,6 +119,12 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
return true, err
|
||||
}
|
||||
|
||||
rid := common.RequestIDFromContext(ctx)
|
||||
if rid != "" {
|
||||
// Create a new gRPC metadata where we store the request ID
|
||||
mp := metadata.Pairs(common.RequestIDContextKey, rid)
|
||||
ctx = metadata.NewOutgoingContext(ctx, mp)
|
||||
}
|
||||
runnerConnection, err := r.client.Engage(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to create client to runner node")
|
||||
|
||||
@@ -9,6 +9,14 @@ import (
|
||||
|
||||
type contextKey string
|
||||
|
||||
// RequestIDContextKey is the name of the key used to store the request ID into the context
|
||||
const RequestIDContextKey = "fn_request_id"
|
||||
|
||||
//WithRequestID stores a request ID into the context
|
||||
func WithRequestID(ctx context.Context, rid string) context.Context {
|
||||
return context.WithValue(ctx, contextKey(RequestIDContextKey), rid)
|
||||
}
|
||||
|
||||
// WithLogger stores the logger.
|
||||
func WithLogger(ctx context.Context, l logrus.FieldLogger) context.Context {
|
||||
return context.WithValue(ctx, contextKey("logger"), l)
|
||||
|
||||
21
api/common/request_id_util.go
Normal file
21
api/common/request_id_util.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fnproject/fn/api/id"
|
||||
)
|
||||
|
||||
// FnRequestID returns the passed value if that is not empty otherwise it generates a new unique ID
|
||||
func FnRequestID(ridFound string) string {
|
||||
if ridFound == "" {
|
||||
return id.New().String()
|
||||
}
|
||||
return ridFound
|
||||
}
|
||||
|
||||
//RequestIDFromContext extract the request id from the context
|
||||
func RequestIDFromContext(ctx context.Context) string {
|
||||
rid, _ := ctx.Value(contextKey(RequestIDContextKey)).(string)
|
||||
return rid
|
||||
}
|
||||
@@ -62,6 +62,8 @@ const (
|
||||
EnvCert = "FN_NODE_CERT"
|
||||
EnvCertKey = "FN_NODE_CERT_KEY"
|
||||
EnvCertAuth = "FN_NODE_CERT_AUTHORITY"
|
||||
// The header name of the incoming request which holds the request ID
|
||||
EnvRidHeader = "FN_RID_HEADER"
|
||||
|
||||
EnvProcessCollectorList = "FN_PROCESS_COLLECTOR_LIST"
|
||||
EnvLBPlacementAlg = "FN_PLACER"
|
||||
|
||||
@@ -5,11 +5,38 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ServerOption func(context.Context, *Server) error
|
||||
|
||||
//RIDProvider is used to manage request ID
|
||||
type RIDProvider struct {
|
||||
HeaderName string //The name of the header where the reques id is stored in the incoming request
|
||||
RIDGenerator func(string) string // Function to generate the requestID
|
||||
}
|
||||
|
||||
func WithRIDProvider(ridProvider *RIDProvider) ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.Router.Use(withRIDProvider(ridProvider))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func withRIDProvider(ridp *RIDProvider) func(c *gin.Context) {
|
||||
return func(c *gin.Context) {
|
||||
rid := ridp.RIDGenerator(c.Request.Header.Get(ridp.HeaderName))
|
||||
ctx := common.WithRequestID(c.Request.Context(), rid)
|
||||
// We set the rid in the common logger so it is always logged when the common logger is used
|
||||
l := common.Logger(ctx).WithFields(logrus.Fields{common.RequestIDContextKey: rid})
|
||||
ctx = common.WithLogger(ctx, l)
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func EnableShutdownEndpoint(ctx context.Context, halt context.CancelFunc) ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.Router.GET("/shutdown", s.handleShutdown(halt))
|
||||
|
||||
Reference in New Issue
Block a user