mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Bugfix/grpc consume eof (#912)
* GRPC streams end with an EOF The client should ensure that the final packet is followed by a GRPC EOF. This has the benefit of permitting the client code to clean up resources. * Don't require an entire HTTP request in RunnerCall TryExec needs a handle on an incoming ReadCloser containing the body of a request; however, everything else will already have been extracted from the HTTP request in the case of lbAgent use. (The point of this change is to simplify the interface for other uses.) * Return error from GRPC layer explicitly As per review
This commit is contained in:
@@ -285,8 +285,8 @@ func (c *call) SlotDeadline() time.Time {
|
|||||||
return c.slotDeadline
|
return c.slotDeadline
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) Request() *http.Request {
|
func (c *call) RequestBody() io.ReadCloser {
|
||||||
return c.req
|
return c.req.Body
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) ResponseWriter() http.ResponseWriter {
|
func (c *call) ResponseWriter() http.ResponseWriter {
|
||||||
|
|||||||
@@ -128,15 +128,18 @@ func (c *mockRunnerCall) SlotDeadline() time.Time {
|
|||||||
return c.slotDeadline
|
return c.slotDeadline
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockRunnerCall) Request() *http.Request {
|
func (c *mockRunnerCall) RequestBody() io.ReadCloser {
|
||||||
return c.r
|
return c.r.Body
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockRunnerCall) ResponseWriter() http.ResponseWriter {
|
func (c *mockRunnerCall) ResponseWriter() http.ResponseWriter {
|
||||||
return c.rw
|
return c.rw
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockRunnerCall) StdErr() io.ReadWriteCloser {
|
func (c *mockRunnerCall) StdErr() io.ReadWriteCloser {
|
||||||
return c.stdErr
|
return c.stdErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockRunnerCall) Model() *models.Call {
|
func (c *mockRunnerCall) Model() *models.Call {
|
||||||
return c.model
|
return c.model
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error {
|
func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error {
|
||||||
bodyReader := call.Request().Body
|
bodyReader := call.RequestBody()
|
||||||
writeBufferSize := 10 * 1024 // 10KB
|
writeBufferSize := 10 * 1024 // 10KB
|
||||||
writeBuffer := make([]byte, writeBufferSize)
|
writeBuffer := make([]byte, writeBufferSize)
|
||||||
for {
|
for {
|
||||||
@@ -199,7 +199,12 @@ func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.Run
|
|||||||
if body.Finished.Success {
|
if body.Finished.Success {
|
||||||
logrus.Infof("Call finished successfully: %v", body.Finished.Details)
|
logrus.Infof("Call finished successfully: %v", body.Finished.Details)
|
||||||
} else {
|
} else {
|
||||||
logrus.Infof("Call finish unsuccessfully:: %v", body.Finished.Details)
|
logrus.Infof("Call finished unsuccessfully: %v", body.Finished.Details)
|
||||||
|
}
|
||||||
|
// There should be an EOF following the last packet
|
||||||
|
if _, err := protocolClient.Recv(); err != io.EOF {
|
||||||
|
logrus.WithError(err).Error("Did not receive expected EOF from runner stream")
|
||||||
|
done <- err
|
||||||
}
|
}
|
||||||
close(done)
|
close(done)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ type Runner interface {
|
|||||||
// processed by a RunnerPool
|
// processed by a RunnerPool
|
||||||
type RunnerCall interface {
|
type RunnerCall interface {
|
||||||
SlotDeadline() time.Time
|
SlotDeadline() time.Time
|
||||||
Request() *http.Request
|
RequestBody() io.ReadCloser
|
||||||
ResponseWriter() http.ResponseWriter
|
ResponseWriter() http.ResponseWriter
|
||||||
StdErr() io.ReadWriteCloser
|
StdErr() io.ReadWriteCloser
|
||||||
Model() *models.Call
|
Model() *models.Call
|
||||||
|
|||||||
Reference in New Issue
Block a user