mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Bunch of cleanup and reorg
This commit is contained in:
204
api/api.go
Normal file
204
api/api.go
Normal file
@@ -0,0 +1,204 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/iron-io/iron_go/cache"
|
||||
)
|
||||
|
||||
var icache = cache.New("routing-table")
|
||||
var config *Config
|
||||
|
||||
func New(conf *Config) *Api {
|
||||
config = conf
|
||||
api := &Api{}
|
||||
return api
|
||||
}
|
||||
|
||||
type Api struct {
|
||||
}
|
||||
|
||||
func (api *Api) Start() {
|
||||
|
||||
log.Infoln("Starting up router version", Version)
|
||||
|
||||
r := mux.NewRouter()
|
||||
|
||||
// dev:
|
||||
s := r.PathPrefix("/api").Subrouter()
|
||||
// production:
|
||||
// s := r.Host("router.irondns.info").Subrouter()
|
||||
// s.Handle("/1/projects/{project_id}/register", &Register{})
|
||||
s.Handle("/v1/apps", &NewApp{})
|
||||
s.HandleFunc("/v1/apps/{app_name}/routes", NewRoute)
|
||||
s.HandleFunc("/ping", Ping)
|
||||
s.HandleFunc("/version", VersionHandler)
|
||||
// s.Handle("/addworker", &WorkerHandler{})
|
||||
s.HandleFunc("/", Ping)
|
||||
|
||||
r.HandleFunc("/elb-ping-router", Ping) // for ELB health check
|
||||
|
||||
// for testing app responses, pass in app name, can use localhost
|
||||
s4 := r.Queries("app", "").Subrouter()
|
||||
// s4.HandleFunc("/appsr", Ping)
|
||||
s4.HandleFunc("/{rest:.*}", Run)
|
||||
s4.NotFoundHandler = http.HandlerFunc(Run)
|
||||
|
||||
// s3 := r.Queries("rhost", "").Subrouter()
|
||||
// s3.HandleFunc("/", ProxyFunc2)
|
||||
|
||||
// This is where all the main incoming traffic goes
|
||||
r.NotFoundHandler = http.HandlerFunc(Run)
|
||||
|
||||
http.Handle("/", r)
|
||||
port := 8080
|
||||
log.Infoln("Router started, listening and serving on port", port)
|
||||
log.Fatal(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%v", port), nil))
|
||||
}
|
||||
|
||||
type NewApp struct{}
|
||||
|
||||
// This registers a new host
|
||||
func (r *NewApp) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
log.Println("NewApp called!")
|
||||
|
||||
vars := mux.Vars(req)
|
||||
projectId := vars["project_id"]
|
||||
// token := common.GetToken(req)
|
||||
log.Infoln("project_id:", projectId)
|
||||
|
||||
app := App{}
|
||||
if !ReadJSON(w, req, &app) {
|
||||
return
|
||||
}
|
||||
log.Infoln("body read into app:", app)
|
||||
app.ProjectId = projectId
|
||||
|
||||
_, err := getApp(app.Name)
|
||||
if err == nil {
|
||||
SendError(w, 400, fmt.Sprintln("An app with this name already exists.", err))
|
||||
return
|
||||
}
|
||||
|
||||
app.Routes = make(map[string]*Route3)
|
||||
|
||||
// create dns entry
|
||||
// TODO: Add project id to this. eg: appname.projectid.iron.computer
|
||||
log.Debug("Creating dns entry.")
|
||||
regOk := registerHost(w, req, &app)
|
||||
if !regOk {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: do we need to close body?
|
||||
err = putApp(&app)
|
||||
if err != nil {
|
||||
log.Infoln("couldn't create app:", err)
|
||||
SendError(w, 400, fmt.Sprintln("Could not create app!", err))
|
||||
return
|
||||
}
|
||||
log.Infoln("registered app:", app)
|
||||
v := map[string]interface{}{"app": app}
|
||||
SendSuccess(w, "App created successfully.", v)
|
||||
}
|
||||
|
||||
func NewRoute(w http.ResponseWriter, req *http.Request) {
|
||||
fmt.Println("NewRoute")
|
||||
vars := mux.Vars(req)
|
||||
projectId := vars["project_id"]
|
||||
appName := vars["app_name"]
|
||||
log.Infoln("project_id: ", projectId, "app: ", appName)
|
||||
|
||||
route := &Route3{}
|
||||
if !ReadJSON(w, req, &route) {
|
||||
return
|
||||
}
|
||||
log.Infoln("body read into route:", route)
|
||||
|
||||
// TODO: validate route
|
||||
|
||||
app, err := getApp(appName)
|
||||
if err != nil {
|
||||
SendError(w, 400, fmt.Sprintln("This app does not exist. Please create app first.", err))
|
||||
return
|
||||
}
|
||||
|
||||
if route.Type == "" {
|
||||
route.Type = "run"
|
||||
}
|
||||
|
||||
// app.Routes = append(app.Routes, route)
|
||||
app.Routes[route.Path] = route
|
||||
err = putApp(app)
|
||||
if err != nil {
|
||||
log.Errorln("Couldn't create route!:", err)
|
||||
SendError(w, 400, fmt.Sprintln("Could not create route!", err))
|
||||
return
|
||||
}
|
||||
log.Infoln("Route created:", route)
|
||||
fmt.Fprintln(w, "Route created successfully.")
|
||||
}
|
||||
|
||||
func getRoute(host string) (*Route, error) {
|
||||
log.Infoln("getRoute for host:", host)
|
||||
rx, err := icache.Get(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rx2 := []byte(rx.(string))
|
||||
route := Route{}
|
||||
err = json.Unmarshal(rx2, &route)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &route, err
|
||||
}
|
||||
|
||||
func putRoute(route *Route) error {
|
||||
item := cache.Item{}
|
||||
v, err := json.Marshal(route)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
item.Value = string(v)
|
||||
err = icache.Put(route.Host, &item)
|
||||
return err
|
||||
}
|
||||
|
||||
func getApp(name string) (*App, error) {
|
||||
log.Infoln("getapp:", name)
|
||||
rx, err := icache.Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rx2 := []byte(rx.(string))
|
||||
app := App{}
|
||||
err = json.Unmarshal(rx2, &app)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &app, err
|
||||
}
|
||||
|
||||
func putApp(app *App) error {
|
||||
item := cache.Item{}
|
||||
v, err := json.Marshal(app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
item.Value = string(v)
|
||||
err = icache.Put(app.Name, &item)
|
||||
return err
|
||||
}
|
||||
|
||||
func Ping(w http.ResponseWriter, req *http.Request) {
|
||||
fmt.Fprintln(w, "pong")
|
||||
}
|
||||
|
||||
func VersionHandler(w http.ResponseWriter, req *http.Request) {
|
||||
fmt.Fprintln(w, Version)
|
||||
}
|
||||
31
api/config.go
Normal file
31
api/config.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package api
|
||||
|
||||
type Config struct {
|
||||
CloudFlare struct {
|
||||
Email string `json:"email"`
|
||||
ApiKey string `json:"api_key"`
|
||||
ZoneId string `json:"zone_id"`
|
||||
} `json:"cloudflare"`
|
||||
Cache struct {
|
||||
Host string `json:"host"`
|
||||
Token string `json:"token"`
|
||||
ProjectId string `json:"project_id"`
|
||||
}
|
||||
Iron struct {
|
||||
Token string `json:"token"`
|
||||
ProjectId string `json:"project_id"`
|
||||
SuperToken string `json:"super_token"`
|
||||
WorkerHost string `json:"worker_host"`
|
||||
AuthHost string `json:"auth_host"`
|
||||
} `json:"iron"`
|
||||
Logging struct {
|
||||
To string `json:"to"`
|
||||
Level string `json:"level"`
|
||||
Prefix string `json:"prefix"`
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) Validate() error {
|
||||
// TODO:
|
||||
return nil
|
||||
}
|
||||
80
api/dns.go
Normal file
80
api/dns.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
type CloudFlareResult struct {
|
||||
Id string `json:"id"`
|
||||
}
|
||||
type CloudFlareResponse struct {
|
||||
Result CloudFlareResult `json:"result"`
|
||||
Success bool
|
||||
}
|
||||
|
||||
/*
|
||||
This function registers the host name provided (dns host) in IronCache which is used for
|
||||
the routing table. Backend runners know this too.
|
||||
|
||||
This will also create a dns entry for the worker in iron.computer.
|
||||
*/
|
||||
func registerHost(w http.ResponseWriter, r *http.Request, app *App) bool {
|
||||
// Give it an iron.computer entry with format: APP_NAME.PROJECT_ID.ironfunctions.com
|
||||
dnsHost := fmt.Sprintf("%v.%v.ironfunctions.com", app.Name, 123)
|
||||
app.Dns = dnsHost
|
||||
|
||||
if app.CloudFlareId == "" {
|
||||
// Tad hacky, but their go lib is pretty confusing.
|
||||
cfMethod := "POST"
|
||||
cfUrl := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%v/dns_records", config.CloudFlare.ZoneId)
|
||||
if app.CloudFlareId != "" {
|
||||
// Have this here in case we need to support updating the entry. If we do this, is how:
|
||||
cfMethod = "PUT"
|
||||
cfUrl = cfUrl + "/" + app.CloudFlareId
|
||||
}
|
||||
log.Info("registering dns: ", "dnsname: ", dnsHost, " url: ", cfUrl)
|
||||
|
||||
cfbody := "{\"type\":\"CNAME\",\"name\":\"" + dnsHost + "\",\"content\":\"api.ironfunctions.com\",\"ttl\":120}"
|
||||
client := &http.Client{} // todo: is default client fine?
|
||||
req, err := http.NewRequest(
|
||||
cfMethod,
|
||||
cfUrl,
|
||||
strings.NewReader(cfbody),
|
||||
)
|
||||
req.Header.Set("X-Auth-Email", config.CloudFlare.Email)
|
||||
req.Header.Set("X-Auth-Key", config.CloudFlare.ApiKey)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Error("Could not register dns entry.", "err", err)
|
||||
SendError(w, 500, fmt.Sprint("Could not register dns entry.", err))
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
// todo: get error message from body for bad status code
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if resp.StatusCode != 200 {
|
||||
log.Error("Could not register dns entry 2.", "code", resp.StatusCode, "body", string(body))
|
||||
SendError(w, 500, fmt.Sprint("Could not register dns entry 2. ", resp.StatusCode))
|
||||
return false
|
||||
}
|
||||
cfResult := CloudFlareResponse{}
|
||||
err = json.Unmarshal(body, &cfResult)
|
||||
if err != nil {
|
||||
log.Error("Could not parse DNS response.", "err", err, "code", resp.StatusCode, "body", string(body))
|
||||
SendError(w, 500, fmt.Sprint("Could not parse DNS response. ", resp.StatusCode))
|
||||
return false
|
||||
}
|
||||
fmt.Println("cfresult:", cfResult)
|
||||
app.CloudFlareId = cfResult.Result.Id
|
||||
}
|
||||
|
||||
log.Info("host registered successfully with cloudflare", "app", app)
|
||||
return true
|
||||
}
|
||||
266
api/helpers.go
Normal file
266
api/helpers.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gopkg.in/inconshreveable/log15.v2"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
func NotFound(w http.ResponseWriter, r *http.Request) {
|
||||
SendError(w, http.StatusNotFound, "Not found")
|
||||
}
|
||||
|
||||
func EndpointNotFound(w http.ResponseWriter, r *http.Request) {
|
||||
SendError(w, http.StatusNotFound, "Endpoint not found")
|
||||
}
|
||||
|
||||
func InternalError(w http.ResponseWriter, err error) {
|
||||
logrus.Error("internal server error response", "err", err, "stack", string(debug.Stack()))
|
||||
SendError(w, http.StatusInternalServerError, "internal error")
|
||||
}
|
||||
|
||||
func InternalErrorDetailed(w http.ResponseWriter, r *http.Request, err error) {
|
||||
logrus.Error("internal server error response", "err", err, "endpoint", r.URL.String(), "token", GetTokenString(r), "stack", string(debug.Stack()))
|
||||
SendError(w, http.StatusInternalServerError, "internal error")
|
||||
}
|
||||
|
||||
type HTTPError interface {
|
||||
error
|
||||
StatusCode() int
|
||||
}
|
||||
|
||||
type response struct {
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
|
||||
func SendError(w http.ResponseWriter, code int, msg string) {
|
||||
logrus.Debug("HTTP error", "status_code", code, "msg", msg)
|
||||
resp := response{Msg: msg}
|
||||
RespondCode(w, nil, code, &resp)
|
||||
}
|
||||
|
||||
func SendSuccess(w http.ResponseWriter, msg string, params map[string]interface{}) {
|
||||
var v interface{}
|
||||
if params == nil {
|
||||
v = &response{Msg: msg}
|
||||
} else {
|
||||
v = params
|
||||
}
|
||||
RespondCode(w, nil, http.StatusOK, v)
|
||||
}
|
||||
|
||||
func Respond(w http.ResponseWriter, r *http.Request, v interface{}) {
|
||||
RespondCode(w, r, http.StatusOK, v)
|
||||
}
|
||||
|
||||
func RespondCode(w http.ResponseWriter, r *http.Request, code int, v interface{}) {
|
||||
bytes, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
logrus.Error("error marshalling HTTP response", "value", v, "type", reflect.TypeOf(v), "err", err)
|
||||
InternalError(w, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(bytes)))
|
||||
w.WriteHeader(code)
|
||||
if _, err := w.Write(bytes); err != nil {
|
||||
// older callers don't pass a request
|
||||
if r != nil {
|
||||
logrus.Error("unable to write HTTP response", "err", err)
|
||||
} else {
|
||||
logrus.Error("unable to write HTTP response", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetTokenString returns the token string from either the Authorization header or
|
||||
// the oauth query parameter.
|
||||
func GetTokenString(r *http.Request) string {
|
||||
tok, _ := GetTokenStringType(r)
|
||||
return tok
|
||||
}
|
||||
|
||||
func GetTokenStringType(r *http.Request) (tok string, jwt bool) {
|
||||
tokenStr := r.URL.Query().Get("oauth")
|
||||
if tokenStr != "" {
|
||||
return tokenStr, false
|
||||
}
|
||||
tokenStr = r.URL.Query().Get("jwt")
|
||||
jwt = tokenStr != ""
|
||||
if tokenStr == "" {
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
authFields := strings.Fields(authHeader)
|
||||
if len(authFields) == 2 && (authFields[0] == "OAuth" || authFields[0] == "JWT") {
|
||||
jwt = authFields[0] == "JWT"
|
||||
tokenStr = authFields[1]
|
||||
}
|
||||
}
|
||||
return tokenStr, jwt
|
||||
}
|
||||
|
||||
func ReadJSONSize(w http.ResponseWriter, r *http.Request, v interface{}, n int64) (success bool) {
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
i := strings.IndexByte(contentType, ';')
|
||||
if i < 0 {
|
||||
i = len(contentType)
|
||||
}
|
||||
if strings.TrimRight(contentType[:i], " ") != "application/json" {
|
||||
SendError(w, http.StatusBadRequest, "Bad Content-Type.")
|
||||
return false
|
||||
}
|
||||
if i < len(contentType) {
|
||||
param := strings.Trim(contentType[i+1:], " ")
|
||||
split := strings.SplitN(param, "=", 2)
|
||||
if len(split) != 2 || strings.Trim(split[0], " ") != "charset" {
|
||||
SendError(w, http.StatusBadRequest, "Invalid Content-Type parameter.")
|
||||
return false
|
||||
}
|
||||
value := strings.Trim(split[1], " ")
|
||||
if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' {
|
||||
// quoted string
|
||||
value = value[1 : len(value)-1]
|
||||
}
|
||||
if !strings.EqualFold(value, "utf-8") {
|
||||
SendError(w, http.StatusBadRequest, "Unsupported charset. JSON is always UTF-8 encoded.")
|
||||
return false
|
||||
}
|
||||
}
|
||||
if r.ContentLength > n {
|
||||
SendError(w, http.StatusBadRequest, fmt.Sprint("Content-Length greater than", n, "bytes"))
|
||||
return false
|
||||
}
|
||||
|
||||
err := json.NewDecoder(&LimitedReader{r.Body, n}).Decode(v)
|
||||
if err != nil {
|
||||
jsonError(w, err)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func ReadJSON(w http.ResponseWriter, r *http.Request, v interface{}) bool {
|
||||
return ReadJSONSize(w, r, v, 100*0xffff)
|
||||
}
|
||||
|
||||
// Same as io.LimitedReader, but returns limitReached so we can
|
||||
// distinguish between the limit being reached and actual EOF.
|
||||
type LimitedReader struct {
|
||||
R io.Reader
|
||||
N int64
|
||||
}
|
||||
|
||||
func (l *LimitedReader) Read(p []byte) (n int, err error) {
|
||||
if l.N <= 0 {
|
||||
return 0, LimitReached(l.N)
|
||||
}
|
||||
if int64(len(p)) > l.N {
|
||||
p = p[:l.N]
|
||||
}
|
||||
n, err = l.R.Read(p)
|
||||
l.N -= int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
// LimitedWriter writes until n bytes are written, then writes
|
||||
// an overage line and skips any further writes.
|
||||
type LimitedWriter struct {
|
||||
W io.Writer
|
||||
N int64
|
||||
}
|
||||
|
||||
func (l *LimitedWriter) Write(p []byte) (n int, err error) {
|
||||
var overrage = []byte("maximum log file size exceeded")
|
||||
|
||||
// we expect there may be concurrent writers, so to be safe..
|
||||
left := atomic.LoadInt64(&l.N)
|
||||
if left <= 0 {
|
||||
return 0, io.EOF // TODO EOF? really? does it matter?
|
||||
}
|
||||
n, err = l.W.Write(p)
|
||||
left = atomic.AddInt64(&l.N, -int64(n))
|
||||
if left <= 0 {
|
||||
l.W.Write(overrage)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
type JSONError string
|
||||
|
||||
func (e JSONError) Error() string { return string(e) }
|
||||
|
||||
func jsonType(t reflect.Type) string {
|
||||
switch t.Kind() {
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32,
|
||||
reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16,
|
||||
reflect.Uint32, reflect.Uint64, reflect.Uintptr:
|
||||
return "integer"
|
||||
case reflect.Float32, reflect.Float64:
|
||||
return "number"
|
||||
case reflect.Map, reflect.Struct:
|
||||
return "object"
|
||||
case reflect.Slice, reflect.Array:
|
||||
return "array"
|
||||
case reflect.Ptr:
|
||||
return jsonType(t.Elem())
|
||||
}
|
||||
// bool, string, other cases not covered
|
||||
return t.String()
|
||||
}
|
||||
|
||||
func jsonError(w http.ResponseWriter, err error) {
|
||||
var msg string
|
||||
switch err := err.(type) {
|
||||
case *json.InvalidUTF8Error:
|
||||
msg = "Invalid UTF-8 in JSON: " + err.S
|
||||
|
||||
case *json.InvalidUnmarshalError, *json.UnmarshalFieldError,
|
||||
*json.UnsupportedTypeError:
|
||||
// should never happen
|
||||
InternalError(w, err)
|
||||
return
|
||||
|
||||
case *json.SyntaxError:
|
||||
msg = fmt.Sprintf("In JSON, %v at position %v.", err, err.Offset)
|
||||
|
||||
case *json.UnmarshalTypeError:
|
||||
msg = fmt.Sprintf("In JSON, cannot use %v as %v", err.Value, jsonType(err.Type))
|
||||
|
||||
case *time.ParseError:
|
||||
msg = "Time strings must be in RFC 3339 format."
|
||||
|
||||
case LimitReached:
|
||||
msg = err.Error()
|
||||
|
||||
case JSONError:
|
||||
msg = string(err)
|
||||
|
||||
default:
|
||||
if err != io.EOF {
|
||||
log15.Error("unhandled json.Unmarshal error", "type", reflect.TypeOf(err), "err", err)
|
||||
}
|
||||
msg = "Failed to decode JSON."
|
||||
}
|
||||
SendError(w, http.StatusBadRequest, msg)
|
||||
}
|
||||
|
||||
type sizer interface {
|
||||
Size() int64
|
||||
}
|
||||
|
||||
type LimitReached int64
|
||||
|
||||
func (e LimitReached) Error() string {
|
||||
return fmt.Sprint("Request body greater than", int64(e), "bytes")
|
||||
}
|
||||
35
api/models.go
Normal file
35
api/models.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package api
|
||||
|
||||
type Route struct {
|
||||
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
|
||||
Host string `json:"host"`
|
||||
Destinations []string `json:"destinations"`
|
||||
ProjectId string `json:"project_id"`
|
||||
Token string `json:"token"` // store this so we can queue up new workers on demand
|
||||
CodeName string `json:"code_name"`
|
||||
}
|
||||
|
||||
// for adding new hosts
|
||||
type Route2 struct {
|
||||
Host string `json:"host"`
|
||||
Dest string `json:"dest"`
|
||||
}
|
||||
|
||||
// An app is that base object for api gateway
|
||||
type App struct {
|
||||
Name string `json:"name"`
|
||||
ProjectId string `json:"project_id"`
|
||||
CloudFlareId string `json:"-"`
|
||||
Dns string `json:"dns"`
|
||||
Routes map[string]*Route3 `json:"routes"`
|
||||
}
|
||||
|
||||
// this is for the new api gateway
|
||||
type Route3 struct {
|
||||
Path string `json:"path"` // run/app
|
||||
Image string `json:"image"`
|
||||
Type string `json:"type"`
|
||||
ContainerPath string `json:"cpath"`
|
||||
// maybe user a header map for whatever user wants to send?
|
||||
ContentType string `json:"content_type"`
|
||||
}
|
||||
199
api/runner.go
Normal file
199
api/runner.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/iron-io/go/common"
|
||||
)
|
||||
|
||||
type RunningApp struct {
|
||||
Route *Route3
|
||||
Port int
|
||||
ContainerName string
|
||||
}
|
||||
|
||||
var runningImages map[string]*RunningApp
|
||||
|
||||
func init() {
|
||||
runningImages = make(map[string]*RunningApp)
|
||||
fmt.Println("ENV:", os.Environ())
|
||||
}
|
||||
|
||||
func Run(w http.ResponseWriter, req *http.Request) {
|
||||
fmt.Println("RUN!!!!")
|
||||
log.Infoln("HOST:", req.Host)
|
||||
rUrl := req.URL
|
||||
appName := rUrl.Query().Get("app")
|
||||
log.Infoln("app_name", appName, "path:", req.URL.Path)
|
||||
if appName != "" {
|
||||
// passed in the name
|
||||
} else {
|
||||
host := strings.Split(req.Host, ":")[0]
|
||||
appName = strings.Split(host, ".")[0]
|
||||
log.Infoln("app_name from host", appName)
|
||||
}
|
||||
|
||||
app, err := getApp(appName)
|
||||
if err != nil {
|
||||
common.SendError(w, 400, fmt.Sprintln("This app does not exist. Please create app first.", err))
|
||||
return
|
||||
}
|
||||
log.Infoln("app", app)
|
||||
|
||||
// find route
|
||||
for _, el := range app.Routes {
|
||||
// TODO: copy/use gorilla's pattern matching here
|
||||
if el.Path == req.URL.Path {
|
||||
// Boom, run it!
|
||||
err = checkAndPull(el.Image)
|
||||
if err != nil {
|
||||
common.SendError(w, 404, fmt.Sprintln("The image could not be pulled:", err))
|
||||
return
|
||||
}
|
||||
if el.Type == "app" {
|
||||
DockerHost(el, w)
|
||||
return
|
||||
} else { // "run"
|
||||
// TODO: timeout 59 seconds
|
||||
DockerRun(el, w, req)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
common.SendError(w, 404, fmt.Sprintln("The requested endpoint does not exist."))
|
||||
}
|
||||
|
||||
// TODO: use Docker utils from docker-job for this and a few others in here
|
||||
func DockerRun(route *Route3, w http.ResponseWriter, req *http.Request) {
|
||||
log.Infoln("route:", route)
|
||||
image := route.Image
|
||||
payload, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error reading request body")
|
||||
return
|
||||
}
|
||||
log.WithField("payload", "---"+string(payload)+"---").Infoln("incoming request")
|
||||
log.WithField("image", image).Infoln("About to run using this image")
|
||||
|
||||
// TODO: swap all this out with Titan's running via API
|
||||
cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", string(payload)), image)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var b bytes.Buffer
|
||||
buff := bufio.NewWriter(&b)
|
||||
|
||||
go io.Copy(buff, stdout)
|
||||
go io.Copy(buff, stderr)
|
||||
|
||||
log.Printf("Waiting for command to finish...")
|
||||
if err = cmd.Wait(); err != nil {
|
||||
// job failed
|
||||
log.Infoln("job finished with err:", err)
|
||||
log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run")
|
||||
// TODO: wrap error in json "error": buff
|
||||
} else {
|
||||
log.Infoln("Docker ran successfully:", b.String())
|
||||
// print
|
||||
log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run")
|
||||
}
|
||||
log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran")
|
||||
buff.Flush()
|
||||
if route.ContentType != "" {
|
||||
w.Header().Set("Content-Type", route.ContentType)
|
||||
}
|
||||
fmt.Fprintln(w, string(bytes.Trim(b.Bytes(), "\x00")))
|
||||
}
|
||||
|
||||
func DockerHost(el *Route3, w http.ResponseWriter) {
|
||||
ra := runningImages[el.Image]
|
||||
if ra == nil {
|
||||
ra = &RunningApp{}
|
||||
ra.Route = el
|
||||
ra.Port = rand.Intn(9999-9000) + 9000
|
||||
ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000))
|
||||
runningImages[el.Image] = ra
|
||||
// TODO: timeout 59 minutes. Mark it in ra as terminated.
|
||||
cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image)
|
||||
// TODO: What should we do with the output here? Store it? Send it to a log service?
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
// TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatal(err)
|
||||
// TODO: What if the app fails to start? Don't want to keep starting the container
|
||||
}
|
||||
} else {
|
||||
// TODO: check if it's still running?
|
||||
// TODO: if ra.terminated, then start new container?
|
||||
}
|
||||
fmt.Println("RunningApp:", ra)
|
||||
// TODO: if connection fails, check if container still running? If not, start it again
|
||||
resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath))
|
||||
if err != nil {
|
||||
common.SendError(w, 404, fmt.Sprintln("The requested app endpoint does not exist.", err))
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
common.SendError(w, 500, fmt.Sprintln("Error reading response body", err))
|
||||
return
|
||||
}
|
||||
fmt.Fprintln(w, string(body))
|
||||
}
|
||||
|
||||
func checkAndPull(image string) error {
|
||||
err := execAndPrint("docker", []string{"inspect", image})
|
||||
if err != nil {
|
||||
// image does not exist, so let's pull
|
||||
fmt.Println("Image not found locally, will pull.", err)
|
||||
err = execAndPrint("docker", []string{"pull", image})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func execAndPrint(cmdstr string, args []string) error {
|
||||
var bout bytes.Buffer
|
||||
buffout := bufio.NewWriter(&bout)
|
||||
var berr bytes.Buffer
|
||||
bufferr := bufio.NewWriter(&berr)
|
||||
cmd := exec.Command(cmdstr, args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
go io.Copy(buffout, stdout)
|
||||
go io.Copy(bufferr, stderr)
|
||||
|
||||
log.Printf("Waiting for cmd to finish...")
|
||||
err = cmd.Wait()
|
||||
fmt.Println("stderr:", berr.String())
|
||||
fmt.Println("stdout:", bout.String())
|
||||
return err
|
||||
}
|
||||
3
api/version.go
Normal file
3
api/version.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package api
|
||||
|
||||
const Version = "0.0.26"
|
||||
Reference in New Issue
Block a user