mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Fns now annotated with invoke urls, as per triggers (#1172)
Clone of the trigger work to inject invoke urls into the annotations on a fn when it is returned from the server. Small changes to trigges code following code review of the fn code.
This commit is contained in:
63
api/server/fn_annotator.go
Normal file
63
api/server/fn_annotator.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
//FnAnnotator Is used to inject trigger context (such as request URLs) into outbound trigger resources
|
||||
type FnAnnotator interface {
|
||||
// Annotates a trigger on read
|
||||
AnnotateFn(ctx *gin.Context, a *models.App, fn *models.Fn) (*models.Fn, error)
|
||||
}
|
||||
|
||||
type requestBasedFnAnnotator struct{}
|
||||
|
||||
func annotateFnWithBaseURL(baseURL string, app *models.App, fn *models.Fn) (*models.Fn, error) {
|
||||
|
||||
baseURL = strings.TrimSuffix(baseURL, "/")
|
||||
src := strings.TrimPrefix(fn.ID, "/")
|
||||
triggerPath := fmt.Sprintf("%s/invoke/%s", baseURL, src)
|
||||
|
||||
newT := fn.Clone()
|
||||
newAnnotations, err := newT.Annotations.With(models.FnInvokeEndpointAnnotation, triggerPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newT.Annotations = newAnnotations
|
||||
return newT, nil
|
||||
}
|
||||
|
||||
func (tp *requestBasedFnAnnotator) AnnotateFn(ctx *gin.Context, app *models.App, t *models.Fn) (*models.Fn, error) {
|
||||
|
||||
//No, I don't feel good about myself either
|
||||
scheme := "http"
|
||||
if ctx.Request.TLS != nil {
|
||||
scheme = "https"
|
||||
}
|
||||
|
||||
return annotateFnWithBaseURL(fmt.Sprintf("%s://%s", scheme, ctx.Request.Host), app, t)
|
||||
}
|
||||
|
||||
//NewRequestBasedFnAnnotator creates a FnAnnotator that inspects the incoming request host and port, and uses this to generate fn invoke endpoint URLs based on those
|
||||
func NewRequestBasedFnAnnotator() FnAnnotator {
|
||||
return &requestBasedFnAnnotator{}
|
||||
}
|
||||
|
||||
type staticURLFnAnnotator struct {
|
||||
baseURL string
|
||||
}
|
||||
|
||||
//NewStaticURLFnAnnotator annotates triggers bases on a given, specified URL base - e.g. "https://my.domain" ---> "https://my.domain/t/app/source"
|
||||
func NewStaticURLFnAnnotator(baseURL string) FnAnnotator {
|
||||
|
||||
return &staticURLFnAnnotator{baseURL: baseURL}
|
||||
}
|
||||
|
||||
func (s *staticURLFnAnnotator) AnnotateFn(ctx *gin.Context, app *models.App, trigger *models.Fn) (*models.Fn, error) {
|
||||
return annotateFnWithBaseURL(s.baseURL, app, trigger)
|
||||
|
||||
}
|
||||
133
api/server/fn_annotator_test.go
Normal file
133
api/server/fn_annotator_test.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
bytes2 "bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestAnnotateFnDefaultProvider(t *testing.T) {
|
||||
|
||||
app := &models.App{
|
||||
ID: "app_id",
|
||||
Name: "myApp",
|
||||
}
|
||||
|
||||
tr := &models.Fn{
|
||||
ID: "fnID",
|
||||
Name: "myFn",
|
||||
AppID: app.ID,
|
||||
}
|
||||
|
||||
// defaults the fn endpoint to the base URL if it's not already set
|
||||
tep := NewRequestBasedFnAnnotator()
|
||||
|
||||
c, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
c.Request = httptest.NewRequest("GET", "/v2/foo/bar", bytes2.NewBuffer([]byte{}))
|
||||
c.Request.Host = "my-server.com:8192"
|
||||
newT, err := tep.AnnotateFn(c, app, tr)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %s", err)
|
||||
}
|
||||
|
||||
bytes, got := newT.Annotations.Get(models.FnInvokeEndpointAnnotation)
|
||||
if !got {
|
||||
t.Fatalf("Expecting annotation to be present but got %v", newT.Annotations)
|
||||
}
|
||||
|
||||
var annot string
|
||||
err = json.Unmarshal(bytes, &annot)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't get annotation")
|
||||
}
|
||||
|
||||
expected := "http://my-server.com:8192/invoke/fnID"
|
||||
if annot != expected {
|
||||
t.Errorf("expected annotation to be %s but was %s", expected, annot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpsFn(t *testing.T) {
|
||||
|
||||
app := &models.App{
|
||||
ID: "app_id",
|
||||
Name: "myApp",
|
||||
}
|
||||
|
||||
tr := &models.Fn{
|
||||
ID: "fnID",
|
||||
Name: "myFn",
|
||||
AppID: app.ID,
|
||||
}
|
||||
|
||||
// defaults the Fn endpoint to the base URL if it's not already set
|
||||
tep := NewRequestBasedFnAnnotator()
|
||||
|
||||
c, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
c.Request = httptest.NewRequest("GET", "/v2/foo/bar", bytes2.NewBuffer([]byte{}))
|
||||
c.Request.Host = "my-server.com:8192"
|
||||
c.Request.TLS = &tls.ConnectionState{}
|
||||
|
||||
newT, err := tep.AnnotateFn(c, app, tr)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %s", err)
|
||||
}
|
||||
|
||||
bytes, got := newT.Annotations.Get(models.FnInvokeEndpointAnnotation)
|
||||
if !got {
|
||||
t.Fatalf("Expecting annotation to be present but got %v", newT.Annotations)
|
||||
}
|
||||
var annot string
|
||||
err = json.Unmarshal(bytes, &annot)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't get annotation")
|
||||
}
|
||||
|
||||
expected := "https://my-server.com:8192/invoke/fnID"
|
||||
if annot != expected {
|
||||
t.Errorf("expected annotation to be %s but was %s", expected, annot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStaticUrlFnAnnotator(t *testing.T) {
|
||||
a := NewStaticURLFnAnnotator("http://foo.bar.com/somewhere")
|
||||
|
||||
app := &models.App{
|
||||
ID: "app_id",
|
||||
Name: "myApp",
|
||||
}
|
||||
|
||||
tr := &models.Fn{
|
||||
ID: "fnID",
|
||||
Name: "myFn",
|
||||
AppID: app.ID,
|
||||
}
|
||||
|
||||
newT, err := a.AnnotateFn(nil, app, tr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed when should have succeeded: %s", err)
|
||||
}
|
||||
|
||||
bytes, got := newT.Annotations.Get(models.FnInvokeEndpointAnnotation)
|
||||
if !got {
|
||||
t.Fatalf("Expecting annotation to be present but got %v", newT.Annotations)
|
||||
}
|
||||
var annot string
|
||||
err = json.Unmarshal(bytes, &annot)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't get annotation")
|
||||
}
|
||||
|
||||
expected := "http://foo.bar.com/somewhere/invoke/fnID"
|
||||
if annot != expected {
|
||||
t.Errorf("expected annotation to be %s but was %s", expected, annot)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
@@ -9,11 +10,24 @@ import (
|
||||
|
||||
func (s *Server) handleFnGet(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
f, err := s.datastore.GetFnByID(ctx, c.Param(api.ParamFnID))
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
app, err := s.datastore.GetAppByID(ctx, f.AppID)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, fmt.Errorf("unexpected error - fn app not available: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
f, err = s.fnAnnotator.AnnotateFn(c, app, f)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, f)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
@@ -21,5 +22,30 @@ func (s *Server) handleFnList(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Annotate the outbound fns
|
||||
|
||||
// this is fairly cludgy bit hard to do in datastore middleware confidently
|
||||
appCache := make(map[string]*models.App)
|
||||
|
||||
for idx, f := range fns.Items {
|
||||
app, ok := appCache[f.AppID]
|
||||
if !ok {
|
||||
gotApp, err := s.Datastore().GetAppByID(ctx, f.AppID)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, fmt.Errorf("failed to get app for fn %s", err))
|
||||
return
|
||||
}
|
||||
app = gotApp
|
||||
appCache[app.ID] = gotApp
|
||||
}
|
||||
|
||||
newF, err := s.fnAnnotator.AnnotateFn(c, app, f)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
fns.Items[idx] = newF
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, fns)
|
||||
}
|
||||
|
||||
@@ -358,3 +358,53 @@ func TestFnGet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFnInvokeEndpointAnnotations(t *testing.T) {
|
||||
a := &models.App{ID: "app_id", Name: "myapp"}
|
||||
fn := &models.Fn{ID: "fnid", AppID: a.ID, Name: "fnname"}
|
||||
|
||||
commonDS := datastore.NewMockInit([]*models.App{a}, []*models.Fn{fn})
|
||||
|
||||
srv := testServer(commonDS, &mqs.Mock{}, logs.NewMock(), nil, ServerTypeAPI)
|
||||
|
||||
_, rec := routerRequest(t, srv.Router, "GET", "/v2/fns/fnid", bytes.NewBuffer([]byte("")))
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected code %d != 200", rec.Code)
|
||||
}
|
||||
var fnGet models.Fn
|
||||
err := json.NewDecoder(rec.Body).Decode(&fnGet)
|
||||
if err != nil {
|
||||
t.Fatalf("Invalid json from server %s", err)
|
||||
}
|
||||
|
||||
const fnEndpoint = "fnproject.io/fn/invokeEndpoint"
|
||||
v, err := fnGet.Annotations.GetString(fnEndpoint)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get fn %s", err)
|
||||
}
|
||||
if v != "http://127.0.0.1:8080/invoke/fnid" {
|
||||
t.Errorf("unexpected fn val %s", v)
|
||||
}
|
||||
|
||||
_, rec = routerRequest(t, srv.Router, "GET", fmt.Sprintf("/v2/fns?app_id=%s", a.ID), nil)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected code %d != 200", rec.Code)
|
||||
}
|
||||
|
||||
var resp models.FnList
|
||||
err = json.NewDecoder(rec.Body).Decode(&resp)
|
||||
if err != nil {
|
||||
t.Fatalf("Invalid json from server %s : %s", err, string(rec.Body.Bytes()))
|
||||
}
|
||||
|
||||
if len(resp.Items) != 1 {
|
||||
t.Fatalf("Unexpected fn list result, %v", resp)
|
||||
}
|
||||
|
||||
v, err = resp.Items[0].Annotations.GetString(fnEndpoint)
|
||||
if v != "http://127.0.0.1:8080/invoke/fnid" {
|
||||
t.Errorf("unexpected fn val %s", v)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,6 +205,7 @@ type Server struct {
|
||||
apiMiddlewares []fnext.Middleware
|
||||
promExporter *prometheus.Exporter
|
||||
triggerAnnotator TriggerAnnotator
|
||||
fnAnnotator FnAnnotator
|
||||
// Extensions can append to this list of contexts so that cancellations are properly handled.
|
||||
extraCtxs []context.Context
|
||||
}
|
||||
@@ -257,8 +258,10 @@ func NewFromEnv(ctx context.Context, opts ...Option) *Server {
|
||||
if publicLBURL != "" {
|
||||
logrus.Infof("using LB Base URL: '%s'", publicLBURL)
|
||||
opts = append(opts, WithTriggerAnnotator(NewStaticURLTriggerAnnotator(publicLBURL)))
|
||||
opts = append(opts, WithFnAnnotator(NewStaticURLFnAnnotator(publicLBURL)))
|
||||
} else {
|
||||
opts = append(opts, WithTriggerAnnotator(NewRequestBasedTriggerAnnotator()))
|
||||
opts = append(opts, WithFnAnnotator(NewRequestBasedFnAnnotator()))
|
||||
}
|
||||
|
||||
// Agent handling depends on node type and several other options so it must be the last processed option.
|
||||
@@ -580,6 +583,14 @@ func WithTriggerAnnotator(provider TriggerAnnotator) Option {
|
||||
}
|
||||
}
|
||||
|
||||
//WithFnAnnotator adds a fnEndpoint provider to the server
|
||||
func WithFnAnnotator(provider FnAnnotator) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.fnAnnotator = provider
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAdminServer starts the admin server on the specified port.
|
||||
func WithAdminServer(port int) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
|
||||
@@ -38,6 +38,7 @@ func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogSto
|
||||
WithAgent(rnr),
|
||||
WithType(nodeType),
|
||||
WithTriggerAnnotator(NewRequestBasedTriggerAnnotator()),
|
||||
WithFnAnnotator(NewRequestBasedFnAnnotator()),
|
||||
)...)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,10 @@ package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (s *Server) handleTriggerGet(c *gin.Context) {
|
||||
@@ -20,6 +21,7 @@ func (s *Server) handleTriggerGet(c *gin.Context) {
|
||||
|
||||
if err != nil {
|
||||
handleErrorResponse(c, fmt.Errorf("unexpected error - trigger app not available: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
trigger, err = s.triggerAnnotator.AnnotateTrigger(c, app, trigger)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@@ -30,10 +31,8 @@ func (s *Server) handleTriggerList(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Annotate the outbound triggers
|
||||
|
||||
// this is fairly cludgy bit hard to do in datastore middleware confidently
|
||||
appCache := make(map[string]*models.App)
|
||||
newTriggers := make([]*models.Trigger, len(triggers.Items))
|
||||
|
||||
for idx, t := range triggers.Items {
|
||||
app, ok := appCache[t.AppID]
|
||||
@@ -52,9 +51,8 @@ func (s *Server) handleTriggerList(c *gin.Context) {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
newTriggers[idx] = newT
|
||||
triggers.Items[idx] = newT
|
||||
}
|
||||
|
||||
triggers.Items = newTriggers
|
||||
c.JSON(http.StatusOK, triggers)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user