mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Added IronCache (not used yet).
Using copy of reverse proxy so can get error response.
This commit is contained in:
@@ -1,4 +1,9 @@
|
|||||||
// I wanted to do some stuff to this so had to make a copy. Namely change the Host handling for virtual hosts.
|
/* I wanted to do some stuff to this so had to make a copy. Namely:
|
||||||
|
- change the Host handling for virtual hosts.
|
||||||
|
- get errors if the proxy request fails
|
||||||
|
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
// Copyright 2011 The Go Authors. All rights reserved.
|
// Copyright 2011 The Go Authors. All rights reserved.
|
||||||
// Use of this source code is governed by a BSD-style
|
// Use of this source code is governed by a BSD-style
|
||||||
@@ -83,7 +88,7 @@ func copyHeader(dst, src http.Header) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) error {
|
||||||
transport := p.Transport
|
transport := p.Transport
|
||||||
if transport == nil {
|
if transport == nil {
|
||||||
transport = http.DefaultTransport
|
transport = http.DefaultTransport
|
||||||
@@ -115,8 +120,8 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||||||
res, err := transport.RoundTrip(outreq)
|
res, err := transport.RoundTrip(outreq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("http: proxy error: %v", err)
|
log.Printf("http: proxy error: %v", err)
|
||||||
rw.WriteHeader(http.StatusInternalServerError)
|
// rw.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
|
|
||||||
@@ -124,6 +129,9 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
rw.WriteHeader(res.StatusCode)
|
rw.WriteHeader(res.StatusCode)
|
||||||
p.copyResponse(rw, res.Body)
|
p.copyResponse(rw, res.Body)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) {
|
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) {
|
||||||
|
|||||||
@@ -11,19 +11,28 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/iron-io/iron_go/cache"
|
||||||
|
// "github.com/iron-io/iron_go/config"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var routingTable = map[string]Route{}
|
var routingTable = map[string]Route{}
|
||||||
|
var icache = cache.New("routertest")
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
icache.Settings.UseConfigMap(map[string]interface{}{"token": "MWx0VfngzsCu0W8NAYw7S2lNrgo", "project_id": "50e227be8e7d14359b001373"})
|
||||||
|
}
|
||||||
|
|
||||||
type Route struct {
|
type Route struct {
|
||||||
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
|
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
|
||||||
Destinations []string
|
Destinations []string
|
||||||
|
ProjectId string
|
||||||
|
Token string // store this so we can queue up new workers on demand
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// for adding new hosts
|
// for adding new hosts
|
||||||
@@ -52,6 +61,11 @@ func main() {
|
|||||||
func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||||
fmt.Println("HOST:", req.Host)
|
fmt.Println("HOST:", req.Host)
|
||||||
host := strings.Split(req.Host, ":")[0]
|
host := strings.Split(req.Host, ":")[0]
|
||||||
|
|
||||||
|
// We look up the destinations in the routing table and there can be 3 possible scenarios:
|
||||||
|
// 1) This host was never registered so we return 404
|
||||||
|
// 2) This host has active workers so we do the proxy
|
||||||
|
// 3) This host has no active workers so we queue one (or more) up and return a 503 or something with message that says "try again in a minute"
|
||||||
route := routingTable[host]
|
route := routingTable[host]
|
||||||
// choose random dest
|
// choose random dest
|
||||||
if len(route.Destinations) == 0 {
|
if len(route.Destinations) == 0 {
|
||||||
@@ -67,14 +81,28 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
fmt.Println("proxying to", destUrl)
|
fmt.Println("proxying to", destUrl)
|
||||||
proxy := httputil.NewSingleHostReverseProxy(destUrl)
|
proxy := NewSingleHostReverseProxy(destUrl)
|
||||||
proxy.ServeHTTP(w, req)
|
err = proxy.ServeHTTP(w, req)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error proxying!", err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
// start new worker if it's a connection error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Println("Served!")
|
||||||
// todo: how to handle destination failures. I got this in log output when testing a bad endpoint:
|
// todo: how to handle destination failures. I got this in log output when testing a bad endpoint:
|
||||||
// 2012/12/26 23:22:08 http: proxy error: dial tcp 127.0.0.1:8082: connection refused
|
// 2012/12/26 23:22:08 http: proxy error: dial tcp 127.0.0.1:8082: connection refused
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When a worker starts up, it calls this
|
||||||
func AddWorker(w http.ResponseWriter, req *http.Request) {
|
func AddWorker(w http.ResponseWriter, req *http.Request) {
|
||||||
log.Println("AddWorker called!")
|
log.Println("AddWorker called!")
|
||||||
|
|
||||||
|
// get project id and token
|
||||||
|
projectId := req.FormValue("project_id")
|
||||||
|
token := req.FormValue("token")
|
||||||
|
fmt.Println("project_id:", projectId, "token:", token)
|
||||||
|
|
||||||
r2 := Route2{}
|
r2 := Route2{}
|
||||||
decoder := json.NewDecoder(req.Body)
|
decoder := json.NewDecoder(req.Body)
|
||||||
decoder.Decode(&r2)
|
decoder.Decode(&r2)
|
||||||
@@ -82,9 +110,12 @@ func AddWorker(w http.ResponseWriter, req *http.Request) {
|
|||||||
fmt.Println("DECODED:", r2)
|
fmt.Println("DECODED:", r2)
|
||||||
|
|
||||||
// todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it.
|
// todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it.
|
||||||
|
// todo: one cache entry per host domain
|
||||||
route := routingTable[r2.Host]
|
route := routingTable[r2.Host]
|
||||||
fmt.Println("ROUTE:", route)
|
fmt.Println("ROUTE:", route)
|
||||||
route.Destinations = append(route.Destinations, r2.Dest)
|
route.Destinations = append(route.Destinations, r2.Dest)
|
||||||
|
route.ProjectId = projectId
|
||||||
|
route.Token = token
|
||||||
fmt.Println("ROUTE:", route)
|
fmt.Println("ROUTE:", route)
|
||||||
routingTable[r2.Host] = route
|
routingTable[r2.Host] = route
|
||||||
fmt.Println("New routing table:", routingTable)
|
fmt.Println("New routing table:", routingTable)
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
require 'rest'
|
require 'rest'
|
||||||
require 'sinatra'
|
require 'sinatra'
|
||||||
|
|
||||||
|
# The backend would do this part before execute the worker
|
||||||
|
########################################################
|
||||||
rest = Rest::Client.new
|
rest = Rest::Client.new
|
||||||
rest.logger.level = Logger::DEBUG
|
rest.logger.level = Logger::DEBUG
|
||||||
|
|
||||||
@@ -10,10 +12,9 @@ puts "public dns name: #{public_dns}"
|
|||||||
port = rand(50000..55000)
|
port = rand(50000..55000)
|
||||||
puts "port: #{port}"
|
puts "port: #{port}"
|
||||||
|
|
||||||
|
|
||||||
response = rest.post(
|
response = rest.post(
|
||||||
# "http://localhost:8080/",
|
# "http://localhost:8080/",
|
||||||
"http://router.irondns.info/",
|
"http://router.irondns.info/?project_id=#{params[:project_id]}&token=#{params[:token]}",
|
||||||
headers: {"Iron-Router"=>"YES!"},
|
headers: {"Iron-Router"=>"YES!"},
|
||||||
body: {"host"=>"routertest.irondns.info", "dest"=>"#{public_dns}:#{port}"})
|
body: {"host"=>"routertest.irondns.info", "dest"=>"#{public_dns}:#{port}"})
|
||||||
puts "body:"
|
puts "body:"
|
||||||
@@ -21,6 +22,9 @@ puts response.body
|
|||||||
|
|
||||||
STDOUT.flush
|
STDOUT.flush
|
||||||
|
|
||||||
|
# Now we start the actual worker
|
||||||
|
##################################################################3
|
||||||
|
|
||||||
ENV['PORT'] = port.to_s # for sinatra
|
ENV['PORT'] = port.to_s # for sinatra
|
||||||
my_app = Sinatra.new do
|
my_app = Sinatra.new do
|
||||||
set :port, port
|
set :port, port
|
||||||
|
|||||||
Reference in New Issue
Block a user