diff --git a/api/server/runner.go b/api/server/runner.go index c6c7e872c..2e01301f2 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -152,20 +152,23 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]* } // TODO: Should remove *gin.Context from these functions, should use only context.Context -func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) +func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, route *models.Route, app *models.App, path, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": route.Path, "image": route.Image}) - params, match := matchRoute(found.Path, route) + params, match := matchRoute(route.Path, path) if !match { return false } var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader + if route.Format == "" { + route.Format = "default" + } envVars := map[string]string{ "METHOD": c.Request.Method, "APP_NAME": appName, - "ROUTE": found.Path, + "ROUTE": route.Path, "REQUEST_URL": fmt.Sprintf("%v//%v%v", func() string { if c.Request.TLS == nil { return "http" @@ -173,13 +176,14 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun return "https" }(), c.Request.Host, c.Request.URL.String()), "CALL_ID": reqID, + "FORMAT": route.Format, } // app config for k, v := range app.Config { envVars[toEnvName("", k)] = v } - for k, v := range found.Config { + for k, v := range route.Config { envVars[toEnvName("", k)] = v } @@ -195,16 +199,16 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun cfg := &task.Config{ AppName: appName, - Path: found.Path, + Path: route.Path, Env: envVars, - Format: found.Format, + Format: route.Format, ID: reqID, - Image: found.Image, - Memory: found.Memory, + Image: route.Image, + Memory: route.Memory, Stdin: payload, Stdout: &stdout, - Timeout: time.Duration(found.Timeout) * time.Second, - IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, + Timeout: time.Duration(route.Timeout) * time.Second, + IdleTimeout: time.Duration(route.IdleTimeout) * time.Second, ReceivedTime: time.Now(), Ready: make(chan struct{}), } @@ -223,11 +227,11 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun newTask.Image = &cfg.Image newTask.ID = cfg.ID newTask.CreatedAt = createdAt - newTask.Path = found.Path + newTask.Path = route.Path newTask.EnvVars = cfg.Env newTask.AppName = cfg.AppName - switch found.Type { + switch route.Type { case "async": // Read payload pl, err := ioutil.ReadAll(cfg.Stdin) @@ -263,7 +267,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun break } - for k, v := range found.Headers { + for k, v := range route.Headers { c.Header(k, v[0]) } diff --git a/docs/writing.md b/docs/writing.md index 49ad8d138..a989352d3 100644 --- a/docs/writing.md +++ b/docs/writing.md @@ -35,6 +35,8 @@ You will also have access to a set of environment variables. * APP_NAME - the name of the application that matched this route, eg: `myapp` * ROUTE - the matched route, eg: `/hello` * METHOD - the HTTP method for the request, eg: `GET` or `POST` +* CALL_ID - a unique ID for each function execution. +* FORMAT - a string representing one of the [function formats](function-format.md), currently either `default` or `http`. Default is `default`. * HEADER_X - the HTTP headers that were set for this request. Replace X with the upper cased name of the header and replace dashes in the header with underscores. * X - any [configuration values](https://gitlab.oracledx.com/odx/functions/blob/master/fn/README.md#application-level-configuration) you've set for the Application or the Route. Replace X with the upper cased name of the config variable you set. Ex: `minio_secret=secret` will be exposed via MINIO_SECRET env var. diff --git a/fn/lambda/node-4/bootstrap.js b/fn/lambda/node-4/bootstrap.js index e51c74643..9802e8ae2 100644 --- a/fn/lambda/node-4/bootstrap.js +++ b/fn/lambda/node-4/bootstrap.js @@ -1,6 +1,11 @@ 'use strict'; var fs = require('fs'); +var net = require('net'); +// var http = require('http'); +var http_common = require('_http_common'); +var events = require('events'); +var HTTPParser = process.binding('http_parser').HTTPParser; var oldlog = console.log console.log = console.error @@ -239,11 +244,48 @@ var setEnvFromHeader = function () { } } +// for http hot functions +function freeParser(parser){ + if (parser) { + parser.onIncoming = null; + parser.socket = null; + http_common.parsers.free(parser); + parser = null; + } +}; + +// parses http requests +function parse(socket){ + var emitter = new events.EventEmitter(); + var parser = http_common.parsers.alloc(); + + parser.reinitialize(HTTPParser.REQUEST); + parser.socket = socket; + parser.maxHeaderPairs = 2000; + + parser.onIncoming = function(req){ + emitter.emit('request', req); + }; + + socket.on('data', function(buffer){ + var ret = parser.execute(buffer, 0, buffer.length); + if(ret instanceof Error){ + emitter.emit('error'); + freeParser(parser); + } + }); + + socket.once('close', function(){ + freeParser(parser); + }); + + return emitter; +}; function run() { + setEnvFromHeader(); - // FIXME(nikhil): Check for file existence and allow non-payload. - var path = process.env["PAYLOAD_FILE"]; + var path = process.env["PAYLOAD_FILE"]; // if we allow a mounted file, this is used. Could safely be removed. var stream = process.stdin; if (path) { try { @@ -254,67 +296,99 @@ function run() { } } - var input = ""; - stream.setEncoding('utf8'); - stream.on('data', function(chunk) { - input += chunk; - }); + // First, check format (ie: hot functions) + var format = process.env["FORMAT"]; + console.error("format", format); + if (format == "http"){ + // var parser = httpSocketSetup(stream) + // init parser + var parser = parse(stream); + let i = 0; + parser.on('request', function(req){ + // Got parsed HTTP object + // console.error("REQUEST", req) + i++; + console.error("REQUEST:", i) + handleRequest(req); + }); - stream.on('error', function(err) { - console.error("bootstrap: Error reading payload stream", err); - process.exit(1); - }); + parser.on('error', function(e){ + // Not HTTP data + console.error("INVALID HTTP DATA!", e) + }); - stream.on('end', function() { - var payload = {} - try { - if (input.length > 0) { - payload = JSON.parse(input); - } - } catch(e) { - console.error("bootstrap: Error parsing JSON", e); - process.exit(1); - } - - if (process.argv.length > 2) { - var handler = process.argv[2]; - var parts = handler.split('.'); - // FIXME(nikhil): Error checking. - var script = parts[0]; - var entry = parts[1]; - var started = false; - try { - var mod = require('./'+script); - var func = mod[entry]; - if (func === undefined) { - oldlog("Handler '" + entry + "' missing on module '" + script + "'"); - return; - } - - if (typeof func !== 'function') { - throw "TypeError: " + (typeof func) + " is not a function"; - } - started = true; - var cback - // RUN THE FUNCTION: - mod[entry](payload, makeCtx(), functionCallback) - } catch(e) { - if (typeof e === 'string') { - oldlog(e) - } else { - oldlog(e.message) - } - if (!started) { - oldlog("Process exited before completing request\n") - } - } - } else { - console.error("bootstrap: No script specified") - process.exit(1); - } - }) + } else { + // default + handleRequest(stream); + } } +function handleRequest(stream) { + var input = ""; + stream.setEncoding('utf8'); + stream.on('data', function(chunk) { + input += chunk; + }); + stream.on('error', function(err) { + console.error("bootstrap: Error reading payload stream", err); + process.exit(1); + }); + stream.on('end', function() { + var payload = {} + try { + if (input.length > 0) { + payload = JSON.parse(input); + } + } catch(e) { + console.error("bootstrap: Error parsing JSON", e); + process.exit(1); + } + + handlePayload(payload) + }) +} + +function handlePayload(payload) { + if (process.argv.length > 2) { + var handler = process.argv[2]; + var parts = handler.split('.'); + // FIXME(nikhil): Error checking. + var script = parts[0]; + var entry = parts[1]; + var started = false; + try { + var mod = require('./'+script); + var func = mod[entry]; + if (func === undefined) { + oldlog("Handler '" + entry + "' missing on module '" + script + "'"); + return; + } + + if (typeof func !== 'function') { + throw "TypeError: " + (typeof func) + " is not a function"; + } + started = true; + var cback + // RUN THE FUNCTION: + mod[entry](payload, makeCtx(), functionCallback) + } catch(e) { + if (typeof e === 'string') { + oldlog(e) + } else { + oldlog(e.message) + } + if (!started) { + oldlog("Process exited before completing request\n") + } + } + } else { + console.error("bootstrap: No script specified") + process.exit(1); + } +} + + + function functionCallback(err, result) { if (err != null) { // then user returned error and we should respond with error diff --git a/fn/lambda/node-6/bootstrap.js b/fn/lambda/node-6/bootstrap.js deleted file mode 100644 index e51c74643..000000000 --- a/fn/lambda/node-6/bootstrap.js +++ /dev/null @@ -1,330 +0,0 @@ -'use strict'; - -var fs = require('fs'); - -var oldlog = console.log -console.log = console.error - -// Some notes on the semantics of the succeed(), fail() and done() methods. -// Tests are the source of truth! -// First call wins in terms of deciding the result of the function. BUT, -// subsequent calls also log. Further, code execution does not stop, even where -// for done(), the docs say that the "function terminates". It seems though -// that further cycles of the event loop do not run. For example: -// index.handler = function(event, context) { -// context.fail("FAIL") -// process.nextTick(function() { -// console.log("This does not get logged") -// }) -// console.log("This does get logged") -// } -// on the other hand: -// index.handler = function(event, context) { -// process.nextTick(function() { -// console.log("This also gets logged") -// context.fail("FAIL") -// }) -// console.log("This does get logged") -// } -// -// The same is true for context.succeed() and done() captures the semantics of -// both. It seems this is implemented simply by having process.nextTick() cause -// process.exit() or similar, because the following: -// exports.handler = function(event, context) { -// process.nextTick(function() {console.log("This gets logged")}) -// process.nextTick(function() {console.log("This also gets logged")}) -// context.succeed("END") -// process.nextTick(function() {console.log("This does not get logged")}) -// }; -// -// So the context object needs to have some sort of hidden boolean that is only -// flipped once, by the first call, and dictates the behavior on the next tick. -// -// In addition, the response behaviour depends on the invocation type. If we -// are to only support the async type, succeed() must return a 202 response -// code, not sure how to do this. -// -// Only the first 256kb, followed by a truncation message, should be logged. -// -// Also, the error log is always in a json literal -// { "errorMessage": "" } -var Context = function() { - var concluded = false; - - var contextSelf = this; - - // The succeed, fail and done functions are public, but access a private - // member (concluded). Hence this ugly nested definition. - this.succeed = function(result) { - if (concluded) { - return - } - - // We have to process the result before we can conclude, because otherwise - // we have to fail. This means NO EARLY RETURNS from this function without - // review! - if (result === undefined) { - result = null - } - - var failed = false; - try { - // Output result to log - oldlog(JSON.stringify(result)); - } catch(e) { - // Set X-Amz-Function-Error: Unhandled header - console.log("Unable to stringify body as json: " + e); - failed = true; - } - - // FIXME(nikhil): Return 202 or 200 based on invocation type and set response - // to result. Should probably be handled externally by the runner/swapi. - - // OK, everything good. - concluded = true; - process.nextTick(function() { process.exit(failed ? 1 : 0) }) - } - - this.fail = function(error) { - if (concluded) { - return - } - - concluded = true - process.nextTick(function() { process.exit(1) }) - - if (error === undefined) { - error = null - } - - // FIXME(nikhil): Truncated log of error, plus non-truncated response body - var errstr = "fail() called with argument but a problem was encountered while converting it to a to string"; - - // The semantics of fail() are weird. If the error is something that can be - // converted to a string, the log output wraps the string in a JSON literal - // with key "errorMessage". If toString() fails, then the output is only - // the error string. - try { - if (error === null) { - errstr = null - } else { - errstr = error.toString() - } - oldlog(JSON.stringify({"errorMessage": errstr })) - } catch(e) { - // Set X-Amz-Function-Error: Unhandled header - oldlog(errstr) - } - } - - this.done = function() { - var error = arguments[0]; - var result = arguments[1]; - if (error) { - contextSelf.fail(error) - } else { - contextSelf.succeed(result) - } - } - - var plannedEnd = Date.now() + (getTimeoutInSeconds() * 1000); - this.getRemainingTimeInMillis = function() { - return Math.max(plannedEnd - Date.now(), 0); - } -} - -function getTimeoutInSeconds() { - var t = parseInt(getEnv("TASK_TIMEOUT")); - if (Number.isNaN(t)) { - return 3600; - } - - return t; -} - -var getEnv = function(name) { - return process.env[name] || ""; -} - -var makeCtx = function() { - var fnname = getEnv("AWS_LAMBDA_FUNCTION_NAME"); - // FIXME(nikhil): Generate UUID. - var taskID = getEnv("TASK_ID"); - - var mem = getEnv("TASK_MAXRAM").toLowerCase(); - var bytes = 300 * 1024 * 1024; - - var scale = { 'b': 1, 'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024 }; - // We don't bother validating too much, if the last character is not a number - // and not in the scale table we just return a default value. - // We use slice instead of indexing so that we always get an empty string, - // instead of undefined. - if (mem.slice(-1).match(/[0-9]/)) { - var a = parseInt(mem); - if (!Number.isNaN(a)) { - bytes = a; - } - } else { - var rem = parseInt(mem.slice(0, -1)); - if (!Number.isNaN(rem)) { - var multiplier = scale[mem.slice(-1)]; - if (multiplier) { - bytes = rem * multiplier - } - } - } - - var memoryMB = bytes / (1024 * 1024); - - var ctx = new Context(); - Object.defineProperties(ctx, { - "functionName": { - value: fnname, - enumerable: true, - }, - "functionVersion": { - value: "$LATEST", - enumerable: true, - }, - "invokedFunctionArn": { - // FIXME(nikhil): Should be filled in. - value: "", - enumerable: true, - }, - "memoryLimitInMB": { - // Sigh, yes it is a string. - value: ""+memoryMB, - enumerable: true, - }, - "awsRequestId": { - value: taskID, - enumerable: true, - }, - "logGroupName": { - // FIXME(nikhil): Should be filled in. - value: "", - enumerable: true, - }, - "logStreamName": { - // FIXME(nikhil): Should be filled in. - value: "", - enumerable: true, - }, - "identity": { - // FIXME(nikhil): Should be filled in. - value: null, - enumerable: true, - }, - "clientContext": { - // FIXME(nikhil): Should be filled in. - value: null, - enumerable: true, - }, - }); - - return ctx; -} - -var setEnvFromHeader = function () { - var headerPrefix = "CONFIG_"; - var newEnvVars = {}; - for (var key in process.env) { - if (key.indexOf(headerPrefix) == 0) { - newEnvVars[key.slice(headerPrefix.length)] = process.env[key]; - } - } - - for (var key in newEnvVars) { - process.env[key] = newEnvVars[key]; - } -} - - -function run() { - setEnvFromHeader(); - // FIXME(nikhil): Check for file existence and allow non-payload. - var path = process.env["PAYLOAD_FILE"]; - var stream = process.stdin; - if (path) { - try { - stream = fs.createReadStream(path); - } catch(e) { - console.error("bootstrap: Error opening payload file", e) - process.exit(1); - } - } - - var input = ""; - stream.setEncoding('utf8'); - stream.on('data', function(chunk) { - input += chunk; - }); - - stream.on('error', function(err) { - console.error("bootstrap: Error reading payload stream", err); - process.exit(1); - }); - - stream.on('end', function() { - var payload = {} - try { - if (input.length > 0) { - payload = JSON.parse(input); - } - } catch(e) { - console.error("bootstrap: Error parsing JSON", e); - process.exit(1); - } - - if (process.argv.length > 2) { - var handler = process.argv[2]; - var parts = handler.split('.'); - // FIXME(nikhil): Error checking. - var script = parts[0]; - var entry = parts[1]; - var started = false; - try { - var mod = require('./'+script); - var func = mod[entry]; - if (func === undefined) { - oldlog("Handler '" + entry + "' missing on module '" + script + "'"); - return; - } - - if (typeof func !== 'function') { - throw "TypeError: " + (typeof func) + " is not a function"; - } - started = true; - var cback - // RUN THE FUNCTION: - mod[entry](payload, makeCtx(), functionCallback) - } catch(e) { - if (typeof e === 'string') { - oldlog(e) - } else { - oldlog(e.message) - } - if (!started) { - oldlog("Process exited before completing request\n") - } - } - } else { - console.error("bootstrap: No script specified") - process.exit(1); - } - }) -} - -function functionCallback(err, result) { - if (err != null) { - // then user returned error and we should respond with error - // http://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-mode-exceptions.html - oldlog(JSON.stringify({"errorMessage": errstr })) - return - } - if (result != null) { - oldlog(JSON.stringify(result)) - } -} - -run() \ No newline at end of file diff --git a/fn/run.go b/fn/run.go index a58c67ed7..7ee69f8bc 100644 --- a/fn/run.go +++ b/fn/run.go @@ -1,9 +1,12 @@ package main import ( + "bytes" "errors" "fmt" "io" + "io/ioutil" + "net/http" "os" "os/exec" "strings" @@ -11,6 +14,12 @@ import ( "github.com/urfave/cli" ) +const ( + DefaultFormat = "default" + HttpFormat = "http" + LocalTestURL = "http://localhost:8080/myapp/hello" +) + func run() cli.Command { r := runCmd{} @@ -28,7 +37,7 @@ type runCmd struct{} func runflags() []cli.Flag { return []cli.Flag{ cli.StringSliceFlag{ - Name: "e", + Name: "env, e", Usage: "select environment variables to be sent to function", }, cli.StringSliceFlag{ @@ -39,6 +48,14 @@ func runflags() []cli.Flag { Name: "method", Usage: "http method for function", }, + cli.StringFlag{ + Name: "format", + Usage: "format to use. `default` and `http` (hot) formats currently supported.", + }, + cli.IntFlag{ + Name: "runs", + Usage: "for hot functions only, will call the function `runs` times in a row.", + }, } } @@ -65,11 +82,11 @@ func (r *runCmd) run(c *cli.Context) error { } } - return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link")) + return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"), c.String("format"), c.Int("runs")) } -// TODO: THIS SHOULD USE THE RUNNER DRIVERS FROM THE SERVER SO IT'S ESSENTIALLY THE SAME PROCESS (MINUS DATABASE AND ALL THAT) -func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error { +// TODO: share all this stuff with the Docker driver in server or better yet, actually use the Docker driver +func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string, format string, runs int) error { sh := []string{"docker", "run", "--rm", "-i"} var env []string // env for the shelled out docker run command @@ -82,11 +99,16 @@ func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method strin method = "POST" } } + if format == "" { + format = DefaultFormat + } // Add expected env vars that service will add runEnv = append(runEnv, kvEq("METHOD", method)) - runEnv = append(runEnv, kvEq("REQUEST_URL", "http://localhost:8080/myapp/hello")) + runEnv = append(runEnv, kvEq("REQUEST_URL", LocalTestURL)) runEnv = append(runEnv, kvEq("APP_NAME", "myapp")) runEnv = append(runEnv, kvEq("ROUTE", "/hello")) // TODO: should we change this to PATH ? + runEnv = append(runEnv, kvEq("FORMAT", format)) + // add user defined envs runEnv = append(runEnv, envVars...) @@ -103,11 +125,41 @@ func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method strin sh = append(sh, "-e", e) } + if runs <= 0 { + runs = 1 + } + if ff.Type != nil && *ff.Type == "async" { // if async, we'll run this in a separate thread and wait for it to complete // reqID := id.New().String() // I'm starting to think maybe `fn run` locally should work the same whether sync or async? Or how would we allow to test the output? } + body := "" // used for hot functions + if format == HttpFormat && stdin != nil { + // let's swap out stdin for http formatted message + input, err := ioutil.ReadAll(stdin) + if err != nil { + return fmt.Errorf("error reading from stdin: %v", err) + } + + var b bytes.Buffer + for i := 0; i < runs; i++ { + // making new request each time since Write closes the body + req, err := http.NewRequest(method, LocalTestURL, strings.NewReader(string(input))) + if err != nil { + return fmt.Errorf("error creating http request: %v", err) + } + err = req.Write(&b) + b.Write([]byte("\n")) + } + + if err != nil { + return fmt.Errorf("error writing to byte buffer: %v", err) + } + body = b.String() + // fmt.Println("body:", s) + stdin = strings.NewReader(body) + } sh = append(sh, ff.FullName()) cmd := exec.Command(sh[0], sh[1:]...) diff --git a/fn/testfn.go b/fn/testfn.go index 899302517..1c2322995 100644 --- a/fn/testfn.go +++ b/fn/testfn.go @@ -136,7 +136,7 @@ func runlocaltest(target string, in, expectedOut, expectedErr *string, env map[s } ff := &funcfile{Name: target} - if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil { + if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil, DefaultFormat, 1); err != nil { return fmt.Errorf("%v\nstdout:%s\nstderr:%s\n", err, stdout.String(), stderr.String()) }