From db7cbf73e2bbd33fb392ce6685a1dbc75450c6f3 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 20 Jul 2018 16:00:02 -0700 Subject: [PATCH] fn: add requests received/handled in Status responses (#1132) This is useful as additional data to inflight requests. Callers can determine request arrival and processing rate. --- api/agent/grpc/runner.pb.go | 124 +++++++++++++++++++--------------- api/agent/grpc/runner.proto | 2 + api/agent/pure_runner.go | 71 +++++++++++-------- api/agent/runner_client.go | 2 + api/runnerpool/runner_pool.go | 2 + 5 files changed, 119 insertions(+), 82 deletions(-) diff --git a/api/agent/grpc/runner.pb.go b/api/agent/grpc/runner.pb.go index 40973d29b..2d3c6c1f1 100644 --- a/api/agent/grpc/runner.pb.go +++ b/api/agent/grpc/runner.pb.go @@ -575,16 +575,18 @@ func _RunnerMsg_OneofSizer(msg proto.Message) (n int) { } type RunnerStatus struct { - Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"` - Failed bool `protobuf:"varint,3,opt,name=failed" json:"failed,omitempty"` - Id string `protobuf:"bytes,4,opt,name=id" json:"id,omitempty"` - Details string `protobuf:"bytes,5,opt,name=details" json:"details,omitempty"` - ErrorCode int32 `protobuf:"varint,6,opt,name=errorCode" json:"errorCode,omitempty"` - ErrorStr string `protobuf:"bytes,7,opt,name=errorStr" json:"errorStr,omitempty"` - CreatedAt string `protobuf:"bytes,8,opt,name=createdAt" json:"createdAt,omitempty"` - StartedAt string `protobuf:"bytes,9,opt,name=startedAt" json:"startedAt,omitempty"` - CompletedAt string `protobuf:"bytes,10,opt,name=completedAt" json:"completedAt,omitempty"` - Cached bool `protobuf:"varint,11,opt,name=cached" json:"cached,omitempty"` + Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"` + Failed bool `protobuf:"varint,3,opt,name=failed" json:"failed,omitempty"` + Id string `protobuf:"bytes,4,opt,name=id" json:"id,omitempty"` + Details string `protobuf:"bytes,5,opt,name=details" json:"details,omitempty"` + ErrorCode int32 `protobuf:"varint,6,opt,name=errorCode" json:"errorCode,omitempty"` + ErrorStr string `protobuf:"bytes,7,opt,name=errorStr" json:"errorStr,omitempty"` + CreatedAt string `protobuf:"bytes,8,opt,name=createdAt" json:"createdAt,omitempty"` + StartedAt string `protobuf:"bytes,9,opt,name=startedAt" json:"startedAt,omitempty"` + CompletedAt string `protobuf:"bytes,10,opt,name=completedAt" json:"completedAt,omitempty"` + Cached bool `protobuf:"varint,11,opt,name=cached" json:"cached,omitempty"` + RequestsReceived uint64 `protobuf:"varint,12,opt,name=requestsReceived" json:"requestsReceived,omitempty"` + RequestsHandled uint64 `protobuf:"varint,13,opt,name=requestsHandled" json:"requestsHandled,omitempty"` } func (m *RunnerStatus) Reset() { *m = RunnerStatus{} } @@ -662,6 +664,20 @@ func (m *RunnerStatus) GetCached() bool { return false } +func (m *RunnerStatus) GetRequestsReceived() uint64 { + if m != nil { + return m.RequestsReceived + } + return 0 +} + +func (m *RunnerStatus) GetRequestsHandled() uint64 { + if m != nil { + return m.RequestsHandled + } + return 0 +} + func init() { proto.RegisterType((*TryCall)(nil), "TryCall") proto.RegisterType((*DataFrame)(nil), "DataFrame") @@ -817,47 +833,49 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("runner.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 665 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xff, 0x8a, 0xd3, 0x40, - 0x10, 0x6e, 0xd2, 0x36, 0x4d, 0x27, 0xb9, 0x1f, 0x2c, 0x22, 0xa1, 0x1e, 0x58, 0xa2, 0x42, 0x41, - 0xc8, 0x69, 0x55, 0x38, 0x04, 0x05, 0x3d, 0x7b, 0x44, 0xe1, 0x40, 0xb6, 0xe2, 0xbf, 0x65, 0x2f, - 0xbb, 0x6d, 0xa3, 0xdb, 0x6c, 0xd9, 0xdd, 0x1e, 0xf6, 0x4d, 0xf4, 0xa5, 0x7c, 0x00, 0x9f, 0x46, - 0x76, 0x93, 0xe6, 0xc2, 0x09, 0x3d, 0xff, 0xcb, 0x7c, 0xdf, 0xce, 0xce, 0xcc, 0xf7, 0x4d, 0x16, - 0x42, 0xb9, 0x29, 0x0a, 0x26, 0x93, 0xb5, 0x14, 0x5a, 0x0c, 0x1e, 0x2c, 0x84, 0x58, 0x70, 0x76, - 0x6a, 0xa3, 0xab, 0xcd, 0xfc, 0x94, 0xad, 0xd6, 0x7a, 0x5b, 0x92, 0xf1, 0x6f, 0x07, 0x7a, 0x5f, - 0xe4, 0xf6, 0x9c, 0x70, 0x8e, 0x46, 0x70, 0xbc, 0x12, 0x94, 0x71, 0x35, 0xcb, 0x08, 0xe7, 0xb3, - 0x6f, 0x4a, 0x14, 0x91, 0x33, 0x74, 0x46, 0x7d, 0x7c, 0x58, 0xe2, 0xe6, 0xd4, 0x27, 0x25, 0x0a, - 0x34, 0x84, 0x50, 0x71, 0xa1, 0x67, 0x4b, 0xa2, 0x96, 0xb3, 0x9c, 0x46, 0xae, 0x3d, 0x05, 0x06, - 0x4b, 0x89, 0x5a, 0x7e, 0xa4, 0xe8, 0x0c, 0x80, 0xfd, 0xd0, 0xac, 0x50, 0xb9, 0x28, 0x54, 0xd4, - 0x1e, 0xb6, 0x47, 0xc1, 0x38, 0x4a, 0xaa, 0x4a, 0xc9, 0xa4, 0xa6, 0x26, 0x85, 0x96, 0x5b, 0xdc, - 0x38, 0x3b, 0x78, 0x03, 0x47, 0xb7, 0x68, 0x74, 0x0c, 0xed, 0xef, 0x6c, 0x5b, 0xf5, 0x62, 0x3e, - 0xd1, 0x3d, 0xe8, 0x5e, 0x13, 0xbe, 0x61, 0x55, 0xe5, 0x32, 0x78, 0xed, 0x9e, 0x39, 0xf1, 0x73, - 0xe8, 0x7f, 0x20, 0x9a, 0x5c, 0x48, 0xb2, 0x62, 0x08, 0x41, 0x87, 0x12, 0x4d, 0x6c, 0x66, 0x88, - 0xed, 0xb7, 0xb9, 0x8c, 0x89, 0xb9, 0x4d, 0xf4, 0xb1, 0xf9, 0x8c, 0x5f, 0x02, 0xa4, 0x5a, 0xaf, - 0x53, 0x46, 0x28, 0x93, 0xff, 0x5b, 0x2c, 0xfe, 0x0a, 0xa1, 0xc9, 0xc2, 0x4c, 0xad, 0x2f, 0x99, - 0x26, 0xe8, 0x21, 0x04, 0x4a, 0x13, 0xbd, 0x51, 0xb3, 0x4c, 0x50, 0x66, 0xf3, 0xbb, 0x18, 0x4a, - 0xe8, 0x5c, 0x50, 0x86, 0x9e, 0x40, 0x6f, 0x69, 0x4b, 0xa8, 0xc8, 0xb5, 0x7a, 0x04, 0xc9, 0x4d, - 0x59, 0xbc, 0xe3, 0xe2, 0xb7, 0x70, 0x64, 0x34, 0xc2, 0x4c, 0x6d, 0xb8, 0x9e, 0x6a, 0x22, 0x35, - 0x7a, 0x04, 0x9d, 0xa5, 0xd6, 0xeb, 0x88, 0x0e, 0x9d, 0x51, 0x30, 0x3e, 0x48, 0x9a, 0x75, 0xd3, - 0x16, 0xb6, 0xe4, 0x7b, 0x0f, 0x3a, 0x2b, 0xa6, 0x49, 0xfc, 0xc7, 0x81, 0xd0, 0x5c, 0x70, 0x91, - 0x17, 0xb9, 0x5a, 0x32, 0x8a, 0x22, 0xe8, 0xa9, 0x4d, 0x96, 0x31, 0xa5, 0x6c, 0x53, 0x3e, 0xde, - 0x85, 0x86, 0xa1, 0x4c, 0x93, 0x9c, 0xab, 0x6a, 0xb4, 0x5d, 0x88, 0x4e, 0xa0, 0xcf, 0xa4, 0x14, - 0xd2, 0x34, 0x1e, 0xb5, 0xed, 0x28, 0x37, 0x00, 0x1a, 0x80, 0x6f, 0x83, 0xa9, 0x96, 0x51, 0xc7, - 0x26, 0xd6, 0xb1, 0xc9, 0xcc, 0x24, 0x23, 0x9a, 0xd1, 0x77, 0x3a, 0xea, 0x5a, 0xf2, 0x06, 0x30, - 0xac, 0x32, 0x23, 0x59, 0xd6, 0x2b, 0xd9, 0x1a, 0x40, 0x43, 0x08, 0x32, 0xb1, 0x5a, 0x73, 0x56, - 0xf2, 0x3d, 0xcb, 0x37, 0xa1, 0x78, 0x0a, 0xfd, 0x73, 0x9e, 0xb3, 0x42, 0x5f, 0xaa, 0x05, 0x3a, - 0x81, 0xb6, 0x96, 0xa5, 0x53, 0xc1, 0xd8, 0xdf, 0x2d, 0x57, 0xda, 0xc2, 0x06, 0x46, 0xc3, 0xca, - 0x7b, 0xd7, 0xd2, 0x90, 0xd4, 0x5b, 0x61, 0x14, 0x33, 0x8c, 0x51, 0xec, 0x4a, 0xd0, 0x6d, 0xfc, - 0xcb, 0x81, 0x3e, 0xb6, 0x7f, 0x8c, 0xb9, 0xf5, 0x15, 0x84, 0xd2, 0x6a, 0x3f, 0xb3, 0x8d, 0x55, - 0xd7, 0x1f, 0x27, 0xb7, 0x4c, 0x49, 0x5b, 0x38, 0x90, 0x0d, 0x8f, 0xee, 0x2c, 0x87, 0x9e, 0x82, - 0x3f, 0xaf, 0x3c, 0xb1, 0x92, 0x1a, 0x27, 0x9b, 0x46, 0xa5, 0x2d, 0x5c, 0x1f, 0xa8, 0x7b, 0xfb, - 0xe9, 0x42, 0x58, 0xf6, 0x36, 0xb5, 0x9b, 0x84, 0xee, 0x83, 0x47, 0x32, 0x9d, 0x5f, 0x97, 0xdb, - 0xd8, 0xc5, 0x55, 0x64, 0xf0, 0x39, 0xc9, 0x79, 0x75, 0xb7, 0x8f, 0xab, 0x08, 0x1d, 0x82, 0x9b, - 0xd3, 0xca, 0x25, 0x37, 0xa7, 0x4d, 0xcf, 0xbb, 0x7b, 0x3c, 0xf7, 0xf6, 0x79, 0xde, 0xdb, 0xe7, - 0xb9, 0xbf, 0xd7, 0xf3, 0xfe, 0x1d, 0x9e, 0xc3, 0x3f, 0x9e, 0x9b, 0xc9, 0x32, 0x92, 0x19, 0xd5, - 0x82, 0x72, 0xb2, 0x32, 0x1a, 0x2f, 0xe0, 0xb0, 0x54, 0xe6, 0xb3, 0x79, 0xc9, 0x32, 0xc1, 0xd1, - 0x63, 0xf0, 0x26, 0xc5, 0x82, 0x2c, 0x18, 0x82, 0xa4, 0x5e, 0x93, 0x01, 0x24, 0xb5, 0xb9, 0x23, - 0xe7, 0x99, 0x83, 0x4e, 0xc1, 0xdb, 0x69, 0x99, 0x94, 0x4f, 0x63, 0xb2, 0x7b, 0x1a, 0x93, 0x89, - 0x79, 0x1a, 0x07, 0x07, 0x49, 0x53, 0xf2, 0x2b, 0xcf, 0xd2, 0x2f, 0xfe, 0x06, 0x00, 0x00, 0xff, - 0xff, 0x17, 0xde, 0x8c, 0x60, 0x57, 0x05, 0x00, 0x00, + // 701 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xed, 0x8a, 0xd3, 0x4c, + 0x14, 0x6e, 0xfa, 0xdd, 0x93, 0xec, 0x6e, 0x19, 0x5e, 0x5e, 0x42, 0x5d, 0xb0, 0x44, 0x85, 0xa2, + 0x90, 0xd5, 0xaa, 0xb0, 0x08, 0x0a, 0xba, 0x76, 0x89, 0xc2, 0x82, 0x4c, 0xc5, 0xbf, 0x65, 0x36, + 0x73, 0xda, 0x46, 0xd3, 0x4c, 0x9d, 0x99, 0x2e, 0xf6, 0x52, 0xbc, 0x29, 0x2f, 0xc0, 0x8b, 0xf0, + 0x1a, 0x64, 0x26, 0x69, 0x36, 0xec, 0x42, 0xd7, 0x7f, 0x39, 0xcf, 0x73, 0xbe, 0x9f, 0x93, 0x01, + 0x4f, 0x6e, 0xb2, 0x0c, 0x65, 0xb8, 0x96, 0x42, 0x8b, 0xc1, 0xbd, 0x85, 0x10, 0x8b, 0x14, 0x4f, + 0xac, 0x75, 0xb9, 0x99, 0x9f, 0xe0, 0x6a, 0xad, 0xb7, 0x39, 0x19, 0xfc, 0x72, 0xa0, 0xf3, 0x59, + 0x6e, 0xcf, 0x58, 0x9a, 0x92, 0x11, 0xf4, 0x57, 0x82, 0x63, 0xaa, 0x66, 0x31, 0x4b, 0xd3, 0xd9, + 0x57, 0x25, 0x32, 0xdf, 0x19, 0x3a, 0xa3, 0x1e, 0x3d, 0xcc, 0x71, 0xe3, 0xf5, 0x51, 0x89, 0x8c, + 0x0c, 0xc1, 0x53, 0xa9, 0xd0, 0xb3, 0x25, 0x53, 0xcb, 0x59, 0xc2, 0xfd, 0xba, 0xf5, 0x02, 0x83, + 0x45, 0x4c, 0x2d, 0x3f, 0x70, 0x72, 0x0a, 0x80, 0x3f, 0x34, 0x66, 0x2a, 0x11, 0x99, 0xf2, 0x1b, + 0xc3, 0xc6, 0xc8, 0x1d, 0xfb, 0x61, 0x51, 0x29, 0x9c, 0x94, 0xd4, 0x24, 0xd3, 0x72, 0x4b, 0x2b, + 0xbe, 0x83, 0xd7, 0x70, 0x74, 0x83, 0x26, 0x7d, 0x68, 0x7c, 0xc3, 0x6d, 0xd1, 0x8b, 0xf9, 0x24, + 0xff, 0x41, 0xeb, 0x8a, 0xa5, 0x1b, 0x2c, 0x2a, 0xe7, 0xc6, 0xab, 0xfa, 0xa9, 0x13, 0x3c, 0x83, + 0xde, 0x7b, 0xa6, 0xd9, 0xb9, 0x64, 0x2b, 0x24, 0x04, 0x9a, 0x9c, 0x69, 0x66, 0x23, 0x3d, 0x6a, + 0xbf, 0x4d, 0x32, 0x14, 0x73, 0x1b, 0xd8, 0xa5, 0xe6, 0x33, 0x78, 0x01, 0x10, 0x69, 0xbd, 0x8e, + 0x90, 0x71, 0x94, 0xff, 0x5a, 0x2c, 0xf8, 0x02, 0x9e, 0x89, 0xa2, 0xa8, 0xd6, 0x17, 0xa8, 0x19, + 0xb9, 0x0f, 0xae, 0xd2, 0x4c, 0x6f, 0xd4, 0x2c, 0x16, 0x1c, 0x6d, 0x7c, 0x8b, 0x42, 0x0e, 0x9d, + 0x09, 0x8e, 0xe4, 0x11, 0x74, 0x96, 0xb6, 0x84, 0xf2, 0xeb, 0x76, 0x1f, 0x6e, 0x78, 0x5d, 0x96, + 0xee, 0xb8, 0xe0, 0x0d, 0x1c, 0x99, 0x1d, 0x51, 0x54, 0x9b, 0x54, 0x4f, 0x35, 0x93, 0x9a, 0x3c, + 0x80, 0xe6, 0x52, 0xeb, 0xb5, 0xcf, 0x87, 0xce, 0xc8, 0x1d, 0x1f, 0x84, 0xd5, 0xba, 0x51, 0x8d, + 0x5a, 0xf2, 0x5d, 0x1b, 0x9a, 0x2b, 0xd4, 0x2c, 0xf8, 0xed, 0x80, 0x67, 0x12, 0x9c, 0x27, 0x59, + 0xa2, 0x96, 0xc8, 0x89, 0x0f, 0x1d, 0xb5, 0x89, 0x63, 0x54, 0xca, 0x36, 0xd5, 0xa5, 0x3b, 0xd3, + 0x30, 0x1c, 0x35, 0x4b, 0x52, 0x55, 0x8c, 0xb6, 0x33, 0xc9, 0x31, 0xf4, 0x50, 0x4a, 0x21, 0x4d, + 0xe3, 0x7e, 0xc3, 0x8e, 0x72, 0x0d, 0x90, 0x01, 0x74, 0xad, 0x31, 0xd5, 0xd2, 0x6f, 0xda, 0xc0, + 0xd2, 0x36, 0x91, 0xb1, 0x44, 0xa6, 0x91, 0xbf, 0xd5, 0x7e, 0xcb, 0x92, 0xd7, 0x80, 0x61, 0x95, + 0x19, 0xc9, 0xb2, 0xed, 0x9c, 0x2d, 0x01, 0x32, 0x04, 0x37, 0x16, 0xab, 0x75, 0x8a, 0x39, 0xdf, + 0xb1, 0x7c, 0x15, 0x0a, 0xa6, 0xd0, 0x3b, 0x4b, 0x13, 0xcc, 0xf4, 0x85, 0x5a, 0x90, 0x63, 0x68, + 0x68, 0x99, 0x2b, 0xe5, 0x8e, 0xbb, 0xbb, 0xe3, 0x8a, 0x6a, 0xd4, 0xc0, 0x64, 0x58, 0x68, 0x5f, + 0xb7, 0x34, 0x84, 0xe5, 0x55, 0x98, 0x8d, 0x19, 0xc6, 0x6c, 0xec, 0x52, 0xf0, 0x6d, 0xf0, 0xd3, + 0x81, 0x1e, 0xb5, 0x7f, 0x8c, 0xc9, 0xfa, 0x12, 0x3c, 0x69, 0x77, 0x3f, 0xb3, 0x8d, 0x15, 0xe9, + 0xfb, 0xe1, 0x0d, 0x51, 0xa2, 0x1a, 0x75, 0x65, 0x45, 0xa3, 0x3b, 0xcb, 0x91, 0x27, 0xd0, 0x9d, + 0x17, 0x9a, 0xd8, 0x95, 0x1a, 0x25, 0xab, 0x42, 0x45, 0x35, 0x5a, 0x3a, 0x94, 0xbd, 0xfd, 0xa9, + 0x83, 0x97, 0xf7, 0x36, 0xb5, 0x97, 0x44, 0xfe, 0x87, 0x36, 0x8b, 0x75, 0x72, 0x95, 0x5f, 0x63, + 0x8b, 0x16, 0x96, 0xc1, 0xe7, 0x2c, 0x49, 0x8b, 0xdc, 0x5d, 0x5a, 0x58, 0xe4, 0x10, 0xea, 0x09, + 0x2f, 0x54, 0xaa, 0x27, 0xbc, 0xaa, 0x79, 0x6b, 0x8f, 0xe6, 0xed, 0x7d, 0x9a, 0x77, 0xf6, 0x69, + 0xde, 0xdd, 0xab, 0x79, 0xef, 0x0e, 0xcd, 0xe1, 0x96, 0xe6, 0x66, 0xb2, 0x98, 0xc5, 0x66, 0x6b, + 0x6e, 0x3e, 0x59, 0x6e, 0x91, 0xc7, 0xd0, 0x97, 0xf8, 0x7d, 0x83, 0x4a, 0x2b, 0x8a, 0x31, 0x26, + 0x57, 0xc8, 0x7d, 0x6f, 0xe8, 0x8c, 0x9a, 0xf4, 0x16, 0x4e, 0x46, 0x70, 0xb4, 0xc3, 0x22, 0x96, + 0x71, 0xb3, 0xa6, 0x03, 0xeb, 0x7a, 0x13, 0x1e, 0x2f, 0xe0, 0x30, 0xdf, 0xf7, 0x27, 0xf3, 0x3e, + 0xc6, 0x22, 0x25, 0x0f, 0xa1, 0x3d, 0xc9, 0x16, 0x6c, 0x81, 0x04, 0xc2, 0xf2, 0xf8, 0x06, 0x10, + 0x96, 0x27, 0x33, 0x72, 0x9e, 0x3a, 0xe4, 0x04, 0xda, 0x3b, 0x85, 0xc2, 0xfc, 0xc1, 0x0d, 0x77, + 0x0f, 0x6e, 0x38, 0x31, 0x0f, 0xee, 0xe0, 0x20, 0xac, 0x0a, 0x79, 0xd9, 0xb6, 0xf4, 0xf3, 0xbf, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x90, 0xc2, 0xda, 0x0f, 0xad, 0x05, 0x00, 0x00, } diff --git a/api/agent/grpc/runner.proto b/api/agent/grpc/runner.proto index e884f8c40..122cf5615 100644 --- a/api/agent/grpc/runner.proto +++ b/api/agent/grpc/runner.proto @@ -73,6 +73,8 @@ message RunnerStatus { string startedAt = 9; // call latency details: start time in container string completedAt = 10; // call latency details: end time bool cached = 11; // true if status response was provided from cache + uint64 requestsReceived = 12; // number of requests received by runner + uint64 requestsHandled = 13; // number of requests processed by runner without NACK } service RunnerProtocol { diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 89a43b000..ecd31c5c3 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -509,8 +509,10 @@ const ( // statusTracker maintains cache data/state/locks for Status Call invocations. type statusTracker struct { - inflight int32 - imageName string + inflight int32 + requestsReceived uint64 + requestsHandled uint64 + imageName string // lock protects expiry/cache/wait fields below. RunnerStatus ptr itself // stored every time status image is executed. Cache fetches use a shallow @@ -624,7 +626,7 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro log := common.Logger(ctx) // Keep lightweight tabs on what this runner is doing: for draindown tests atomic.AddInt32(&pr.status.inflight, 1) - defer atomic.AddInt32(&pr.status.inflight, -1) + atomic.AddUint64(&pr.status.requestsReceived, 1) pv, ok := peer.FromContext(ctx) log.Debug("Starting engagement") @@ -638,37 +640,42 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro state := NewCallHandle(engagement) tryMsg := state.getTryMsg() - if tryMsg == nil { - return state.waitError() - } + if tryMsg != nil { + errTry := pr.handleTryCall(tryMsg, state) + if errTry == nil { + dataFeed := state.spawnPipeToFn() - errTry := pr.handleTryCall(tryMsg, state) - if errTry != nil { - return state.waitError() - } + DataLoop: + for { + dataMsg := state.getDataMsg() + if dataMsg == nil { + break + } - dataFeed := state.spawnPipeToFn() - -DataLoop: - for { - dataMsg := state.getDataMsg() - if dataMsg == nil { - break - } - - select { - case dataFeed <- dataMsg: - if dataMsg.Eof { - break DataLoop + select { + case dataFeed <- dataMsg: + if dataMsg.Eof { + break DataLoop + } + case <-state.doneQueue: + break DataLoop + case <-state.ctx.Done(): + break DataLoop + } } - case <-state.doneQueue: - break DataLoop - case <-state.ctx.Done(): - break DataLoop } } - return state.waitError() + err := state.waitError() + + // if we didn't respond with TooBusy, then this means the request + // was processed. + if err != models.ErrCallTimeoutServerBusy { + atomic.AddUint64(&pr.status.requestsHandled, 1) + } + + atomic.AddInt32(&pr.status.inflight, -1) + return err } // Runs a status call using status image with baked in parameters. @@ -820,11 +827,15 @@ func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatu cacheObj.Cached = true cacheObj.Active = atomic.LoadInt32(&pr.status.inflight) + cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived) + cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled) return &cacheObj, nil } cachePtr := pr.runStatusCall(ctx) cachePtr.Active = atomic.LoadInt32(&pr.status.inflight) + cachePtr.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived) + cachePtr.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled) now = time.Now() // Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow @@ -847,7 +858,9 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne // Status using image name is disabled. We return inflight request count only if pr.status.imageName == "" { return &runner.RunnerStatus{ - Active: atomic.LoadInt32(&pr.status.inflight), + Active: atomic.LoadInt32(&pr.status.inflight), + RequestsReceived: atomic.LoadUint64(&pr.status.requestsReceived), + RequestsHandled: atomic.LoadUint64(&pr.status.requestsHandled), }, nil } return pr.handleStatusCall(ctx) diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index b6a156297..d03fdde70 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -117,6 +117,8 @@ func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStat return &pool.RunnerStatus{ ActiveRequestCount: status.Active, + RequestsReceived: status.RequestsReceived, + RequestsHandled: status.RequestsHandled, StatusFailed: status.Failed, Cached: status.Cached, StatusId: status.Id, diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 6605e8f34..06a8ed50e 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -35,6 +35,8 @@ type MTLSRunnerFactory func(addr, certCommonName string, pki *PKIData) (Runner, // RunnerStatus is general information on Runner health as returned by Runner::Status() call type RunnerStatus struct { ActiveRequestCount int32 // Number of active running requests on Runner + RequestsReceived uint64 // Number of requests received by Runner + RequestsHandled uint64 // Number of requests handled without NACK by Runner StatusFailed bool // True if Status execution failed Cached bool // True if Status was provided from cache StatusId string // Call ID for Status