1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/lib/surface/channel.h"
18
19 #include <grpc/compression.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24
25 #include "absl/log/check.h"
26 #include "src/core/channelz/channel_trace.h"
27 #include "src/core/channelz/channelz.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/compression/compression_internal.h"
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/telemetry/stats.h"
33 #include "src/core/telemetry/stats_data.h"
34
35 namespace grpc_core {
36
37 //
38 // Channel::RegisteredCall
39 //
40
RegisteredCall(const char * method_arg,const char * host_arg)41 Channel::RegisteredCall::RegisteredCall(const char* method_arg,
42 const char* host_arg) {
43 path = Slice::FromCopiedString(method_arg);
44 if (host_arg != nullptr && host_arg[0] != 0) {
45 authority = Slice::FromCopiedString(host_arg);
46 }
47 }
48
RegisteredCall(const RegisteredCall & other)49 Channel::RegisteredCall::RegisteredCall(const RegisteredCall& other)
50 : path(other.path.Ref()) {
51 if (other.authority.has_value()) {
52 authority = other.authority->Ref();
53 }
54 }
55
~RegisteredCall()56 Channel::RegisteredCall::~RegisteredCall() {}
57
58 //
59 // Channel
60 //
61
Channel(std::string target,const ChannelArgs & channel_args)62 Channel::Channel(std::string target, const ChannelArgs& channel_args)
63 : target_(std::move(target)),
64 channelz_node_(channel_args.GetObjectRef<channelz::ChannelNode>()),
65 compression_options_(CompressionOptionsFromChannelArgs(channel_args)),
66 call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
67 channel_args.GetObject<ResourceQuota>()
68 ->memory_quota()
69 ->CreateMemoryOwner(),
70 1024)) {}
71
RegisterCall(const char * method,const char * host)72 Channel::RegisteredCall* Channel::RegisterCall(const char* method,
73 const char* host) {
74 MutexLock lock(&mu_);
75 auto key = std::make_pair(std::string(host != nullptr ? host : ""),
76 std::string(method != nullptr ? method : ""));
77 auto rc_posn = registration_table_.find(key);
78 if (rc_posn != registration_table_.end()) {
79 return &rc_posn->second;
80 }
81 auto insertion_result = registration_table_.insert(
82 {std::move(key), RegisteredCall(method, host)});
83 return &insertion_result.first->second;
84 }
85
86 } // namespace grpc_core
87
88 //
89 // C-core API
90 //
91
grpc_channel_destroy(grpc_channel * channel)92 void grpc_channel_destroy(grpc_channel* channel) {
93 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
94 grpc_core::ExecCtx exec_ctx;
95 GRPC_TRACE_LOG(api, INFO)
96 << "grpc_channel_destroy(channel=" << channel << ")";
97 grpc_channel_destroy_internal(channel);
98 }
99
grpc_channel_create_call(grpc_channel * channel,grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * completion_queue,grpc_slice method,const grpc_slice * host,gpr_timespec deadline,void * reserved)100 grpc_call* grpc_channel_create_call(grpc_channel* channel,
101 grpc_call* parent_call,
102 uint32_t propagation_mask,
103 grpc_completion_queue* completion_queue,
104 grpc_slice method, const grpc_slice* host,
105 gpr_timespec deadline, void* reserved) {
106 CHECK(!reserved);
107 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
108 grpc_core::ExecCtx exec_ctx;
109 return grpc_core::Channel::FromC(channel)->CreateCall(
110 parent_call, propagation_mask, completion_queue, nullptr,
111 grpc_core::Slice(grpc_core::CSliceRef(method)),
112 host != nullptr
113 ? absl::optional<grpc_core::Slice>(grpc_core::CSliceRef(*host))
114 : absl::nullopt,
115 grpc_core::Timestamp::FromTimespecRoundUp(deadline),
116 /*registered_method=*/false);
117 }
118
grpc_channel_register_call(grpc_channel * channel,const char * method,const char * host,void * reserved)119 void* grpc_channel_register_call(grpc_channel* channel, const char* method,
120 const char* host, void* reserved) {
121 GRPC_TRACE_LOG(api, INFO) << "grpc_channel_register_call(channel=" << channel
122 << ", method=" << method << ", host=" << host
123 << ", reserved=" << reserved << ")";
124 CHECK(!reserved);
125 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
126 grpc_core::ExecCtx exec_ctx;
127 return grpc_core::Channel::FromC(channel)->RegisterCall(method, host);
128 }
129
grpc_channel_create_registered_call(grpc_channel * channel,grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * completion_queue,void * registered_call_handle,gpr_timespec deadline,void * reserved)130 grpc_call* grpc_channel_create_registered_call(
131 grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
132 grpc_completion_queue* completion_queue, void* registered_call_handle,
133 gpr_timespec deadline, void* reserved) {
134 auto* rc =
135 static_cast<grpc_core::Channel::RegisteredCall*>(registered_call_handle);
136 GRPC_TRACE_LOG(api, INFO)
137 << "grpc_channel_create_registered_call(channel=" << channel
138 << ", parent_call=" << parent_call
139 << ", propagation_mask=" << (unsigned)propagation_mask
140 << ", completion_queue=" << completion_queue
141 << ", registered_call_handle=" << registered_call_handle
142 << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
143 << ", tv_nsec: " << deadline.tv_nsec
144 << ", clock_type: " << (int)deadline.clock_type
145 << " }, reserved=" << reserved << ")";
146 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
147 grpc_core::ExecCtx exec_ctx;
148 return grpc_core::Channel::FromC(channel)->CreateCall(
149 parent_call, propagation_mask, completion_queue, nullptr, rc->path.Ref(),
150 rc->authority.has_value()
151 ? absl::optional<grpc_core::Slice>(rc->authority->Ref())
152 : absl::nullopt,
153 grpc_core::Timestamp::FromTimespecRoundUp(deadline),
154 /*registered_method=*/true);
155 }
156
grpc_channel_get_target(grpc_channel * channel)157 char* grpc_channel_get_target(grpc_channel* channel) {
158 GRPC_TRACE_LOG(api, INFO)
159 << "grpc_channel_get_target(channel=" << channel << ")";
160 auto target = grpc_core::Channel::FromC(channel)->target();
161 char* buffer = static_cast<char*>(gpr_zalloc(target.size() + 1));
162 memcpy(buffer, target.data(), target.size());
163 return buffer;
164 }
165
grpc_channel_get_info(grpc_channel * channel,const grpc_channel_info * channel_info)166 void grpc_channel_get_info(grpc_channel* channel,
167 const grpc_channel_info* channel_info) {
168 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
169 grpc_core::ExecCtx exec_ctx;
170 grpc_core::Channel::FromC(channel)->GetInfo(channel_info);
171 }
172
grpc_channel_reset_connect_backoff(grpc_channel * channel)173 void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
174 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
175 grpc_core::ExecCtx exec_ctx;
176 GRPC_TRACE_LOG(api, INFO)
177 << "grpc_channel_reset_connect_backoff(channel=" << channel << ")";
178 grpc_core::Channel::FromC(channel)->ResetConnectionBackoff();
179 }
180
grpc_channel_support_connectivity_watcher(grpc_channel * channel)181 int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
182 return grpc_core::Channel::FromC(channel)->SupportsConnectivityWatcher();
183 }
184
grpc_channel_check_connectivity_state(grpc_channel * channel,int try_to_connect)185 grpc_connectivity_state grpc_channel_check_connectivity_state(
186 grpc_channel* channel, int try_to_connect) {
187 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
188 grpc_core::ExecCtx exec_ctx;
189 GRPC_TRACE_LOG(api, INFO)
190 << "grpc_channel_check_connectivity_state(channel=" << channel
191 << ", try_to_connect=" << try_to_connect << ")";
192 return grpc_core::Channel::FromC(channel)->CheckConnectivityState(
193 try_to_connect);
194 }
195
grpc_channel_watch_connectivity_state(grpc_channel * channel,grpc_connectivity_state last_observed_state,gpr_timespec deadline,grpc_completion_queue * cq,void * tag)196 void grpc_channel_watch_connectivity_state(
197 grpc_channel* channel, grpc_connectivity_state last_observed_state,
198 gpr_timespec deadline, grpc_completion_queue* cq, void* tag) {
199 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
200 grpc_core::ExecCtx exec_ctx;
201 GRPC_TRACE_LOG(api, INFO)
202 << "grpc_channel_watch_connectivity_state(channel=" << channel
203 << ", last_observed_state=" << (int)last_observed_state
204 << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
205 << ", tv_nsec: " << deadline.tv_nsec
206 << ", clock_type: " << (int)deadline.clock_type << " }, cq=" << cq
207 << ", tag=" << tag << ")";
208 return grpc_core::Channel::FromC(channel)->WatchConnectivityState(
209 last_observed_state, grpc_core::Timestamp::FromTimespecRoundUp(deadline),
210 cq, tag);
211 }
212
grpc_channel_ping(grpc_channel * channel,grpc_completion_queue * cq,void * tag,void * reserved)213 void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
214 void* tag, void* reserved) {
215 grpc_core::ExecCtx exec_ctx;
216 GRPC_TRACE_LOG(api, INFO)
217 << "grpc_channel_ping(channel=" << channel << ", cq=" << cq
218 << ", tag=" << tag << ", reserved=" << reserved << ")";
219 CHECK_EQ(reserved, nullptr);
220 grpc_core::Channel::FromC(channel)->Ping(cq, tag);
221 }
222