• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 
25 #include "test/core/util/passthru_endpoint.h"
26 
27 #include <inttypes.h>
28 #include <string.h>
29 
30 #include <string>
31 
32 #include "absl/strings/str_format.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/string_util.h>
36 #include "src/core/lib/iomgr/sockaddr.h"
37 
38 #include "src/core/lib/slice/slice_internal.h"
39 
40 typedef struct passthru_endpoint passthru_endpoint;
41 
42 typedef struct {
43   grpc_endpoint base;
44   passthru_endpoint* parent;
45   grpc_slice_buffer read_buffer;
46   grpc_slice_buffer* on_read_out;
47   grpc_closure* on_read;
48   grpc_resource_user* resource_user;
49 } half;
50 
51 struct passthru_endpoint {
52   gpr_mu mu;
53   int halves;
54   grpc_passthru_endpoint_stats* stats;
55   bool shutdown;
56   half client;
57   half server;
58 };
59 
me_read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool)60 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
61                     grpc_closure* cb, bool /*urgent*/) {
62   half* m = reinterpret_cast<half*>(ep);
63   gpr_mu_lock(&m->parent->mu);
64   if (m->parent->shutdown) {
65     grpc_core::ExecCtx::Run(
66         DEBUG_LOCATION, cb,
67         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
68   } else if (m->read_buffer.count > 0) {
69     grpc_slice_buffer_swap(&m->read_buffer, slices);
70     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
71   } else {
72     m->on_read = cb;
73     m->on_read_out = slices;
74   }
75   gpr_mu_unlock(&m->parent->mu);
76 }
77 
other_half(half * h)78 static half* other_half(half* h) {
79   if (h == &h->parent->client) return &h->parent->server;
80   return &h->parent->client;
81 }
82 
me_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *)83 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
84                      grpc_closure* cb, void* /*arg*/) {
85   half* m = other_half(reinterpret_cast<half*>(ep));
86   gpr_mu_lock(&m->parent->mu);
87   grpc_error* error = GRPC_ERROR_NONE;
88   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
89   if (m->parent->shutdown) {
90     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
91   } else if (m->on_read != nullptr) {
92     for (size_t i = 0; i < slices->count; i++) {
93       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
94     }
95     grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
96     m->on_read = nullptr;
97   } else {
98     for (size_t i = 0; i < slices->count; i++) {
99       grpc_slice_buffer_add(&m->read_buffer,
100                             grpc_slice_copy(slices->slices[i]));
101     }
102   }
103   gpr_mu_unlock(&m->parent->mu);
104   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
105 }
106 
me_add_to_pollset(grpc_endpoint *,grpc_pollset *)107 static void me_add_to_pollset(grpc_endpoint* /*ep*/,
108                               grpc_pollset* /*pollset*/) {}
109 
me_add_to_pollset_set(grpc_endpoint *,grpc_pollset_set *)110 static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
111                                   grpc_pollset_set* /*pollset*/) {}
112 
me_delete_from_pollset_set(grpc_endpoint *,grpc_pollset_set *)113 static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
114                                        grpc_pollset_set* /*pollset*/) {}
115 
me_shutdown(grpc_endpoint * ep,grpc_error * why)116 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
117   half* m = reinterpret_cast<half*>(ep);
118   gpr_mu_lock(&m->parent->mu);
119   m->parent->shutdown = true;
120   if (m->on_read) {
121     grpc_core::ExecCtx::Run(
122         DEBUG_LOCATION, m->on_read,
123         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
124     m->on_read = nullptr;
125   }
126   m = other_half(m);
127   if (m->on_read) {
128     grpc_core::ExecCtx::Run(
129         DEBUG_LOCATION, m->on_read,
130         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
131     m->on_read = nullptr;
132   }
133   gpr_mu_unlock(&m->parent->mu);
134   grpc_resource_user_shutdown(m->resource_user);
135   GRPC_ERROR_UNREF(why);
136 }
137 
me_destroy(grpc_endpoint * ep)138 static void me_destroy(grpc_endpoint* ep) {
139   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
140   gpr_mu_lock(&p->mu);
141   if (0 == --p->halves) {
142     gpr_mu_unlock(&p->mu);
143     gpr_mu_destroy(&p->mu);
144     grpc_passthru_endpoint_stats_destroy(p->stats);
145     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
146     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
147     grpc_resource_user_unref(p->client.resource_user);
148     grpc_resource_user_unref(p->server.resource_user);
149     gpr_free(p);
150   } else {
151     gpr_mu_unlock(&p->mu);
152   }
153 }
154 
me_get_peer(grpc_endpoint * ep)155 static char* me_get_peer(grpc_endpoint* ep) {
156   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
157   return (reinterpret_cast<half*>(ep)) == &p->client
158              ? gpr_strdup("fake:mock_client_endpoint")
159              : gpr_strdup("fake:mock_server_endpoint");
160 }
161 
me_get_fd(grpc_endpoint *)162 static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
163 
me_can_track_err(grpc_endpoint *)164 static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
165 
me_get_resource_user(grpc_endpoint * ep)166 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
167   half* m = reinterpret_cast<half*>(ep);
168   return m->resource_user;
169 }
170 
171 static const grpc_endpoint_vtable vtable = {
172     me_read,
173     me_write,
174     me_add_to_pollset,
175     me_add_to_pollset_set,
176     me_delete_from_pollset_set,
177     me_shutdown,
178     me_destroy,
179     me_get_resource_user,
180     me_get_peer,
181     me_get_fd,
182     me_can_track_err,
183 };
184 
half_init(half * m,passthru_endpoint * parent,grpc_resource_quota * resource_quota,const char * half_name)185 static void half_init(half* m, passthru_endpoint* parent,
186                       grpc_resource_quota* resource_quota,
187                       const char* half_name) {
188   m->base.vtable = &vtable;
189   m->parent = parent;
190   grpc_slice_buffer_init(&m->read_buffer);
191   m->on_read = nullptr;
192   std::string name = absl::StrFormat("passthru_endpoint_%s_%" PRIxPTR,
193                                      half_name, (intptr_t)parent);
194   m->resource_user = grpc_resource_user_create(resource_quota, name.c_str());
195 }
196 
grpc_passthru_endpoint_create(grpc_endpoint ** client,grpc_endpoint ** server,grpc_resource_quota * resource_quota,grpc_passthru_endpoint_stats * stats)197 void grpc_passthru_endpoint_create(grpc_endpoint** client,
198                                    grpc_endpoint** server,
199                                    grpc_resource_quota* resource_quota,
200                                    grpc_passthru_endpoint_stats* stats) {
201   passthru_endpoint* m =
202       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
203   m->halves = 2;
204   m->shutdown = 0;
205   if (stats == nullptr) {
206     m->stats = grpc_passthru_endpoint_stats_create();
207   } else {
208     gpr_ref(&stats->refs);
209     m->stats = stats;
210   }
211   half_init(&m->client, m, resource_quota, "client");
212   half_init(&m->server, m, resource_quota, "server");
213   gpr_mu_init(&m->mu);
214   *client = &m->client.base;
215   *server = &m->server.base;
216 }
217 
grpc_passthru_endpoint_stats_create()218 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
219   grpc_passthru_endpoint_stats* stats =
220       static_cast<grpc_passthru_endpoint_stats*>(
221           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
222   memset(stats, 0, sizeof(*stats));
223   gpr_ref_init(&stats->refs, 1);
224   return stats;
225 }
226 
grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats * stats)227 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
228   if (gpr_unref(&stats->refs)) {
229     gpr_free(stats);
230   }
231 }
232