1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <string>
31
32 #include "absl/strings/str_format.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/string_util.h>
36 #include "src/core/lib/iomgr/sockaddr.h"
37
38 #include "src/core/lib/slice/slice_internal.h"
39
40 typedef struct passthru_endpoint passthru_endpoint;
41
42 typedef struct {
43 grpc_endpoint base;
44 passthru_endpoint* parent;
45 grpc_slice_buffer read_buffer;
46 grpc_slice_buffer* on_read_out;
47 grpc_closure* on_read;
48 grpc_resource_user* resource_user;
49 } half;
50
51 struct passthru_endpoint {
52 gpr_mu mu;
53 int halves;
54 grpc_passthru_endpoint_stats* stats;
55 bool shutdown;
56 half client;
57 half server;
58 };
59
me_read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool)60 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
61 grpc_closure* cb, bool /*urgent*/) {
62 half* m = reinterpret_cast<half*>(ep);
63 gpr_mu_lock(&m->parent->mu);
64 if (m->parent->shutdown) {
65 grpc_core::ExecCtx::Run(
66 DEBUG_LOCATION, cb,
67 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
68 } else if (m->read_buffer.count > 0) {
69 grpc_slice_buffer_swap(&m->read_buffer, slices);
70 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
71 } else {
72 m->on_read = cb;
73 m->on_read_out = slices;
74 }
75 gpr_mu_unlock(&m->parent->mu);
76 }
77
other_half(half * h)78 static half* other_half(half* h) {
79 if (h == &h->parent->client) return &h->parent->server;
80 return &h->parent->client;
81 }
82
me_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *)83 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
84 grpc_closure* cb, void* /*arg*/) {
85 half* m = other_half(reinterpret_cast<half*>(ep));
86 gpr_mu_lock(&m->parent->mu);
87 grpc_error* error = GRPC_ERROR_NONE;
88 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
89 if (m->parent->shutdown) {
90 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
91 } else if (m->on_read != nullptr) {
92 for (size_t i = 0; i < slices->count; i++) {
93 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
94 }
95 grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
96 m->on_read = nullptr;
97 } else {
98 for (size_t i = 0; i < slices->count; i++) {
99 grpc_slice_buffer_add(&m->read_buffer,
100 grpc_slice_copy(slices->slices[i]));
101 }
102 }
103 gpr_mu_unlock(&m->parent->mu);
104 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
105 }
106
me_add_to_pollset(grpc_endpoint *,grpc_pollset *)107 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
108 grpc_pollset* /*pollset*/) {}
109
me_add_to_pollset_set(grpc_endpoint *,grpc_pollset_set *)110 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
111 grpc_pollset_set* /*pollset*/) {}
112
me_delete_from_pollset_set(grpc_endpoint *,grpc_pollset_set *)113 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
114 grpc_pollset_set* /*pollset*/) {}
115
me_shutdown(grpc_endpoint * ep,grpc_error * why)116 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
117 half* m = reinterpret_cast<half*>(ep);
118 gpr_mu_lock(&m->parent->mu);
119 m->parent->shutdown = true;
120 if (m->on_read) {
121 grpc_core::ExecCtx::Run(
122 DEBUG_LOCATION, m->on_read,
123 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
124 m->on_read = nullptr;
125 }
126 m = other_half(m);
127 if (m->on_read) {
128 grpc_core::ExecCtx::Run(
129 DEBUG_LOCATION, m->on_read,
130 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
131 m->on_read = nullptr;
132 }
133 gpr_mu_unlock(&m->parent->mu);
134 grpc_resource_user_shutdown(m->resource_user);
135 GRPC_ERROR_UNREF(why);
136 }
137
me_destroy(grpc_endpoint * ep)138 static void me_destroy(grpc_endpoint* ep) {
139 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
140 gpr_mu_lock(&p->mu);
141 if (0 == --p->halves) {
142 gpr_mu_unlock(&p->mu);
143 gpr_mu_destroy(&p->mu);
144 grpc_passthru_endpoint_stats_destroy(p->stats);
145 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
146 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
147 grpc_resource_user_unref(p->client.resource_user);
148 grpc_resource_user_unref(p->server.resource_user);
149 gpr_free(p);
150 } else {
151 gpr_mu_unlock(&p->mu);
152 }
153 }
154
me_get_peer(grpc_endpoint * ep)155 static char* me_get_peer(grpc_endpoint* ep) {
156 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
157 return (reinterpret_cast<half*>(ep)) == &p->client
158 ? gpr_strdup("fake:mock_client_endpoint")
159 : gpr_strdup("fake:mock_server_endpoint");
160 }
161
me_get_fd(grpc_endpoint *)162 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
163
me_can_track_err(grpc_endpoint *)164 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
165
me_get_resource_user(grpc_endpoint * ep)166 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
167 half* m = reinterpret_cast<half*>(ep);
168 return m->resource_user;
169 }
170
171 static const grpc_endpoint_vtable vtable = {
172 me_read,
173 me_write,
174 me_add_to_pollset,
175 me_add_to_pollset_set,
176 me_delete_from_pollset_set,
177 me_shutdown,
178 me_destroy,
179 me_get_resource_user,
180 me_get_peer,
181 me_get_fd,
182 me_can_track_err,
183 };
184
half_init(half * m,passthru_endpoint * parent,grpc_resource_quota * resource_quota,const char * half_name)185 static void half_init(half* m, passthru_endpoint* parent,
186 grpc_resource_quota* resource_quota,
187 const char* half_name) {
188 m->base.vtable = &vtable;
189 m->parent = parent;
190 grpc_slice_buffer_init(&m->read_buffer);
191 m->on_read = nullptr;
192 std::string name = absl::StrFormat("passthru_endpoint_%s_%" PRIxPTR,
193 half_name, (intptr_t)parent);
194 m->resource_user = grpc_resource_user_create(resource_quota, name.c_str());
195 }
196
grpc_passthru_endpoint_create(grpc_endpoint ** client,grpc_endpoint ** server,grpc_resource_quota * resource_quota,grpc_passthru_endpoint_stats * stats)197 void grpc_passthru_endpoint_create(grpc_endpoint** client,
198 grpc_endpoint** server,
199 grpc_resource_quota* resource_quota,
200 grpc_passthru_endpoint_stats* stats) {
201 passthru_endpoint* m =
202 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
203 m->halves = 2;
204 m->shutdown = 0;
205 if (stats == nullptr) {
206 m->stats = grpc_passthru_endpoint_stats_create();
207 } else {
208 gpr_ref(&stats->refs);
209 m->stats = stats;
210 }
211 half_init(&m->client, m, resource_quota, "client");
212 half_init(&m->server, m, resource_quota, "server");
213 gpr_mu_init(&m->mu);
214 *client = &m->client.base;
215 *server = &m->server.base;
216 }
217
grpc_passthru_endpoint_stats_create()218 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
219 grpc_passthru_endpoint_stats* stats =
220 static_cast<grpc_passthru_endpoint_stats*>(
221 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
222 memset(stats, 0, sizeof(*stats));
223 gpr_ref_init(&stats->refs, 1);
224 return stats;
225 }
226
grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats * stats)227 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
228 if (gpr_unref(&stats->refs)) {
229 gpr_free(stats);
230 }
231 }
232