From 01b8e8679da78d0ce0bb30a8e4187a9447a08753 Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Wed, 26 Sep 2018 05:25:48 -0700 Subject: [PATCH] HTTP trigger http-stream tests (#1241) --- Gopkg.lock | 212 ++---------------- Makefile | 7 +- api/agent/agent.go | 7 +- api/agent/call.go | 62 +---- api/agent/lb_agent.go | 1 - api/common/io_utils.go | 4 +- api/server/runner_fninvoke.go | 129 ++++++----- api/server/runner_fninvoke_test.go | 2 +- api/server/runner_httptrigger.go | 117 ++++++---- api/server/runner_httptrigger_test.go | 106 ++++++--- docs/definitions.md | 4 +- images/fn-test-utils/Gopkg.lock | 2 +- images/fn-test-utils/fn-test-utils.go | 41 +++- vendor/github.com/fnproject/fdk-go/circle.yml | 33 +++ .../fdk-go/examples/hello/Gopkg.lock | 2 +- .../fnproject/fdk-go/examples/hello/func.go | 17 +- .../fnproject/fdk-go/examples/hello/func.yaml | 6 + vendor/github.com/fnproject/fdk-go/fdk.go | 16 +- .../fnproject/fdk-go/utils/httpstream.go | 168 ++++++++++++++ .../fnproject/fdk-go/utils/utils.go | 15 +- 20 files changed, 548 insertions(+), 403 deletions(-) create mode 100644 vendor/github.com/fnproject/fdk-go/circle.yml create mode 100644 vendor/github.com/fnproject/fdk-go/examples/hello/func.yaml create mode 100644 vendor/github.com/fnproject/fdk-go/utils/httpstream.go diff --git a/Gopkg.lock b/Gopkg.lock index 6bdb2af85..8e7862e8a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,23 +2,18 @@ [[projects]] - digest = "1:4af4d05ade1fe949a30dc3a2e712943cf096a16c7e68b386c93952c1b71ce704" name = "git.apache.org/thrift.git" packages = ["lib/go/thrift"] - pruneopts = "" revision = "272470790ad6db791bd6f9db399b2cd2d5879f74" source = "github.com/apache/thrift" [[projects]] branch = "master" - digest = "1:3721a10686511b80c052323423f0de17a8c06d417dbdd3b392b1578432a33aae" name = "github.com/Nvveen/Gotty" packages = ["."] - pruneopts = "" revision = "cd527374f1e5bff4938207604a14f2e38a9cf512" [[projects]] - digest = "1:63776251fbaa60062742412a9d2fa1e4b7cd24bcf5527fa9656b314ea8d60913" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -48,53 +43,41 @@ "service/s3", "service/s3/s3iface", "service/s3/s3manager", - "service/sts", + "service/sts" ] - pruneopts = "" revision = "bff41fb23b7550368282029f6478819d6a99ae0f" version = "v1.12.79" [[projects]] branch = "master" - digest = "1:c0bec5f9b98d0bc872ff5e834fac186b807b656683bd29cb82fb207a1513fabb" name = "github.com/beorn7/perks" packages = ["quantile"] - pruneopts = "" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" [[projects]] - digest = "1:8f4ae450cb47d944eb7fc75edcca20449915b7c0f5ddfa2e7d145ba07f9fdceb" name = "github.com/boltdb/bolt" packages = ["."] - pruneopts = "" revision = "fa5367d20c994db73282594be0146ab221657943" [[projects]] branch = "master" - digest = "1:54f28fa28bb8fc90169e37ae25182cb0e7d06e38fa8530a90bedb497410fed0d" name = "github.com/containerd/continuity" packages = ["pathdriver"] - pruneopts = "" revision = "246e49050efdf45e8f17fbbcf1547ee376f9939e" [[projects]] - digest = "1:3c3f68ebab415344aef64363d23471e953a4715645115604aaf57923ae904f5e" name = "github.com/coreos/go-semver" packages = ["semver"] - pruneopts = "" revision = "8ab6407b697782a06568d4b7f1db25550ec2e4c6" version = "v0.2.0" [[projects]] - digest = "1:437eb8b3aff726370e97dddbea5366eff786728582cc785033a274c1491f2710" name = "github.com/dchest/siphash" packages = ["."] - pruneopts = "" revision = "4ebf1de738443ea7f45f02dc394c4df1942a126d" version = "v1.1.0" [[projects]] - digest = "1:5bd14ec4eacea32533d819153f9a972f219c7004a995763c4f3e0d9da6841855" name = "github.com/docker/docker" packages = [ "api/types", @@ -118,129 +101,101 @@ "pkg/mount", "pkg/pools", "pkg/stdcopy", - "pkg/system", + "pkg/system" ] - pruneopts = "" revision = "c3e32938430e03a316311f9e4fbdb743e492a07e" [[projects]] - digest = "1:a5ecc2e70260a87aa263811281465a5effcfae8a54bac319cee87c4625f04d63" name = "github.com/docker/go-connections" packages = ["nat"] - pruneopts = "" revision = "3ede32e2033de7505e6500d6c868c2b9ed9f169d" version = "v0.3.0" [[projects]] - digest = "1:582d54fcb7233da8dde1dfd2210a5b9675d0685f84246a8d317b07d680c18b1b" name = "github.com/docker/go-units" packages = ["."] - pruneopts = "" revision = "47565b4f722fb6ceae66b95f853feed578a4a51c" version = "v0.3.3" [[projects]] - digest = "1:6a9b61ab4f59a90724b309215446a880a1f29c327f67bd54de5101e1125fe6dc" name = "github.com/docker/libnetwork" packages = ["ipamutils"] - pruneopts = "" revision = "0ae9b6f38f24f65567d4b46602502b33c95cf57a" [[projects]] branch = "master" - digest = "1:209f213526a6c3f2d6ef431bb4303bd97fa4af8350e67b5b4d43b53fd469a005" name = "github.com/fnproject/fdk-go" packages = [ ".", - "utils", + "utils" ] - pruneopts = "" - revision = "1eb29530716f262bad5b83eb9a5b3f7483636949" + revision = "19c8e175e8563669762773c3c3f68e6bf995b16a" [[projects]] - digest = "1:eb53021a8aa3f599d29c7102e65026242bdedce998a54837dc67f14b6a97c5fd" name = "github.com/fsnotify/fsnotify" packages = ["."] - pruneopts = "" revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" version = "v1.4.7" [[projects]] - digest = "1:fb02a20784a57cb6229a8097d97546fdc4a14f3506463ae8cc73d5727f326532" name = "github.com/fsouza/go-dockerclient" packages = [ ".", "internal/archive", "internal/jsonmessage", - "internal/term", + "internal/term" ] - pruneopts = "" revision = "1b33efcb40bef3494af8bc6a1f73600f37a3c9d1" [[projects]] - digest = "1:9525d0e79ccf382e32edeef466b9a91f16eb0eebdca5971a03fad1bb3be9cd89" name = "github.com/garyburd/redigo" packages = [ "internal", - "redis", + "redis" ] - pruneopts = "" revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e" version = "v1.6.0" [[projects]] - digest = "1:629d5458c77f7df2e78f011d535da90aaf117cee238e44407952d65691a954a0" name = "github.com/gin-contrib/cors" packages = ["."] - pruneopts = "" revision = "cf4846e6a636a76237a28d9286f163c132e841bc" version = "v1.2" [[projects]] branch = "master" - digest = "1:1120f960f5c334f0f94bad29eefaf73d52d226893369693686148f66c1993f15" name = "github.com/gin-contrib/sse" packages = ["."] - pruneopts = "" revision = "22d885f9ecc78bf4ee5d72b937e4bbcdc58e8cae" [[projects]] - digest = "1:348ceb76f2ac958e541e4ba3190484b68df28c38ac9720ed4ef8d36af69ce52e" name = "github.com/gin-gonic/gin" packages = [ ".", "binding", - "render", + "render" ] - pruneopts = "" revision = "d459835d2b077e44f7c9b453505ee29881d5d12d" version = "v1.2" [[projects]] - digest = "1:617b3e0f5989d4ff866a1820480990c65dfc9257eb080da749a45e2d76681b02" name = "github.com/go-ini/ini" packages = ["."] - pruneopts = "" revision = "06f5f3d67269ccec1fe5fe4134ba6e982984f7f5" version = "v1.37.0" [[projects]] - digest = "1:54e2545b223c001f4265c38a66a5ed631536801f6cd869a63e35c7ba6f14b188" name = "github.com/go-sql-driver/mysql" packages = ["."] - pruneopts = "" revision = "21d7e97c9f760ca685a01ecea202e1c84276daa1" [[projects]] - digest = "1:0a3f6a0c68ab8f3d455f8892295503b179e571b7fefe47cc6c556405d1f83411" name = "github.com/gogo/protobuf" packages = ["proto"] - pruneopts = "" revision = "1adfc126b41513cc696b209667c8656ea7aac67c" version = "v1.0.0" [[projects]] - digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b" name = "github.com/golang/protobuf" packages = [ "proto", @@ -248,220 +203,174 @@ "ptypes/any", "ptypes/duration", "ptypes/empty", - "ptypes/timestamp", + "ptypes/timestamp" ] - pruneopts = "" revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" version = "v1.1.0" [[projects]] branch = "master" - digest = "1:be28c0531a755f2178acf1e327e6f5a8a3968feb5f2567cdc968064253141751" name = "github.com/google/btree" packages = ["."] - pruneopts = "" revision = "e89373fe6b4a7413d7acd6da1725b83ef713e6e4" [[projects]] - digest = "1:9a0b2dd1f882668a3d7fbcd424eed269c383a16f1faa3a03d14e0dd5fba571b1" name = "github.com/grpc-ecosystem/go-grpc-middleware" packages = ["."] - pruneopts = "" revision = "c250d6563d4d4c20252cd865923440e829844f4e" version = "v1.0.0" [[projects]] - digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" name = "github.com/jmespath/go-jmespath" packages = ["."] - pruneopts = "" revision = "0b12d6b5" [[projects]] branch = "master" - digest = "1:617ee2434b77e911fa26b678730be9a617f75243b194eadc8201c8ac860844aa" name = "github.com/jmoiron/sqlx" packages = [ ".", - "reflectx", + "reflectx" ] - pruneopts = "" revision = "0dae4fefe7c0e190f7b5a78dac28a1c82cc8d849" [[projects]] - digest = "1:b352d4ebd13c046f4585b0538b095ce7f8ffcf2cca7b87c6a86083782aac1325" name = "github.com/leanovate/gopter" packages = [ ".", "gen", - "prop", + "prop" ] - pruneopts = "" revision = "1f4d0ba27bd5df5390eb622bcb458f8f7ac2573c" version = "v0.2.2" [[projects]] branch = "master" - digest = "1:09792d732b079867772cdbabdf7dc54ef9f9d04c998a9ce6226657151fccbb94" name = "github.com/lib/pq" packages = [ ".", - "oid", + "oid" ] - pruneopts = "" revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8" [[projects]] - digest = "1:78229b46ddb7434f881390029bd1af7661294af31f6802e0e1bedaad4ab0af3c" name = "github.com/mattn/go-isatty" packages = ["."] - pruneopts = "" revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39" version = "v0.0.3" [[projects]] - digest = "1:bc03901fc8f0965ccba8bc453eae21a9b04f95999eab664c7de6dc7290f4e8f4" name = "github.com/mattn/go-sqlite3" packages = ["."] - pruneopts = "" revision = "25ecb14adfc7543176f7d85291ec7dba82c6f7e4" version = "v1.9.0" [[projects]] - digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" name = "github.com/matttproud/golang_protobuf_extensions" packages = ["pbutil"] - pruneopts = "" revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" version = "v1.0.1" [[projects]] - digest = "1:5d9b668b0b4581a978f07e7d2e3314af18eb27b3fb5d19b70185b7c575723d11" name = "github.com/opencontainers/go-digest" packages = ["."] - pruneopts = "" revision = "279bed98673dd5bef374d3b6e4b09e2af76183bf" version = "v1.0.0-rc1" [[projects]] - digest = "1:f26c8670b11e29a49c8e45f7ec7f2d5bac62e8fd4e3c0ae1662baa4a697f984a" name = "github.com/opencontainers/image-spec" packages = [ "specs-go", - "specs-go/v1", + "specs-go/v1" ] - pruneopts = "" revision = "d60099175f88c47cd379c4738d158884749ed235" version = "v1.0.1" [[projects]] - digest = "1:fa19ddee0e5ee5951a6a450a4b6ec635a42957f86bfc87d9d778eeee04ad2036" name = "github.com/opencontainers/runc" packages = ["libcontainer/user"] - pruneopts = "" revision = "baf6536d6259209c3edfa2b22237af82942d3dfa" version = "v0.1.1" [[projects]] - digest = "1:aec3ca6ffc2c0dd856e858b2f3d3bcc291123fff7ed1456e0a1a6dac8a986b75" name = "github.com/openzipkin/zipkin-go" packages = [ "model", "reporter", - "reporter/http", + "reporter/http" ] - pruneopts = "" revision = "f197ec29e729f226d23370ea60f0e49b8f44ccf4" version = "v0.1.0" [[projects]] - digest = "1:4c0404dc03d974acd5fcd8b8d3ce687b13bd169db032b89275e8b9d77b98ce8c" name = "github.com/patrickmn/go-cache" packages = ["."] - pruneopts = "" revision = "a3647f8e31d79543b2d0f0ae2fe5c379d72cedc0" version = "v2.1.0" [[projects]] branch = "master" - digest = "1:c24598ffeadd2762552269271b3b1510df2d83ee6696c1e543a0ff653af494bc" name = "github.com/petar/GoLLRB" packages = ["llrb"] - pruneopts = "" revision = "53be0d36a84c2a886ca057d34b6aa4468df9ccb4" [[projects]] - digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" name = "github.com/pkg/errors" packages = ["."] - pruneopts = "" revision = "645ef00459ed84a119197bfb8d8205042c6df63d" version = "v0.8.0" [[projects]] - digest = "1:4142d94383572e74b42352273652c62afec5b23f325222ed09198f46009022d1" name = "github.com/prometheus/client_golang" packages = [ "prometheus", - "prometheus/promhttp", + "prometheus/promhttp" ] - pruneopts = "" revision = "c5b7fccd204277076155f10851dad72b76a49317" version = "v0.8.0" [[projects]] branch = "master" - digest = "1:60aca47f4eeeb972f1b9da7e7db51dee15ff6c59f7b401c1588b8e6771ba15ef" name = "github.com/prometheus/client_model" packages = ["go"] - pruneopts = "" revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" [[projects]] branch = "master" - digest = "1:bfbc121ef802d245ef67421cff206615357d9202337a3d492b8f668906b485a8" name = "github.com/prometheus/common" packages = [ "expfmt", "internal/bitbucket.org/ww/goautoneg", - "model", + "model" ] - pruneopts = "" revision = "7600349dcfe1abd18d72d3a1770870d9800a7801" [[projects]] branch = "master" - digest = "1:cccf925b20ad15bfad909bdfed67a0d9a29c7e2264597199858067e7eeada232" name = "github.com/prometheus/procfs" packages = [ ".", "internal/util", "nfs", - "xfs", + "xfs" ] - pruneopts = "" revision = "7d6f385de8bea29190f15ba9931442a0eaef9af7" [[projects]] - digest = "1:d85b68f829b376e76d36b3f7841fb544495d96b8999ce6ebf7317b74386ca602" name = "github.com/sirupsen/logrus" packages = [ ".", - "hooks/syslog", + "hooks/syslog" ] - pruneopts = "" revision = "89742aefa4b206dcf400792f3bd35b542998eb3b" [[projects]] - digest = "1:2e7f653483e51243b6cd6de60ce39bde0d6927d10a3c24295ab0f82cb1efeae2" name = "github.com/ugorji/go" packages = ["codec"] - pruneopts = "" revision = "b4c50a2b199d93b13dc15e78929cfb23bfdf21ab" version = "v1.1.1" [[projects]] - digest = "1:8778190ba18941ba540ea3a065bfe3e6c4126ab16aa2f8a240076ef51af9ab75" name = "go.opencensus.io" packages = [ ".", @@ -479,23 +388,19 @@ "tag", "trace", "trace/internal", - "trace/propagation", + "trace/propagation" ] - pruneopts = "" revision = "7b558058b7cc960667590e5413ef55157b06652e" version = "v0.15.0" [[projects]] branch = "master" - digest = "1:2b051135019435c0cf95756a3ff5366b87ddc7e06e91571506c6264ce721784f" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - pruneopts = "" revision = "7f39a6fea4fe9364fb61e1def6a268a51b4f3a06" [[projects]] branch = "master" - digest = "1:5dc6753986b9eeba4abdf05dedc5ba06bb52dad43cc8aad35ffb42bb7adfa68f" name = "golang.org/x/net" packages = [ "context", @@ -504,32 +409,26 @@ "http2/hpack", "idna", "internal/timeseries", - "trace", + "trace" ] - pruneopts = "" revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" [[projects]] branch = "master" - digest = "1:b2ea75de0ccb2db2ac79356407f8a4cd8f798fe15d41b381c00abf3ae8e55ed1" name = "golang.org/x/sync" packages = ["semaphore"] - pruneopts = "" revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca" [[projects]] branch = "master" - digest = "1:a6a50b355ef54a65de2aa58cf9fa8f4554159e42f1881a30f76541e43090d217" name = "golang.org/x/sys" packages = [ "unix", - "windows", + "windows" ] - pruneopts = "" revision = "ad87a3a340fa7f3bed189293fbfa7a9b7e021ae1" [[projects]] - digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" name = "golang.org/x/text" packages = [ "collate", @@ -545,38 +444,30 @@ "unicode/bidi", "unicode/cldr", "unicode/norm", - "unicode/rangetable", + "unicode/rangetable" ] - pruneopts = "" revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" version = "v0.3.0" [[projects]] branch = "master" - digest = "1:55a681cb66f28755765fa5fa5104cbd8dc85c55c02d206f9f89566451e3fe1aa" name = "golang.org/x/time" packages = ["rate"] - pruneopts = "" revision = "fbb02b2291d28baffd63558aa44b4b56f178d650" [[projects]] branch = "master" - digest = "1:7d15746ff4df12481c89fd953a28122fa75368fb1fb1bb1fed918a78647b3c3a" name = "google.golang.org/api" packages = ["support/bundler"] - pruneopts = "" revision = "2eea9ba0a3d94f6ab46508083e299a00bbbc65f6" [[projects]] branch = "master" - digest = "1:d88c2eb6750028f5707971ed1340a02407d5f0967af85bf6ac70c031533f6c15" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] - pruneopts = "" revision = "32ee49c4dd805befd833990acba36cb75042378c" [[projects]] - digest = "1:5f31b45ee9da7a87f140bef3ed0a7ca34ea2a6d38eb888123b8e28170e8aa4f2" name = "google.golang.org/grpc" packages = [ ".", @@ -603,83 +494,26 @@ "stats", "status", "tap", - "transport", + "transport" ] - pruneopts = "" revision = "168a6198bcb0ef175f7dacec0b8691fc141dc9b8" version = "v1.13.0" [[projects]] - digest = "1:dd549e360e5a8f982a28c2bcbe667307ceffe538ed9afc7c965524f1ac285b3f" name = "gopkg.in/go-playground/validator.v8" packages = ["."] - pruneopts = "" revision = "5f1438d3fca68893a817e4a66806cea46a9e4ebf" version = "v8.18.2" [[projects]] - digest = "1:f0620375dd1f6251d9973b5f2596228cc8042e887cd7f827e4220bc1ce8c30e2" name = "gopkg.in/yaml.v2" packages = ["."] - pruneopts = "" revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" version = "v2.2.1" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - input-imports = [ - "github.com/aws/aws-sdk-go/aws", - "github.com/aws/aws-sdk-go/aws/awserr", - "github.com/aws/aws-sdk-go/aws/credentials", - "github.com/aws/aws-sdk-go/aws/session", - "github.com/aws/aws-sdk-go/service/s3", - "github.com/aws/aws-sdk-go/service/s3/s3manager", - "github.com/boltdb/bolt", - "github.com/coreos/go-semver/semver", - "github.com/dchest/siphash", - "github.com/fnproject/fdk-go", - "github.com/fnproject/fdk-go/utils", - "github.com/fsnotify/fsnotify", - "github.com/fsouza/go-dockerclient", - "github.com/garyburd/redigo/redis", - "github.com/gin-contrib/cors", - "github.com/gin-gonic/gin", - "github.com/go-sql-driver/mysql", - "github.com/golang/protobuf/proto", - "github.com/golang/protobuf/ptypes/empty", - "github.com/google/btree", - "github.com/grpc-ecosystem/go-grpc-middleware", - "github.com/jmoiron/sqlx", - "github.com/leanovate/gopter", - "github.com/leanovate/gopter/gen", - "github.com/leanovate/gopter/prop", - "github.com/lib/pq", - "github.com/mattn/go-sqlite3", - "github.com/openzipkin/zipkin-go/reporter/http", - "github.com/patrickmn/go-cache", - "github.com/prometheus/client_golang/prometheus", - "github.com/sirupsen/logrus", - "github.com/sirupsen/logrus/hooks/syslog", - "go.opencensus.io/exporter/jaeger", - "go.opencensus.io/exporter/prometheus", - "go.opencensus.io/exporter/zipkin", - "go.opencensus.io/plugin/ochttp", - "go.opencensus.io/plugin/ochttp/propagation/b3", - "go.opencensus.io/stats", - "go.opencensus.io/stats/view", - "go.opencensus.io/tag", - "go.opencensus.io/trace", - "golang.org/x/net/context", - "golang.org/x/net/trace", - "golang.org/x/sys/unix", - "golang.org/x/time/rate", - "google.golang.org/grpc", - "google.golang.org/grpc/codes", - "google.golang.org/grpc/credentials", - "google.golang.org/grpc/metadata", - "google.golang.org/grpc/peer", - "google.golang.org/grpc/status", - ] + inputs-digest = "caf9da7776a8bba243fa98193ff57c7fff65abe07019dd4115c228ae54f19770" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 5531e7c4c..466928f02 100644 --- a/Makefile +++ b/Makefile @@ -130,7 +130,12 @@ docker-test: -v ${CURDIR}:/go/src/github.com/fnproject/fn \ -w /go/src/github.com/fnproject/fn \ fnproject/go:dev go test \ - -v $(shell docker run --rm -ti -v ${CURDIR}:/go/src/github.com/fnproject/fn -w /go/src/github.com/fnproject/fn -e GOPATH=/go golang:alpine sh -c 'go list ./... | grep -v vendor | grep -v examples | grep -v tool | grep -v fn') + -v $(shell docker run --rm -ti -v ${CURDIR}:/go/src/github.com/fnproject/fn -w /go/src/github.com/fnproject/fn -e GOPATH=/go golang:alpine sh -c 'go list ./... | \ + grep -v vendor | \ + grep -v examples | \ + grep -v test/fn-api-tests | \ + grep -v test/fn-system-tests | \ + grep -v images/fn-test-utils') .PHONY: all all: dep generate build diff --git a/api/agent/agent.go b/api/agent/agent.go index bc2842dd1..8e8f4c20e 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -755,13 +755,18 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error { } go func() { + // TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still + // TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it + swapBack := s.container.swap(nil, call.stderr, call.stderr, &call.Stats) + defer swapBack() + resp, err := s.udsClient.Do(req) if err != nil { common.Logger(ctx).WithError(err).Error("Got error from UDS socket") errApp <- models.NewAPIError(http.StatusBadGateway, errors.New("error receiving function response")) return } - common.Logger(ctx).WithField("status", resp.StatusCode).Debug("Got resp from UDS socket") + common.Logger(ctx).WithField("resp", resp).Debug("Got resp from UDS socket") defer resp.Body.Close() diff --git a/api/agent/call.go b/api/agent/call.go index 065799a69..f8990f61d 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -11,15 +11,12 @@ import ( "strings" "time" - "go.opencensus.io/trace" - - "net/textproto" - "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) type Call interface { @@ -55,44 +52,6 @@ const ( invokePath = "/invoke" ) -var skipTriggerHeaders = map[string]bool{ - "Connection": true, - "Keep-Alive": true, - "Trailer": true, - "Transfer-Encoding": true, - "TE": true, - "Upgrade": true, -} - -// Sets up a call from an http trigger request -func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trigger, req *http.Request) CallOpt { - return func(c *call) error { - // transpose trigger headers into HTTP - headers := make(http.Header) - for k, vs := range req.Header { - // should be generally unnecessary but to be doubly sure. - k = textproto.CanonicalMIMEHeaderKey(k) - if skipTriggerHeaders[k] { - continue - } - rewriteKey := fmt.Sprintf("Fn-Http-H-%s", k) - for _, v := range vs { - headers.Add(rewriteKey, v) - } - } - requestUrl := reqURL(req) - - headers.Set("Fn-Http-Method", req.Method) - headers.Set("Fn-Http-Request-Url", requestUrl) - headers.Set("Fn-Intent", "httprequest") - req.Header = headers - - err := FromHTTPFnRequest(app, fn, req)(c) - c.Model().TriggerID = trigger.ID - return err - } -} - // Sets up a call from an http trigger request func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt { return func(c *call) error { @@ -235,7 +194,17 @@ func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt { } } -// WithWriter sets the writier that the call uses to send its output message to +// WithTrigger adds trigger specific bits to a call. +// TODO consider removal, this is from a shuffle +func WithTrigger(t *models.Trigger) CallOpt { + return func(c *call) error { + // right now just set the trigger id + c.TriggerID = t.ID + return nil + } +} + +// WithWriter sets the writer that the call uses to send its output message to // TODO this should be required func WithWriter(w io.Writer) CallOpt { return func(c *call) error { @@ -330,7 +299,6 @@ type call struct { requestState RequestState containerState ContainerState slotHashId string - isLB bool // LB & Pure Runner Extra Config extensions map[string]string @@ -378,12 +346,6 @@ func (c *call) Start(ctx context.Context) error { c.StartedAt = common.DateTime(time.Now()) c.Status = "running" - if !c.isLB { - if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in - rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) - } - } - if c.Type == models.TypeAsync { // XXX (reed): make sure MQ reservation is lengthy. to skirt MQ semantics, // we could add a new message to MQ w/ delay of call.Timeout and delete the diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 1aa489c15..d34f1e283 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -119,7 +119,6 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { setupCtx(&c) - c.isLB = true c.handler = a.cda c.ct = a c.stderr = &nullReadWriter{} diff --git a/api/common/io_utils.go b/api/common/io_utils.go index 1b0338034..10dafcb05 100644 --- a/api/common/io_utils.go +++ b/api/common/io_utils.go @@ -107,7 +107,7 @@ func (g *ghostWriter) awaitRealWriter() (io.Writer, bool) { g.cond.L.Unlock() return nil, false } - if _, ok := g.inner.(*waitWriter); ok { + if _, ok := g.inner.(*waitWriter); ok || g.inner == nil { g.cond.Wait() } else { break @@ -180,7 +180,7 @@ func (g *ghostReader) awaitRealReader() (io.Reader, bool) { g.cond.L.Unlock() return nil, false } - if _, ok := g.inner.(*waitReader); ok { + if _, ok := g.inner.(*waitReader); ok || g.inner == nil { g.cond.Wait() } else { break diff --git a/api/server/runner_fninvoke.go b/api/server/runner_fninvoke.go index 0c9ea984f..395c0d4af 100644 --- a/api/server/runner_fninvoke.go +++ b/api/server/runner_fninvoke.go @@ -6,7 +6,6 @@ import ( "net/http" "strconv" "sync" - "time" "github.com/fnproject/fn/api" "github.com/fnproject/fn/api/agent" @@ -20,16 +19,6 @@ var ( bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} ) -type ResponseBufferingWriter interface { - http.ResponseWriter - io.Reader - Status() int - GetBuffer() *bytes.Buffer - SetBuffer(*bytes.Buffer) -} - -var _ ResponseBufferingWriter = new(syncResponseWriter) - // implements http.ResponseWriter // this little guy buffers responses from user containers and lets them still // set headers and such without us risking writing partial output [as much, the @@ -41,13 +30,10 @@ type syncResponseWriter struct { *bytes.Buffer } -func (s *syncResponseWriter) Header() http.Header { return s.headers } +var _ http.ResponseWriter = new(syncResponseWriter) // nice compiler errors -// By storing the status here, we effectively buffer the response -func (s *syncResponseWriter) WriteHeader(code int) { s.status = code } -func (s *syncResponseWriter) Status() int { return s.status } -func (s *syncResponseWriter) GetBuffer() *bytes.Buffer { return s.Buffer } -func (s *syncResponseWriter) SetBuffer(buf *bytes.Buffer) { s.Buffer = buf } +func (s *syncResponseWriter) Header() http.Header { return s.headers } +func (s *syncResponseWriter) WriteHeader(code int) { s.status = code } // handleFnInvokeCall executes the function, for router handlers func (s *Server) handleFnInvokeCall(c *gin.Context) { @@ -79,71 +65,84 @@ func (s *Server) handleFnInvokeCall2(c *gin.Context) error { } func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) error { - writer := &syncResponseWriter{ - headers: c.Writer.Header(), + return s.fnInvoke(c.Writer, c.Request, app, fn, nil) +} + +func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *models.App, fn *models.Fn, trig *models.Trigger) error { + // TODO: we should get rid of the buffers, and stream back (saves memory (+splice), faster (splice), allows streaming, don't have to cap resp size) + // buffer the response before writing it out to client to prevent partials from trying to stream + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + bufWriter := syncResponseWriter{ + headers: resp.Header(), + status: 200, + Buffer: buf, } - call, err := s.agent.GetCall(agent.WithWriter(writer), // XXX (reed): order matters [for now] - agent.FromHTTPFnRequest(app, fn, c.Request)) + var writer http.ResponseWriter = &bufWriter + writer = &jsonContentTypeTrapper{ResponseWriter: writer} + opts := []agent.CallOpt{ + agent.WithWriter(writer), // XXX (reed): order matters [for now] + agent.FromHTTPFnRequest(app, fn, req), + } + if trig != nil { + opts = append(opts, agent.WithTrigger(trig)) + } + + call, err := s.agent.GetCall(opts...) if err != nil { return err } - return s.fnInvoke(c, app, fn, writer, call) + err = s.agent.Submit(call) + if err != nil { + return err + } + + // because we can... + writer.Header().Set("Content-Length", strconv.Itoa(int(buf.Len()))) + + // buffered response writer traps status (so we can add headers), we need to write it still + if bufWriter.status > 0 { + resp.WriteHeader(bufWriter.status) + } + + io.Copy(resp, buf) + bufPool.Put(buf) // at this point, submit returned without timing out, so we can re-use this one + return nil } -func (s *Server) fnInvoke(c *gin.Context, app *models.App, fn *models.Fn, writer ResponseBufferingWriter, call agent.Call) error { - // TODO: we should get rid of the buffers, and stream back (saves memory (+splice), faster (splice), allows streaming, don't have to cap resp size) - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - var submitErr error - defer func() { - if buf.Len() == 0 && submitErr == nil { - bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch? - } - }() - writer.SetBuffer(buf) +// TODO kill this thing after removing tests for http/json/default formats +type jsonContentTypeTrapper struct { + http.ResponseWriter + committed bool +} - model := call.Model() - { // scope this, to disallow ctx use outside of this scope. add id for handleV1ErrorResponse logger - ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID}) - c.Request = c.Request.WithContext(ctx) - } +var _ http.ResponseWriter = new(jsonContentTypeTrapper) // nice compiler errors - submitErr = s.agent.Submit(call) - if submitErr != nil { - // NOTE if they cancel the request then it will stop the call (kind of cool), - // we could filter that error out here too as right now it yells a little - if submitErr == models.ErrCallTimeoutServerBusy || submitErr == models.ErrCallTimeout { - // TODO maneuver - // add this, since it means that start may not have been called [and it's relevant] - c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) - } - return submitErr +func (j *jsonContentTypeTrapper) Write(b []byte) (int, error) { + if !j.committed { + // override default content type detection behavior to add json + j.detectContentType(b) } - // if they don't set a content-type - detect it - // TODO: remove this after removing all the formats (too many tests to scrub til then) - if writer.Header().Get("Content-Type") == "" { - // see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to? + j.committed = true + + // write inner + return j.ResponseWriter.Write(b) +} + +func (j *jsonContentTypeTrapper) detectContentType(b []byte) { + if j.Header().Get("Content-Type") == "" { + // see http.DetectContentType var contentType string jsonPrefix := [1]byte{'{'} // stack allocated - if bytes.HasPrefix(writer.GetBuffer().Bytes(), jsonPrefix[:]) { + if bytes.HasPrefix(b, jsonPrefix[:]) { // try to detect json, since DetectContentType isn't a hipster. contentType = "application/json; charset=utf-8" } else { - contentType = http.DetectContentType(writer.GetBuffer().Bytes()) + contentType = http.DetectContentType(b) } - writer.Header().Set("Content-Type", contentType) + j.Header().Set("Content-Type", contentType) } - - writer.Header().Set("Content-Length", strconv.Itoa(int(writer.GetBuffer().Len()))) - - if writer.Status() > 0 { - c.Writer.WriteHeader(writer.Status()) - } - - io.Copy(c.Writer, writer) - - return nil } diff --git a/api/server/runner_fninvoke_test.go b/api/server/runner_fninvoke_test.go index a252ea7fd..8227f3330 100644 --- a/api/server/runner_fninvoke_test.go +++ b/api/server/runner_fninvoke_test.go @@ -261,7 +261,7 @@ func TestFnInvokeRunnerExecution(t *testing.T) { maxBody = 1024 } - callIds[i] = rec.Header().Get("Fn_call_id") + callIds[i] = rec.Header().Get("Fn-Call-Id") cid := callIds[i] if rec.Code != test.expectedCode { diff --git a/api/server/runner_httptrigger.go b/api/server/runner_httptrigger.go index 885cb3b59..5adba92a6 100644 --- a/api/server/runner_httptrigger.go +++ b/api/server/runner_httptrigger.go @@ -1,13 +1,14 @@ package server import ( + "fmt" "net/http" + "net/textproto" "strconv" "strings" "github.com/fnproject/fn/api" - "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/models" "github.com/gin-gonic/gin" ) @@ -60,21 +61,19 @@ func (s *Server) handleTriggerHTTPFunctionCall2(c *gin.Context) error { } type triggerResponseWriter struct { - syncResponseWriter + inner http.ResponseWriter committed bool } -var _ ResponseBufferingWriter = new(triggerResponseWriter) - func (trw *triggerResponseWriter) Header() http.Header { - return trw.headers + return trw.inner.Header() } func (trw *triggerResponseWriter) Write(b []byte) (int, error) { if !trw.committed { trw.WriteHeader(http.StatusOK) } - return trw.GetBuffer().Write(b) + return trw.inner.Write(b) } func (trw *triggerResponseWriter) WriteHeader(statusCode int) { @@ -83,56 +82,98 @@ func (trw *triggerResponseWriter) WriteHeader(statusCode int) { } trw.committed = true - for k, vs := range trw.Header() { - if strings.HasPrefix(k, "Fn-Http-H-") { - // TODO strip out content-length and stuff here. - realHeader := strings.TrimPrefix(k, "Fn-Http-H-") - if realHeader != "" { // case where header is exactly the prefix - for _, v := range vs { - trw.Header().Del(k) - trw.Header().Add(realHeader, v) + var fnStatus int + realHeaders := trw.Header() + gwHeaders := make(http.Header, len(realHeaders)) + for k, vs := range realHeaders { + switch { + case strings.HasPrefix(k, "Fn-Http-H-"): + gwHeader := strings.TrimPrefix(k, "Fn-Http-H-") + if gwHeader != "" { // case where header is exactly the prefix + gwHeaders[gwHeader] = vs + } + case k == "Fn-Http-Status": + if len(vs) > 0 { + statusInt, err := strconv.Atoi(vs[0]) + if err == nil { + fnStatus = statusInt } } + case k == "Content-Type", k == "Fn-Call-Id": + gwHeaders[k] = vs } } - gatewayStatus := 200 + // XXX(reed): this is O(3n)... yes sorry for making it work without making it perfect first + for k := range realHeaders { + realHeaders.Del(k) + } + for k, vs := range gwHeaders { + realHeaders[k] = vs + } + // XXX(reed): simplify / add tests for these behaviors... + gatewayStatus := 200 if statusCode >= 400 { gatewayStatus = 502 + } else if fnStatus > 0 { + gatewayStatus = fnStatus } - status := trw.Header().Get("Fn-Http-Status") - if status != "" { - statusInt, err := strconv.Atoi(status) - if err == nil { - gatewayStatus = statusInt + trw.inner.WriteHeader(gatewayStatus) +} + +var skipTriggerHeaders = map[string]bool{ + "Connection": true, + "Keep-Alive": true, + "Trailer": true, + "Transfer-Encoding": true, + "TE": true, + "Upgrade": true, +} + +func reqURL(req *http.Request) string { + if req.URL.Scheme == "" { + if req.TLS == nil { + req.URL.Scheme = "http" + } else { + req.URL.Scheme = "https" } } - - trw.WriteHeader(gatewayStatus) + if req.URL.Host == "" { + req.URL.Host = req.Host + } + return req.URL.String() } -//ServeHTTPTr igger serves an HTTP trigger for a given app/fn/trigger based on the current request +// ServeHTTPTrigger serves an HTTP trigger for a given app/fn/trigger based on the current request // This is exported to allow extensions to handle their own trigger naming and publishing func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn, trigger *models.Trigger) error { - triggerWriter := &triggerResponseWriter{ - syncResponseWriter{ - headers: c.Writer.Header()}, - false, + // transpose trigger headers into the request + req := c.Request + headers := make(http.Header, len(req.Header)) + for k, vs := range req.Header { + // should be generally unnecessary but to be doubly sure. + k = textproto.CanonicalMIMEHeaderKey(k) + if skipTriggerHeaders[k] { + continue + } + switch k { + case "Content-Type": + default: + k = fmt.Sprintf("Fn-Http-H-%s", k) + } + headers[k] = vs } - // GetCall can mod headers, assign an id, look up the route/app (cached), - // strip params, etc. - // this should happen ASAP to turn app name to app ID + requestURL := reqURL(req) - // GetCall can mod headers, assign an id, look up the route/app (cached), - // strip params, etc. + headers.Set("Fn-Http-Method", req.Method) + headers.Set("Fn-Http-Request-Url", requestURL) + headers.Set("Fn-Intent", "httprequest") + req.Header = headers - call, err := s.agent.GetCall(agent.WithWriter(triggerWriter), // XXX (reed): order matters [for now] - agent.FromHTTPTriggerRequest(app, fn, trigger, c.Request)) + // trap the headers and rewrite them for http trigger + rw := &triggerResponseWriter{inner: c.Writer} - if err != nil { - return err - } - return s.fnInvoke(c, app, fn, triggerWriter, call) + return s.fnInvoke(rw, req, app, fn, trigger) } diff --git a/api/server/runner_httptrigger_test.go b/api/server/runner_httptrigger_test.go index 299c896ae..1e8659022 100644 --- a/api/server/runner_httptrigger_test.go +++ b/api/server/runner_httptrigger_test.go @@ -16,6 +16,7 @@ import ( "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/mqs" + "reflect" ) func envTweaker(name, value string) func() { @@ -273,10 +274,11 @@ func TestTriggerRunnerExecution(t *testing.T) { httpDneRegistryFn := &models.Fn{ID: "http_dnereg_fn_id", Name: "http_dnereg_fn", AppID: app.ID, Image: rImgBs2, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} jsonFn := &models.Fn{ID: "json_fn_id", Name: "json_fn", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} oomFn := &models.Fn{ID: "http_oom_fn_id", Name: "http_fn", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 8, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + httpStreamFn := &models.Fn{ID: "http_stream_fn_id", Name: "http_stream_fn", AppID: app.ID, Image: rImg, Format: "http-stream", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} ds := datastore.NewMockInit( []*models.App{app}, - []*models.Fn{defaultFn, defaultDneFn, httpDneRegistryFn, oomFn, httpFn, jsonFn, httpDneFn}, + []*models.Fn{defaultFn, defaultDneFn, httpDneRegistryFn, oomFn, httpFn, jsonFn, httpDneFn, httpStreamFn}, []*models.Trigger{ {ID: "1", Name: "1", Source: "/", Type: "http", AppID: app.ID, FnID: defaultFn.ID}, {ID: "2", Name: "2", Source: "/myhot", Type: "http", AppID: app.ID, FnID: httpFn.ID}, @@ -290,6 +292,7 @@ func TestTriggerRunnerExecution(t *testing.T) { {ID: "10", Name: "10", Source: "/mybigoutputcold", Type: "http", AppID: app.ID, FnID: defaultFn.ID}, {ID: "11", Name: "11", Source: "/mybigoutputhttp", Type: "http", AppID: app.ID, FnID: httpFn.ID}, {ID: "12", Name: "12", Source: "/mybigoutputjson", Type: "http", AppID: app.ID, FnID: jsonFn.ID}, + {ID: "13", Name: "13", Source: "/httpstream", Type: "http", AppID: app.ID, FnID: httpStreamFn.ID}, }, ) ls := logs.NewMock() @@ -307,20 +310,38 @@ func TestTriggerRunnerExecution(t *testing.T) { multiLogExpectCold := []string{"BeginOfLogs", "EndOfLogs"} multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */} - crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container - oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB - badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output - ok := `{"echoContent": "_TRX_ID_", "isDebug": true}` // good response / ok - respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar - respTypeJason := `{"echoContent": "_TRX_ID_", "jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar + crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container + oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB + badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output + ok := `{"echoContent": "_TRX_ID_", "responseContentType": "application/json; charset=utf-8", "isDebug": true}` // good response / ok + respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar + respTypeJason := `{"echoContent": "_TRX_ID_", "jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar // sleep between logs and with debug enabled, fn-test-utils will log header/footer below: multiLog := `{"echoContent": "_TRX_ID_", "sleepTime": 1000, "isDebug": true}` bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K + statusChecker := `{"echoContent": "_TRX_ID_", "isDebug": true, "responseCode":202, "responseContentType": "application/json; charset=utf-8"}` + + fooHeader := map[string][]string{"Content-Type": {"application/hateson"}, "Test-Header": {"foo"}} + expFooHeaders := map[string][]string{"Content-Type": {"application/hateson"}, "Return-Header": {"foo", "bar"}} + expFooHeadersBody := `{"echoContent": "_TRX_ID_", + "expectHeaders": { + "Content-Type":["application/hateson"], + "Fn-Http-H-Test-Header":["foo"], + "Fn-Http-Method":["POST"], + "Fn-Http-Request-Url":["http://127.0.0.1:8080/t/myapp/httpstream"] + }, + "returnHeaders": { + "Return-Header":["foo","bar"] + }, + "responseContentType":"application/hateson", + "isDebug": true}` + testCases := []struct { path string + headers map[string][]string body string method string expectedCode int @@ -328,34 +349,47 @@ func TestTriggerRunnerExecution(t *testing.T) { expectedErrSubStr string expectedLogsSubStr []string }{ - {"/t/myapp/", ok, "GET", http.StatusOK, expHeaders, "", nil}, + {"/t/myapp/", nil, ok, "GET", http.StatusOK, expHeaders, "", nil}, - {"/t/myapp/myhot", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid http response", nil}, + {"/t/myapp/myhot", nil, badHot, "GET", http.StatusBadGateway, expHeaders, "invalid http response", nil}, // hot container now back to normal: - {"/t/myapp/myhot", ok, "GET", http.StatusOK, expHeaders, "", nil}, + {"/t/myapp/myhot", nil, ok, "GET", http.StatusOK, expHeaders, "", nil}, - {"/t/myapp/myhotjason", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid json response", nil}, + {"/t/myapp/myhotjason", nil, badHot, "GET", http.StatusBadGateway, expHeaders, "invalid json response", nil}, // hot container now back to normal: - {"/t/myapp/myhotjason", ok, "GET", http.StatusOK, expHeaders, "", nil}, + {"/t/myapp/myhotjason", nil, ok, "GET", http.StatusOK, expHeaders, "", nil}, - {"/t/myapp/myhot", respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, - {"/t/myapp/myhotjason", respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, - {"/t/myapp/myhotjason", respTypeJason, "GET", http.StatusOK, expCTHeaders, "", nil}, + {"/t/myapp/myhot", nil, respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, + {"/t/myapp/myhotjason", nil, respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, + {"/t/myapp/myhotjason", nil, respTypeJason, "GET", http.StatusOK, expCTHeaders, "", nil}, - {"/t/myapp/myroute", ok, "GET", http.StatusOK, expHeaders, "", nil}, - {"/t/myapp/myerror", crasher, "GET", http.StatusBadGateway, expHeaders, "container exit code 1", nil}, - {"/t/myapp/mydne", ``, "GET", http.StatusNotFound, nil, "pull access denied", nil}, - {"/t/myapp/mydnehot", ``, "GET", http.StatusNotFound, nil, "pull access denied", nil}, - {"/t/myapp/mydneregistry", ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil}, - {"/t/myapp/myoom", oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil}, - {"/t/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectHot}, - {"/t/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectCold}, - {"/t/myapp/mybigoutputjson", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil}, - {"/t/myapp/mybigoutputjson", smalloutput, "GET", http.StatusOK, nil, "", nil}, - {"/t/myapp/mybigoutputhttp", bigoutput, "GET", http.StatusBadGateway, nil, "", nil}, - {"/t/myapp/mybigoutputhttp", smalloutput, "GET", http.StatusOK, nil, "", nil}, - {"/t/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "", nil}, - {"/t/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil}, + // XXX(reed): we test a lot of stuff in invoke, we really only need to test headers / status code here dude... + {"/t/myapp/httpstream", nil, ok, "POST", http.StatusOK, expHeaders, "", nil}, + {"/t/myapp/httpstream", nil, statusChecker, "POST", 202, expHeaders, "", nil}, + {"/t/myapp/httpstream", fooHeader, expFooHeadersBody, "POST", http.StatusOK, expFooHeaders, "", nil}, + // NOTE: we can't test bad response framing anymore easily (eg invalid http response), should we even worry about it? + {"/t/myapp/httpstream", nil, respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil}, + //{"/t/myapp/httpstream", nil, crasher, "POST", http.StatusBadGateway, expHeaders, "error receiving function response", nil}, + //// XXX(reed): we could stop buffering function responses so that we can stream things? + //{"/t/myapp/httpstream", nil, bigoutput, "POST", http.StatusBadGateway, nil, "function response too large", nil}, + //{"/t/myapp/httpstream", nil, smalloutput, "POST", http.StatusOK, expHeaders, "", nil}, + //// XXX(reed): meh we really should try to get oom out, but maybe it's better left to the logs? + //{"/t/myapp/httpstream", nil, oomer, "POST", http.StatusBadGateway, nil, "error receiving function response", nil}, + + {"/t/myapp/myroute", nil, ok, "GET", http.StatusOK, expHeaders, "", nil}, + {"/t/myapp/myerror", nil, crasher, "GET", http.StatusBadGateway, expHeaders, "container exit code 1", nil}, + {"/t/myapp/mydne", nil, ``, "GET", http.StatusNotFound, nil, "pull access denied", nil}, + {"/t/myapp/mydnehot", nil, ``, "GET", http.StatusNotFound, nil, "pull access denied", nil}, + {"/t/myapp/mydneregistry", nil, ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil}, + {"/t/myapp/myoom", nil, oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil}, + {"/t/myapp/myhot", nil, multiLog, "GET", http.StatusOK, nil, "", multiLogExpectHot}, + {"/t/myapp/", nil, multiLog, "GET", http.StatusOK, nil, "", multiLogExpectCold}, + {"/t/myapp/mybigoutputjson", nil, bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil}, + {"/t/myapp/mybigoutputjson", nil, smalloutput, "GET", http.StatusOK, nil, "", nil}, + {"/t/myapp/mybigoutputhttp", nil, bigoutput, "GET", http.StatusBadGateway, nil, "", nil}, + {"/t/myapp/mybigoutputhttp", nil, smalloutput, "GET", http.StatusOK, nil, "", nil}, + {"/t/myapp/mybigoutputcold", nil, bigoutput, "GET", http.StatusBadGateway, nil, "", nil}, + {"/t/myapp/mybigoutputcold", nil, smalloutput, "GET", http.StatusOK, nil, "", nil}, } callIds := make([]string, len(testCases)) @@ -364,7 +398,11 @@ func TestTriggerRunnerExecution(t *testing.T) { t.Run(fmt.Sprintf("Test_%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { trx := fmt.Sprintf("_trx_%d_", i) body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1)) - _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + req := createRequest(t, test.method, test.path, body) + if test.headers != nil { + req.Header = test.headers + } + _, rec := routerRequest2(t, srv.Router, req) respBytes, _ := ioutil.ReadAll(rec.Body) respBody := string(respBytes) maxBody := len(respBody) @@ -372,7 +410,7 @@ func TestTriggerRunnerExecution(t *testing.T) { maxBody = 1024 } - callIds[i] = rec.Header().Get("Fn_call_id") + callIds[i] = rec.Header().Get("Fn-Call-Id") if rec.Code != test.expectedCode { isFailure = true @@ -396,10 +434,10 @@ func TestTriggerRunnerExecution(t *testing.T) { if test.expectedHeaders != nil { for name, header := range test.expectedHeaders { - if header[0] != rec.Header().Get(name) { + if !reflect.DeepEqual(header, rec.Header()[name]) { isFailure = true - t.Errorf("Test %d: Expected header `%s` to be `%s` but was `%s`. body: `%s`", - i, name, header[0], rec.Header().Get(name), respBody) + t.Errorf("Test %d: Expected header `%s` to be `%v` but was `%v`. body: `%s`", + i, name, header, rec.Header()[name], respBody) } } } diff --git a/docs/definitions.md b/docs/definitions.md index 3cf04a505..e63fb8764 100644 --- a/docs/definitions.md +++ b/docs/definitions.md @@ -204,14 +204,14 @@ curl -v localhost:8080/r/newapp/envsync > Accept: */* > < HTTP/1.1 200 OK -< Fn_call_id: f5621e8b-725a-4ba9-8323-b8cdc02ce37e +< Fn-Call-Id: f5621e8b-725a-4ba9-8323-b8cdc02ce37e < Date: Fri, 02 Jun 2017 12:31:30 GMT < Content-Length: 489 < Content-Type: text/plain; charset=utf-8 < ... ``` -Corresponding HTTP header is `Fn_call_id`. +Corresponding HTTP header is `Fn-Call-Id`. ### Per-route calls diff --git a/images/fn-test-utils/Gopkg.lock b/images/fn-test-utils/Gopkg.lock index f62b32a57..9d1c0a972 100644 --- a/images/fn-test-utils/Gopkg.lock +++ b/images/fn-test-utils/Gopkg.lock @@ -8,7 +8,7 @@ ".", "utils" ] - revision = "d1fa834929bb1579d770eded860a3a54e1f3502b" + revision = "bd03f2c17b4d9e43525829d194174b35b5c47b54" [solve-meta] analyzer-name = "dep" diff --git a/images/fn-test-utils/fn-test-utils.go b/images/fn-test-utils/fn-test-utils.go index 09bceb5b3..44a4161d8 100644 --- a/images/fn-test-utils/fn-test-utils.go +++ b/images/fn-test-utils/fn-test-utils.go @@ -65,6 +65,11 @@ type AppRequest struct { PostErrGarbage string `json:"postErrGarbage,omitempty"` // test empty body IsEmptyBody bool `json:"isEmptyBody,omitempty"` + // test headers that come into function + ExpectHeaders map[string][]string `json:"expectHeaders,omitempty"` + // send some headers out explicitly + ReturnHeaders map[string][]string `json:"returnHeaders,omitempty"` + // TODO: simulate slow read/slow write // TODO: simulate partial IO write/read // TODO: simulate high cpu usage (async and sync) @@ -119,11 +124,6 @@ func AppHandler(ctx context.Context, in io.Reader, out io.Writer) { } func finalizeRequest(out io.Writer, req *AppRequest, resp *AppResponse) { - // custom response code - if req.ResponseCode != 0 { - fdk.WriteStatus(out, req.ResponseCode) - } - // custom content type if req.ResponseContentType != "" { fdk.SetHeader(out, "Content-Type", req.ResponseContentType) @@ -137,6 +137,19 @@ func finalizeRequest(out io.Writer, req *AppRequest, resp *AppResponse) { fdk.SetHeader(out, "Content-Type", req.JasonContentType) } + if req.ReturnHeaders != nil { + for k, vs := range req.ReturnHeaders { + for _, v := range vs { + fdk.AddHeader(out, k, v) + } + } + } + + // custom response code + if req.ResponseCode != 0 { + fdk.WriteStatus(out, req.ResponseCode) + } + if !req.IsEmptyBody { json.NewEncoder(out).Encode(resp) } @@ -155,6 +168,7 @@ func processRequest(ctx context.Context, in io.Reader) (*AppRequest, *AppRespons log.Printf("Received format %v", format) log.Printf("Received request %#v", request) log.Printf("Received headers %v", fnctx.Header) + log.Printf("Received http headers %v", fnctx.HTTPHeader) log.Printf("Received config %v", fnctx.Config) } @@ -219,6 +233,23 @@ func processRequest(ctx context.Context, in io.Reader) (*AppRequest, *AppRespons log.Fatalln("Crash requested") } + if request.ExpectHeaders != nil { + for name, header := range request.ExpectHeaders { + if strings.HasPrefix(name, "Fn-Http-H-") { + // if it's an http header, make sure our other bucket works. + // idk this seems like a weird good idea, maybe we should only test/expose one or the other... + if h2 := fnctx.HTTPHeader.Get(strings.TrimPrefix(name, "Fn-Http-H-")); header[0] != h2 { + log.Fatalf("Expected http header `%s` to be `%s` but was `%s`.", + name, header[0], h2) + } + } + if h2 := fnctx.Header.Get(name); header[0] != h2 { + log.Fatalf("Expected header `%s` to be `%s` but was `%s`", + name, header[0], h2) + } + } + } + resp := AppResponse{ Data: data, Request: request, diff --git a/vendor/github.com/fnproject/fdk-go/circle.yml b/vendor/github.com/fnproject/fdk-go/circle.yml new file mode 100644 index 000000000..02359f40c --- /dev/null +++ b/vendor/github.com/fnproject/fdk-go/circle.yml @@ -0,0 +1,33 @@ +version: 2 +jobs: + build: + docker: + - image: circleci/golang:1.11.0 + working_directory: ~/fdk-go + steps: + - checkout + - setup_remote_docker: + docker_layer_caching: true + - run: docker version + - run: docker pull fnproject/fnserver + # installing Fn CLI and starting the Fn server + - run: + command: | + curl -LSs https://raw.githubusercontent.com/fnproject/cli/master/install | sh + - run: + command: fn build + working_directory: examples/hello + - deploy: + command: | + if [[ "${CIRCLE_BRANCH}" == "master" && -z "${CIRCLE_PR_REPONAME}" ]]; then + func_version=$(awk '/^version:/ { print $2; }' func.yaml) + printenv DOCKER_PASS | docker login -u ${DOCKER_USER} --password-stdin + git config --global user.email "ci@fnproject.com" + git config --global user.name "CI" + git branch --set-upstream-to=origin/${CIRCLE_BRANCH} ${CIRCLE_BRANCH} + docker tag "hello:${func_version}" "fnproject/fdk-go-hello:${func_version}" + docker tag "hello:${func_version}" "fnproject/fdk-go-hello:latest" + docker push "fnproject/fdk-go-hello:${func_version}" + docker push "fnproject/fdk-go-hello:latest" + fi + working_directory: examples/hello diff --git a/vendor/github.com/fnproject/fdk-go/examples/hello/Gopkg.lock b/vendor/github.com/fnproject/fdk-go/examples/hello/Gopkg.lock index 1e5cae97b..a3ab0e1ba 100644 --- a/vendor/github.com/fnproject/fdk-go/examples/hello/Gopkg.lock +++ b/vendor/github.com/fnproject/fdk-go/examples/hello/Gopkg.lock @@ -8,7 +8,7 @@ ".", "utils" ] - revision = "5d768b2006f11737b6a69a758ddd6d2fac04923e" + revision = "c6ce6afbce0935ffdf0e228471406d0f966736a5" [solve-meta] analyzer-name = "dep" diff --git a/vendor/github.com/fnproject/fdk-go/examples/hello/func.go b/vendor/github.com/fnproject/fdk-go/examples/hello/func.go index 93f22c89a..6cea35095 100644 --- a/vendor/github.com/fnproject/fdk-go/examples/hello/func.go +++ b/vendor/github.com/fnproject/fdk-go/examples/hello/func.go @@ -13,20 +13,17 @@ func main() { fdk.Handle(fdk.HandlerFunc(myHandler)) } -func myHandler(ctx context.Context, in io.Reader, out io.Writer) { - var person struct { - Name string `json:"name"` - } - json.NewDecoder(in).Decode(&person) - if person.Name == "" { - person.Name = "World" - } +type Person struct { + Name string `json:"name"` +} +func myHandler(ctx context.Context, in io.Reader, out io.Writer) { + p := &Person{Name: "World"} + json.NewDecoder(in).Decode(p) msg := struct { Msg string `json:"message"` }{ - Msg: fmt.Sprintf("Hello %s", person.Name), + Msg: fmt.Sprintf("Hello %s", p.Name), } - json.NewEncoder(out).Encode(&msg) } diff --git a/vendor/github.com/fnproject/fdk-go/examples/hello/func.yaml b/vendor/github.com/fnproject/fdk-go/examples/hello/func.yaml new file mode 100644 index 000000000..a708e12de --- /dev/null +++ b/vendor/github.com/fnproject/fdk-go/examples/hello/func.yaml @@ -0,0 +1,6 @@ +schema_version: 20180708 +name: hello +version: 0.0.1 +runtime: go +entrypoint: ./func +format: http-stream diff --git a/vendor/github.com/fnproject/fdk-go/fdk.go b/vendor/github.com/fnproject/fdk-go/fdk.go index 42e0ed807..571bc6478 100644 --- a/vendor/github.com/fnproject/fdk-go/fdk.go +++ b/vendor/github.com/fnproject/fdk-go/fdk.go @@ -24,6 +24,7 @@ func (f HandlerFunc) Serve(ctx context.Context, in io.Reader, out io.Writer) { func Context(ctx context.Context) *Ctx { utilsCtx := utils.Context(ctx) return &Ctx{ + HTTPHeader: utilsCtx.HTTPHeader, Header: utilsCtx.Header, Config: utilsCtx.Config, RequestURL: utilsCtx.RequestURL, @@ -33,6 +34,7 @@ func Context(ctx context.Context) *Ctx { func WithContext(ctx context.Context, fnctx *Ctx) context.Context { utilsCtx := &utils.Ctx{ + HTTPHeader: fnctx.HTTPHeader, Header: fnctx.Header, Config: fnctx.Config, RequestURL: fnctx.RequestURL, @@ -43,7 +45,12 @@ func WithContext(ctx context.Context, fnctx *Ctx) context.Context { // Ctx provides access to Config and Headers from fn. type Ctx struct { - Header http.Header + // Header are the unmodified headers as sent to the container, see + // HTTPHeader for specific trigger headers + Header http.Header + // HTTPHeader are the request headers as they appear on the original HTTP request, + // for an http trigger. + HTTPHeader http.Header Config map[string]string RequestURL string Method string @@ -78,5 +85,12 @@ func WriteStatus(out io.Writer, status int) { // function and fn server via any of the supported formats. func Handle(handler Handler) { format, _ := os.LookupEnv("FN_FORMAT") + + path := os.Getenv("FN_LISTENER") + if path != "" { + utils.StartHTTPServer(handler, path, format) + return + } + utils.Do(handler, format, os.Stdin, os.Stdout) } diff --git a/vendor/github.com/fnproject/fdk-go/utils/httpstream.go b/vendor/github.com/fnproject/fdk-go/utils/httpstream.go new file mode 100644 index 000000000..dc41a8685 --- /dev/null +++ b/vendor/github.com/fnproject/fdk-go/utils/httpstream.go @@ -0,0 +1,168 @@ +package utils + +import ( + "bytes" + "context" + "io" + "log" + "net" + "net/http" + "net/url" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" +) + +// in case we go over the timeout, need to use a pool since prev buffer may not be freed +var bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} + +type HTTPHandler struct { + handler Handler +} + +func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + + resp := Response{ + Writer: buf, + Status: 200, + Header: make(http.Header), // XXX(reed): pool these too + } + + ctx := WithContext(r.Context(), &Ctx{ + Config: BuildConfig(), + }) + + ctx, cancel := decapHeaders(ctx, r) + defer cancel() + + h.handler.Serve(ctx, r.Body, &resp) + + encapHeaders(w, resp) + + // XXX(reed): 504 if ctx is past due / handle errors with 5xx? just 200 for now + // copy response from user back up now with headers in place... + io.Copy(w, buf) + + // XXX(reed): handle streaming, we have to intercept headers but not necessarily body (ie no buffer) +} + +func encapHeaders(fn http.ResponseWriter, user Response) { + fnh := fn.Header() + fnh.Set("Fn-Http-Status", strconv.Itoa(user.Status)) + + for k, vs := range user.Header { + switch k { + case "Content-Type": + // don't modify this one... + default: + // prepend this guy + k = "Fn-Http-H-" + k + } + + for _, v := range vs { + fnh.Add(k, v) + } + } +} + +// TODO can make this the primary means of context construction +func decapHeaders(ctx context.Context, r *http.Request) (_ context.Context, cancel func()) { + rctx := Context(ctx) + var deadline string + + // copy the original headers in then reduce for http headers + rctx.Header = r.Header + rctx.HTTPHeader = make(http.Header, len(r.Header)) // XXX(reed): oversized, esp if not http + + // find things we need, and for http headers add them to the httph bucket + + for k, vs := range r.Header { + switch k { + case "Fn-Deadline": + deadline = vs[0] + case "Fn-Call-Id": + rctx.callId = vs[0] + case "Content-Type": + // just leave this one instead of deleting + default: + continue + } + + if !strings.HasPrefix(k, "Fn-Http-") { + // XXX(reed): we need 2 header buckets on ctx, one for these and one for the 'original req' headers + // for now just nuke so the headers are clean... + continue + } + + switch { + case k == "Fn-Http-Request-Url": + rctx.RequestURL = vs[0] + case k == "Fn-Http-Method": + rctx.Method = vs[0] + case strings.HasPrefix(k, "Fn-Http-H-"): + for _, v := range vs { + rctx.HTTPHeader.Add(strings.TrimPrefix(k, "Fn-Http-H-"), v) + } + default: + // XXX(reed): just delete it? how is it here? maybe log/panic + } + } + + return CtxWithDeadline(ctx, deadline) +} + +func StartHTTPServer(handler Handler, path, format string) { + + uri, err := url.Parse(path) + if err != nil { + log.Fatalln("url parse error: ", path, err) + } + + server := http.Server{ + Handler: &HTTPHandler{ + handler: handler, + }, + } + + // try to remove pre-existing UDS: ignore errors here + phonySock := filepath.Join(filepath.Dir(uri.Path), "phony"+filepath.Base(uri.Path)) + if uri.Scheme == "unix" { + os.Remove(phonySock) + os.Remove(uri.Path) + } + + listener, err := net.Listen(uri.Scheme, phonySock) + if err != nil { + log.Fatalln("net.Listen error: ", err) + } + + if uri.Scheme == "unix" { + sockPerm(phonySock, uri.Path) + } + + err = server.Serve(listener) + if err != nil && err != http.ErrServerClosed { + log.Fatalln("serve error: ", err) + } +} + +func sockPerm(phonySock, realSock string) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // somehow this is the best way to get a permissioned sock file, don't ask questions, life is sad and meaningless + err := os.Chmod(phonySock, 0666) + if err != nil { + log.Fatalln("error giving sock file a perm", err) + } + + err = os.Link(phonySock, realSock) + if err != nil { + log.Fatalln("error linking fake sock to real sock", err) + } +} diff --git a/vendor/github.com/fnproject/fdk-go/utils/utils.go b/vendor/github.com/fnproject/fdk-go/utils/utils.go index b73580377..ed6d88e6b 100644 --- a/vendor/github.com/fnproject/fdk-go/utils/utils.go +++ b/vendor/github.com/fnproject/fdk-go/utils/utils.go @@ -26,12 +26,25 @@ func WithContext(ctx context.Context, fnctx *Ctx) context.Context { // Ctx provides access to Config and Headers from fn. type Ctx struct { - Header http.Header + // Header are the unmodified headers as sent to the container, see + // HTTPHeader for specific trigger headers + Header http.Header + + // HTTPHeader are the request headers as they appear on the original HTTP request, + // for an http trigger. + HTTPHeader http.Header Config map[string]string RequestURL string Method string + + // XXX(reed): should turn this whole mess into some kind of event that we can + // morph into another type of an event after http/json/default die + // XXX(reed): should strip out eg FN_APP_NAME, etc as fields so Config is actually the config not config + fn's env vars + callId string } +func (c Ctx) CallId() string { return c.callId } + type key struct{} var ctxKey = new(key)