1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/channelz/channelz.h"
20
21 #include <grpc/support/json.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <cstdint>
28
29 #include "absl/log/check.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/escaping.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/strip.h"
34 #include "src/core/channelz/channelz_registry.h"
35 #include "src/core/lib/address_utils/parse_address.h"
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/iomgr/resolved_address.h"
39 #include "src/core/lib/transport/connectivity_state.h"
40 #include "src/core/util/json/json_writer.h"
41 #include "src/core/util/string.h"
42 #include "src/core/util/uri.h"
43 #include "src/core/util/useful.h"
44
45 namespace grpc_core {
46 namespace channelz {
47
48 //
49 // BaseNode
50 //
51
BaseNode(EntityType type,std::string name)52 BaseNode::BaseNode(EntityType type, std::string name)
53 : type_(type), uuid_(-1), name_(std::move(name)) {
54 // The registry will set uuid_ under its lock.
55 ChannelzRegistry::Register(this);
56 }
57
~BaseNode()58 BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
59
RenderJsonString()60 std::string BaseNode::RenderJsonString() {
61 Json json = RenderJson();
62 return JsonDump(json);
63 }
64
65 //
66 // CallCountingHelper
67 //
68
RecordCallStarted()69 void CallCountingHelper::RecordCallStarted() {
70 calls_started_.fetch_add(1, std::memory_order_relaxed);
71 last_call_started_cycle_.store(gpr_get_cycle_counter(),
72 std::memory_order_relaxed);
73 }
74
RecordCallFailed()75 void CallCountingHelper::RecordCallFailed() {
76 calls_failed_.fetch_add(1, std::memory_order_relaxed);
77 }
78
RecordCallSucceeded()79 void CallCountingHelper::RecordCallSucceeded() {
80 calls_succeeded_.fetch_add(1, std::memory_order_relaxed);
81 }
82
PopulateCallCounts(Json::Object * json)83 void CallCountingHelper::PopulateCallCounts(Json::Object* json) {
84 auto calls_started = calls_started_.load(std::memory_order_relaxed);
85 auto calls_succeeded = calls_succeeded_.load(std::memory_order_relaxed);
86 auto calls_failed = calls_failed_.load(std::memory_order_relaxed);
87 auto last_call_started_cycle =
88 last_call_started_cycle_.load(std::memory_order_relaxed);
89 if (calls_started != 0) {
90 (*json)["callsStarted"] = Json::FromString(absl::StrCat(calls_started));
91 gpr_timespec ts = gpr_convert_clock_type(
92 gpr_cycle_counter_to_time(last_call_started_cycle), GPR_CLOCK_REALTIME);
93 (*json)["lastCallStartedTimestamp"] =
94 Json::FromString(gpr_format_timespec(ts));
95 }
96 if (calls_succeeded != 0) {
97 (*json)["callsSucceeded"] = Json::FromString(absl::StrCat(calls_succeeded));
98 }
99 if (calls_failed != 0) {
100 (*json)["callsFailed"] = Json::FromString(absl::StrCat(calls_failed));
101 }
102 }
103
104 //
105 // PerCpuCallCountingHelper
106 //
107
RecordCallStarted()108 void PerCpuCallCountingHelper::RecordCallStarted() {
109 auto& data = per_cpu_data_.this_cpu();
110 data.calls_started.fetch_add(1, std::memory_order_relaxed);
111 data.last_call_started_cycle.store(gpr_get_cycle_counter(),
112 std::memory_order_relaxed);
113 }
114
RecordCallFailed()115 void PerCpuCallCountingHelper::RecordCallFailed() {
116 per_cpu_data_.this_cpu().calls_failed.fetch_add(1, std::memory_order_relaxed);
117 }
118
RecordCallSucceeded()119 void PerCpuCallCountingHelper::RecordCallSucceeded() {
120 per_cpu_data_.this_cpu().calls_succeeded.fetch_add(1,
121 std::memory_order_relaxed);
122 }
123
PopulateCallCounts(Json::Object * json)124 void PerCpuCallCountingHelper::PopulateCallCounts(Json::Object* json) {
125 int64_t calls_started = 0;
126 int64_t calls_succeeded = 0;
127 int64_t calls_failed = 0;
128 gpr_cycle_counter last_call_started_cycle = 0;
129 for (const auto& cpu : per_cpu_data_) {
130 calls_started += cpu.calls_started.load(std::memory_order_relaxed);
131 calls_succeeded += cpu.calls_succeeded.load(std::memory_order_relaxed);
132 calls_failed += cpu.calls_failed.load(std::memory_order_relaxed);
133 last_call_started_cycle =
134 std::max(last_call_started_cycle,
135 cpu.last_call_started_cycle.load(std::memory_order_relaxed));
136 }
137
138 if (calls_started != 0) {
139 (*json)["callsStarted"] = Json::FromString(absl::StrCat(calls_started));
140 gpr_timespec ts = gpr_convert_clock_type(
141 gpr_cycle_counter_to_time(last_call_started_cycle), GPR_CLOCK_REALTIME);
142 (*json)["lastCallStartedTimestamp"] =
143 Json::FromString(gpr_format_timespec(ts));
144 }
145 if (calls_succeeded != 0) {
146 (*json)["callsSucceeded"] = Json::FromString(absl::StrCat(calls_succeeded));
147 }
148 if (calls_failed != 0) {
149 (*json)["callsFailed"] = Json::FromString(absl::StrCat(calls_failed));
150 }
151 }
152
153 //
154 // ChannelNode
155 //
156
ChannelNode(std::string target,size_t channel_tracer_max_nodes,bool is_internal_channel)157 ChannelNode::ChannelNode(std::string target, size_t channel_tracer_max_nodes,
158 bool is_internal_channel)
159 : BaseNode(is_internal_channel ? EntityType::kInternalChannel
160 : EntityType::kTopLevelChannel,
161 target),
162 target_(std::move(target)),
163 trace_(channel_tracer_max_nodes) {}
164
GetChannelConnectivityStateChangeString(grpc_connectivity_state state)165 const char* ChannelNode::GetChannelConnectivityStateChangeString(
166 grpc_connectivity_state state) {
167 switch (state) {
168 case GRPC_CHANNEL_IDLE:
169 return "Channel state change to IDLE";
170 case GRPC_CHANNEL_CONNECTING:
171 return "Channel state change to CONNECTING";
172 case GRPC_CHANNEL_READY:
173 return "Channel state change to READY";
174 case GRPC_CHANNEL_TRANSIENT_FAILURE:
175 return "Channel state change to TRANSIENT_FAILURE";
176 case GRPC_CHANNEL_SHUTDOWN:
177 return "Channel state change to SHUTDOWN";
178 }
179 GPR_UNREACHABLE_CODE(return "UNKNOWN");
180 }
181
RenderJson()182 Json ChannelNode::RenderJson() {
183 Json::Object data = {
184 {"target", Json::FromString(target_)},
185 };
186 // Connectivity state.
187 // If low-order bit is on, then the field is set.
188 int state_field = connectivity_state_.load(std::memory_order_relaxed);
189 if ((state_field & 1) != 0) {
190 grpc_connectivity_state state =
191 static_cast<grpc_connectivity_state>(state_field >> 1);
192 data["state"] = Json::FromObject({
193 {"state", Json::FromString(ConnectivityStateName(state))},
194 });
195 }
196 // Fill in the channel trace if applicable.
197 Json trace_json = trace_.RenderJson();
198 if (trace_json.type() != Json::Type::kNull) {
199 data["trace"] = std::move(trace_json);
200 }
201 // Ask CallCountingHelper to populate call count data.
202 call_counter_.PopulateCallCounts(&data);
203 // Construct outer object.
204 Json::Object json = {
205 {"ref", Json::FromObject({
206 {"channelId", Json::FromString(absl::StrCat(uuid()))},
207 })},
208 {"data", Json::FromObject(std::move(data))},
209 };
210 // Template method. Child classes may override this to add their specific
211 // functionality.
212 PopulateChildRefs(&json);
213 return Json::FromObject(std::move(json));
214 }
215
PopulateChildRefs(Json::Object * json)216 void ChannelNode::PopulateChildRefs(Json::Object* json) {
217 MutexLock lock(&child_mu_);
218 if (!child_subchannels_.empty()) {
219 Json::Array array;
220 for (intptr_t subchannel_uuid : child_subchannels_) {
221 array.emplace_back(Json::FromObject({
222 {"subchannelId", Json::FromString(absl::StrCat(subchannel_uuid))},
223 }));
224 }
225 (*json)["subchannelRef"] = Json::FromArray(std::move(array));
226 }
227 if (!child_channels_.empty()) {
228 Json::Array array;
229 for (intptr_t channel_uuid : child_channels_) {
230 array.emplace_back(Json::FromObject({
231 {"channelId", Json::FromString(absl::StrCat(channel_uuid))},
232 }));
233 }
234 (*json)["channelRef"] = Json::FromArray(std::move(array));
235 }
236 }
237
SetConnectivityState(grpc_connectivity_state state)238 void ChannelNode::SetConnectivityState(grpc_connectivity_state state) {
239 // Store with low-order bit set to indicate that the field is set.
240 int state_field = (state << 1) + 1;
241 connectivity_state_.store(state_field, std::memory_order_relaxed);
242 }
243
AddChildChannel(intptr_t child_uuid)244 void ChannelNode::AddChildChannel(intptr_t child_uuid) {
245 MutexLock lock(&child_mu_);
246 child_channels_.insert(child_uuid);
247 }
248
RemoveChildChannel(intptr_t child_uuid)249 void ChannelNode::RemoveChildChannel(intptr_t child_uuid) {
250 MutexLock lock(&child_mu_);
251 child_channels_.erase(child_uuid);
252 }
253
AddChildSubchannel(intptr_t child_uuid)254 void ChannelNode::AddChildSubchannel(intptr_t child_uuid) {
255 MutexLock lock(&child_mu_);
256 child_subchannels_.insert(child_uuid);
257 }
258
RemoveChildSubchannel(intptr_t child_uuid)259 void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {
260 MutexLock lock(&child_mu_);
261 child_subchannels_.erase(child_uuid);
262 }
263
264 //
265 // SubchannelNode
266 //
267
SubchannelNode(std::string target_address,size_t channel_tracer_max_nodes)268 SubchannelNode::SubchannelNode(std::string target_address,
269 size_t channel_tracer_max_nodes)
270 : BaseNode(EntityType::kSubchannel, target_address),
271 target_(std::move(target_address)),
272 trace_(channel_tracer_max_nodes) {}
273
~SubchannelNode()274 SubchannelNode::~SubchannelNode() {}
275
UpdateConnectivityState(grpc_connectivity_state state)276 void SubchannelNode::UpdateConnectivityState(grpc_connectivity_state state) {
277 connectivity_state_.store(state, std::memory_order_relaxed);
278 }
279
SetChildSocket(RefCountedPtr<SocketNode> socket)280 void SubchannelNode::SetChildSocket(RefCountedPtr<SocketNode> socket) {
281 MutexLock lock(&socket_mu_);
282 child_socket_ = std::move(socket);
283 }
284
RenderJson()285 Json SubchannelNode::RenderJson() {
286 // Create and fill the data child.
287 grpc_connectivity_state state =
288 connectivity_state_.load(std::memory_order_relaxed);
289 Json::Object data = {
290 {"state", Json::FromObject({
291 {"state", Json::FromString(ConnectivityStateName(state))},
292 })},
293 {"target", Json::FromString(target_)},
294 };
295 // Fill in the channel trace if applicable
296 Json trace_json = trace_.RenderJson();
297 if (trace_json.type() != Json::Type::kNull) {
298 data["trace"] = std::move(trace_json);
299 }
300 // Ask CallCountingHelper to populate call count data.
301 call_counter_.PopulateCallCounts(&data);
302 // Construct top-level object.
303 Json::Object object{
304 {"ref", Json::FromObject({
305 {"subchannelId", Json::FromString(absl::StrCat(uuid()))},
306 })},
307 {"data", Json::FromObject(std::move(data))},
308 };
309 // Populate the child socket.
310 RefCountedPtr<SocketNode> child_socket;
311 {
312 MutexLock lock(&socket_mu_);
313 child_socket = child_socket_;
314 }
315 if (child_socket != nullptr && child_socket->uuid() != 0) {
316 object["socketRef"] = Json::FromArray({
317 Json::FromObject({
318 {"socketId", Json::FromString(absl::StrCat(child_socket->uuid()))},
319 {"name", Json::FromString(child_socket->name())},
320 }),
321 });
322 }
323 return Json::FromObject(object);
324 }
325
326 //
327 // ServerNode
328 //
329
ServerNode(size_t channel_tracer_max_nodes)330 ServerNode::ServerNode(size_t channel_tracer_max_nodes)
331 : BaseNode(EntityType::kServer, ""), trace_(channel_tracer_max_nodes) {}
332
~ServerNode()333 ServerNode::~ServerNode() {}
334
AddChildSocket(RefCountedPtr<SocketNode> node)335 void ServerNode::AddChildSocket(RefCountedPtr<SocketNode> node) {
336 MutexLock lock(&child_mu_);
337 child_sockets_.insert(std::make_pair(node->uuid(), std::move(node)));
338 }
339
RemoveChildSocket(intptr_t child_uuid)340 void ServerNode::RemoveChildSocket(intptr_t child_uuid) {
341 MutexLock lock(&child_mu_);
342 child_sockets_.erase(child_uuid);
343 }
344
AddChildListenSocket(RefCountedPtr<ListenSocketNode> node)345 void ServerNode::AddChildListenSocket(RefCountedPtr<ListenSocketNode> node) {
346 MutexLock lock(&child_mu_);
347 child_listen_sockets_.insert(std::make_pair(node->uuid(), std::move(node)));
348 }
349
RemoveChildListenSocket(intptr_t child_uuid)350 void ServerNode::RemoveChildListenSocket(intptr_t child_uuid) {
351 MutexLock lock(&child_mu_);
352 child_listen_sockets_.erase(child_uuid);
353 }
354
RenderServerSockets(intptr_t start_socket_id,intptr_t max_results)355 std::string ServerNode::RenderServerSockets(intptr_t start_socket_id,
356 intptr_t max_results) {
357 CHECK_GE(start_socket_id, 0);
358 CHECK_GE(max_results, 0);
359 // If user does not set max_results, we choose 500.
360 size_t pagination_limit = max_results == 0 ? 500 : max_results;
361 Json::Object object;
362 {
363 MutexLock lock(&child_mu_);
364 size_t sockets_rendered = 0;
365 // Create list of socket refs.
366 Json::Array array;
367 auto it = child_sockets_.lower_bound(start_socket_id);
368 for (; it != child_sockets_.end() && sockets_rendered < pagination_limit;
369 ++it, ++sockets_rendered) {
370 array.emplace_back(Json::FromObject({
371 {"socketId", Json::FromString(absl::StrCat(it->first))},
372 {"name", Json::FromString(it->second->name())},
373 }));
374 }
375 object["socketRef"] = Json::FromArray(std::move(array));
376 if (it == child_sockets_.end()) {
377 object["end"] = Json::FromBool(true);
378 }
379 }
380 return JsonDump(Json::FromObject(std::move(object)));
381 }
382
RenderJson()383 Json ServerNode::RenderJson() {
384 Json::Object data;
385 // Fill in the channel trace if applicable.
386 Json trace_json = trace_.RenderJson();
387 if (trace_json.type() != Json::Type::kNull) {
388 data["trace"] = std::move(trace_json);
389 }
390 // Ask CallCountingHelper to populate call count data.
391 call_counter_.PopulateCallCounts(&data);
392 // Construct top-level object.
393 Json::Object object = {
394 {"ref", Json::FromObject({
395 {"serverId", Json::FromString(absl::StrCat(uuid()))},
396 })},
397 {"data", Json::FromObject(std::move(data))},
398 };
399 // Render listen sockets.
400 {
401 MutexLock lock(&child_mu_);
402 if (!child_listen_sockets_.empty()) {
403 Json::Array array;
404 for (const auto& it : child_listen_sockets_) {
405 array.emplace_back(Json::FromObject({
406 {"socketId", Json::FromString(absl::StrCat(it.first))},
407 {"name", Json::FromString(it.second->name())},
408 }));
409 }
410 object["listenSocket"] = Json::FromArray(std::move(array));
411 }
412 }
413 return Json::FromObject(std::move(object));
414 }
415
416 //
417 // SocketNode::Security::Tls
418 //
419
RenderJson()420 Json SocketNode::Security::Tls::RenderJson() {
421 Json::Object data;
422 if (type == NameType::kStandardName) {
423 data["standard_name"] = Json::FromString(name);
424 } else if (type == NameType::kOtherName) {
425 data["other_name"] = Json::FromString(name);
426 }
427 if (!local_certificate.empty()) {
428 data["local_certificate"] =
429 Json::FromString(absl::Base64Escape(local_certificate));
430 }
431 if (!remote_certificate.empty()) {
432 data["remote_certificate"] =
433 Json::FromString(absl::Base64Escape(remote_certificate));
434 }
435 return Json::FromObject(std::move(data));
436 }
437
438 //
439 // SocketNode::Security
440 //
441
RenderJson()442 Json SocketNode::Security::RenderJson() {
443 Json::Object data;
444 switch (type) {
445 case ModelType::kUnset:
446 break;
447 case ModelType::kTls:
448 if (tls) {
449 data["tls"] = tls->RenderJson();
450 }
451 break;
452 case ModelType::kOther:
453 if (other.has_value()) {
454 data["other"] = *other;
455 }
456 break;
457 }
458 return Json::FromObject(std::move(data));
459 }
460
461 namespace {
462
SecurityArgCopy(void * p)463 void* SecurityArgCopy(void* p) {
464 SocketNode::Security* xds_certificate_provider =
465 static_cast<SocketNode::Security*>(p);
466 return xds_certificate_provider->Ref().release();
467 }
468
SecurityArgDestroy(void * p)469 void SecurityArgDestroy(void* p) {
470 SocketNode::Security* xds_certificate_provider =
471 static_cast<SocketNode::Security*>(p);
472 xds_certificate_provider->Unref();
473 }
474
SecurityArgCmp(void * p,void * q)475 int SecurityArgCmp(void* p, void* q) { return QsortCompare(p, q); }
476
477 const grpc_arg_pointer_vtable kChannelArgVtable = {
478 SecurityArgCopy, SecurityArgDestroy, SecurityArgCmp};
479
480 } // namespace
481
MakeChannelArg() const482 grpc_arg SocketNode::Security::MakeChannelArg() const {
483 return grpc_channel_arg_pointer_create(
484 const_cast<char*>(GRPC_ARG_CHANNELZ_SECURITY),
485 const_cast<SocketNode::Security*>(this), &kChannelArgVtable);
486 }
487
GetFromChannelArgs(const grpc_channel_args * args)488 RefCountedPtr<SocketNode::Security> SocketNode::Security::GetFromChannelArgs(
489 const grpc_channel_args* args) {
490 Security* security = grpc_channel_args_find_pointer<Security>(
491 args, GRPC_ARG_CHANNELZ_SECURITY);
492 return security != nullptr ? security->Ref() : nullptr;
493 }
494
495 //
496 // SocketNode
497 //
498
499 namespace {
500
PopulateSocketAddressJson(Json::Object * json,const char * name,const char * addr_str)501 void PopulateSocketAddressJson(Json::Object* json, const char* name,
502 const char* addr_str) {
503 if (addr_str == nullptr) return;
504 absl::StatusOr<URI> uri = URI::Parse(addr_str);
505 if (uri.ok()) {
506 if (uri->scheme() == "ipv4" || uri->scheme() == "ipv6") {
507 auto address = StringToSockaddr(absl::StripPrefix(uri->path(), "/"));
508 if (address.ok()) {
509 std::string packed_host = grpc_sockaddr_get_packed_host(&*address);
510 (*json)[name] = Json::FromObject({
511 {"tcpip_address",
512 Json::FromObject({
513 {"port", Json::FromString(
514 absl::StrCat(grpc_sockaddr_get_port(&*address)))},
515 {"ip_address",
516 Json::FromString(absl::Base64Escape(packed_host))},
517 })},
518 });
519 return;
520 }
521 } else if (uri->scheme() == "unix") {
522 (*json)[name] = Json::FromObject({
523 {"uds_address", Json::FromObject({
524 {"filename", Json::FromString(uri->path())},
525 })},
526 });
527 return;
528 }
529 }
530 // Unknown address type.
531 (*json)[name] = Json::FromObject({
532 {"other_address", Json::FromObject({
533 {"name", Json::FromString(addr_str)},
534 })},
535 });
536 }
537
538 } // namespace
539
SocketNode(std::string local,std::string remote,std::string name,RefCountedPtr<Security> security)540 SocketNode::SocketNode(std::string local, std::string remote, std::string name,
541 RefCountedPtr<Security> security)
542 : BaseNode(EntityType::kSocket, std::move(name)),
543 local_(std::move(local)),
544 remote_(std::move(remote)),
545 security_(std::move(security)) {}
546
RecordStreamStartedFromLocal()547 void SocketNode::RecordStreamStartedFromLocal() {
548 streams_started_.fetch_add(1, std::memory_order_relaxed);
549 last_local_stream_created_cycle_.store(gpr_get_cycle_counter(),
550 std::memory_order_relaxed);
551 }
552
RecordStreamStartedFromRemote()553 void SocketNode::RecordStreamStartedFromRemote() {
554 streams_started_.fetch_add(1, std::memory_order_relaxed);
555 last_remote_stream_created_cycle_.store(gpr_get_cycle_counter(),
556 std::memory_order_relaxed);
557 }
558
RecordMessagesSent(uint32_t num_sent)559 void SocketNode::RecordMessagesSent(uint32_t num_sent) {
560 messages_sent_.fetch_add(num_sent, std::memory_order_relaxed);
561 last_message_sent_cycle_.store(gpr_get_cycle_counter(),
562 std::memory_order_relaxed);
563 }
564
RecordMessageReceived()565 void SocketNode::RecordMessageReceived() {
566 messages_received_.fetch_add(1, std::memory_order_relaxed);
567 last_message_received_cycle_.store(gpr_get_cycle_counter(),
568 std::memory_order_relaxed);
569 }
570
RenderJson()571 Json SocketNode::RenderJson() {
572 // Create and fill the data child.
573 Json::Object data;
574 gpr_timespec ts;
575 int64_t streams_started = streams_started_.load(std::memory_order_relaxed);
576 if (streams_started != 0) {
577 data["streamsStarted"] = Json::FromString(absl::StrCat(streams_started));
578 gpr_cycle_counter last_local_stream_created_cycle =
579 last_local_stream_created_cycle_.load(std::memory_order_relaxed);
580 if (last_local_stream_created_cycle != 0) {
581 ts = gpr_convert_clock_type(
582 gpr_cycle_counter_to_time(last_local_stream_created_cycle),
583 GPR_CLOCK_REALTIME);
584 data["lastLocalStreamCreatedTimestamp"] =
585 Json::FromString(gpr_format_timespec(ts));
586 }
587 gpr_cycle_counter last_remote_stream_created_cycle =
588 last_remote_stream_created_cycle_.load(std::memory_order_relaxed);
589 if (last_remote_stream_created_cycle != 0) {
590 ts = gpr_convert_clock_type(
591 gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
592 GPR_CLOCK_REALTIME);
593 data["lastRemoteStreamCreatedTimestamp"] =
594 Json::FromString(gpr_format_timespec(ts));
595 }
596 }
597 int64_t streams_succeeded =
598 streams_succeeded_.load(std::memory_order_relaxed);
599 if (streams_succeeded != 0) {
600 data["streamsSucceeded"] =
601 Json::FromString(absl::StrCat(streams_succeeded));
602 }
603 int64_t streams_failed = streams_failed_.load(std::memory_order_relaxed);
604 if (streams_failed != 0) {
605 data["streamsFailed"] = Json::FromString(absl::StrCat(streams_failed));
606 }
607 int64_t messages_sent = messages_sent_.load(std::memory_order_relaxed);
608 if (messages_sent != 0) {
609 data["messagesSent"] = Json::FromString(absl::StrCat(messages_sent));
610 ts = gpr_convert_clock_type(
611 gpr_cycle_counter_to_time(
612 last_message_sent_cycle_.load(std::memory_order_relaxed)),
613 GPR_CLOCK_REALTIME);
614 data["lastMessageSentTimestamp"] =
615 Json::FromString(gpr_format_timespec(ts));
616 }
617 int64_t messages_received =
618 messages_received_.load(std::memory_order_relaxed);
619 if (messages_received != 0) {
620 data["messagesReceived"] =
621 Json::FromString(absl::StrCat(messages_received));
622 ts = gpr_convert_clock_type(
623 gpr_cycle_counter_to_time(
624 last_message_received_cycle_.load(std::memory_order_relaxed)),
625 GPR_CLOCK_REALTIME);
626 data["lastMessageReceivedTimestamp"] =
627 Json::FromString(gpr_format_timespec(ts));
628 }
629 int64_t keepalives_sent = keepalives_sent_.load(std::memory_order_relaxed);
630 if (keepalives_sent != 0) {
631 data["keepAlivesSent"] = Json::FromString(absl::StrCat(keepalives_sent));
632 }
633 // Create and fill the parent object.
634 Json::Object object = {
635 {"ref", Json::FromObject({
636 {"socketId", Json::FromString(absl::StrCat(uuid()))},
637 {"name", Json::FromString(name())},
638 })},
639 {"data", Json::FromObject(std::move(data))},
640 };
641 if (security_ != nullptr &&
642 security_->type != SocketNode::Security::ModelType::kUnset) {
643 object["security"] = security_->RenderJson();
644 }
645 PopulateSocketAddressJson(&object, "remote", remote_.c_str());
646 PopulateSocketAddressJson(&object, "local", local_.c_str());
647 return Json::FromObject(std::move(object));
648 }
649
650 //
651 // ListenSocketNode
652 //
653
ListenSocketNode(std::string local_addr,std::string name)654 ListenSocketNode::ListenSocketNode(std::string local_addr, std::string name)
655 : BaseNode(EntityType::kSocket, std::move(name)),
656 local_addr_(std::move(local_addr)) {}
657
RenderJson()658 Json ListenSocketNode::RenderJson() {
659 Json::Object object = {
660 {"ref", Json::FromObject({
661 {"socketId", Json::FromString(absl::StrCat(uuid()))},
662 {"name", Json::FromString(name())},
663 })},
664 };
665 PopulateSocketAddressJson(&object, "local", local_addr_.c_str());
666 return Json::FromObject(std::move(object));
667 }
668
669 } // namespace channelz
670 } // namespace grpc_core
671