Got it actually working with workers and a remote router. Boom.

This commit is contained in:
Travis
2012-12-30 21:14:27 -08:00
parent 0b01415b76
commit 9795fb207d
7 changed files with 240 additions and 15 deletions

2
.gitignore vendored
View File

@@ -1,2 +1,2 @@
.idea/*
.idea/
*.sublime*

View File

@@ -1,15 +1,25 @@
## What's going on?
- worker.rb connects to router and adds routes.
- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response.
The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests.
## Todo
This is just a simple prototype. To get to production would need:
- Routing table in central storage (mongo or IronCache) so all routers can write to it and read to get updates.
- Update routing table from central store every X minutes.
- Remove failed routes and start new workers if it failed.
- Expire routes after 55 minutes or so.
- Ability to start new workers if none are running.
- Ability to always keep a minimum number running at all times, like at least one (or not if on free account?).
- Ability to start new workers based on some auto scaling scheme.
- Authentication (same as always).
## Testing
- start helloserver.go
@@ -17,10 +27,10 @@ This is just a simple prototype. To get to production would need:
- ruby worker.rb a couple times
- ruby client.rb
What's going on?
- worker.rb connects to router and adds routes.
- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response.
The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests. The endpoint should only be cached for 55 minutes or so.
## Testing for reals
- start router.go on remote server (there's a test project on SD already)
- iron_worker upload app_worker
- iron_worker queue app_worker
- ruby client.rb
- BOOM!

6
app_worker.worker Normal file
View File

@@ -0,0 +1,6 @@
runtime "ruby"
gem 'rest'
gem 'sinatra'
exec 'worker.rb'

View File

@@ -2,7 +2,10 @@ require 'rest'
rest = Rest::Client.new
rest.logger.level = Logger::DEBUG
response = rest.get("http://localhost:8080/") # "http://www.github.com")
response = rest.get(
#"http://localhost:8080/"
"http://routertest.irondns.info/"
)
puts "body:"
puts response.body

184
src/router/reverser.go Normal file
View File

@@ -0,0 +1,184 @@
// I wanted to do some stuff to this so had to make a copy. Namely change the Host handling for virtual hosts.
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// HTTP reverse proxy handler
package main
import (
"io"
"log"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
// onExitFlushLoop is a callback set by tests to detect the state of the
// flushLoop() goroutine.
var onExitFlushLoop func()
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
type ReverseProxy struct {
// Director must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
Director func(*http.Request)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
FlushInterval time.Duration
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
return &ReverseProxy{Director: director}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
outreq := new(http.Request)
*outreq = *req // includes shallow copies of maps, but okay
p.Director(outreq)
outreq.Proto = "HTTP/1.1"
outreq.ProtoMajor = 1
outreq.ProtoMinor = 1
outreq.Close = false
// Remove the connection header to the backend. We want a
// persistent connection, regardless of what the client sent
// to us. This is modifying the same underlying map from req
// (shallow copied above) so we only copy it if necessary.
if outreq.Header.Get("Connection") != "" {
outreq.Header = make(http.Header)
copyHeader(outreq.Header, req.Header)
outreq.Header.Del("Connection")
}
if clientIp, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
outreq.Header.Set("X-Forwarded-For", clientIp)
}
res, err := transport.RoundTrip(outreq)
if err != nil {
log.Printf("http: proxy error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
defer res.Body.Close()
copyHeader(rw.Header(), res.Header)
rw.WriteHeader(res.StatusCode)
p.copyResponse(rw, res.Body)
}
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) {
if p.FlushInterval != 0 {
if wf, ok := dst.(writeFlusher); ok {
mlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go mlw.flushLoop()
defer mlw.stop()
dst = mlw
}
}
io.Copy(dst, src)
}
type writeFlusher interface {
io.Writer
http.Flusher
}
type maxLatencyWriter struct {
dst writeFlusher
latency time.Duration
lk sync.Mutex // protects Write + Flush
done chan bool
}
func (m *maxLatencyWriter) Write(p []byte) (int, error) {
m.lk.Lock()
defer m.lk.Unlock()
return m.dst.Write(p)
}
func (m *maxLatencyWriter) flushLoop() {
t := time.NewTicker(m.latency)
defer t.Stop()
for {
select {
case <-m.done:
if onExitFlushLoop != nil {
onExitFlushLoop()
}
return
case <-t.C:
m.lk.Lock()
m.dst.Flush()
m.lk.Unlock()
}
}
panic("unreached")
}
func (m *maxLatencyWriter) stop() { m.done <- true }

View File

@@ -63,6 +63,7 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
destUrls = "http://" + destUrls
destUrl, err := url.Parse(destUrls)
if err != nil {
fmt.Println("error!", err)
panic(err)
}
fmt.Println("proxying to", destUrl)

View File

@@ -1,10 +1,31 @@
require 'rest'
require 'sinatra'
rest = Rest::Client.new
rest.logger.level = Logger::DEBUG
response = rest.post("http://localhost:8080/",
public_dns = rest.get("http://169.254.169.254/latest/meta-data/public-hostname").body
puts "public dns name: #{public_dns}"
port = rand(50000..55000)
puts "port: #{port}"
response = rest.post(
# "http://localhost:8080/",
"http://router.irondns.info/",
headers: {"Iron-Router"=>"YES!"},
body: {"host"=>"localhost", "dest"=>"localhost:8082"})
body: {"host"=>"routertest.irondns.info", "dest"=>"#{public_dns}:#{port}"})
puts "body:"
puts response.body
STDOUT.flush
ENV['PORT'] = port.to_s # for sinatra
my_app = Sinatra.new do
set :port, port
get('/') { "hi" }
get('/*') { "you passed in #{params[:splat].inspect}" }
end
my_app.run!