Revert "safe responsewriter usage in TryExec (#1490)" (#1522)

This reverts commit 1fb78ed836.
This commit is contained in:
Srinidhi Chokkadi Puranik
2019-07-04 01:51:06 -07:00
committed by Andrea Rosa
parent e73e351eef
commit bb84ed35de
5 changed files with 9 additions and 93 deletions

View File

@@ -70,9 +70,9 @@ test: checkfmt pull-images test-basic test-middleware test-extensions test-syste
.PHONY: test-system
test-system:
./system_test.sh sqlite3 $(run)
./system_test.sh mysql $(run)
./system_test.sh postgres $(run)
./system_test.sh sqlite3
./system_test.sh mysql
./system_test.sh postgres
.PHONY: img-busybox
img-busybox:

View File

@@ -28,18 +28,14 @@ type lbAgent struct {
callOpts []CallOpt
}
// DetachedResponseWriter implements http.ResponseWriter without allowing
// writes to the body or writing the headers from a call to Write or
// WriteHeader, it is only intended to allow writing the status code in and
// being able to fetch it later from Status()
type DetachedResponseWriter struct {
headers http.Header
Headers http.Header
status int
acked chan struct{}
}
func (w *DetachedResponseWriter) Header() http.Header {
return w.headers
return w.Headers
}
func (w *DetachedResponseWriter) Write(data []byte) (int, error) {
@@ -55,9 +51,9 @@ func (w *DetachedResponseWriter) Status() int {
return w.status
}
func NewDetachedResponseWriter(statusCode int) *DetachedResponseWriter {
func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter {
return &DetachedResponseWriter{
headers: make(http.Header),
Headers: h,
status: statusCode,
acked: make(chan struct{}, 1),
}

View File

@@ -1,7 +1,6 @@
package agent
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
@@ -162,76 +161,6 @@ func (r *gRPCRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) {
// implements Runner
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
// we need to get a response writer that is safe for us to use. the only reason we need this,
// ostensibly, is because io operations can't be timed out so we need to do receiveFromRunner
// in a goroutine, and we may return from here from a timeout while recv is running, leading
// to a race when a placer calls TryExec with the same ResponseWriter we have here. blech.
respBuffer := bufPool.Get().(*bytes.Buffer)
respBuffer.Reset()
defer func() {
if ctx.Err() == nil { // this is only safe if we don't time out (receiveFromRunner returned/not called)
bufPool.Put(respBuffer)
}
}()
writer := syncResponseWriter{
headers: make(http.Header),
Buffer: respBuffer,
}
safeCall := callWithResponseWriter(call, &writer)
placed, err := r.tryExec(ctx, safeCall)
if err != nil || !placed {
return placed, err
}
// now we can write to the actual response writer from this thread (so long
// as the caller waits, we're playing jenga ofc)
rw := call.ResponseWriter()
copyHeaders(rw.Header(), writer.Header())
rw.WriteHeader(writer.status)
io.Copy(rw, writer) // TODO(reed): this is also a buffer->buffer operation :( but it means no errors
return true, nil
}
func callWithResponseWriter(call pool.RunnerCall, rw http.ResponseWriter) pool.RunnerCall {
return &wrapCall{call, rw}
}
// wrapCall implements pool.RunnerCall but bypasses the embedded RunnerCall's ResponseWriter() method
// TODO this is the worst thing we've tried, except for all the other things we've tried.
type wrapCall struct {
pool.RunnerCall
rw http.ResponseWriter
}
func (c *wrapCall) ResponseWriter() http.ResponseWriter {
return c.rw
}
func copyHeaders(dst, src http.Header) {
for k, vs := range src {
for _, v := range vs {
dst.Add(k, v)
}
}
}
// TODO(reed): this is copied. and not only does it make sense in both places for different
// reasons, it makes sense in another place too. need to reconsider ifaces
type syncResponseWriter struct {
headers http.Header
status int
*bytes.Buffer
}
var _ http.ResponseWriter = new(syncResponseWriter) // nice compiler errors
func (s *syncResponseWriter) Header() http.Header { return s.headers }
func (s *syncResponseWriter) WriteHeader(code int) { s.status = code }
func (r *gRPCRunner) tryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
log := common.Logger(ctx).WithField("runner_addr", r.address)
log.Debug("Attempting to place call")
@@ -413,7 +342,6 @@ func recordFinishStats(ctx context.Context, msg *pb.CallFinished, c pool.RunnerC
statsLBAgentRunnerSchedLatency(ctx, runnerSchedLatency)
statsLBAgentRunnerExecLatency(ctx, runnerExecLatency)
// TODO: this is not safe to be called from within receiveFromRunner, it may get called each retry (data race, but also incorrect)
c.AddUserExecutionTime(runnerExecLatency)
}
}

View File

@@ -96,7 +96,7 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
isDetached := req.Header.Get("Fn-Invoke-Type") == models.TypeDetached
if isDetached {
writer = agent.NewDetachedResponseWriter(202)
writer = agent.NewDetachedResponseWriter(resp.Header(), 202)
} else {
writer = &syncResponseWriter{
headers: resp.Header(),

View File

@@ -6,7 +6,6 @@ source ./helpers.sh
remove_containers ${CONTEXT}
DB_NAME=$1
shift # later usage
export FN_DB_URL=$(spawn_${DB_NAME} ${CONTEXT})
# avoid port conflicts with api_test.sh which are run in parallel
@@ -24,15 +23,8 @@ export FN_LOG_LEVEL=debug
#
export SYSTEM_TEST_PROMETHEUS_FILE=./prometheus.${DB_NAME}.txt
run="$@"
if [ ! -z "$run" ]
then
run="-run $run"
fi
cd test/fn-system-tests
go test $run -v ./...
go test -v ./...
cd ../../
remove_containers ${CONTEXT}