fn: add requests received/handled in Status responses (#1132)

This is useful as additional data to inflight requests.
Callers can determine request arrival and processing
rate.
This commit is contained in:
Tolga Ceylan
2018-07-20 16:00:02 -07:00
committed by GitHub
parent c0bb60fd67
commit db7cbf73e2
5 changed files with 119 additions and 82 deletions

View File

@@ -575,16 +575,18 @@ func _RunnerMsg_OneofSizer(msg proto.Message) (n int) {
} }
type RunnerStatus struct { type RunnerStatus struct {
Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"` Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"`
Failed bool `protobuf:"varint,3,opt,name=failed" json:"failed,omitempty"` Failed bool `protobuf:"varint,3,opt,name=failed" json:"failed,omitempty"`
Id string `protobuf:"bytes,4,opt,name=id" json:"id,omitempty"` Id string `protobuf:"bytes,4,opt,name=id" json:"id,omitempty"`
Details string `protobuf:"bytes,5,opt,name=details" json:"details,omitempty"` Details string `protobuf:"bytes,5,opt,name=details" json:"details,omitempty"`
ErrorCode int32 `protobuf:"varint,6,opt,name=errorCode" json:"errorCode,omitempty"` ErrorCode int32 `protobuf:"varint,6,opt,name=errorCode" json:"errorCode,omitempty"`
ErrorStr string `protobuf:"bytes,7,opt,name=errorStr" json:"errorStr,omitempty"` ErrorStr string `protobuf:"bytes,7,opt,name=errorStr" json:"errorStr,omitempty"`
CreatedAt string `protobuf:"bytes,8,opt,name=createdAt" json:"createdAt,omitempty"` CreatedAt string `protobuf:"bytes,8,opt,name=createdAt" json:"createdAt,omitempty"`
StartedAt string `protobuf:"bytes,9,opt,name=startedAt" json:"startedAt,omitempty"` StartedAt string `protobuf:"bytes,9,opt,name=startedAt" json:"startedAt,omitempty"`
CompletedAt string `protobuf:"bytes,10,opt,name=completedAt" json:"completedAt,omitempty"` CompletedAt string `protobuf:"bytes,10,opt,name=completedAt" json:"completedAt,omitempty"`
Cached bool `protobuf:"varint,11,opt,name=cached" json:"cached,omitempty"` Cached bool `protobuf:"varint,11,opt,name=cached" json:"cached,omitempty"`
RequestsReceived uint64 `protobuf:"varint,12,opt,name=requestsReceived" json:"requestsReceived,omitempty"`
RequestsHandled uint64 `protobuf:"varint,13,opt,name=requestsHandled" json:"requestsHandled,omitempty"`
} }
func (m *RunnerStatus) Reset() { *m = RunnerStatus{} } func (m *RunnerStatus) Reset() { *m = RunnerStatus{} }
@@ -662,6 +664,20 @@ func (m *RunnerStatus) GetCached() bool {
return false return false
} }
func (m *RunnerStatus) GetRequestsReceived() uint64 {
if m != nil {
return m.RequestsReceived
}
return 0
}
func (m *RunnerStatus) GetRequestsHandled() uint64 {
if m != nil {
return m.RequestsHandled
}
return 0
}
func init() { func init() {
proto.RegisterType((*TryCall)(nil), "TryCall") proto.RegisterType((*TryCall)(nil), "TryCall")
proto.RegisterType((*DataFrame)(nil), "DataFrame") proto.RegisterType((*DataFrame)(nil), "DataFrame")
@@ -817,47 +833,49 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("runner.proto", fileDescriptor0) } func init() { proto.RegisterFile("runner.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 665 bytes of a gzipped FileDescriptorProto // 701 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xff, 0x8a, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xed, 0x8a, 0xd3, 0x4c,
0x10, 0x6e, 0xd2, 0x36, 0x4d, 0x27, 0xb9, 0x1f, 0x2c, 0x22, 0xa1, 0x1e, 0x58, 0xa2, 0x42, 0x41, 0x14, 0x6e, 0xfa, 0xdd, 0x93, 0xec, 0x6e, 0x19, 0x5e, 0x5e, 0x42, 0x5d, 0xb0, 0x44, 0x85, 0xa2,
0xc8, 0x69, 0x55, 0x38, 0x04, 0x05, 0x3d, 0x7b, 0x44, 0xe1, 0x40, 0xb6, 0xe2, 0xbf, 0x65, 0x2f, 0x90, 0xd5, 0xaa, 0xb0, 0x08, 0x0a, 0xba, 0x76, 0x89, 0xc2, 0x82, 0x4c, 0xc5, 0xbf, 0x65, 0x36,
0xbb, 0x6d, 0xa3, 0xdb, 0x6c, 0xd9, 0xdd, 0x1e, 0xf6, 0x4d, 0xf4, 0xa5, 0x7c, 0x00, 0x9f, 0x46, 0x73, 0xda, 0x46, 0xd3, 0x4c, 0x9d, 0x99, 0x2e, 0xf6, 0x52, 0xbc, 0x29, 0x2f, 0xc0, 0x8b, 0xf0,
0x76, 0x93, 0xe6, 0xc2, 0x09, 0x3d, 0xff, 0xcb, 0x7c, 0xdf, 0xce, 0xce, 0xcc, 0xf7, 0x4d, 0x16, 0x1a, 0x64, 0x26, 0x69, 0x36, 0xec, 0x42, 0xd7, 0x7f, 0x39, 0xcf, 0x73, 0xbe, 0x9f, 0x93, 0x01,
0x42, 0xb9, 0x29, 0x0a, 0x26, 0x93, 0xb5, 0x14, 0x5a, 0x0c, 0x1e, 0x2c, 0x84, 0x58, 0x70, 0x76, 0x4f, 0x6e, 0xb2, 0x0c, 0x65, 0xb8, 0x96, 0x42, 0x8b, 0xc1, 0xbd, 0x85, 0x10, 0x8b, 0x14, 0x4f,
0x6a, 0xa3, 0xab, 0xcd, 0xfc, 0x94, 0xad, 0xd6, 0x7a, 0x5b, 0x92, 0xf1, 0x6f, 0x07, 0x7a, 0x5f, 0xac, 0x75, 0xb9, 0x99, 0x9f, 0xe0, 0x6a, 0xad, 0xb7, 0x39, 0x19, 0xfc, 0x72, 0xa0, 0xf3, 0x59,
0xe4, 0xf6, 0x9c, 0x70, 0x8e, 0x46, 0x70, 0xbc, 0x12, 0x94, 0x71, 0x35, 0xcb, 0x08, 0xe7, 0xb3, 0x6e, 0xcf, 0x58, 0x9a, 0x92, 0x11, 0xf4, 0x57, 0x82, 0x63, 0xaa, 0x66, 0x31, 0x4b, 0xd3, 0xd9,
0x6f, 0x4a, 0x14, 0x91, 0x33, 0x74, 0x46, 0x7d, 0x7c, 0x58, 0xe2, 0xe6, 0xd4, 0x27, 0x25, 0x0a, 0x57, 0x25, 0x32, 0xdf, 0x19, 0x3a, 0xa3, 0x1e, 0x3d, 0xcc, 0x71, 0xe3, 0xf5, 0x51, 0x89, 0x8c,
0x34, 0x84, 0x50, 0x71, 0xa1, 0x67, 0x4b, 0xa2, 0x96, 0xb3, 0x9c, 0x46, 0xae, 0x3d, 0x05, 0x06, 0x0c, 0xc1, 0x53, 0xa9, 0xd0, 0xb3, 0x25, 0x53, 0xcb, 0x59, 0xc2, 0xfd, 0xba, 0xf5, 0x02, 0x83,
0x4b, 0x89, 0x5a, 0x7e, 0xa4, 0xe8, 0x0c, 0x80, 0xfd, 0xd0, 0xac, 0x50, 0xb9, 0x28, 0x54, 0xd4, 0x45, 0x4c, 0x2d, 0x3f, 0x70, 0x72, 0x0a, 0x80, 0x3f, 0x34, 0x66, 0x2a, 0x11, 0x99, 0xf2, 0x1b,
0x1e, 0xb6, 0x47, 0xc1, 0x38, 0x4a, 0xaa, 0x4a, 0xc9, 0xa4, 0xa6, 0x26, 0x85, 0x96, 0x5b, 0xdc, 0xc3, 0xc6, 0xc8, 0x1d, 0xfb, 0x61, 0x51, 0x29, 0x9c, 0x94, 0xd4, 0x24, 0xd3, 0x72, 0x4b, 0x2b,
0x38, 0x3b, 0x78, 0x03, 0x47, 0xb7, 0x68, 0x74, 0x0c, 0xed, 0xef, 0x6c, 0x5b, 0xf5, 0x62, 0x3e, 0xbe, 0x83, 0xd7, 0x70, 0x74, 0x83, 0x26, 0x7d, 0x68, 0x7c, 0xc3, 0x6d, 0xd1, 0x8b, 0xf9, 0x24,
0xd1, 0x3d, 0xe8, 0x5e, 0x13, 0xbe, 0x61, 0x55, 0xe5, 0x32, 0x78, 0xed, 0x9e, 0x39, 0xf1, 0x73, 0xff, 0x41, 0xeb, 0x8a, 0xa5, 0x1b, 0x2c, 0x2a, 0xe7, 0xc6, 0xab, 0xfa, 0xa9, 0x13, 0x3c, 0x83,
0xe8, 0x7f, 0x20, 0x9a, 0x5c, 0x48, 0xb2, 0x62, 0x08, 0x41, 0x87, 0x12, 0x4d, 0x6c, 0x66, 0x88, 0xde, 0x7b, 0xa6, 0xd9, 0xb9, 0x64, 0x2b, 0x24, 0x04, 0x9a, 0x9c, 0x69, 0x66, 0x23, 0x3d, 0x6a,
0xed, 0xb7, 0xb9, 0x8c, 0x89, 0xb9, 0x4d, 0xf4, 0xb1, 0xf9, 0x8c, 0x5f, 0x02, 0xa4, 0x5a, 0xaf, 0xbf, 0x4d, 0x32, 0x14, 0x73, 0x1b, 0xd8, 0xa5, 0xe6, 0x33, 0x78, 0x01, 0x10, 0x69, 0xbd, 0x8e,
0x53, 0x46, 0x28, 0x93, 0xff, 0x5b, 0x2c, 0xfe, 0x0a, 0xa1, 0xc9, 0xc2, 0x4c, 0xad, 0x2f, 0x99, 0x90, 0x71, 0x94, 0xff, 0x5a, 0x2c, 0xf8, 0x02, 0x9e, 0x89, 0xa2, 0xa8, 0xd6, 0x17, 0xa8, 0x19,
0x26, 0xe8, 0x21, 0x04, 0x4a, 0x13, 0xbd, 0x51, 0xb3, 0x4c, 0x50, 0x66, 0xf3, 0xbb, 0x18, 0x4a, 0xb9, 0x0f, 0xae, 0xd2, 0x4c, 0x6f, 0xd4, 0x2c, 0x16, 0x1c, 0x6d, 0x7c, 0x8b, 0x42, 0x0e, 0x9d,
0xe8, 0x5c, 0x50, 0x86, 0x9e, 0x40, 0x6f, 0x69, 0x4b, 0xa8, 0xc8, 0xb5, 0x7a, 0x04, 0xc9, 0x4d, 0x09, 0x8e, 0xe4, 0x11, 0x74, 0x96, 0xb6, 0x84, 0xf2, 0xeb, 0x76, 0x1f, 0x6e, 0x78, 0x5d, 0x96,
0x59, 0xbc, 0xe3, 0xe2, 0xb7, 0x70, 0x64, 0x34, 0xc2, 0x4c, 0x6d, 0xb8, 0x9e, 0x6a, 0x22, 0x35, 0xee, 0xb8, 0xe0, 0x0d, 0x1c, 0x99, 0x1d, 0x51, 0x54, 0x9b, 0x54, 0x4f, 0x35, 0x93, 0x9a, 0x3c,
0x7a, 0x04, 0x9d, 0xa5, 0xd6, 0xeb, 0x88, 0x0e, 0x9d, 0x51, 0x30, 0x3e, 0x48, 0x9a, 0x75, 0xd3, 0x80, 0xe6, 0x52, 0xeb, 0xb5, 0xcf, 0x87, 0xce, 0xc8, 0x1d, 0x1f, 0x84, 0xd5, 0xba, 0x51, 0x8d,
0x16, 0xb6, 0xe4, 0x7b, 0x0f, 0x3a, 0x2b, 0xa6, 0x49, 0xfc, 0xc7, 0x81, 0xd0, 0x5c, 0x70, 0x91, 0x5a, 0xf2, 0x5d, 0x1b, 0x9a, 0x2b, 0xd4, 0x2c, 0xf8, 0xed, 0x80, 0x67, 0x12, 0x9c, 0x27, 0x59,
0x17, 0xb9, 0x5a, 0x32, 0x8a, 0x22, 0xe8, 0xa9, 0x4d, 0x96, 0x31, 0xa5, 0x6c, 0x53, 0x3e, 0xde, 0xa2, 0x96, 0xc8, 0x89, 0x0f, 0x1d, 0xb5, 0x89, 0x63, 0x54, 0xca, 0x36, 0xd5, 0xa5, 0x3b, 0xd3,
0x85, 0x86, 0xa1, 0x4c, 0x93, 0x9c, 0xab, 0x6a, 0xb4, 0x5d, 0x88, 0x4e, 0xa0, 0xcf, 0xa4, 0x14, 0x30, 0x1c, 0x35, 0x4b, 0x52, 0x55, 0x8c, 0xb6, 0x33, 0xc9, 0x31, 0xf4, 0x50, 0x4a, 0x21, 0x4d,
0xd2, 0x34, 0x1e, 0xb5, 0xed, 0x28, 0x37, 0x00, 0x1a, 0x80, 0x6f, 0x83, 0xa9, 0x96, 0x51, 0xc7, 0xe3, 0x7e, 0xc3, 0x8e, 0x72, 0x0d, 0x90, 0x01, 0x74, 0xad, 0x31, 0xd5, 0xd2, 0x6f, 0xda, 0xc0,
0x26, 0xd6, 0xb1, 0xc9, 0xcc, 0x24, 0x23, 0x9a, 0xd1, 0x77, 0x3a, 0xea, 0x5a, 0xf2, 0x06, 0x30, 0xd2, 0x36, 0x91, 0xb1, 0x44, 0xa6, 0x91, 0xbf, 0xd5, 0x7e, 0xcb, 0x92, 0xd7, 0x80, 0x61, 0x95,
0xac, 0x32, 0x23, 0x59, 0xd6, 0x2b, 0xd9, 0x1a, 0x40, 0x43, 0x08, 0x32, 0xb1, 0x5a, 0x73, 0x56, 0x19, 0xc9, 0xb2, 0xed, 0x9c, 0x2d, 0x01, 0x32, 0x04, 0x37, 0x16, 0xab, 0x75, 0x8a, 0x39, 0xdf,
0xf2, 0x3d, 0xcb, 0x37, 0xa1, 0x78, 0x0a, 0xfd, 0x73, 0x9e, 0xb3, 0x42, 0x5f, 0xaa, 0x05, 0x3a, 0xb1, 0x7c, 0x15, 0x0a, 0xa6, 0xd0, 0x3b, 0x4b, 0x13, 0xcc, 0xf4, 0x85, 0x5a, 0x90, 0x63, 0x68,
0x81, 0xb6, 0x96, 0xa5, 0x53, 0xc1, 0xd8, 0xdf, 0x2d, 0x57, 0xda, 0xc2, 0x06, 0x46, 0xc3, 0xca, 0x68, 0x99, 0x2b, 0xe5, 0x8e, 0xbb, 0xbb, 0xe3, 0x8a, 0x6a, 0xd4, 0xc0, 0x64, 0x58, 0x68, 0x5f,
0x7b, 0xd7, 0xd2, 0x90, 0xd4, 0x5b, 0x61, 0x14, 0x33, 0x8c, 0x51, 0xec, 0x4a, 0xd0, 0x6d, 0xfc, 0xb7, 0x34, 0x84, 0xe5, 0x55, 0x98, 0x8d, 0x19, 0xc6, 0x6c, 0xec, 0x52, 0xf0, 0x6d, 0xf0, 0xd3,
0xcb, 0x81, 0x3e, 0xb6, 0x7f, 0x8c, 0xb9, 0xf5, 0x15, 0x84, 0xd2, 0x6a, 0x3f, 0xb3, 0x8d, 0x55, 0x81, 0x1e, 0xb5, 0x7f, 0x8c, 0xc9, 0xfa, 0x12, 0x3c, 0x69, 0x77, 0x3f, 0xb3, 0x8d, 0x15, 0xe9,
0xd7, 0x1f, 0x27, 0xb7, 0x4c, 0x49, 0x5b, 0x38, 0x90, 0x0d, 0x8f, 0xee, 0x2c, 0x87, 0x9e, 0x82, 0xfb, 0xe1, 0x0d, 0x51, 0xa2, 0x1a, 0x75, 0x65, 0x45, 0xa3, 0x3b, 0xcb, 0x91, 0x27, 0xd0, 0x9d,
0x3f, 0xaf, 0x3c, 0xb1, 0x92, 0x1a, 0x27, 0x9b, 0x46, 0xa5, 0x2d, 0x5c, 0x1f, 0xa8, 0x7b, 0xfb, 0x17, 0x9a, 0xd8, 0x95, 0x1a, 0x25, 0xab, 0x42, 0x45, 0x35, 0x5a, 0x3a, 0x94, 0xbd, 0xfd, 0xa9,
0xe9, 0x42, 0x58, 0xf6, 0x36, 0xb5, 0x9b, 0x84, 0xee, 0x83, 0x47, 0x32, 0x9d, 0x5f, 0x97, 0xdb, 0x83, 0x97, 0xf7, 0x36, 0xb5, 0x97, 0x44, 0xfe, 0x87, 0x36, 0x8b, 0x75, 0x72, 0x95, 0x5f, 0x63,
0xd8, 0xc5, 0x55, 0x64, 0xf0, 0x39, 0xc9, 0x79, 0x75, 0xb7, 0x8f, 0xab, 0x08, 0x1d, 0x82, 0x9b, 0x8b, 0x16, 0x96, 0xc1, 0xe7, 0x2c, 0x49, 0x8b, 0xdc, 0x5d, 0x5a, 0x58, 0xe4, 0x10, 0xea, 0x09,
0xd3, 0xca, 0x25, 0x37, 0xa7, 0x4d, 0xcf, 0xbb, 0x7b, 0x3c, 0xf7, 0xf6, 0x79, 0xde, 0xdb, 0xe7, 0x2f, 0x54, 0xaa, 0x27, 0xbc, 0xaa, 0x79, 0x6b, 0x8f, 0xe6, 0xed, 0x7d, 0x9a, 0x77, 0xf6, 0x69,
0xb9, 0xbf, 0xd7, 0xf3, 0xfe, 0x1d, 0x9e, 0xc3, 0x3f, 0x9e, 0x9b, 0xc9, 0x32, 0x92, 0x19, 0xd5, 0xde, 0xdd, 0xab, 0x79, 0xef, 0x0e, 0xcd, 0xe1, 0x96, 0xe6, 0x66, 0xb2, 0x98, 0xc5, 0x66, 0x6b,
0x82, 0x72, 0xb2, 0x32, 0x1a, 0x2f, 0xe0, 0xb0, 0x54, 0xe6, 0xb3, 0x79, 0xc9, 0x32, 0xc1, 0xd1, 0x6e, 0x3e, 0x59, 0x6e, 0x91, 0xc7, 0xd0, 0x97, 0xf8, 0x7d, 0x83, 0x4a, 0x2b, 0x8a, 0x31, 0x26,
0x63, 0xf0, 0x26, 0xc5, 0x82, 0x2c, 0x18, 0x82, 0xa4, 0x5e, 0x93, 0x01, 0x24, 0xb5, 0xb9, 0x23, 0x57, 0xc8, 0x7d, 0x6f, 0xe8, 0x8c, 0x9a, 0xf4, 0x16, 0x4e, 0x46, 0x70, 0xb4, 0xc3, 0x22, 0x96,
0xe7, 0x99, 0x83, 0x4e, 0xc1, 0xdb, 0x69, 0x99, 0x94, 0x4f, 0x63, 0xb2, 0x7b, 0x1a, 0x93, 0x89, 0x71, 0xb3, 0xa6, 0x03, 0xeb, 0x7a, 0x13, 0x1e, 0x2f, 0xe0, 0x30, 0xdf, 0xf7, 0x27, 0xf3, 0x3e,
0x79, 0x1a, 0x07, 0x07, 0x49, 0x53, 0xf2, 0x2b, 0xcf, 0xd2, 0x2f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xc6, 0x22, 0x25, 0x0f, 0xa1, 0x3d, 0xc9, 0x16, 0x6c, 0x81, 0x04, 0xc2, 0xf2, 0xf8, 0x06, 0x10,
0xff, 0x17, 0xde, 0x8c, 0x60, 0x57, 0x05, 0x00, 0x00, 0x96, 0x27, 0x33, 0x72, 0x9e, 0x3a, 0xe4, 0x04, 0xda, 0x3b, 0x85, 0xc2, 0xfc, 0xc1, 0x0d, 0x77,
0x0f, 0x6e, 0x38, 0x31, 0x0f, 0xee, 0xe0, 0x20, 0xac, 0x0a, 0x79, 0xd9, 0xb6, 0xf4, 0xf3, 0xbf,
0x01, 0x00, 0x00, 0xff, 0xff, 0x90, 0xc2, 0xda, 0x0f, 0xad, 0x05, 0x00, 0x00,
} }

View File

@@ -73,6 +73,8 @@ message RunnerStatus {
string startedAt = 9; // call latency details: start time in container string startedAt = 9; // call latency details: start time in container
string completedAt = 10; // call latency details: end time string completedAt = 10; // call latency details: end time
bool cached = 11; // true if status response was provided from cache bool cached = 11; // true if status response was provided from cache
uint64 requestsReceived = 12; // number of requests received by runner
uint64 requestsHandled = 13; // number of requests processed by runner without NACK
} }
service RunnerProtocol { service RunnerProtocol {

View File

@@ -509,8 +509,10 @@ const (
// statusTracker maintains cache data/state/locks for Status Call invocations. // statusTracker maintains cache data/state/locks for Status Call invocations.
type statusTracker struct { type statusTracker struct {
inflight int32 inflight int32
imageName string requestsReceived uint64
requestsHandled uint64
imageName string
// lock protects expiry/cache/wait fields below. RunnerStatus ptr itself // lock protects expiry/cache/wait fields below. RunnerStatus ptr itself
// stored every time status image is executed. Cache fetches use a shallow // stored every time status image is executed. Cache fetches use a shallow
@@ -624,7 +626,7 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
log := common.Logger(ctx) log := common.Logger(ctx)
// Keep lightweight tabs on what this runner is doing: for draindown tests // Keep lightweight tabs on what this runner is doing: for draindown tests
atomic.AddInt32(&pr.status.inflight, 1) atomic.AddInt32(&pr.status.inflight, 1)
defer atomic.AddInt32(&pr.status.inflight, -1) atomic.AddUint64(&pr.status.requestsReceived, 1)
pv, ok := peer.FromContext(ctx) pv, ok := peer.FromContext(ctx)
log.Debug("Starting engagement") log.Debug("Starting engagement")
@@ -638,37 +640,42 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
state := NewCallHandle(engagement) state := NewCallHandle(engagement)
tryMsg := state.getTryMsg() tryMsg := state.getTryMsg()
if tryMsg == nil { if tryMsg != nil {
return state.waitError() errTry := pr.handleTryCall(tryMsg, state)
} if errTry == nil {
dataFeed := state.spawnPipeToFn()
errTry := pr.handleTryCall(tryMsg, state) DataLoop:
if errTry != nil { for {
return state.waitError() dataMsg := state.getDataMsg()
} if dataMsg == nil {
break
}
dataFeed := state.spawnPipeToFn() select {
case dataFeed <- dataMsg:
DataLoop: if dataMsg.Eof {
for { break DataLoop
dataMsg := state.getDataMsg() }
if dataMsg == nil { case <-state.doneQueue:
break break DataLoop
} case <-state.ctx.Done():
break DataLoop
select { }
case dataFeed <- dataMsg:
if dataMsg.Eof {
break DataLoop
} }
case <-state.doneQueue:
break DataLoop
case <-state.ctx.Done():
break DataLoop
} }
} }
return state.waitError() err := state.waitError()
// if we didn't respond with TooBusy, then this means the request
// was processed.
if err != models.ErrCallTimeoutServerBusy {
atomic.AddUint64(&pr.status.requestsHandled, 1)
}
atomic.AddInt32(&pr.status.inflight, -1)
return err
} }
// Runs a status call using status image with baked in parameters. // Runs a status call using status image with baked in parameters.
@@ -820,11 +827,15 @@ func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatu
cacheObj.Cached = true cacheObj.Cached = true
cacheObj.Active = atomic.LoadInt32(&pr.status.inflight) cacheObj.Active = atomic.LoadInt32(&pr.status.inflight)
cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
return &cacheObj, nil return &cacheObj, nil
} }
cachePtr := pr.runStatusCall(ctx) cachePtr := pr.runStatusCall(ctx)
cachePtr.Active = atomic.LoadInt32(&pr.status.inflight) cachePtr.Active = atomic.LoadInt32(&pr.status.inflight)
cachePtr.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
cachePtr.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
now = time.Now() now = time.Now()
// Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow // Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow
@@ -847,7 +858,9 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne
// Status using image name is disabled. We return inflight request count only // Status using image name is disabled. We return inflight request count only
if pr.status.imageName == "" { if pr.status.imageName == "" {
return &runner.RunnerStatus{ return &runner.RunnerStatus{
Active: atomic.LoadInt32(&pr.status.inflight), Active: atomic.LoadInt32(&pr.status.inflight),
RequestsReceived: atomic.LoadUint64(&pr.status.requestsReceived),
RequestsHandled: atomic.LoadUint64(&pr.status.requestsHandled),
}, nil }, nil
} }
return pr.handleStatusCall(ctx) return pr.handleStatusCall(ctx)

View File

@@ -117,6 +117,8 @@ func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStat
return &pool.RunnerStatus{ return &pool.RunnerStatus{
ActiveRequestCount: status.Active, ActiveRequestCount: status.Active,
RequestsReceived: status.RequestsReceived,
RequestsHandled: status.RequestsHandled,
StatusFailed: status.Failed, StatusFailed: status.Failed,
Cached: status.Cached, Cached: status.Cached,
StatusId: status.Id, StatusId: status.Id,

View File

@@ -35,6 +35,8 @@ type MTLSRunnerFactory func(addr, certCommonName string, pki *PKIData) (Runner,
// RunnerStatus is general information on Runner health as returned by Runner::Status() call // RunnerStatus is general information on Runner health as returned by Runner::Status() call
type RunnerStatus struct { type RunnerStatus struct {
ActiveRequestCount int32 // Number of active running requests on Runner ActiveRequestCount int32 // Number of active running requests on Runner
RequestsReceived uint64 // Number of requests received by Runner
RequestsHandled uint64 // Number of requests handled without NACK by Runner
StatusFailed bool // True if Status execution failed StatusFailed bool // True if Status execution failed
Cached bool // True if Status was provided from cache Cached bool // True if Status was provided from cache
StatusId string // Call ID for Status StatusId string // Call ID for Status