mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* add DateTime sans mgo * change all uses of strfmt.DateTime to common.DateTime, remove test strfmt usage * remove api tests, system-test dep on api test multiple reasons to remove the api tests: * awkward dependency with fn_go meant generating bindings on a branched fn to vendor those to test new stuff. this is at a minimum not at all intuitive, worth it, nor a fun way to spend the finite amount of time we have to live. * api tests only tested a subset of functionality that the server/ api tests already test, and we risk having tests where one tests some thing and the other doesn't. let's not. we have too many test suites as it is, and these pretty much only test that we updated the fn_go bindings, which is actually a hassle as noted above and the cli will pretty quickly figure out anyway. * fn_go relies on openapi, which relies on mgo, which is deprecated and we'd like to remove as a dependency. openapi is a _huge_ dep built in a NIH fashion, that cannot simply remove the mgo dep as users may be using it. we've now stolen their date time and otherwise killed usage of it in fn core, for fn_go it still exists but that's less of a problem. * update deps removals: * easyjson * mgo * go-openapi * mapstructure * fn_go * purell * go-validator also, had to lock docker. we shouldn't use docker on master anyway, they strongly advise against that. had no luck with latest version rev, so i locked it to what we were using before. until next time. the rest is just playing dep roulette, those end up removing a ton tho * fix exec test to work * account for john le cache
574 lines
15 KiB
Go
574 lines
15 KiB
Go
/*
|
|
*
|
|
* Copyright 2018 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
// Package channelz defines APIs for enabling channelz service, entry
|
|
// registration/deletion, and accessing channelz data. It also defines channelz
|
|
// metric struct formats.
|
|
//
|
|
// All APIs in this package are experimental.
|
|
package channelz
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"google.golang.org/grpc/grpclog"
|
|
)
|
|
|
|
var (
|
|
db dbWrapper
|
|
idGen idGenerator
|
|
// EntryPerPage defines the number of channelz entries to be shown on a web page.
|
|
EntryPerPage = 50
|
|
curState int32
|
|
)
|
|
|
|
// TurnOn turns on channelz data collection.
|
|
func TurnOn() {
|
|
if !IsOn() {
|
|
NewChannelzStorage()
|
|
atomic.StoreInt32(&curState, 1)
|
|
}
|
|
}
|
|
|
|
// IsOn returns whether channelz data collection is on.
|
|
func IsOn() bool {
|
|
return atomic.CompareAndSwapInt32(&curState, 1, 1)
|
|
}
|
|
|
|
// dbWarpper wraps around a reference to internal channelz data storage, and
|
|
// provide synchronized functionality to set and get the reference.
|
|
type dbWrapper struct {
|
|
mu sync.RWMutex
|
|
DB *channelMap
|
|
}
|
|
|
|
func (d *dbWrapper) set(db *channelMap) {
|
|
d.mu.Lock()
|
|
d.DB = db
|
|
d.mu.Unlock()
|
|
}
|
|
|
|
func (d *dbWrapper) get() *channelMap {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
return d.DB
|
|
}
|
|
|
|
// NewChannelzStorage initializes channelz data storage and id generator.
|
|
//
|
|
// Note: This function is exported for testing purpose only. User should not call
|
|
// it in most cases.
|
|
func NewChannelzStorage() {
|
|
db.set(&channelMap{
|
|
topLevelChannels: make(map[int64]struct{}),
|
|
channels: make(map[int64]*channel),
|
|
listenSockets: make(map[int64]*listenSocket),
|
|
normalSockets: make(map[int64]*normalSocket),
|
|
servers: make(map[int64]*server),
|
|
subChannels: make(map[int64]*subChannel),
|
|
})
|
|
idGen.reset()
|
|
}
|
|
|
|
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
|
|
// boolean indicating whether there's more top channels to be queried for.
|
|
//
|
|
// The arg id specifies that only top channel with id at or above it will be included
|
|
// in the result. The returned slice is up to a length of EntryPerPage, and is
|
|
// sorted in ascending id order.
|
|
func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
|
|
return db.get().GetTopChannels(id)
|
|
}
|
|
|
|
// GetServers returns a slice of server's ServerMetric, along with a
|
|
// boolean indicating whether there's more servers to be queried for.
|
|
//
|
|
// The arg id specifies that only server with id at or above it will be included
|
|
// in the result. The returned slice is up to a length of EntryPerPage, and is
|
|
// sorted in ascending id order.
|
|
func GetServers(id int64) ([]*ServerMetric, bool) {
|
|
return db.get().GetServers(id)
|
|
}
|
|
|
|
// GetServerSockets returns a slice of server's (identified by id) normal socket's
|
|
// SocketMetric, along with a boolean indicating whether there's more sockets to
|
|
// be queried for.
|
|
//
|
|
// The arg startID specifies that only sockets with id at or above it will be
|
|
// included in the result. The returned slice is up to a length of EntryPerPage,
|
|
// and is sorted in ascending id order.
|
|
func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
|
|
return db.get().GetServerSockets(id, startID)
|
|
}
|
|
|
|
// GetChannel returns the ChannelMetric for the channel (identified by id).
|
|
func GetChannel(id int64) *ChannelMetric {
|
|
return db.get().GetChannel(id)
|
|
}
|
|
|
|
// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
|
|
func GetSubChannel(id int64) *SubChannelMetric {
|
|
return db.get().GetSubChannel(id)
|
|
}
|
|
|
|
// GetSocket returns the SocketInternalMetric for the socket (identified by id).
|
|
func GetSocket(id int64) *SocketMetric {
|
|
return db.get().GetSocket(id)
|
|
}
|
|
|
|
// RegisterChannel registers the given channel c in channelz database with ref
|
|
// as its reference name, and add it to the child list of its parent (identified
|
|
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
|
|
// assigned to this channel.
|
|
func RegisterChannel(c Channel, pid int64, ref string) int64 {
|
|
id := idGen.genID()
|
|
cn := &channel{
|
|
refName: ref,
|
|
c: c,
|
|
subChans: make(map[int64]string),
|
|
nestedChans: make(map[int64]string),
|
|
id: id,
|
|
pid: pid,
|
|
}
|
|
if pid == 0 {
|
|
db.get().addChannel(id, cn, true, pid, ref)
|
|
} else {
|
|
db.get().addChannel(id, cn, false, pid, ref)
|
|
}
|
|
return id
|
|
}
|
|
|
|
// RegisterSubChannel registers the given channel c in channelz database with ref
|
|
// as its reference name, and add it to the child list of its parent (identified
|
|
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
|
|
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
|
|
if pid == 0 {
|
|
grpclog.Error("a SubChannel's parent id cannot be 0")
|
|
return 0
|
|
}
|
|
id := idGen.genID()
|
|
sc := &subChannel{
|
|
refName: ref,
|
|
c: c,
|
|
sockets: make(map[int64]string),
|
|
id: id,
|
|
pid: pid,
|
|
}
|
|
db.get().addSubChannel(id, sc, pid, ref)
|
|
return id
|
|
}
|
|
|
|
// RegisterServer registers the given server s in channelz database. It returns
|
|
// the unique channelz tracking id assigned to this server.
|
|
func RegisterServer(s Server, ref string) int64 {
|
|
id := idGen.genID()
|
|
svr := &server{
|
|
refName: ref,
|
|
s: s,
|
|
sockets: make(map[int64]string),
|
|
listenSockets: make(map[int64]string),
|
|
id: id,
|
|
}
|
|
db.get().addServer(id, svr)
|
|
return id
|
|
}
|
|
|
|
// RegisterListenSocket registers the given listen socket s in channelz database
|
|
// with ref as its reference name, and add it to the child list of its parent
|
|
// (identified by pid). It returns the unique channelz tracking id assigned to
|
|
// this listen socket.
|
|
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
|
|
if pid == 0 {
|
|
grpclog.Error("a ListenSocket's parent id cannot be 0")
|
|
return 0
|
|
}
|
|
id := idGen.genID()
|
|
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
|
|
db.get().addListenSocket(id, ls, pid, ref)
|
|
return id
|
|
}
|
|
|
|
// RegisterNormalSocket registers the given normal socket s in channelz database
|
|
// with ref as its reference name, and add it to the child list of its parent
|
|
// (identified by pid). It returns the unique channelz tracking id assigned to
|
|
// this normal socket.
|
|
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
|
|
if pid == 0 {
|
|
grpclog.Error("a NormalSocket's parent id cannot be 0")
|
|
return 0
|
|
}
|
|
id := idGen.genID()
|
|
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
|
|
db.get().addNormalSocket(id, ns, pid, ref)
|
|
return id
|
|
}
|
|
|
|
// RemoveEntry removes an entry with unique channelz trakcing id to be id from
|
|
// channelz database.
|
|
func RemoveEntry(id int64) {
|
|
db.get().removeEntry(id)
|
|
}
|
|
|
|
// channelMap is the storage data structure for channelz.
|
|
// Methods of channelMap can be divided in two two categories with respect to locking.
|
|
// 1. Methods acquire the global lock.
|
|
// 2. Methods that can only be called when global lock is held.
|
|
// A second type of method need always to be called inside a first type of method.
|
|
type channelMap struct {
|
|
mu sync.RWMutex
|
|
topLevelChannels map[int64]struct{}
|
|
servers map[int64]*server
|
|
channels map[int64]*channel
|
|
subChannels map[int64]*subChannel
|
|
listenSockets map[int64]*listenSocket
|
|
normalSockets map[int64]*normalSocket
|
|
}
|
|
|
|
func (c *channelMap) addServer(id int64, s *server) {
|
|
c.mu.Lock()
|
|
s.cm = c
|
|
c.servers[id] = s
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
|
|
c.mu.Lock()
|
|
cn.cm = c
|
|
c.channels[id] = cn
|
|
if isTopChannel {
|
|
c.topLevelChannels[id] = struct{}{}
|
|
} else {
|
|
c.findEntry(pid).addChild(id, cn)
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
|
|
c.mu.Lock()
|
|
sc.cm = c
|
|
c.subChannels[id] = sc
|
|
c.findEntry(pid).addChild(id, sc)
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
|
|
c.mu.Lock()
|
|
ls.cm = c
|
|
c.listenSockets[id] = ls
|
|
c.findEntry(pid).addChild(id, ls)
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
|
|
c.mu.Lock()
|
|
ns.cm = c
|
|
c.normalSockets[id] = ns
|
|
c.findEntry(pid).addChild(id, ns)
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// removeEntry triggers the removal of an entry, which may not indeed delete the
|
|
// entry, if it has to wait on the deletion of its children, or may lead to a chain
|
|
// of entry deletion. For example, deleting the last socket of a gracefully shutting
|
|
// down server will lead to the server being also deleted.
|
|
func (c *channelMap) removeEntry(id int64) {
|
|
c.mu.Lock()
|
|
c.findEntry(id).triggerDelete()
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// c.mu must be held by the caller.
|
|
func (c *channelMap) findEntry(id int64) entry {
|
|
var v entry
|
|
var ok bool
|
|
if v, ok = c.channels[id]; ok {
|
|
return v
|
|
}
|
|
if v, ok = c.subChannels[id]; ok {
|
|
return v
|
|
}
|
|
if v, ok = c.servers[id]; ok {
|
|
return v
|
|
}
|
|
if v, ok = c.listenSockets[id]; ok {
|
|
return v
|
|
}
|
|
if v, ok = c.normalSockets[id]; ok {
|
|
return v
|
|
}
|
|
return &dummyEntry{idNotFound: id}
|
|
}
|
|
|
|
// c.mu must be held by the caller
|
|
// deleteEntry simply deletes an entry from the channelMap. Before calling this
|
|
// method, caller must check this entry is ready to be deleted, i.e removeEntry()
|
|
// has been called on it, and no children still exist.
|
|
// Conditionals are ordered by the expected frequency of deletion of each entity
|
|
// type, in order to optimize performance.
|
|
func (c *channelMap) deleteEntry(id int64) {
|
|
var ok bool
|
|
if _, ok = c.normalSockets[id]; ok {
|
|
delete(c.normalSockets, id)
|
|
return
|
|
}
|
|
if _, ok = c.subChannels[id]; ok {
|
|
delete(c.subChannels, id)
|
|
return
|
|
}
|
|
if _, ok = c.channels[id]; ok {
|
|
delete(c.channels, id)
|
|
delete(c.topLevelChannels, id)
|
|
return
|
|
}
|
|
if _, ok = c.listenSockets[id]; ok {
|
|
delete(c.listenSockets, id)
|
|
return
|
|
}
|
|
if _, ok = c.servers[id]; ok {
|
|
delete(c.servers, id)
|
|
return
|
|
}
|
|
}
|
|
|
|
type int64Slice []int64
|
|
|
|
func (s int64Slice) Len() int { return len(s) }
|
|
func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
|
|
|
|
func copyMap(m map[int64]string) map[int64]string {
|
|
n := make(map[int64]string)
|
|
for k, v := range m {
|
|
n[k] = v
|
|
}
|
|
return n
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
|
|
c.mu.RLock()
|
|
l := len(c.topLevelChannels)
|
|
ids := make([]int64, 0, l)
|
|
cns := make([]*channel, 0, min(l, EntryPerPage))
|
|
|
|
for k := range c.topLevelChannels {
|
|
ids = append(ids, k)
|
|
}
|
|
sort.Sort(int64Slice(ids))
|
|
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
|
|
count := 0
|
|
var end bool
|
|
var t []*ChannelMetric
|
|
for i, v := range ids[idx:] {
|
|
if count == EntryPerPage {
|
|
break
|
|
}
|
|
if cn, ok := c.channels[v]; ok {
|
|
cns = append(cns, cn)
|
|
t = append(t, &ChannelMetric{
|
|
NestedChans: copyMap(cn.nestedChans),
|
|
SubChans: copyMap(cn.subChans),
|
|
})
|
|
count++
|
|
}
|
|
if i == len(ids[idx:])-1 {
|
|
end = true
|
|
break
|
|
}
|
|
}
|
|
c.mu.RUnlock()
|
|
if count == 0 {
|
|
end = true
|
|
}
|
|
|
|
for i, cn := range cns {
|
|
t[i].ChannelData = cn.c.ChannelzMetric()
|
|
t[i].ID = cn.id
|
|
t[i].RefName = cn.refName
|
|
}
|
|
return t, end
|
|
}
|
|
|
|
func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
|
|
c.mu.RLock()
|
|
l := len(c.servers)
|
|
ids := make([]int64, 0, l)
|
|
ss := make([]*server, 0, min(l, EntryPerPage))
|
|
for k := range c.servers {
|
|
ids = append(ids, k)
|
|
}
|
|
sort.Sort(int64Slice(ids))
|
|
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
|
|
count := 0
|
|
var end bool
|
|
var s []*ServerMetric
|
|
for i, v := range ids[idx:] {
|
|
if count == EntryPerPage {
|
|
break
|
|
}
|
|
if svr, ok := c.servers[v]; ok {
|
|
ss = append(ss, svr)
|
|
s = append(s, &ServerMetric{
|
|
ListenSockets: copyMap(svr.listenSockets),
|
|
})
|
|
count++
|
|
}
|
|
if i == len(ids[idx:])-1 {
|
|
end = true
|
|
break
|
|
}
|
|
}
|
|
c.mu.RUnlock()
|
|
if count == 0 {
|
|
end = true
|
|
}
|
|
|
|
for i, svr := range ss {
|
|
s[i].ServerData = svr.s.ChannelzMetric()
|
|
s[i].ID = svr.id
|
|
s[i].RefName = svr.refName
|
|
}
|
|
return s, end
|
|
}
|
|
|
|
func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
|
|
var svr *server
|
|
var ok bool
|
|
c.mu.RLock()
|
|
if svr, ok = c.servers[id]; !ok {
|
|
// server with id doesn't exist.
|
|
c.mu.RUnlock()
|
|
return nil, true
|
|
}
|
|
svrskts := svr.sockets
|
|
l := len(svrskts)
|
|
ids := make([]int64, 0, l)
|
|
sks := make([]*normalSocket, 0, min(l, EntryPerPage))
|
|
for k := range svrskts {
|
|
ids = append(ids, k)
|
|
}
|
|
sort.Sort((int64Slice(ids)))
|
|
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
|
|
count := 0
|
|
var end bool
|
|
for i, v := range ids[idx:] {
|
|
if count == EntryPerPage {
|
|
break
|
|
}
|
|
if ns, ok := c.normalSockets[v]; ok {
|
|
sks = append(sks, ns)
|
|
count++
|
|
}
|
|
if i == len(ids[idx:])-1 {
|
|
end = true
|
|
break
|
|
}
|
|
}
|
|
c.mu.RUnlock()
|
|
if count == 0 {
|
|
end = true
|
|
}
|
|
var s []*SocketMetric
|
|
for _, ns := range sks {
|
|
sm := &SocketMetric{}
|
|
sm.SocketData = ns.s.ChannelzMetric()
|
|
sm.ID = ns.id
|
|
sm.RefName = ns.refName
|
|
s = append(s, sm)
|
|
}
|
|
return s, end
|
|
}
|
|
|
|
func (c *channelMap) GetChannel(id int64) *ChannelMetric {
|
|
cm := &ChannelMetric{}
|
|
var cn *channel
|
|
var ok bool
|
|
c.mu.RLock()
|
|
if cn, ok = c.channels[id]; !ok {
|
|
// channel with id doesn't exist.
|
|
c.mu.RUnlock()
|
|
return nil
|
|
}
|
|
cm.NestedChans = copyMap(cn.nestedChans)
|
|
cm.SubChans = copyMap(cn.subChans)
|
|
c.mu.RUnlock()
|
|
cm.ChannelData = cn.c.ChannelzMetric()
|
|
cm.ID = cn.id
|
|
cm.RefName = cn.refName
|
|
return cm
|
|
}
|
|
|
|
func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
|
|
cm := &SubChannelMetric{}
|
|
var sc *subChannel
|
|
var ok bool
|
|
c.mu.RLock()
|
|
if sc, ok = c.subChannels[id]; !ok {
|
|
// subchannel with id doesn't exist.
|
|
c.mu.RUnlock()
|
|
return nil
|
|
}
|
|
cm.Sockets = copyMap(sc.sockets)
|
|
c.mu.RUnlock()
|
|
cm.ChannelData = sc.c.ChannelzMetric()
|
|
cm.ID = sc.id
|
|
cm.RefName = sc.refName
|
|
return cm
|
|
}
|
|
|
|
func (c *channelMap) GetSocket(id int64) *SocketMetric {
|
|
sm := &SocketMetric{}
|
|
c.mu.RLock()
|
|
if ls, ok := c.listenSockets[id]; ok {
|
|
c.mu.RUnlock()
|
|
sm.SocketData = ls.s.ChannelzMetric()
|
|
sm.ID = ls.id
|
|
sm.RefName = ls.refName
|
|
return sm
|
|
}
|
|
if ns, ok := c.normalSockets[id]; ok {
|
|
c.mu.RUnlock()
|
|
sm.SocketData = ns.s.ChannelzMetric()
|
|
sm.ID = ns.id
|
|
sm.RefName = ns.refName
|
|
return sm
|
|
}
|
|
c.mu.RUnlock()
|
|
return nil
|
|
}
|
|
|
|
type idGenerator struct {
|
|
id int64
|
|
}
|
|
|
|
func (i *idGenerator) reset() {
|
|
atomic.StoreInt64(&i.id, 0)
|
|
}
|
|
|
|
func (i *idGenerator) genID() int64 {
|
|
return atomic.AddInt64(&i.id, 1)
|
|
}
|