1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package channelz defines APIs for enabling channelz service, entry 20// registration/deletion, and accessing channelz data. It also defines channelz 21// metric struct formats. 22// 23// All APIs in this package are experimental. 24package channelz 25 26import ( 27 "sort" 28 "sync" 29 "sync/atomic" 30 31 "google.golang.org/grpc/grpclog" 32) 33 34var ( 35 db dbWrapper 36 idGen idGenerator 37 // EntryPerPage defines the number of channelz entries to be shown on a web page. 38 EntryPerPage = 50 39 curState int32 40) 41 42// TurnOn turns on channelz data collection. 43func TurnOn() { 44 if !IsOn() { 45 NewChannelzStorage() 46 atomic.StoreInt32(&curState, 1) 47 } 48} 49 50// IsOn returns whether channelz data collection is on. 51func IsOn() bool { 52 return atomic.CompareAndSwapInt32(&curState, 1, 1) 53} 54 55// dbWarpper wraps around a reference to internal channelz data storage, and 56// provide synchronized functionality to set and get the reference. 57type dbWrapper struct { 58 mu sync.RWMutex 59 DB *channelMap 60} 61 62func (d *dbWrapper) set(db *channelMap) { 63 d.mu.Lock() 64 d.DB = db 65 d.mu.Unlock() 66} 67 68func (d *dbWrapper) get() *channelMap { 69 d.mu.RLock() 70 defer d.mu.RUnlock() 71 return d.DB 72} 73 74// NewChannelzStorage initializes channelz data storage and id generator. 75// 76// Note: This function is exported for testing purpose only. User should not call 77// it in most cases. 78func NewChannelzStorage() { 79 db.set(&channelMap{ 80 topLevelChannels: make(map[int64]struct{}), 81 channels: make(map[int64]*channel), 82 listenSockets: make(map[int64]*listenSocket), 83 normalSockets: make(map[int64]*normalSocket), 84 servers: make(map[int64]*server), 85 subChannels: make(map[int64]*subChannel), 86 }) 87 idGen.reset() 88} 89 90// GetTopChannels returns a slice of top channel's ChannelMetric, along with a 91// boolean indicating whether there's more top channels to be queried for. 92// 93// The arg id specifies that only top channel with id at or above it will be included 94// in the result. The returned slice is up to a length of EntryPerPage, and is 95// sorted in ascending id order. 96func GetTopChannels(id int64) ([]*ChannelMetric, bool) { 97 return db.get().GetTopChannels(id) 98} 99 100// GetServers returns a slice of server's ServerMetric, along with a 101// boolean indicating whether there's more servers to be queried for. 102// 103// The arg id specifies that only server with id at or above it will be included 104// in the result. The returned slice is up to a length of EntryPerPage, and is 105// sorted in ascending id order. 106func GetServers(id int64) ([]*ServerMetric, bool) { 107 return db.get().GetServers(id) 108} 109 110// GetServerSockets returns a slice of server's (identified by id) normal socket's 111// SocketMetric, along with a boolean indicating whether there's more sockets to 112// be queried for. 113// 114// The arg startID specifies that only sockets with id at or above it will be 115// included in the result. The returned slice is up to a length of EntryPerPage, 116// and is sorted in ascending id order. 117func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { 118 return db.get().GetServerSockets(id, startID) 119} 120 121// GetChannel returns the ChannelMetric for the channel (identified by id). 122func GetChannel(id int64) *ChannelMetric { 123 return db.get().GetChannel(id) 124} 125 126// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). 127func GetSubChannel(id int64) *SubChannelMetric { 128 return db.get().GetSubChannel(id) 129} 130 131// GetSocket returns the SocketInternalMetric for the socket (identified by id). 132func GetSocket(id int64) *SocketMetric { 133 return db.get().GetSocket(id) 134} 135 136// RegisterChannel registers the given channel c in channelz database with ref 137// as its reference name, and add it to the child list of its parent (identified 138// by pid). pid = 0 means no parent. It returns the unique channelz tracking id 139// assigned to this channel. 140func RegisterChannel(c Channel, pid int64, ref string) int64 { 141 id := idGen.genID() 142 cn := &channel{ 143 refName: ref, 144 c: c, 145 subChans: make(map[int64]string), 146 nestedChans: make(map[int64]string), 147 id: id, 148 pid: pid, 149 } 150 if pid == 0 { 151 db.get().addChannel(id, cn, true, pid, ref) 152 } else { 153 db.get().addChannel(id, cn, false, pid, ref) 154 } 155 return id 156} 157 158// RegisterSubChannel registers the given channel c in channelz database with ref 159// as its reference name, and add it to the child list of its parent (identified 160// by pid). It returns the unique channelz tracking id assigned to this subchannel. 161func RegisterSubChannel(c Channel, pid int64, ref string) int64 { 162 if pid == 0 { 163 grpclog.Error("a SubChannel's parent id cannot be 0") 164 return 0 165 } 166 id := idGen.genID() 167 sc := &subChannel{ 168 refName: ref, 169 c: c, 170 sockets: make(map[int64]string), 171 id: id, 172 pid: pid, 173 } 174 db.get().addSubChannel(id, sc, pid, ref) 175 return id 176} 177 178// RegisterServer registers the given server s in channelz database. It returns 179// the unique channelz tracking id assigned to this server. 180func RegisterServer(s Server, ref string) int64 { 181 id := idGen.genID() 182 svr := &server{ 183 refName: ref, 184 s: s, 185 sockets: make(map[int64]string), 186 listenSockets: make(map[int64]string), 187 id: id, 188 } 189 db.get().addServer(id, svr) 190 return id 191} 192 193// RegisterListenSocket registers the given listen socket s in channelz database 194// with ref as its reference name, and add it to the child list of its parent 195// (identified by pid). It returns the unique channelz tracking id assigned to 196// this listen socket. 197func RegisterListenSocket(s Socket, pid int64, ref string) int64 { 198 if pid == 0 { 199 grpclog.Error("a ListenSocket's parent id cannot be 0") 200 return 0 201 } 202 id := idGen.genID() 203 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid} 204 db.get().addListenSocket(id, ls, pid, ref) 205 return id 206} 207 208// RegisterNormalSocket registers the given normal socket s in channelz database 209// with ref as its reference name, and add it to the child list of its parent 210// (identified by pid). It returns the unique channelz tracking id assigned to 211// this normal socket. 212func RegisterNormalSocket(s Socket, pid int64, ref string) int64 { 213 if pid == 0 { 214 grpclog.Error("a NormalSocket's parent id cannot be 0") 215 return 0 216 } 217 id := idGen.genID() 218 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid} 219 db.get().addNormalSocket(id, ns, pid, ref) 220 return id 221} 222 223// RemoveEntry removes an entry with unique channelz trakcing id to be id from 224// channelz database. 225func RemoveEntry(id int64) { 226 db.get().removeEntry(id) 227} 228 229// channelMap is the storage data structure for channelz. 230// Methods of channelMap can be divided in two two categories with respect to locking. 231// 1. Methods acquire the global lock. 232// 2. Methods that can only be called when global lock is held. 233// A second type of method need always to be called inside a first type of method. 234type channelMap struct { 235 mu sync.RWMutex 236 topLevelChannels map[int64]struct{} 237 servers map[int64]*server 238 channels map[int64]*channel 239 subChannels map[int64]*subChannel 240 listenSockets map[int64]*listenSocket 241 normalSockets map[int64]*normalSocket 242} 243 244func (c *channelMap) addServer(id int64, s *server) { 245 c.mu.Lock() 246 s.cm = c 247 c.servers[id] = s 248 c.mu.Unlock() 249} 250 251func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) { 252 c.mu.Lock() 253 cn.cm = c 254 c.channels[id] = cn 255 if isTopChannel { 256 c.topLevelChannels[id] = struct{}{} 257 } else { 258 c.findEntry(pid).addChild(id, cn) 259 } 260 c.mu.Unlock() 261} 262 263func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) { 264 c.mu.Lock() 265 sc.cm = c 266 c.subChannels[id] = sc 267 c.findEntry(pid).addChild(id, sc) 268 c.mu.Unlock() 269} 270 271func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) { 272 c.mu.Lock() 273 ls.cm = c 274 c.listenSockets[id] = ls 275 c.findEntry(pid).addChild(id, ls) 276 c.mu.Unlock() 277} 278 279func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) { 280 c.mu.Lock() 281 ns.cm = c 282 c.normalSockets[id] = ns 283 c.findEntry(pid).addChild(id, ns) 284 c.mu.Unlock() 285} 286 287// removeEntry triggers the removal of an entry, which may not indeed delete the 288// entry, if it has to wait on the deletion of its children, or may lead to a chain 289// of entry deletion. For example, deleting the last socket of a gracefully shutting 290// down server will lead to the server being also deleted. 291func (c *channelMap) removeEntry(id int64) { 292 c.mu.Lock() 293 c.findEntry(id).triggerDelete() 294 c.mu.Unlock() 295} 296 297// c.mu must be held by the caller. 298func (c *channelMap) findEntry(id int64) entry { 299 var v entry 300 var ok bool 301 if v, ok = c.channels[id]; ok { 302 return v 303 } 304 if v, ok = c.subChannels[id]; ok { 305 return v 306 } 307 if v, ok = c.servers[id]; ok { 308 return v 309 } 310 if v, ok = c.listenSockets[id]; ok { 311 return v 312 } 313 if v, ok = c.normalSockets[id]; ok { 314 return v 315 } 316 return &dummyEntry{idNotFound: id} 317} 318 319// c.mu must be held by the caller 320// deleteEntry simply deletes an entry from the channelMap. Before calling this 321// method, caller must check this entry is ready to be deleted, i.e removeEntry() 322// has been called on it, and no children still exist. 323// Conditionals are ordered by the expected frequency of deletion of each entity 324// type, in order to optimize performance. 325func (c *channelMap) deleteEntry(id int64) { 326 var ok bool 327 if _, ok = c.normalSockets[id]; ok { 328 delete(c.normalSockets, id) 329 return 330 } 331 if _, ok = c.subChannels[id]; ok { 332 delete(c.subChannels, id) 333 return 334 } 335 if _, ok = c.channels[id]; ok { 336 delete(c.channels, id) 337 delete(c.topLevelChannels, id) 338 return 339 } 340 if _, ok = c.listenSockets[id]; ok { 341 delete(c.listenSockets, id) 342 return 343 } 344 if _, ok = c.servers[id]; ok { 345 delete(c.servers, id) 346 return 347 } 348} 349 350type int64Slice []int64 351 352func (s int64Slice) Len() int { return len(s) } 353func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 354func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } 355 356func copyMap(m map[int64]string) map[int64]string { 357 n := make(map[int64]string) 358 for k, v := range m { 359 n[k] = v 360 } 361 return n 362} 363 364func min(a, b int) int { 365 if a < b { 366 return a 367 } 368 return b 369} 370 371func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) { 372 c.mu.RLock() 373 l := len(c.topLevelChannels) 374 ids := make([]int64, 0, l) 375 cns := make([]*channel, 0, min(l, EntryPerPage)) 376 377 for k := range c.topLevelChannels { 378 ids = append(ids, k) 379 } 380 sort.Sort(int64Slice(ids)) 381 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 382 count := 0 383 var end bool 384 var t []*ChannelMetric 385 for i, v := range ids[idx:] { 386 if count == EntryPerPage { 387 break 388 } 389 if cn, ok := c.channels[v]; ok { 390 cns = append(cns, cn) 391 t = append(t, &ChannelMetric{ 392 NestedChans: copyMap(cn.nestedChans), 393 SubChans: copyMap(cn.subChans), 394 }) 395 count++ 396 } 397 if i == len(ids[idx:])-1 { 398 end = true 399 break 400 } 401 } 402 c.mu.RUnlock() 403 if count == 0 { 404 end = true 405 } 406 407 for i, cn := range cns { 408 t[i].ChannelData = cn.c.ChannelzMetric() 409 t[i].ID = cn.id 410 t[i].RefName = cn.refName 411 } 412 return t, end 413} 414 415func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) { 416 c.mu.RLock() 417 l := len(c.servers) 418 ids := make([]int64, 0, l) 419 ss := make([]*server, 0, min(l, EntryPerPage)) 420 for k := range c.servers { 421 ids = append(ids, k) 422 } 423 sort.Sort(int64Slice(ids)) 424 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 425 count := 0 426 var end bool 427 var s []*ServerMetric 428 for i, v := range ids[idx:] { 429 if count == EntryPerPage { 430 break 431 } 432 if svr, ok := c.servers[v]; ok { 433 ss = append(ss, svr) 434 s = append(s, &ServerMetric{ 435 ListenSockets: copyMap(svr.listenSockets), 436 }) 437 count++ 438 } 439 if i == len(ids[idx:])-1 { 440 end = true 441 break 442 } 443 } 444 c.mu.RUnlock() 445 if count == 0 { 446 end = true 447 } 448 449 for i, svr := range ss { 450 s[i].ServerData = svr.s.ChannelzMetric() 451 s[i].ID = svr.id 452 s[i].RefName = svr.refName 453 } 454 return s, end 455} 456 457func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { 458 var svr *server 459 var ok bool 460 c.mu.RLock() 461 if svr, ok = c.servers[id]; !ok { 462 // server with id doesn't exist. 463 c.mu.RUnlock() 464 return nil, true 465 } 466 svrskts := svr.sockets 467 l := len(svrskts) 468 ids := make([]int64, 0, l) 469 sks := make([]*normalSocket, 0, min(l, EntryPerPage)) 470 for k := range svrskts { 471 ids = append(ids, k) 472 } 473 sort.Sort((int64Slice(ids))) 474 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 475 count := 0 476 var end bool 477 for i, v := range ids[idx:] { 478 if count == EntryPerPage { 479 break 480 } 481 if ns, ok := c.normalSockets[v]; ok { 482 sks = append(sks, ns) 483 count++ 484 } 485 if i == len(ids[idx:])-1 { 486 end = true 487 break 488 } 489 } 490 c.mu.RUnlock() 491 if count == 0 { 492 end = true 493 } 494 var s []*SocketMetric 495 for _, ns := range sks { 496 sm := &SocketMetric{} 497 sm.SocketData = ns.s.ChannelzMetric() 498 sm.ID = ns.id 499 sm.RefName = ns.refName 500 s = append(s, sm) 501 } 502 return s, end 503} 504 505func (c *channelMap) GetChannel(id int64) *ChannelMetric { 506 cm := &ChannelMetric{} 507 var cn *channel 508 var ok bool 509 c.mu.RLock() 510 if cn, ok = c.channels[id]; !ok { 511 // channel with id doesn't exist. 512 c.mu.RUnlock() 513 return nil 514 } 515 cm.NestedChans = copyMap(cn.nestedChans) 516 cm.SubChans = copyMap(cn.subChans) 517 c.mu.RUnlock() 518 cm.ChannelData = cn.c.ChannelzMetric() 519 cm.ID = cn.id 520 cm.RefName = cn.refName 521 return cm 522} 523 524func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { 525 cm := &SubChannelMetric{} 526 var sc *subChannel 527 var ok bool 528 c.mu.RLock() 529 if sc, ok = c.subChannels[id]; !ok { 530 // subchannel with id doesn't exist. 531 c.mu.RUnlock() 532 return nil 533 } 534 cm.Sockets = copyMap(sc.sockets) 535 c.mu.RUnlock() 536 cm.ChannelData = sc.c.ChannelzMetric() 537 cm.ID = sc.id 538 cm.RefName = sc.refName 539 return cm 540} 541 542func (c *channelMap) GetSocket(id int64) *SocketMetric { 543 sm := &SocketMetric{} 544 c.mu.RLock() 545 if ls, ok := c.listenSockets[id]; ok { 546 c.mu.RUnlock() 547 sm.SocketData = ls.s.ChannelzMetric() 548 sm.ID = ls.id 549 sm.RefName = ls.refName 550 return sm 551 } 552 if ns, ok := c.normalSockets[id]; ok { 553 c.mu.RUnlock() 554 sm.SocketData = ns.s.ChannelzMetric() 555 sm.ID = ns.id 556 sm.RefName = ns.refName 557 return sm 558 } 559 c.mu.RUnlock() 560 return nil 561} 562 563type idGenerator struct { 564 id int64 565} 566 567func (i *idGenerator) reset() { 568 atomic.StoreInt64(&i.id, 0) 569} 570 571func (i *idGenerator) genID() int64 { 572 return atomic.AddInt64(&i.id, 1) 573} 574