From 77086ecc248682037d59faf6b0ae9a28ac3bceb3 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 17 May 2018 15:02:15 -0700 Subject: [PATCH] fn: lb-agent & runner gRPC updates (#1005) Breaking changes: *) Removed unused ACK/NACK definitions *) Extended Finished messages with error code/str --- api/agent/grpc/runner.pb.go | 214 +++++++++++++----------------------- api/agent/grpc/runner.proto | 17 +-- api/agent/pure_runner.go | 24 ++-- api/agent/runner_client.go | 23 ++-- 4 files changed, 104 insertions(+), 174 deletions(-) diff --git a/api/agent/grpc/runner.pb.go b/api/agent/grpc/runner.pb.go index 7abb1953f..548f7bad1 100644 --- a/api/agent/grpc/runner.pb.go +++ b/api/agent/grpc/runner.pb.go @@ -9,7 +9,6 @@ It is generated from these files: It has these top-level messages: TryCall - CallAcknowledged DataFrame HttpHeader HttpRespMeta @@ -59,39 +58,6 @@ func (m *TryCall) GetModelsCallJson() string { return "" } -// Call has been accepted and a slot allocated, or it's been rejected -type CallAcknowledged struct { - Committed bool `protobuf:"varint,1,opt,name=committed" json:"committed,omitempty"` - Details string `protobuf:"bytes,2,opt,name=details" json:"details,omitempty"` - SlotAllocationLatency string `protobuf:"bytes,3,opt,name=slot_allocation_latency,json=slotAllocationLatency" json:"slot_allocation_latency,omitempty"` -} - -func (m *CallAcknowledged) Reset() { *m = CallAcknowledged{} } -func (m *CallAcknowledged) String() string { return proto.CompactTextString(m) } -func (*CallAcknowledged) ProtoMessage() {} -func (*CallAcknowledged) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -func (m *CallAcknowledged) GetCommitted() bool { - if m != nil { - return m.Committed - } - return false -} - -func (m *CallAcknowledged) GetDetails() string { - if m != nil { - return m.Details - } - return "" -} - -func (m *CallAcknowledged) GetSlotAllocationLatency() string { - if m != nil { - return m.SlotAllocationLatency - } - return "" -} - // Data sent C2S and S2C - as soon as the runner sees the first of these it // will start running. If empty content, there must be one of these with eof. // The runner will send these for the body of the response, AFTER it has sent @@ -104,7 +70,7 @@ type DataFrame struct { func (m *DataFrame) Reset() { *m = DataFrame{} } func (m *DataFrame) String() string { return proto.CompactTextString(m) } func (*DataFrame) ProtoMessage() {} -func (*DataFrame) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*DataFrame) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (m *DataFrame) GetData() []byte { if m != nil { @@ -128,7 +94,7 @@ type HttpHeader struct { func (m *HttpHeader) Reset() { *m = HttpHeader{} } func (m *HttpHeader) String() string { return proto.CompactTextString(m) } func (*HttpHeader) ProtoMessage() {} -func (*HttpHeader) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*HttpHeader) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } func (m *HttpHeader) GetKey() string { if m != nil { @@ -152,7 +118,7 @@ type HttpRespMeta struct { func (m *HttpRespMeta) Reset() { *m = HttpRespMeta{} } func (m *HttpRespMeta) String() string { return proto.CompactTextString(m) } func (*HttpRespMeta) ProtoMessage() {} -func (*HttpRespMeta) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*HttpRespMeta) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } func (m *HttpRespMeta) GetStatusCode() int32 { if m != nil { @@ -179,11 +145,9 @@ type CallResultStart struct { func (m *CallResultStart) Reset() { *m = CallResultStart{} } func (m *CallResultStart) String() string { return proto.CompactTextString(m) } func (*CallResultStart) ProtoMessage() {} -func (*CallResultStart) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*CallResultStart) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } -type isCallResultStart_Meta interface { - isCallResultStart_Meta() -} +type isCallResultStart_Meta interface{ isCallResultStart_Meta() } type CallResultStart_Http struct { Http *HttpRespMeta `protobuf:"bytes,100,opt,name=http,oneof"` @@ -262,14 +226,16 @@ func _CallResultStart_OneofSizer(msg proto.Message) (n int) { // Call has really finished, it might have completed or crashed type CallFinished struct { - Success bool `protobuf:"varint,1,opt,name=success" json:"success,omitempty"` - Details string `protobuf:"bytes,2,opt,name=details" json:"details,omitempty"` + Success bool `protobuf:"varint,1,opt,name=success" json:"success,omitempty"` + Details string `protobuf:"bytes,2,opt,name=details" json:"details,omitempty"` + ErrorCode int32 `protobuf:"varint,3,opt,name=errorCode" json:"errorCode,omitempty"` + ErrorStr string `protobuf:"bytes,4,opt,name=errorStr" json:"errorStr,omitempty"` } func (m *CallFinished) Reset() { *m = CallFinished{} } func (m *CallFinished) String() string { return proto.CompactTextString(m) } func (*CallFinished) ProtoMessage() {} -func (*CallFinished) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*CallFinished) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } func (m *CallFinished) GetSuccess() bool { if m != nil { @@ -285,6 +251,20 @@ func (m *CallFinished) GetDetails() string { return "" } +func (m *CallFinished) GetErrorCode() int32 { + if m != nil { + return m.ErrorCode + } + return 0 +} + +func (m *CallFinished) GetErrorStr() string { + if m != nil { + return m.ErrorStr + } + return "" +} + type ClientMsg struct { // Types that are valid to be assigned to Body: // *ClientMsg_Try @@ -295,11 +275,9 @@ type ClientMsg struct { func (m *ClientMsg) Reset() { *m = ClientMsg{} } func (m *ClientMsg) String() string { return proto.CompactTextString(m) } func (*ClientMsg) ProtoMessage() {} -func (*ClientMsg) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*ClientMsg) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } -type isClientMsg_Body interface { - isClientMsg_Body() -} +type isClientMsg_Body interface{ isClientMsg_Body() } type ClientMsg_Try struct { Try *TryCall `protobuf:"bytes,1,opt,name=try,oneof"` @@ -408,7 +386,6 @@ func _ClientMsg_OneofSizer(msg proto.Message) (n int) { type RunnerMsg struct { // Types that are valid to be assigned to Body: - // *RunnerMsg_Acknowledged // *RunnerMsg_ResultStart // *RunnerMsg_Data // *RunnerMsg_Finished @@ -418,29 +395,23 @@ type RunnerMsg struct { func (m *RunnerMsg) Reset() { *m = RunnerMsg{} } func (m *RunnerMsg) String() string { return proto.CompactTextString(m) } func (*RunnerMsg) ProtoMessage() {} -func (*RunnerMsg) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } +func (*RunnerMsg) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } -type isRunnerMsg_Body interface { - isRunnerMsg_Body() -} +type isRunnerMsg_Body interface{ isRunnerMsg_Body() } -type RunnerMsg_Acknowledged struct { - Acknowledged *CallAcknowledged `protobuf:"bytes,1,opt,name=acknowledged,oneof"` -} type RunnerMsg_ResultStart struct { - ResultStart *CallResultStart `protobuf:"bytes,2,opt,name=result_start,json=resultStart,oneof"` + ResultStart *CallResultStart `protobuf:"bytes,1,opt,name=result_start,json=resultStart,oneof"` } type RunnerMsg_Data struct { - Data *DataFrame `protobuf:"bytes,3,opt,name=data,oneof"` + Data *DataFrame `protobuf:"bytes,2,opt,name=data,oneof"` } type RunnerMsg_Finished struct { - Finished *CallFinished `protobuf:"bytes,4,opt,name=finished,oneof"` + Finished *CallFinished `protobuf:"bytes,3,opt,name=finished,oneof"` } -func (*RunnerMsg_Acknowledged) isRunnerMsg_Body() {} -func (*RunnerMsg_ResultStart) isRunnerMsg_Body() {} -func (*RunnerMsg_Data) isRunnerMsg_Body() {} -func (*RunnerMsg_Finished) isRunnerMsg_Body() {} +func (*RunnerMsg_ResultStart) isRunnerMsg_Body() {} +func (*RunnerMsg_Data) isRunnerMsg_Body() {} +func (*RunnerMsg_Finished) isRunnerMsg_Body() {} func (m *RunnerMsg) GetBody() isRunnerMsg_Body { if m != nil { @@ -449,13 +420,6 @@ func (m *RunnerMsg) GetBody() isRunnerMsg_Body { return nil } -func (m *RunnerMsg) GetAcknowledged() *CallAcknowledged { - if x, ok := m.GetBody().(*RunnerMsg_Acknowledged); ok { - return x.Acknowledged - } - return nil -} - func (m *RunnerMsg) GetResultStart() *CallResultStart { if x, ok := m.GetBody().(*RunnerMsg_ResultStart); ok { return x.ResultStart @@ -480,7 +444,6 @@ func (m *RunnerMsg) GetFinished() *CallFinished { // XXX_OneofFuncs is for the internal use of the proto package. func (*RunnerMsg) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _RunnerMsg_OneofMarshaler, _RunnerMsg_OneofUnmarshaler, _RunnerMsg_OneofSizer, []interface{}{ - (*RunnerMsg_Acknowledged)(nil), (*RunnerMsg_ResultStart)(nil), (*RunnerMsg_Data)(nil), (*RunnerMsg_Finished)(nil), @@ -491,23 +454,18 @@ func _RunnerMsg_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { m := msg.(*RunnerMsg) // body switch x := m.Body.(type) { - case *RunnerMsg_Acknowledged: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.Acknowledged); err != nil { - return err - } case *RunnerMsg_ResultStart: - b.EncodeVarint(2<<3 | proto.WireBytes) + b.EncodeVarint(1<<3 | proto.WireBytes) if err := b.EncodeMessage(x.ResultStart); err != nil { return err } case *RunnerMsg_Data: - b.EncodeVarint(3<<3 | proto.WireBytes) + b.EncodeVarint(2<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Data); err != nil { return err } case *RunnerMsg_Finished: - b.EncodeVarint(4<<3 | proto.WireBytes) + b.EncodeVarint(3<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Finished); err != nil { return err } @@ -521,15 +479,7 @@ func _RunnerMsg_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { func _RunnerMsg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*RunnerMsg) switch tag { - case 1: // body.acknowledged - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(CallAcknowledged) - err := b.DecodeMessage(msg) - m.Body = &RunnerMsg_Acknowledged{msg} - return true, err - case 2: // body.result_start + case 1: // body.result_start if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -537,7 +487,7 @@ func _RunnerMsg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buff err := b.DecodeMessage(msg) m.Body = &RunnerMsg_ResultStart{msg} return true, err - case 3: // body.data + case 2: // body.data if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -545,7 +495,7 @@ func _RunnerMsg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buff err := b.DecodeMessage(msg) m.Body = &RunnerMsg_Data{msg} return true, err - case 4: // body.finished + case 3: // body.finished if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -562,24 +512,19 @@ func _RunnerMsg_OneofSizer(msg proto.Message) (n int) { m := msg.(*RunnerMsg) // body switch x := m.Body.(type) { - case *RunnerMsg_Acknowledged: - s := proto.Size(x.Acknowledged) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s case *RunnerMsg_ResultStart: s := proto.Size(x.ResultStart) - n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(1<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s case *RunnerMsg_Data: s := proto.Size(x.Data) - n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(2<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s case *RunnerMsg_Finished: s := proto.Size(x.Finished) - n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(3<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s case nil: @@ -596,7 +541,7 @@ type RunnerStatus struct { func (m *RunnerStatus) Reset() { *m = RunnerStatus{} } func (m *RunnerStatus) String() string { return proto.CompactTextString(m) } func (*RunnerStatus) ProtoMessage() {} -func (*RunnerStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } +func (*RunnerStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } func (m *RunnerStatus) GetActive() int32 { if m != nil { @@ -607,7 +552,6 @@ func (m *RunnerStatus) GetActive() int32 { func init() { proto.RegisterType((*TryCall)(nil), "TryCall") - proto.RegisterType((*CallAcknowledged)(nil), "CallAcknowledged") proto.RegisterType((*DataFrame)(nil), "DataFrame") proto.RegisterType((*HttpHeader)(nil), "HttpHeader") proto.RegisterType((*HttpRespMeta)(nil), "HttpRespMeta") @@ -761,41 +705,37 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("runner.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 566 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0x59, 0x6b, 0xdb, 0x40, - 0x10, 0xc7, 0xad, 0xd8, 0x71, 0xec, 0xb1, 0x92, 0xba, 0x4b, 0x0f, 0x93, 0x06, 0x1a, 0xd4, 0x03, - 0x43, 0x61, 0xd3, 0x3a, 0x3d, 0xde, 0x0a, 0x49, 0x9a, 0x20, 0x4a, 0x03, 0x65, 0x53, 0xfa, 0x6a, - 0x36, 0xd2, 0x44, 0x56, 0xb3, 0xd2, 0x1a, 0xed, 0x28, 0xc5, 0xaf, 0xfd, 0x88, 0xfd, 0x44, 0x65, - 0x57, 0x47, 0xdc, 0x40, 0xde, 0x34, 0xfb, 0x9f, 0xf3, 0xa7, 0x19, 0xf0, 0x8b, 0x32, 0xcf, 0xb1, - 0xe0, 0xcb, 0x42, 0x93, 0xde, 0x7d, 0x96, 0x68, 0x9d, 0x28, 0x3c, 0x70, 0xd6, 0x65, 0x79, 0x75, - 0x80, 0xd9, 0x92, 0x56, 0x95, 0x18, 0x1c, 0xc2, 0xd6, 0x8f, 0x62, 0x75, 0x22, 0x95, 0x62, 0x53, - 0x18, 0x67, 0x3a, 0x46, 0x65, 0xe6, 0x91, 0x54, 0x6a, 0xfe, 0xcb, 0xe8, 0x7c, 0xe2, 0xed, 0x7b, - 0xd3, 0xa1, 0xd8, 0xa9, 0xde, 0xad, 0xd7, 0x57, 0xa3, 0xf3, 0xe0, 0x8f, 0x07, 0x63, 0x6b, 0x1c, - 0x45, 0xd7, 0xb9, 0xfe, 0xad, 0x30, 0x4e, 0x30, 0x66, 0x7b, 0x30, 0x8c, 0x74, 0x96, 0xa5, 0x44, - 0x18, 0xbb, 0xb8, 0x81, 0xb8, 0x7d, 0x60, 0x13, 0xd8, 0x8a, 0x91, 0x64, 0xaa, 0xcc, 0x64, 0xc3, - 0xe5, 0x6c, 0x4c, 0xf6, 0x11, 0x9e, 0x1a, 0xa5, 0x69, 0x2e, 0x95, 0xd2, 0x91, 0xa4, 0x54, 0xe7, - 0x73, 0x25, 0x09, 0xf3, 0x68, 0x35, 0xe9, 0x3a, 0xcf, 0xc7, 0x56, 0x3e, 0x6a, 0xd5, 0x6f, 0x95, - 0x18, 0xbc, 0x83, 0xe1, 0x17, 0x49, 0xf2, 0xac, 0x90, 0x19, 0x32, 0x06, 0xbd, 0x58, 0x92, 0x74, - 0x75, 0x7d, 0xe1, 0xbe, 0xd9, 0x18, 0xba, 0xa8, 0xaf, 0x5c, 0xb9, 0x81, 0xb0, 0x9f, 0xc1, 0x7b, - 0x80, 0x90, 0x68, 0x19, 0xa2, 0x8c, 0xb1, 0xb0, 0xfa, 0x35, 0xae, 0xea, 0x11, 0xed, 0x27, 0x7b, - 0x04, 0x9b, 0x37, 0x52, 0x95, 0x58, 0xb7, 0x58, 0x19, 0xc1, 0x4f, 0xf0, 0x6d, 0x94, 0x40, 0xb3, - 0x3c, 0x47, 0x92, 0xec, 0x39, 0x8c, 0x0c, 0x49, 0x2a, 0xcd, 0x3c, 0xd2, 0x31, 0xba, 0xf8, 0x4d, - 0x01, 0xd5, 0xd3, 0x89, 0x8e, 0x91, 0xbd, 0x82, 0xad, 0x85, 0x2b, 0x61, 0x67, 0xed, 0x4e, 0x47, - 0xb3, 0x11, 0xbf, 0x2d, 0x2b, 0x1a, 0x2d, 0xf8, 0x0c, 0x0f, 0x2c, 0x44, 0x81, 0xa6, 0x54, 0x74, - 0x41, 0xb2, 0x20, 0xf6, 0x02, 0x7a, 0x0b, 0xa2, 0xe5, 0x24, 0xde, 0xf7, 0xa6, 0xa3, 0xd9, 0x36, - 0x5f, 0xaf, 0x1b, 0x76, 0x84, 0x13, 0x8f, 0xfb, 0xd0, 0xcb, 0x90, 0x64, 0x70, 0x0c, 0xbe, 0x8d, - 0x3f, 0x4b, 0xf3, 0xd4, 0x2c, 0x2a, 0xc4, 0xa6, 0x8c, 0x22, 0x34, 0xa6, 0xc6, 0xdf, 0x98, 0xf7, - 0xc3, 0x0f, 0x2e, 0x60, 0x78, 0xa2, 0x52, 0xcc, 0xe9, 0xdc, 0x24, 0x6c, 0x0f, 0xba, 0x54, 0x54, - 0x40, 0x46, 0xb3, 0x01, 0xaf, 0xf7, 0x22, 0xec, 0x08, 0xfb, 0xcc, 0xf6, 0x6b, 0xc4, 0x1b, 0x4e, - 0x06, 0xde, 0xc2, 0xb7, 0x8d, 0x59, 0xc5, 0x36, 0x76, 0xa9, 0xe3, 0x55, 0xf0, 0xd7, 0x83, 0xa1, - 0x70, 0x1b, 0x68, 0xb3, 0x7e, 0x02, 0x5f, 0xae, 0xed, 0x49, 0x9d, 0xfe, 0x21, 0xbf, 0xbb, 0x40, - 0x61, 0x47, 0xfc, 0xe7, 0xc8, 0x3e, 0x80, 0x5f, 0x38, 0x36, 0x73, 0x63, 0xe1, 0xd4, 0x85, 0xc7, - 0xfc, 0x0e, 0xb4, 0xb0, 0x23, 0x46, 0xc5, 0x1a, 0xc3, 0xa6, 0xcf, 0xee, 0x7d, 0x7d, 0xb2, 0x37, - 0x30, 0xb8, 0xaa, 0xa1, 0x4d, 0x7a, 0x35, 0xe9, 0x75, 0x92, 0x61, 0x47, 0xb4, 0x0e, 0xed, 0x50, - 0xaf, 0xc1, 0xaf, 0x66, 0xba, 0x70, 0x3f, 0x9a, 0x3d, 0x81, 0xbe, 0x8c, 0x28, 0xbd, 0xa9, 0x96, - 0x65, 0x53, 0xd4, 0xd6, 0x2c, 0x81, 0x9d, 0xca, 0xef, 0xbb, 0xbd, 0xaf, 0x48, 0x2b, 0xf6, 0x12, - 0xfa, 0xa7, 0x79, 0x22, 0x13, 0x64, 0xc0, 0x5b, 0xd8, 0xbb, 0xc0, 0x5b, 0x44, 0x53, 0xef, 0xad, - 0xc7, 0x0e, 0xa0, 0xdf, 0x64, 0xe6, 0xd5, 0xc1, 0xf2, 0xe6, 0x60, 0xf9, 0xa9, 0x3d, 0xd8, 0xdd, - 0x6d, 0xbe, 0xde, 0xc0, 0x65, 0xdf, 0xc9, 0x87, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x05, 0xe4, - 0x7b, 0x85, 0xed, 0x03, 0x00, 0x00, + // 511 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x5d, 0x8b, 0xd3, 0x40, + 0x14, 0x6d, 0xb6, 0x9f, 0xb9, 0xc9, 0xae, 0x65, 0x10, 0x09, 0x75, 0xc1, 0x12, 0x3f, 0x08, 0x08, + 0x53, 0xed, 0xea, 0xab, 0x0f, 0xd6, 0x5d, 0x82, 0xb0, 0x20, 0x53, 0xf1, 0xb5, 0x4c, 0x93, 0xdb, + 0xb4, 0x3a, 0xcd, 0x94, 0x99, 0xc9, 0x42, 0xc1, 0x3f, 0xe2, 0xbf, 0x95, 0x99, 0xa4, 0xd9, 0xe2, + 0x8b, 0x6f, 0x73, 0xe6, 0xdc, 0xaf, 0x73, 0xee, 0x0c, 0x84, 0xaa, 0x2a, 0x4b, 0x54, 0xf4, 0xa0, + 0xa4, 0x91, 0x93, 0xe7, 0x85, 0x94, 0x85, 0xc0, 0x99, 0x43, 0xeb, 0x6a, 0x33, 0xc3, 0xfd, 0xc1, + 0x1c, 0x6b, 0x32, 0xbe, 0x81, 0xe1, 0x77, 0x75, 0x5c, 0x70, 0x21, 0x48, 0x02, 0xe3, 0xbd, 0xcc, + 0x51, 0xe8, 0x55, 0xc6, 0x85, 0x58, 0xfd, 0xd4, 0xb2, 0x8c, 0xbc, 0xa9, 0x97, 0xf8, 0xec, 0xaa, + 0xbe, 0xb7, 0x51, 0x5f, 0xb5, 0x2c, 0xe3, 0xf7, 0xe0, 0x7f, 0xe1, 0x86, 0xdf, 0x29, 0xbe, 0x47, + 0x42, 0xa0, 0x97, 0x73, 0xc3, 0x5d, 0x68, 0xc8, 0xdc, 0x99, 0x8c, 0xa1, 0x8b, 0x72, 0x13, 0x5d, + 0x4c, 0xbd, 0x64, 0xc4, 0xec, 0x31, 0xfe, 0x00, 0x90, 0x1a, 0x73, 0x48, 0x91, 0xe7, 0xa8, 0x2c, + 0xff, 0x0b, 0x8f, 0x4d, 0x75, 0x7b, 0x24, 0x4f, 0xa1, 0xff, 0xc0, 0x45, 0x85, 0x2e, 0xc7, 0x67, + 0x35, 0x88, 0x7f, 0x40, 0x68, 0xb3, 0x18, 0xea, 0xc3, 0x3d, 0x1a, 0x4e, 0x5e, 0x40, 0xa0, 0x0d, + 0x37, 0x95, 0x5e, 0x65, 0x32, 0x47, 0x97, 0xdf, 0x67, 0x50, 0x5f, 0x2d, 0x64, 0x8e, 0xe4, 0x35, + 0x0c, 0xb7, 0xae, 0x85, 0x8e, 0x2e, 0xa6, 0xdd, 0x24, 0x98, 0x07, 0xf4, 0xb1, 0x2d, 0x3b, 0x71, + 0xf1, 0x27, 0x78, 0x62, 0xc5, 0x30, 0xd4, 0x95, 0x30, 0x4b, 0xc3, 0x95, 0x21, 0x2f, 0xa1, 0xb7, + 0x35, 0xe6, 0x10, 0xe5, 0x53, 0x2f, 0x09, 0xe6, 0x97, 0xf4, 0xbc, 0x6f, 0xda, 0x61, 0x8e, 0xfc, + 0x3c, 0x80, 0xde, 0x1e, 0x0d, 0x8f, 0x7f, 0x43, 0x68, 0xf3, 0xef, 0x76, 0xe5, 0x4e, 0x6f, 0x31, + 0x27, 0x11, 0x0c, 0x75, 0x95, 0x65, 0xa8, 0xb5, 0x9b, 0x69, 0xc4, 0x4e, 0xd0, 0x32, 0x39, 0x1a, + 0xbe, 0x13, 0xba, 0x51, 0x76, 0x82, 0xe4, 0x1a, 0x7c, 0x54, 0x4a, 0x2a, 0x3b, 0x77, 0xd4, 0x75, + 0x4a, 0x1e, 0x2f, 0xc8, 0x04, 0x46, 0x0e, 0x2c, 0x8d, 0x8a, 0x7a, 0x2e, 0xb1, 0xc5, 0xf1, 0x12, + 0xfc, 0x85, 0xd8, 0x61, 0x69, 0xee, 0x75, 0x41, 0xae, 0xa1, 0x6b, 0x54, 0x6d, 0x65, 0x30, 0x1f, + 0xd1, 0x66, 0x99, 0x69, 0x87, 0xd9, 0x6b, 0x32, 0x6d, 0x96, 0x73, 0xe1, 0x68, 0xa0, 0xed, 0xda, + 0xac, 0x24, 0xcb, 0x58, 0x49, 0x6b, 0x99, 0x1f, 0xe3, 0x3f, 0x1e, 0xf8, 0xcc, 0x3d, 0x1b, 0x5b, + 0xf5, 0x23, 0x84, 0xca, 0x99, 0xb3, 0xd2, 0xd6, 0x9d, 0xa6, 0xfc, 0x98, 0xfe, 0xe3, 0x5a, 0xda, + 0x61, 0x81, 0x3a, 0x33, 0xf1, 0xbf, 0xed, 0xc8, 0x5b, 0x18, 0x6d, 0x1a, 0xd7, 0x9c, 0x68, 0x6b, + 0xf5, 0xb9, 0x95, 0x69, 0x87, 0xb5, 0x01, 0xed, 0x6c, 0x6f, 0x20, 0xac, 0x47, 0x5b, 0xba, 0x4d, + 0x93, 0x67, 0x30, 0xe0, 0x99, 0xd9, 0x3d, 0xd4, 0xaf, 0xa5, 0xcf, 0x1a, 0x34, 0x2f, 0xe0, 0xaa, + 0x8e, 0xfb, 0x66, 0xdf, 0x76, 0x26, 0x05, 0x79, 0x05, 0x83, 0xdb, 0xb2, 0xe0, 0x05, 0x12, 0xa0, + 0xad, 0x67, 0x13, 0xa0, 0xad, 0xd2, 0xc4, 0x7b, 0xe7, 0x91, 0x19, 0x0c, 0x4e, 0x95, 0x69, 0xfd, + 0x59, 0xe8, 0xe9, 0xb3, 0xd0, 0x5b, 0xfb, 0x59, 0x26, 0x97, 0xf4, 0x7c, 0x80, 0xf5, 0xc0, 0xd1, + 0x37, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xdc, 0x15, 0x71, 0xf0, 0x69, 0x03, 0x00, 0x00, } diff --git a/api/agent/grpc/runner.proto b/api/agent/grpc/runner.proto index e6e18db1d..3511bca03 100644 --- a/api/agent/grpc/runner.proto +++ b/api/agent/grpc/runner.proto @@ -7,13 +7,6 @@ message TryCall { string models_call_json = 1; } -// Call has been accepted and a slot allocated, or it's been rejected -message CallAcknowledged { - bool committed = 1; - string details = 2; - string slot_allocation_latency = 3; -} - // Data sent C2S and S2C - as soon as the runner sees the first of these it // will start running. If empty content, there must be one of these with eof. // The runner will send these for the body of the response, AFTER it has sent @@ -45,9 +38,10 @@ message CallResultStart { message CallFinished { bool success = 1; string details = 2; + int32 errorCode = 3; + string errorStr = 4; } - message ClientMsg { oneof body { TryCall try = 1; @@ -57,10 +51,9 @@ message ClientMsg { message RunnerMsg { oneof body { - CallAcknowledged acknowledged = 1; - CallResultStart result_start = 2; - DataFrame data = 3; - CallFinished finished = 4; + CallResultStart result_start = 1; + DataFrame data = 2; + CallFinished finished = 3; } } diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 233f4ebe4..25ed11b9a 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -199,19 +199,19 @@ func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error { return err } -func convertError(err error) string { - code := models.GetAPIErrorCode(err) - return fmt.Sprintf("%d:%s", code, err.Error()) -} - // enqueueCallResponse enqueues a Submit() response to the LB // and initiates a graceful shutdown of the session. func (ch *callHandle) enqueueCallResponse(err error) { var details string + var errCode int + var errStr string if err != nil { - details = convertError(err) - } else if ch.c != nil { + errCode = models.GetAPIErrorCode(err) + errStr = err.Error() + } + + if ch.c != nil { details = ch.c.Model().ID } @@ -219,18 +219,20 @@ func (ch *callHandle) enqueueCallResponse(err error) { errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{ Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: err == nil, - Details: details, + Success: err == nil, + Details: details, + ErrorCode: int32(errCode), + ErrorStr: errStr, }}}) if errTmp != nil { - logrus.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v", details) + logrus.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr) return } errTmp = ch.finalize() if errTmp != nil { - logrus.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v", details) + logrus.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr) } } diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index aec5871dd..2dcb7ec63 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -5,8 +5,6 @@ import ( "encoding/json" "errors" "io" - "strconv" - "strings" "time" "google.golang.org/grpc" @@ -188,19 +186,16 @@ func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.Runne } } -func parseError(details string) error { - tokens := strings.SplitN(details, ":", 2) - if len(tokens) != 2 || tokens[0] == "" || tokens[1] == "" { - return errors.New(details) +func parseError(msg *pb.CallFinished) error { + if msg.GetSuccess() { + return nil } - code, err := strconv.ParseInt(tokens[0], 10, 64) - if err != nil { - return errors.New(details) + eCode := msg.GetErrorCode() + eStr := msg.GetErrorStr() + if eStr == "" { + eStr = "Unknown Error From Pure Runner" } - if code != 0 { - return models.NewAPIError(int(code), errors.New(tokens[1])) - } - return errors.New(tokens[1]) + return models.NewAPIError(int(eCode), errors.New(eStr)) } func tryQueueError(err error, done chan error) { @@ -263,7 +258,7 @@ DataLoop: case *pb.RunnerMsg_Finished: logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details) if !body.Finished.Success { - err := parseError(body.Finished.GetDetails()) + err := parseError(body.Finished) tryQueueError(err, done) } break DataLoop