From 88074a42c08b364746b7c18bbe833e587dfa56c8 Mon Sep 17 00:00:00 2001 From: jan grant <3430517+jan-g@users.noreply.github.com> Date: Tue, 3 Apr 2018 15:04:21 +0100 Subject: [PATCH] Bugfix/grpc consume eof (#912) * GRPC streams end with an EOF The client should ensure that the final packet is followed by a GRPC EOF. This has the benefit of permitting the client code to clean up resources. * Don't require an entire HTTP request in RunnerCall TryExec needs a handle on an incoming ReadCloser containing the body of a request; however, everything else will already have been extracted from the HTTP request in the case of lbAgent use. (The point of this change is to simplify the interface for other uses.) * Return error from GRPC layer explicitly As per review --- api/agent/call.go | 4 ++-- api/agent/lb_agent_test.go | 7 +++++-- api/agent/runner_client.go | 9 +++++++-- api/runnerpool/runner_pool.go | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/api/agent/call.go b/api/agent/call.go index 1fc64af80..5d3b78760 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -285,8 +285,8 @@ func (c *call) SlotDeadline() time.Time { return c.slotDeadline } -func (c *call) Request() *http.Request { - return c.req +func (c *call) RequestBody() io.ReadCloser { + return c.req.Body } func (c *call) ResponseWriter() http.ResponseWriter { diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 5c39cb6bb..4ded29829 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -128,15 +128,18 @@ func (c *mockRunnerCall) SlotDeadline() time.Time { return c.slotDeadline } -func (c *mockRunnerCall) Request() *http.Request { - return c.r +func (c *mockRunnerCall) RequestBody() io.ReadCloser { + return c.r.Body } + func (c *mockRunnerCall) ResponseWriter() http.ResponseWriter { return c.rw } + func (c *mockRunnerCall) StdErr() io.ReadWriteCloser { return c.stdErr } + func (c *mockRunnerCall) Model() *models.Call { return c.model } diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index 1f3b482df..257c38d03 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -135,7 +135,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e } func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error { - bodyReader := call.Request().Body + bodyReader := call.RequestBody() writeBufferSize := 10 * 1024 // 10KB writeBuffer := make([]byte, writeBufferSize) for { @@ -199,7 +199,12 @@ func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.Run if body.Finished.Success { logrus.Infof("Call finished successfully: %v", body.Finished.Details) } else { - logrus.Infof("Call finish unsuccessfully:: %v", body.Finished.Details) + logrus.Infof("Call finished unsuccessfully: %v", body.Finished.Details) + } + // There should be an EOF following the last packet + if _, err := protocolClient.Recv(); err != io.EOF { + logrus.WithError(err).Error("Did not receive expected EOF from runner stream") + done <- err } close(done) return diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index fc82dec16..9f49a528d 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -42,7 +42,7 @@ type Runner interface { // processed by a RunnerPool type RunnerCall interface { SlotDeadline() time.Time - Request() *http.Request + RequestBody() io.ReadCloser ResponseWriter() http.ResponseWriter StdErr() io.ReadWriteCloser Model() *models.Call