• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package channelz defines APIs for enabling channelz service, entry
20// registration/deletion, and accessing channelz data. It also defines channelz
21// metric struct formats.
22//
23// All APIs in this package are experimental.
24package channelz
25
26import (
27	"sort"
28	"sync"
29	"sync/atomic"
30
31	"google.golang.org/grpc/grpclog"
32)
33
34var (
35	db    dbWrapper
36	idGen idGenerator
37	// EntryPerPage defines the number of channelz entries to be shown on a web page.
38	EntryPerPage = 50
39	curState     int32
40)
41
42// TurnOn turns on channelz data collection.
43func TurnOn() {
44	if !IsOn() {
45		NewChannelzStorage()
46		atomic.StoreInt32(&curState, 1)
47	}
48}
49
50// IsOn returns whether channelz data collection is on.
51func IsOn() bool {
52	return atomic.CompareAndSwapInt32(&curState, 1, 1)
53}
54
55// dbWarpper wraps around a reference to internal channelz data storage, and
56// provide synchronized functionality to set and get the reference.
57type dbWrapper struct {
58	mu sync.RWMutex
59	DB *channelMap
60}
61
62func (d *dbWrapper) set(db *channelMap) {
63	d.mu.Lock()
64	d.DB = db
65	d.mu.Unlock()
66}
67
68func (d *dbWrapper) get() *channelMap {
69	d.mu.RLock()
70	defer d.mu.RUnlock()
71	return d.DB
72}
73
74// NewChannelzStorage initializes channelz data storage and id generator.
75//
76// Note: This function is exported for testing purpose only. User should not call
77// it in most cases.
78func NewChannelzStorage() {
79	db.set(&channelMap{
80		topLevelChannels: make(map[int64]struct{}),
81		channels:         make(map[int64]*channel),
82		listenSockets:    make(map[int64]*listenSocket),
83		normalSockets:    make(map[int64]*normalSocket),
84		servers:          make(map[int64]*server),
85		subChannels:      make(map[int64]*subChannel),
86	})
87	idGen.reset()
88}
89
90// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
91// boolean indicating whether there's more top channels to be queried for.
92//
93// The arg id specifies that only top channel with id at or above it will be included
94// in the result. The returned slice is up to a length of EntryPerPage, and is
95// sorted in ascending id order.
96func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
97	return db.get().GetTopChannels(id)
98}
99
100// GetServers returns a slice of server's ServerMetric, along with a
101// boolean indicating whether there's more servers to be queried for.
102//
103// The arg id specifies that only server with id at or above it will be included
104// in the result. The returned slice is up to a length of EntryPerPage, and is
105// sorted in ascending id order.
106func GetServers(id int64) ([]*ServerMetric, bool) {
107	return db.get().GetServers(id)
108}
109
110// GetServerSockets returns a slice of server's (identified by id) normal socket's
111// SocketMetric, along with a boolean indicating whether there's more sockets to
112// be queried for.
113//
114// The arg startID specifies that only sockets with id at or above it will be
115// included in the result. The returned slice is up to a length of EntryPerPage,
116// and is sorted in ascending id order.
117func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
118	return db.get().GetServerSockets(id, startID)
119}
120
121// GetChannel returns the ChannelMetric for the channel (identified by id).
122func GetChannel(id int64) *ChannelMetric {
123	return db.get().GetChannel(id)
124}
125
126// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
127func GetSubChannel(id int64) *SubChannelMetric {
128	return db.get().GetSubChannel(id)
129}
130
131// GetSocket returns the SocketInternalMetric for the socket (identified by id).
132func GetSocket(id int64) *SocketMetric {
133	return db.get().GetSocket(id)
134}
135
136// RegisterChannel registers the given channel c in channelz database with ref
137// as its reference name, and add it to the child list of its parent (identified
138// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
139// assigned to this channel.
140func RegisterChannel(c Channel, pid int64, ref string) int64 {
141	id := idGen.genID()
142	cn := &channel{
143		refName:     ref,
144		c:           c,
145		subChans:    make(map[int64]string),
146		nestedChans: make(map[int64]string),
147		id:          id,
148		pid:         pid,
149	}
150	if pid == 0 {
151		db.get().addChannel(id, cn, true, pid, ref)
152	} else {
153		db.get().addChannel(id, cn, false, pid, ref)
154	}
155	return id
156}
157
158// RegisterSubChannel registers the given channel c in channelz database with ref
159// as its reference name, and add it to the child list of its parent (identified
160// by pid). It returns the unique channelz tracking id assigned to this subchannel.
161func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
162	if pid == 0 {
163		grpclog.Error("a SubChannel's parent id cannot be 0")
164		return 0
165	}
166	id := idGen.genID()
167	sc := &subChannel{
168		refName: ref,
169		c:       c,
170		sockets: make(map[int64]string),
171		id:      id,
172		pid:     pid,
173	}
174	db.get().addSubChannel(id, sc, pid, ref)
175	return id
176}
177
178// RegisterServer registers the given server s in channelz database. It returns
179// the unique channelz tracking id assigned to this server.
180func RegisterServer(s Server, ref string) int64 {
181	id := idGen.genID()
182	svr := &server{
183		refName:       ref,
184		s:             s,
185		sockets:       make(map[int64]string),
186		listenSockets: make(map[int64]string),
187		id:            id,
188	}
189	db.get().addServer(id, svr)
190	return id
191}
192
193// RegisterListenSocket registers the given listen socket s in channelz database
194// with ref as its reference name, and add it to the child list of its parent
195// (identified by pid). It returns the unique channelz tracking id assigned to
196// this listen socket.
197func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
198	if pid == 0 {
199		grpclog.Error("a ListenSocket's parent id cannot be 0")
200		return 0
201	}
202	id := idGen.genID()
203	ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
204	db.get().addListenSocket(id, ls, pid, ref)
205	return id
206}
207
208// RegisterNormalSocket registers the given normal socket s in channelz database
209// with ref as its reference name, and add it to the child list of its parent
210// (identified by pid). It returns the unique channelz tracking id assigned to
211// this normal socket.
212func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
213	if pid == 0 {
214		grpclog.Error("a NormalSocket's parent id cannot be 0")
215		return 0
216	}
217	id := idGen.genID()
218	ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
219	db.get().addNormalSocket(id, ns, pid, ref)
220	return id
221}
222
223// RemoveEntry removes an entry with unique channelz trakcing id to be id from
224// channelz database.
225func RemoveEntry(id int64) {
226	db.get().removeEntry(id)
227}
228
229// channelMap is the storage data structure for channelz.
230// Methods of channelMap can be divided in two two categories with respect to locking.
231// 1. Methods acquire the global lock.
232// 2. Methods that can only be called when global lock is held.
233// A second type of method need always to be called inside a first type of method.
234type channelMap struct {
235	mu               sync.RWMutex
236	topLevelChannels map[int64]struct{}
237	servers          map[int64]*server
238	channels         map[int64]*channel
239	subChannels      map[int64]*subChannel
240	listenSockets    map[int64]*listenSocket
241	normalSockets    map[int64]*normalSocket
242}
243
244func (c *channelMap) addServer(id int64, s *server) {
245	c.mu.Lock()
246	s.cm = c
247	c.servers[id] = s
248	c.mu.Unlock()
249}
250
251func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
252	c.mu.Lock()
253	cn.cm = c
254	c.channels[id] = cn
255	if isTopChannel {
256		c.topLevelChannels[id] = struct{}{}
257	} else {
258		c.findEntry(pid).addChild(id, cn)
259	}
260	c.mu.Unlock()
261}
262
263func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
264	c.mu.Lock()
265	sc.cm = c
266	c.subChannels[id] = sc
267	c.findEntry(pid).addChild(id, sc)
268	c.mu.Unlock()
269}
270
271func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
272	c.mu.Lock()
273	ls.cm = c
274	c.listenSockets[id] = ls
275	c.findEntry(pid).addChild(id, ls)
276	c.mu.Unlock()
277}
278
279func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
280	c.mu.Lock()
281	ns.cm = c
282	c.normalSockets[id] = ns
283	c.findEntry(pid).addChild(id, ns)
284	c.mu.Unlock()
285}
286
287// removeEntry triggers the removal of an entry, which may not indeed delete the
288// entry, if it has to wait on the deletion of its children, or may lead to a chain
289// of entry deletion. For example, deleting the last socket of a gracefully shutting
290// down server will lead to the server being also deleted.
291func (c *channelMap) removeEntry(id int64) {
292	c.mu.Lock()
293	c.findEntry(id).triggerDelete()
294	c.mu.Unlock()
295}
296
297// c.mu must be held by the caller.
298func (c *channelMap) findEntry(id int64) entry {
299	var v entry
300	var ok bool
301	if v, ok = c.channels[id]; ok {
302		return v
303	}
304	if v, ok = c.subChannels[id]; ok {
305		return v
306	}
307	if v, ok = c.servers[id]; ok {
308		return v
309	}
310	if v, ok = c.listenSockets[id]; ok {
311		return v
312	}
313	if v, ok = c.normalSockets[id]; ok {
314		return v
315	}
316	return &dummyEntry{idNotFound: id}
317}
318
319// c.mu must be held by the caller
320// deleteEntry simply deletes an entry from the channelMap. Before calling this
321// method, caller must check this entry is ready to be deleted, i.e removeEntry()
322// has been called on it, and no children still exist.
323// Conditionals are ordered by the expected frequency of deletion of each entity
324// type, in order to optimize performance.
325func (c *channelMap) deleteEntry(id int64) {
326	var ok bool
327	if _, ok = c.normalSockets[id]; ok {
328		delete(c.normalSockets, id)
329		return
330	}
331	if _, ok = c.subChannels[id]; ok {
332		delete(c.subChannels, id)
333		return
334	}
335	if _, ok = c.channels[id]; ok {
336		delete(c.channels, id)
337		delete(c.topLevelChannels, id)
338		return
339	}
340	if _, ok = c.listenSockets[id]; ok {
341		delete(c.listenSockets, id)
342		return
343	}
344	if _, ok = c.servers[id]; ok {
345		delete(c.servers, id)
346		return
347	}
348}
349
350type int64Slice []int64
351
352func (s int64Slice) Len() int           { return len(s) }
353func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
354func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
355
356func copyMap(m map[int64]string) map[int64]string {
357	n := make(map[int64]string)
358	for k, v := range m {
359		n[k] = v
360	}
361	return n
362}
363
364func min(a, b int) int {
365	if a < b {
366		return a
367	}
368	return b
369}
370
371func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
372	c.mu.RLock()
373	l := len(c.topLevelChannels)
374	ids := make([]int64, 0, l)
375	cns := make([]*channel, 0, min(l, EntryPerPage))
376
377	for k := range c.topLevelChannels {
378		ids = append(ids, k)
379	}
380	sort.Sort(int64Slice(ids))
381	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
382	count := 0
383	var end bool
384	var t []*ChannelMetric
385	for i, v := range ids[idx:] {
386		if count == EntryPerPage {
387			break
388		}
389		if cn, ok := c.channels[v]; ok {
390			cns = append(cns, cn)
391			t = append(t, &ChannelMetric{
392				NestedChans: copyMap(cn.nestedChans),
393				SubChans:    copyMap(cn.subChans),
394			})
395			count++
396		}
397		if i == len(ids[idx:])-1 {
398			end = true
399			break
400		}
401	}
402	c.mu.RUnlock()
403	if count == 0 {
404		end = true
405	}
406
407	for i, cn := range cns {
408		t[i].ChannelData = cn.c.ChannelzMetric()
409		t[i].ID = cn.id
410		t[i].RefName = cn.refName
411	}
412	return t, end
413}
414
415func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
416	c.mu.RLock()
417	l := len(c.servers)
418	ids := make([]int64, 0, l)
419	ss := make([]*server, 0, min(l, EntryPerPage))
420	for k := range c.servers {
421		ids = append(ids, k)
422	}
423	sort.Sort(int64Slice(ids))
424	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
425	count := 0
426	var end bool
427	var s []*ServerMetric
428	for i, v := range ids[idx:] {
429		if count == EntryPerPage {
430			break
431		}
432		if svr, ok := c.servers[v]; ok {
433			ss = append(ss, svr)
434			s = append(s, &ServerMetric{
435				ListenSockets: copyMap(svr.listenSockets),
436			})
437			count++
438		}
439		if i == len(ids[idx:])-1 {
440			end = true
441			break
442		}
443	}
444	c.mu.RUnlock()
445	if count == 0 {
446		end = true
447	}
448
449	for i, svr := range ss {
450		s[i].ServerData = svr.s.ChannelzMetric()
451		s[i].ID = svr.id
452		s[i].RefName = svr.refName
453	}
454	return s, end
455}
456
457func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
458	var svr *server
459	var ok bool
460	c.mu.RLock()
461	if svr, ok = c.servers[id]; !ok {
462		// server with id doesn't exist.
463		c.mu.RUnlock()
464		return nil, true
465	}
466	svrskts := svr.sockets
467	l := len(svrskts)
468	ids := make([]int64, 0, l)
469	sks := make([]*normalSocket, 0, min(l, EntryPerPage))
470	for k := range svrskts {
471		ids = append(ids, k)
472	}
473	sort.Sort((int64Slice(ids)))
474	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
475	count := 0
476	var end bool
477	for i, v := range ids[idx:] {
478		if count == EntryPerPage {
479			break
480		}
481		if ns, ok := c.normalSockets[v]; ok {
482			sks = append(sks, ns)
483			count++
484		}
485		if i == len(ids[idx:])-1 {
486			end = true
487			break
488		}
489	}
490	c.mu.RUnlock()
491	if count == 0 {
492		end = true
493	}
494	var s []*SocketMetric
495	for _, ns := range sks {
496		sm := &SocketMetric{}
497		sm.SocketData = ns.s.ChannelzMetric()
498		sm.ID = ns.id
499		sm.RefName = ns.refName
500		s = append(s, sm)
501	}
502	return s, end
503}
504
505func (c *channelMap) GetChannel(id int64) *ChannelMetric {
506	cm := &ChannelMetric{}
507	var cn *channel
508	var ok bool
509	c.mu.RLock()
510	if cn, ok = c.channels[id]; !ok {
511		// channel with id doesn't exist.
512		c.mu.RUnlock()
513		return nil
514	}
515	cm.NestedChans = copyMap(cn.nestedChans)
516	cm.SubChans = copyMap(cn.subChans)
517	c.mu.RUnlock()
518	cm.ChannelData = cn.c.ChannelzMetric()
519	cm.ID = cn.id
520	cm.RefName = cn.refName
521	return cm
522}
523
524func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
525	cm := &SubChannelMetric{}
526	var sc *subChannel
527	var ok bool
528	c.mu.RLock()
529	if sc, ok = c.subChannels[id]; !ok {
530		// subchannel with id doesn't exist.
531		c.mu.RUnlock()
532		return nil
533	}
534	cm.Sockets = copyMap(sc.sockets)
535	c.mu.RUnlock()
536	cm.ChannelData = sc.c.ChannelzMetric()
537	cm.ID = sc.id
538	cm.RefName = sc.refName
539	return cm
540}
541
542func (c *channelMap) GetSocket(id int64) *SocketMetric {
543	sm := &SocketMetric{}
544	c.mu.RLock()
545	if ls, ok := c.listenSockets[id]; ok {
546		c.mu.RUnlock()
547		sm.SocketData = ls.s.ChannelzMetric()
548		sm.ID = ls.id
549		sm.RefName = ls.refName
550		return sm
551	}
552	if ns, ok := c.normalSockets[id]; ok {
553		c.mu.RUnlock()
554		sm.SocketData = ns.s.ChannelzMetric()
555		sm.ID = ns.id
556		sm.RefName = ns.refName
557		return sm
558	}
559	c.mu.RUnlock()
560	return nil
561}
562
563type idGenerator struct {
564	id int64
565}
566
567func (i *idGenerator) reset() {
568	atomic.StoreInt64(&i.id, 0)
569}
570
571func (i *idGenerator) genID() int64 {
572	return atomic.AddInt64(&i.id, 1)
573}
574