Got optional hot functions working for Lambda.

This commit is contained in:
Travis Reeder
2017-07-05 23:34:19 -07:00
parent 1f3218f8dd
commit 5ec0eadff8
6 changed files with 156 additions and 420 deletions

View File

@@ -162,6 +162,9 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
if route.Format == "" {
route.Format = "default"
}
envVars := map[string]string{
"METHOD": c.Request.Method,
"APP_NAME": appName,

View File

@@ -2,7 +2,8 @@
var fs = require('fs');
var net = require('net');
var http = require('http');
// var http = require('http');
var http_common = require('_http_common');
var events = require('events');
var HTTPParser = process.binding('http_parser').HTTPParser;
@@ -248,7 +249,7 @@ function freeParser(parser){
if (parser) {
parser.onIncoming = null;
parser.socket = null;
http.parsers.free(parser);
http_common.parsers.free(parser);
parser = null;
}
};
@@ -256,7 +257,7 @@ function freeParser(parser){
// parses http requests
function parse(socket){
var emitter = new events.EventEmitter();
var parser = http.parsers.alloc();
var parser = http_common.parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
parser.socket = socket;
@@ -270,7 +271,6 @@ function parse(socket){
var ret = parser.execute(buffer, 0, buffer.length);
if(ret instanceof Error){
emitter.emit('error');
freeParser(parser);
}
});
@@ -284,29 +284,8 @@ function parse(socket){
function run() {
// First, check format (ie: hot functions)
var format = process.env.FORMAT;
if (format == "http"){
// init parser
var parser = http.parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
parser.socket = process.stdin;
parser.maxHeaderPairs = 2000;
parser.onIncoming = function(req){
emitter.emit('request', req);
};
}
var parser = parse(socket);
setEnvFromHeader();
// FIXME(nikhil): Check for file existence and allow non-payload.
var path = process.env["PAYLOAD_FILE"];
var path = process.env["PAYLOAD_FILE"]; // if we allow a mounted file, this is used. Could safely be removed.
var stream = process.stdin;
if (path) {
try {
@@ -317,67 +296,99 @@ function run() {
}
}
var input = "";
stream.setEncoding('utf8');
stream.on('data', function(chunk) {
input += chunk;
});
// First, check format (ie: hot functions)
var format = process.env["FORMAT"];
console.error("format", format);
if (format == "http"){
// var parser = httpSocketSetup(stream)
// init parser
var parser = parse(stream);
let i = 0;
parser.on('request', function(req){
// Got parsed HTTP object
// console.error("REQUEST", req)
i++;
console.error("REQUEST:", i)
handleRequest(req);
});
stream.on('error', function(err) {
console.error("bootstrap: Error reading payload stream", err);
process.exit(1);
});
parser.on('error', function(e){
// Not HTTP data
console.error("INVALID HTTP DATA!", e)
});
stream.on('end', function() {
var payload = {}
try {
if (input.length > 0) {
payload = JSON.parse(input);
}
} catch(e) {
console.error("bootstrap: Error parsing JSON", e);
process.exit(1);
}
if (process.argv.length > 2) {
var handler = process.argv[2];
var parts = handler.split('.');
// FIXME(nikhil): Error checking.
var script = parts[0];
var entry = parts[1];
var started = false;
try {
var mod = require('./'+script);
var func = mod[entry];
if (func === undefined) {
oldlog("Handler '" + entry + "' missing on module '" + script + "'");
return;
}
if (typeof func !== 'function') {
throw "TypeError: " + (typeof func) + " is not a function";
}
started = true;
var cback
// RUN THE FUNCTION:
mod[entry](payload, makeCtx(), functionCallback)
} catch(e) {
if (typeof e === 'string') {
oldlog(e)
} else {
oldlog(e.message)
}
if (!started) {
oldlog("Process exited before completing request\n")
}
}
} else {
console.error("bootstrap: No script specified")
process.exit(1);
}
})
} else {
// default
handleRequest(stream);
}
}
function handleRequest(stream) {
var input = "";
stream.setEncoding('utf8');
stream.on('data', function(chunk) {
input += chunk;
});
stream.on('error', function(err) {
console.error("bootstrap: Error reading payload stream", err);
process.exit(1);
});
stream.on('end', function() {
var payload = {}
try {
if (input.length > 0) {
payload = JSON.parse(input);
}
} catch(e) {
console.error("bootstrap: Error parsing JSON", e);
process.exit(1);
}
handlePayload(payload)
})
}
function handlePayload(payload) {
if (process.argv.length > 2) {
var handler = process.argv[2];
var parts = handler.split('.');
// FIXME(nikhil): Error checking.
var script = parts[0];
var entry = parts[1];
var started = false;
try {
var mod = require('./'+script);
var func = mod[entry];
if (func === undefined) {
oldlog("Handler '" + entry + "' missing on module '" + script + "'");
return;
}
if (typeof func !== 'function') {
throw "TypeError: " + (typeof func) + " is not a function";
}
started = true;
var cback
// RUN THE FUNCTION:
mod[entry](payload, makeCtx(), functionCallback)
} catch(e) {
if (typeof e === 'string') {
oldlog(e)
} else {
oldlog(e.message)
}
if (!started) {
oldlog("Process exited before completing request\n")
}
}
} else {
console.error("bootstrap: No script specified")
process.exit(1);
}
}
function functionCallback(err, result) {
if (err != null) {
// then user returned error and we should respond with error

View File

@@ -1,330 +0,0 @@
'use strict';
var fs = require('fs');
var oldlog = console.log
console.log = console.error
// Some notes on the semantics of the succeed(), fail() and done() methods.
// Tests are the source of truth!
// First call wins in terms of deciding the result of the function. BUT,
// subsequent calls also log. Further, code execution does not stop, even where
// for done(), the docs say that the "function terminates". It seems though
// that further cycles of the event loop do not run. For example:
// index.handler = function(event, context) {
// context.fail("FAIL")
// process.nextTick(function() {
// console.log("This does not get logged")
// })
// console.log("This does get logged")
// }
// on the other hand:
// index.handler = function(event, context) {
// process.nextTick(function() {
// console.log("This also gets logged")
// context.fail("FAIL")
// })
// console.log("This does get logged")
// }
//
// The same is true for context.succeed() and done() captures the semantics of
// both. It seems this is implemented simply by having process.nextTick() cause
// process.exit() or similar, because the following:
// exports.handler = function(event, context) {
// process.nextTick(function() {console.log("This gets logged")})
// process.nextTick(function() {console.log("This also gets logged")})
// context.succeed("END")
// process.nextTick(function() {console.log("This does not get logged")})
// };
//
// So the context object needs to have some sort of hidden boolean that is only
// flipped once, by the first call, and dictates the behavior on the next tick.
//
// In addition, the response behaviour depends on the invocation type. If we
// are to only support the async type, succeed() must return a 202 response
// code, not sure how to do this.
//
// Only the first 256kb, followed by a truncation message, should be logged.
//
// Also, the error log is always in a json literal
// { "errorMessage": "<message>" }
var Context = function() {
var concluded = false;
var contextSelf = this;
// The succeed, fail and done functions are public, but access a private
// member (concluded). Hence this ugly nested definition.
this.succeed = function(result) {
if (concluded) {
return
}
// We have to process the result before we can conclude, because otherwise
// we have to fail. This means NO EARLY RETURNS from this function without
// review!
if (result === undefined) {
result = null
}
var failed = false;
try {
// Output result to log
oldlog(JSON.stringify(result));
} catch(e) {
// Set X-Amz-Function-Error: Unhandled header
console.log("Unable to stringify body as json: " + e);
failed = true;
}
// FIXME(nikhil): Return 202 or 200 based on invocation type and set response
// to result. Should probably be handled externally by the runner/swapi.
// OK, everything good.
concluded = true;
process.nextTick(function() { process.exit(failed ? 1 : 0) })
}
this.fail = function(error) {
if (concluded) {
return
}
concluded = true
process.nextTick(function() { process.exit(1) })
if (error === undefined) {
error = null
}
// FIXME(nikhil): Truncated log of error, plus non-truncated response body
var errstr = "fail() called with argument but a problem was encountered while converting it to a to string";
// The semantics of fail() are weird. If the error is something that can be
// converted to a string, the log output wraps the string in a JSON literal
// with key "errorMessage". If toString() fails, then the output is only
// the error string.
try {
if (error === null) {
errstr = null
} else {
errstr = error.toString()
}
oldlog(JSON.stringify({"errorMessage": errstr }))
} catch(e) {
// Set X-Amz-Function-Error: Unhandled header
oldlog(errstr)
}
}
this.done = function() {
var error = arguments[0];
var result = arguments[1];
if (error) {
contextSelf.fail(error)
} else {
contextSelf.succeed(result)
}
}
var plannedEnd = Date.now() + (getTimeoutInSeconds() * 1000);
this.getRemainingTimeInMillis = function() {
return Math.max(plannedEnd - Date.now(), 0);
}
}
function getTimeoutInSeconds() {
var t = parseInt(getEnv("TASK_TIMEOUT"));
if (Number.isNaN(t)) {
return 3600;
}
return t;
}
var getEnv = function(name) {
return process.env[name] || "";
}
var makeCtx = function() {
var fnname = getEnv("AWS_LAMBDA_FUNCTION_NAME");
// FIXME(nikhil): Generate UUID.
var taskID = getEnv("TASK_ID");
var mem = getEnv("TASK_MAXRAM").toLowerCase();
var bytes = 300 * 1024 * 1024;
var scale = { 'b': 1, 'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024 };
// We don't bother validating too much, if the last character is not a number
// and not in the scale table we just return a default value.
// We use slice instead of indexing so that we always get an empty string,
// instead of undefined.
if (mem.slice(-1).match(/[0-9]/)) {
var a = parseInt(mem);
if (!Number.isNaN(a)) {
bytes = a;
}
} else {
var rem = parseInt(mem.slice(0, -1));
if (!Number.isNaN(rem)) {
var multiplier = scale[mem.slice(-1)];
if (multiplier) {
bytes = rem * multiplier
}
}
}
var memoryMB = bytes / (1024 * 1024);
var ctx = new Context();
Object.defineProperties(ctx, {
"functionName": {
value: fnname,
enumerable: true,
},
"functionVersion": {
value: "$LATEST",
enumerable: true,
},
"invokedFunctionArn": {
// FIXME(nikhil): Should be filled in.
value: "",
enumerable: true,
},
"memoryLimitInMB": {
// Sigh, yes it is a string.
value: ""+memoryMB,
enumerable: true,
},
"awsRequestId": {
value: taskID,
enumerable: true,
},
"logGroupName": {
// FIXME(nikhil): Should be filled in.
value: "",
enumerable: true,
},
"logStreamName": {
// FIXME(nikhil): Should be filled in.
value: "",
enumerable: true,
},
"identity": {
// FIXME(nikhil): Should be filled in.
value: null,
enumerable: true,
},
"clientContext": {
// FIXME(nikhil): Should be filled in.
value: null,
enumerable: true,
},
});
return ctx;
}
var setEnvFromHeader = function () {
var headerPrefix = "CONFIG_";
var newEnvVars = {};
for (var key in process.env) {
if (key.indexOf(headerPrefix) == 0) {
newEnvVars[key.slice(headerPrefix.length)] = process.env[key];
}
}
for (var key in newEnvVars) {
process.env[key] = newEnvVars[key];
}
}
function run() {
setEnvFromHeader();
// FIXME(nikhil): Check for file existence and allow non-payload.
var path = process.env["PAYLOAD_FILE"];
var stream = process.stdin;
if (path) {
try {
stream = fs.createReadStream(path);
} catch(e) {
console.error("bootstrap: Error opening payload file", e)
process.exit(1);
}
}
var input = "";
stream.setEncoding('utf8');
stream.on('data', function(chunk) {
input += chunk;
});
stream.on('error', function(err) {
console.error("bootstrap: Error reading payload stream", err);
process.exit(1);
});
stream.on('end', function() {
var payload = {}
try {
if (input.length > 0) {
payload = JSON.parse(input);
}
} catch(e) {
console.error("bootstrap: Error parsing JSON", e);
process.exit(1);
}
if (process.argv.length > 2) {
var handler = process.argv[2];
var parts = handler.split('.');
// FIXME(nikhil): Error checking.
var script = parts[0];
var entry = parts[1];
var started = false;
try {
var mod = require('./'+script);
var func = mod[entry];
if (func === undefined) {
oldlog("Handler '" + entry + "' missing on module '" + script + "'");
return;
}
if (typeof func !== 'function') {
throw "TypeError: " + (typeof func) + " is not a function";
}
started = true;
var cback
// RUN THE FUNCTION:
mod[entry](payload, makeCtx(), functionCallback)
} catch(e) {
if (typeof e === 'string') {
oldlog(e)
} else {
oldlog(e.message)
}
if (!started) {
oldlog("Process exited before completing request\n")
}
}
} else {
console.error("bootstrap: No script specified")
process.exit(1);
}
})
}
function functionCallback(err, result) {
if (err != null) {
// then user returned error and we should respond with error
// http://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-mode-exceptions.html
oldlog(JSON.stringify({"errorMessage": errstr }))
return
}
if (result != null) {
oldlog(JSON.stringify(result))
}
}
run()

View File

@@ -299,7 +299,7 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error {
if rt.Path == "" && ff.Path != "" {
rt.Path = ff.Path
}
if rt.Type == nil && ff.Type != nil && *ff.Type != "" {
if rt.Type == "" && ff.Type != nil && *ff.Type != "" {
rt.Type = *ff.Type
}

View File

@@ -1,9 +1,12 @@
package main
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strings"
@@ -11,6 +14,12 @@ import (
"github.com/urfave/cli"
)
const (
DefaultFormat = "default"
HttpFormat = "http"
LocalTestURL = "http://localhost:8080/myapp/hello"
)
func run() cli.Command {
r := runCmd{}
@@ -28,7 +37,7 @@ type runCmd struct{}
func runflags() []cli.Flag {
return []cli.Flag{
cli.StringSliceFlag{
Name: "e",
Name: "env, e",
Usage: "select environment variables to be sent to function",
},
cli.StringSliceFlag{
@@ -39,6 +48,14 @@ func runflags() []cli.Flag {
Name: "method",
Usage: "http method for function",
},
cli.StringFlag{
Name: "format",
Usage: "format to use. `default` and `http` (hot) formats currently supported.",
},
cli.IntFlag{
Name: "runs",
Usage: "for hot functions only, will call the function `runs` times in a row.",
},
}
}
@@ -65,11 +82,11 @@ func (r *runCmd) run(c *cli.Context) error {
}
}
return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"))
return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"), c.String("format"), c.Int("runs"))
}
// TODO: THIS SHOULD USE THE RUNNER DRIVERS FROM THE SERVER SO IT'S ESSENTIALLY THE SAME PROCESS (MINUS DATABASE AND ALL THAT)
func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error {
// TODO: share all this stuff with the Docker driver in server or better yet, actually use the Docker driver
func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string, format string, runs int) error {
sh := []string{"docker", "run", "--rm", "-i"}
var env []string // env for the shelled out docker run command
@@ -82,11 +99,16 @@ func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method strin
method = "POST"
}
}
if format == "" {
format = DefaultFormat
}
// Add expected env vars that service will add
runEnv = append(runEnv, kvEq("METHOD", method))
runEnv = append(runEnv, kvEq("REQUEST_URL", "http://localhost:8080/myapp/hello"))
runEnv = append(runEnv, kvEq("REQUEST_URL", LocalTestURL))
runEnv = append(runEnv, kvEq("APP_NAME", "myapp"))
runEnv = append(runEnv, kvEq("ROUTE", "/hello")) // TODO: should we change this to PATH ?
runEnv = append(runEnv, kvEq("FORMAT", format))
// add user defined envs
runEnv = append(runEnv, envVars...)
@@ -103,11 +125,41 @@ func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method strin
sh = append(sh, "-e", e)
}
if runs <= 0 {
runs = 1
}
if ff.Type != nil && *ff.Type == "async" {
// if async, we'll run this in a separate thread and wait for it to complete
// reqID := id.New().String()
// I'm starting to think maybe `fn run` locally should work the same whether sync or async? Or how would we allow to test the output?
}
body := "" // used for hot functions
if format == HttpFormat && stdin != nil {
// let's swap out stdin for http formatted message
input, err := ioutil.ReadAll(stdin)
if err != nil {
return fmt.Errorf("error reading from stdin: %v", err)
}
var b bytes.Buffer
for i := 0; i < runs; i++ {
// making new request each time since Write closes the body
req, err := http.NewRequest(method, LocalTestURL, strings.NewReader(string(input)))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}
err = req.Write(&b)
b.Write([]byte("\n"))
}
if err != nil {
return fmt.Errorf("error writing to byte buffer: %v", err)
}
body = b.String()
// fmt.Println("body:", s)
stdin = strings.NewReader(body)
}
sh = append(sh, ff.FullName())
cmd := exec.Command(sh[0], sh[1:]...)

View File

@@ -135,7 +135,7 @@ func runlocaltest(target string, in, expectedOut, expectedErr *string, env map[s
}
ff := &funcfile{Name: target}
if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil {
if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil, DefaultFormat, 1); err != nil {
return fmt.Errorf("%v\nstdout:%s\nstderr:%s\n", err, stdout.String(), stderr.String())
}