1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include "test/core/util/passthru_endpoint.h"
26
27 #include <inttypes.h>
28 #include <string.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
33
34 #include "src/core/lib/slice/slice_internal.h"
35
36 typedef struct passthru_endpoint passthru_endpoint;
37
38 typedef struct {
39 grpc_endpoint base;
40 passthru_endpoint* parent;
41 grpc_slice_buffer read_buffer;
42 grpc_slice_buffer* on_read_out;
43 grpc_closure* on_read;
44 grpc_resource_user* resource_user;
45 } half;
46
47 struct passthru_endpoint {
48 gpr_mu mu;
49 int halves;
50 grpc_passthru_endpoint_stats* stats;
51 bool shutdown;
52 half client;
53 half server;
54 };
55
me_read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb)56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57 grpc_closure* cb) {
58 half* m = reinterpret_cast<half*>(ep);
59 gpr_mu_lock(&m->parent->mu);
60 if (m->parent->shutdown) {
61 GRPC_CLOSURE_SCHED(
62 cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63 } else if (m->read_buffer.count > 0) {
64 grpc_slice_buffer_swap(&m->read_buffer, slices);
65 GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
66 } else {
67 m->on_read = cb;
68 m->on_read_out = slices;
69 }
70 gpr_mu_unlock(&m->parent->mu);
71 }
72
other_half(half * h)73 static half* other_half(half* h) {
74 if (h == &h->parent->client) return &h->parent->server;
75 return &h->parent->client;
76 }
77
me_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79 grpc_closure* cb, void* arg) {
80 half* m = other_half(reinterpret_cast<half*>(ep));
81 gpr_mu_lock(&m->parent->mu);
82 grpc_error* error = GRPC_ERROR_NONE;
83 gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84 if (m->parent->shutdown) {
85 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86 } else if (m->on_read != nullptr) {
87 for (size_t i = 0; i < slices->count; i++) {
88 grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
89 }
90 GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
91 m->on_read = nullptr;
92 } else {
93 for (size_t i = 0; i < slices->count; i++) {
94 grpc_slice_buffer_add(&m->read_buffer,
95 grpc_slice_copy(slices->slices[i]));
96 }
97 }
98 gpr_mu_unlock(&m->parent->mu);
99 GRPC_CLOSURE_SCHED(cb, error);
100 }
101
me_add_to_pollset(grpc_endpoint * ep,grpc_pollset * pollset)102 static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
103
me_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)104 static void me_add_to_pollset_set(grpc_endpoint* ep,
105 grpc_pollset_set* pollset) {}
106
me_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)107 static void me_delete_from_pollset_set(grpc_endpoint* ep,
108 grpc_pollset_set* pollset) {}
109
me_shutdown(grpc_endpoint * ep,grpc_error * why)110 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
111 half* m = reinterpret_cast<half*>(ep);
112 gpr_mu_lock(&m->parent->mu);
113 m->parent->shutdown = true;
114 if (m->on_read) {
115 GRPC_CLOSURE_SCHED(
116 m->on_read,
117 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
118 m->on_read = nullptr;
119 }
120 m = other_half(m);
121 if (m->on_read) {
122 GRPC_CLOSURE_SCHED(
123 m->on_read,
124 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
125 m->on_read = nullptr;
126 }
127 gpr_mu_unlock(&m->parent->mu);
128 grpc_resource_user_shutdown(m->resource_user);
129 GRPC_ERROR_UNREF(why);
130 }
131
me_destroy(grpc_endpoint * ep)132 static void me_destroy(grpc_endpoint* ep) {
133 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
134 gpr_mu_lock(&p->mu);
135 if (0 == --p->halves) {
136 gpr_mu_unlock(&p->mu);
137 gpr_mu_destroy(&p->mu);
138 grpc_passthru_endpoint_stats_destroy(p->stats);
139 grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
140 grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
141 grpc_resource_user_unref(p->client.resource_user);
142 grpc_resource_user_unref(p->server.resource_user);
143 gpr_free(p);
144 } else {
145 gpr_mu_unlock(&p->mu);
146 }
147 }
148
me_get_peer(grpc_endpoint * ep)149 static char* me_get_peer(grpc_endpoint* ep) {
150 passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
151 return (reinterpret_cast<half*>(ep)) == &p->client
152 ? gpr_strdup("fake:mock_client_endpoint")
153 : gpr_strdup("fake:mock_server_endpoint");
154 }
155
me_get_fd(grpc_endpoint * ep)156 static int me_get_fd(grpc_endpoint* ep) { return -1; }
157
me_get_resource_user(grpc_endpoint * ep)158 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
159 half* m = reinterpret_cast<half*>(ep);
160 return m->resource_user;
161 }
162
163 static const grpc_endpoint_vtable vtable = {
164 me_read,
165 me_write,
166 me_add_to_pollset,
167 me_add_to_pollset_set,
168 me_delete_from_pollset_set,
169 me_shutdown,
170 me_destroy,
171 me_get_resource_user,
172 me_get_peer,
173 me_get_fd,
174 };
175
half_init(half * m,passthru_endpoint * parent,grpc_resource_quota * resource_quota,const char * half_name)176 static void half_init(half* m, passthru_endpoint* parent,
177 grpc_resource_quota* resource_quota,
178 const char* half_name) {
179 m->base.vtable = &vtable;
180 m->parent = parent;
181 grpc_slice_buffer_init(&m->read_buffer);
182 m->on_read = nullptr;
183 char* name;
184 gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
185 (intptr_t)parent);
186 m->resource_user = grpc_resource_user_create(resource_quota, name);
187 gpr_free(name);
188 }
189
grpc_passthru_endpoint_create(grpc_endpoint ** client,grpc_endpoint ** server,grpc_resource_quota * resource_quota,grpc_passthru_endpoint_stats * stats)190 void grpc_passthru_endpoint_create(grpc_endpoint** client,
191 grpc_endpoint** server,
192 grpc_resource_quota* resource_quota,
193 grpc_passthru_endpoint_stats* stats) {
194 passthru_endpoint* m =
195 static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
196 m->halves = 2;
197 m->shutdown = 0;
198 if (stats == nullptr) {
199 m->stats = grpc_passthru_endpoint_stats_create();
200 } else {
201 gpr_ref(&stats->refs);
202 m->stats = stats;
203 }
204 half_init(&m->client, m, resource_quota, "client");
205 half_init(&m->server, m, resource_quota, "server");
206 gpr_mu_init(&m->mu);
207 *client = &m->client.base;
208 *server = &m->server.base;
209 }
210
grpc_passthru_endpoint_stats_create()211 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
212 grpc_passthru_endpoint_stats* stats =
213 static_cast<grpc_passthru_endpoint_stats*>(
214 gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
215 memset(stats, 0, sizeof(*stats));
216 gpr_ref_init(&stats->refs, 1);
217 return stats;
218 }
219
grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats * stats)220 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
221 if (gpr_unref(&stats->refs)) {
222 gpr_free(stats);
223 }
224 }
225