• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_UV)
22 
23 #include "absl/strings/str_format.h"
24 
25 #include <ares.h>
26 #include <uv.h>
27 
28 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
29 
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 #include <grpc/support/time.h>
34 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 
38 namespace grpc_core {
39 
40 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events);
41 
ares_uv_poll_close_cb(uv_handle_t * handle)42 void ares_uv_poll_close_cb(uv_handle_t* handle) { delete handle; }
43 
44 class GrpcPolledFdLibuv : public GrpcPolledFd {
45  public:
GrpcPolledFdLibuv(ares_socket_t as,std::shared_ptr<WorkSerializer> work_serializer)46   GrpcPolledFdLibuv(ares_socket_t as,
47                     std::shared_ptr<WorkSerializer> work_serializer)
48       : name_(absl::StrFormat("c-ares socket: %" PRIdPTR, (intptr_t)as)),
49         as_(as),
50         work_serializer_(std::move(work_serializer)) {
51     handle_ = new uv_poll_t();
52     uv_poll_init_socket(uv_default_loop(), handle_, as);
53     handle_->data = this;
54   }
55 
RegisterForOnReadableLocked(grpc_closure * read_closure)56   void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
57     GPR_ASSERT(read_closure_ == nullptr);
58     GPR_ASSERT((poll_events_ & UV_READABLE) == 0);
59     read_closure_ = read_closure;
60     poll_events_ |= UV_READABLE;
61     uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
62   }
63 
RegisterForOnWriteableLocked(grpc_closure * write_closure)64   void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
65     GPR_ASSERT(write_closure_ == nullptr);
66     GPR_ASSERT((poll_events_ & UV_WRITABLE) == 0);
67     write_closure_ = write_closure;
68     poll_events_ |= UV_WRITABLE;
69     uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
70   }
71 
IsFdStillReadableLocked()72   bool IsFdStillReadableLocked() override {
73     /* uv_poll_t is based on poll, which is level triggered. So, if cares
74      * leaves some data unread, the event will trigger again. */
75     return false;
76   }
77 
ShutdownInternalLocked(grpc_error * error)78   void ShutdownInternalLocked(grpc_error* error) {
79     uv_poll_stop(handle_);
80     uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
81     if (read_closure_ != nullptr) {
82       grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_,
83                               GRPC_ERROR_CANCELLED);
84     }
85     if (write_closure_ != nullptr) {
86       grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_,
87                               GRPC_ERROR_CANCELLED);
88     }
89   }
90 
ShutdownLocked(grpc_error * error)91   void ShutdownLocked(grpc_error* error) override {
92     if (grpc_core::ExecCtx::Get() == nullptr) {
93       grpc_core::ExecCtx exec_ctx;
94       ShutdownInternalLocked(error);
95     } else {
96       ShutdownInternalLocked(error);
97     }
98   }
99 
GetWrappedAresSocketLocked()100   ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
101 
GetName()102   const char* GetName() override { return name_.c_str(); }
103 
104   // TODO(apolcyn): Data members should be private.
105   std::string name_;
106   ares_socket_t as_;
107   uv_poll_t* handle_;
108   grpc_closure* read_closure_ = nullptr;
109   grpc_closure* write_closure_ = nullptr;
110   int poll_events_ = 0;
111   std::shared_ptr<WorkSerializer> work_serializer_;
112 };
113 
114 struct AresUvPollCbArg {
AresUvPollCbArggrpc_core::AresUvPollCbArg115   AresUvPollCbArg(uv_poll_t* handle, int status, int events)
116       : handle(handle), status(status), events(events) {}
117 
118   uv_poll_t* handle;
119   int status;
120   int events;
121 };
122 
ares_uv_poll_cb_locked(AresUvPollCbArg * arg)123 static void ares_uv_poll_cb_locked(AresUvPollCbArg* arg) {
124   std::unique_ptr<AresUvPollCbArg> arg_struct(arg);
125   uv_poll_t* handle = arg_struct->handle;
126   int status = arg_struct->status;
127   int events = arg_struct->events;
128   GrpcPolledFdLibuv* polled_fd =
129       reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
130   grpc_error* error = GRPC_ERROR_NONE;
131   if (status < 0) {
132     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("cares polling error");
133     error =
134         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
135                            grpc_slice_from_static_string(uv_strerror(status)));
136   }
137   if (events & UV_READABLE) {
138     GPR_ASSERT(polled_fd->read_closure_ != nullptr);
139     grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->read_closure_, error);
140     polled_fd->read_closure_ = nullptr;
141     polled_fd->poll_events_ &= ~UV_READABLE;
142   }
143   if (events & UV_WRITABLE) {
144     GPR_ASSERT(polled_fd->write_closure_ != nullptr);
145     grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->write_closure_, error);
146     polled_fd->write_closure_ = nullptr;
147     polled_fd->poll_events_ &= ~UV_WRITABLE;
148   }
149   uv_poll_start(handle, polled_fd->poll_events_, ares_uv_poll_cb);
150 }
151 
ares_uv_poll_cb(uv_poll_t * handle,int status,int events)152 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) {
153   grpc_core::ExecCtx exec_ctx;
154   GrpcPolledFdLibuv* polled_fd =
155       reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
156   AresUvPollCbArg* arg = new AresUvPollCbArg(handle, status, events);
157   polled_fd->work_serializer_->Run([arg]() { ares_uv_poll_cb_locked(arg); },
158                                    DEBUG_LOCATION);
159 }
160 
161 class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
162  public:
NewGrpcPolledFdLocked(ares_socket_t as,grpc_pollset_set * driver_pollset_set,std::shared_ptr<WorkSerializer> work_serializer)163   GrpcPolledFd* NewGrpcPolledFdLocked(
164       ares_socket_t as, grpc_pollset_set* driver_pollset_set,
165       std::shared_ptr<WorkSerializer> work_serializer) override {
166     return new GrpcPolledFdLibuv(as, std::move(work_serializer));
167   }
168 
ConfigureAresChannelLocked(ares_channel channel)169   void ConfigureAresChannelLocked(ares_channel channel) override {}
170 };
171 
NewGrpcPolledFdFactory(std::shared_ptr<WorkSerializer> work_serializer)172 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
173     std::shared_ptr<WorkSerializer> work_serializer) {
174   return absl::make_unique<GrpcPolledFdFactoryLibuv>();
175 }
176 
177 }  // namespace grpc_core
178 
179 #endif /* GRPC_ARES == 1 && defined(GRPC_UV) */
180