diff --git a/.gitignore b/.gitignore index d0741d053..0e7a152db 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -.idea/* +.idea/ *.sublime* diff --git a/README.md b/README.md index 7f39e68eb..b4fe6e983 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,25 @@ + +## What's going on? + +- worker.rb connects to router and adds routes. +- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response. + +The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests. + +## Todo + This is just a simple prototype. To get to production would need: - Routing table in central storage (mongo or IronCache) so all routers can write to it and read to get updates. - Update routing table from central store every X minutes. -- Remove failed routes and start new workers if it failed. +- Remove failed routes and start new workers if it failed. +- Expire routes after 55 minutes or so. - Ability to start new workers if none are running. - Ability to always keep a minimum number running at all times, like at least one (or not if on free account?). - Ability to start new workers based on some auto scaling scheme. - Authentication (same as always). - ## Testing - start helloserver.go @@ -17,10 +27,10 @@ This is just a simple prototype. To get to production would need: - ruby worker.rb a couple times - ruby client.rb -What's going on? - -- worker.rb connects to router and adds routes. -- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response. - -The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests. The endpoint should only be cached for 55 minutes or so. +## Testing for reals +- start router.go on remote server (there's a test project on SD already) +- iron_worker upload app_worker +- iron_worker queue app_worker +- ruby client.rb +- BOOM! diff --git a/app_worker.worker b/app_worker.worker new file mode 100644 index 000000000..296e0fe33 --- /dev/null +++ b/app_worker.worker @@ -0,0 +1,6 @@ +runtime "ruby" + +gem 'rest' +gem 'sinatra' + +exec 'worker.rb' diff --git a/client.rb b/client.rb index d73e35ab0..d54e19675 100644 --- a/client.rb +++ b/client.rb @@ -2,7 +2,10 @@ require 'rest' rest = Rest::Client.new rest.logger.level = Logger::DEBUG -response = rest.get("http://localhost:8080/") # "http://www.github.com") +response = rest.get( +#"http://localhost:8080/" + "http://routertest.irondns.info/" +) puts "body:" puts response.body diff --git a/src/router/reverser.go b/src/router/reverser.go new file mode 100644 index 000000000..071df4576 --- /dev/null +++ b/src/router/reverser.go @@ -0,0 +1,184 @@ +// I wanted to do some stuff to this so had to make a copy. Namely change the Host handling for virtual hosts. + +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// HTTP reverse proxy handler + +package main + +import ( + "io" + "log" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +// onExitFlushLoop is a callback set by tests to detect the state of the +// flushLoop() goroutine. +var onExitFlushLoop func() + +// ReverseProxy is an HTTP Handler that takes an incoming request and +// sends it to another server, proxying the response back to the +// client. +type ReverseProxy struct { + // Director must be a function which modifies + // the request into a new request to be sent + // using Transport. Its response is then copied + // back to the original client unmodified. + Director func(*http.Request) + + // The transport used to perform proxy requests. + // If nil, http.DefaultTransport is used. + Transport http.RoundTripper + + // FlushInterval specifies the flush interval + // to flush to the client while copying the + // response body. + // If zero, no periodic flushing is done. + FlushInterval time.Duration +} + +func singleJoiningSlash(a, b string) string { + aslash := strings.HasSuffix(a, "/") + bslash := strings.HasPrefix(b, "/") + switch { + case aslash && bslash: + return a + b[1:] + case !aslash && !bslash: + return a + "/" + b + } + return a + b +} + +// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites +// URLs to the scheme, host, and base path provided in target. If the +// target's path is "/base" and the incoming request was for "/dir", +// the target request will be for /base/dir. +func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { + targetQuery := target.RawQuery + director := func(req *http.Request) { + req.URL.Scheme = target.Scheme + req.URL.Host = target.Host + req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) + if targetQuery == "" || req.URL.RawQuery == "" { + req.URL.RawQuery = targetQuery + req.URL.RawQuery + } else { + req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery + } + } + return &ReverseProxy{Director: director} +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + transport := p.Transport + if transport == nil { + transport = http.DefaultTransport + } + + outreq := new(http.Request) + *outreq = *req // includes shallow copies of maps, but okay + + p.Director(outreq) + outreq.Proto = "HTTP/1.1" + outreq.ProtoMajor = 1 + outreq.ProtoMinor = 1 + outreq.Close = false + + // Remove the connection header to the backend. We want a + // persistent connection, regardless of what the client sent + // to us. This is modifying the same underlying map from req + // (shallow copied above) so we only copy it if necessary. + if outreq.Header.Get("Connection") != "" { + outreq.Header = make(http.Header) + copyHeader(outreq.Header, req.Header) + outreq.Header.Del("Connection") + } + + if clientIp, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { + outreq.Header.Set("X-Forwarded-For", clientIp) + } + + res, err := transport.RoundTrip(outreq) + if err != nil { + log.Printf("http: proxy error: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + defer res.Body.Close() + + copyHeader(rw.Header(), res.Header) + + rw.WriteHeader(res.StatusCode) + p.copyResponse(rw, res.Body) +} + +func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) { + if p.FlushInterval != 0 { + if wf, ok := dst.(writeFlusher); ok { + mlw := &maxLatencyWriter{ + dst: wf, + latency: p.FlushInterval, + done: make(chan bool), + } + go mlw.flushLoop() + defer mlw.stop() + dst = mlw + } + } + + io.Copy(dst, src) +} + +type writeFlusher interface { +io.Writer +http.Flusher +} + +type maxLatencyWriter struct { + dst writeFlusher + latency time.Duration + + lk sync.Mutex // protects Write + Flush + done chan bool +} + +func (m *maxLatencyWriter) Write(p []byte) (int, error) { + m.lk.Lock() + defer m.lk.Unlock() + return m.dst.Write(p) +} + +func (m *maxLatencyWriter) flushLoop() { + t := time.NewTicker(m.latency) + defer t.Stop() + for { + select { + case <-m.done: + if onExitFlushLoop != nil { + onExitFlushLoop() + } + return + case <-t.C: + m.lk.Lock() + m.dst.Flush() + m.lk.Unlock() + } + } + panic("unreached") +} + +func (m *maxLatencyWriter) stop() { m.done <- true } diff --git a/src/router/router.go b/src/router/router.go index df1539109..5e5947a49 100644 --- a/src/router/router.go +++ b/src/router/router.go @@ -44,8 +44,8 @@ func main() { r.HandleFunc("/", ProxyFunc) http.Handle("/", r) -port := 80 -fmt.Println("listening and serving on port", port) + port := 80 + fmt.Println("listening and serving on port", port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", port), nil)) } @@ -63,9 +63,10 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) { destUrls = "http://" + destUrls destUrl, err := url.Parse(destUrls) if err != nil { + fmt.Println("error!", err) panic(err) } -fmt.Println("proxying to", destUrl) + fmt.Println("proxying to", destUrl) proxy := httputil.NewSingleHostReverseProxy(destUrl) proxy.ServeHTTP(w, req) // todo: how to handle destination failures. I got this in log output when testing a bad endpoint: diff --git a/worker.rb b/worker.rb index 20cbc9700..a77942a41 100644 --- a/worker.rb +++ b/worker.rb @@ -1,10 +1,31 @@ require 'rest' +require 'sinatra' rest = Rest::Client.new rest.logger.level = Logger::DEBUG -response = rest.post("http://localhost:8080/", + +public_dns = rest.get("http://169.254.169.254/latest/meta-data/public-hostname").body + +puts "public dns name: #{public_dns}" +port = rand(50000..55000) +puts "port: #{port}" + + +response = rest.post( +# "http://localhost:8080/", + "http://router.irondns.info/", headers: {"Iron-Router"=>"YES!"}, - body: {"host"=>"localhost", "dest"=>"localhost:8082"}) + body: {"host"=>"routertest.irondns.info", "dest"=>"#{public_dns}:#{port}"}) puts "body:" puts response.body +STDOUT.flush + +ENV['PORT'] = port.to_s # for sinatra +my_app = Sinatra.new do + set :port, port + get('/') { "hi" } + get('/*') { "you passed in #{params[:splat].inspect}" } +end +my_app.run! +