• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H
18 #define GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H
19 
20 #include <grpc/grpc.h>
21 #include <grpc/slice.h>
22 #include <grpc/status.h>
23 #include <grpc/support/port_platform.h>
24 
25 #include <functional>
26 #include <memory>
27 #include <string>
28 
29 #include "absl/container/flat_hash_map.h"
30 #include "absl/status/status.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/lib/iomgr/iomgr_fwd.h"
35 #include "src/core/lib/surface/channel.h"
36 #include "src/core/util/orphanable.h"
37 #include "src/core/util/ref_counted_ptr.h"
38 #include "src/core/util/sync.h"
39 #include "src/core/xds/xds_client/xds_bootstrap.h"
40 #include "src/core/xds/xds_client/xds_transport.h"
41 
42 namespace grpc_core {
43 
44 class GrpcXdsTransportFactory final : public XdsTransportFactory {
45  public:
46   class GrpcXdsTransport;
47 
48   explicit GrpcXdsTransportFactory(const ChannelArgs& args);
49   ~GrpcXdsTransportFactory() override;
50 
Orphaned()51   void Orphaned() override {}
52 
53   RefCountedPtr<XdsTransport> GetTransport(
54       const XdsBootstrap::XdsServer& server, absl::Status* status) override;
55 
interested_parties()56   grpc_pollset_set* interested_parties() const { return interested_parties_; }
57 
58  private:
59   ChannelArgs args_;
60   grpc_pollset_set* interested_parties_;
61 
62   Mutex mu_;
63   absl::flat_hash_map<std::string /*XdsServer key*/, GrpcXdsTransport*>
64       transports_ ABSL_GUARDED_BY(&mu_);
65 };
66 
67 class GrpcXdsTransportFactory::GrpcXdsTransport final
68     : public XdsTransportFactory::XdsTransport {
69  public:
70   class GrpcStreamingCall;
71 
72   GrpcXdsTransport(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,
73                    const XdsBootstrap::XdsServer& server, absl::Status* status);
74   ~GrpcXdsTransport() override;
75 
76   void Orphaned() override;
77 
78   void StartConnectivityFailureWatch(
79       RefCountedPtr<ConnectivityFailureWatcher> watcher) override;
80   void StopConnectivityFailureWatch(
81       const RefCountedPtr<ConnectivityFailureWatcher>& watcher) override;
82 
83   OrphanablePtr<StreamingCall> CreateStreamingCall(
84       const char* method,
85       std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
86 
87   void ResetBackoff() override;
88 
89  private:
90   class StateWatcher;
91 
92   WeakRefCountedPtr<GrpcXdsTransportFactory> factory_;
93   std::string key_;
94   RefCountedPtr<Channel> channel_;
95 
96   Mutex mu_;
97   absl::flat_hash_map<RefCountedPtr<ConnectivityFailureWatcher>, StateWatcher*>
98       watchers_ ABSL_GUARDED_BY(&mu_);
99 };
100 
101 class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall final
102     : public XdsTransportFactory::XdsTransport::StreamingCall {
103  public:
104   GrpcStreamingCall(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,
105                     Channel* channel, const char* method,
106                     std::unique_ptr<StreamingCall::EventHandler> event_handler);
107   ~GrpcStreamingCall() override;
108 
109   void Orphan() override;
110 
111   void SendMessage(std::string payload) override;
112 
113   void StartRecvMessage() override;
114 
115  private:
116   static void OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/);
117   static void OnRequestSent(void* arg, grpc_error_handle error);
118   static void OnResponseReceived(void* arg, grpc_error_handle /*error*/);
119   static void OnStatusReceived(void* arg, grpc_error_handle /*error*/);
120 
121   WeakRefCountedPtr<GrpcXdsTransportFactory> factory_;
122 
123   std::unique_ptr<StreamingCall::EventHandler> event_handler_;
124 
125   // Always non-NULL.
126   grpc_call* call_;
127 
128   // recv_initial_metadata
129   grpc_metadata_array initial_metadata_recv_;
130   grpc_closure on_recv_initial_metadata_;
131 
132   // send_message
133   grpc_byte_buffer* send_message_payload_ = nullptr;
134   grpc_closure on_request_sent_;
135 
136   // recv_message
137   grpc_byte_buffer* recv_message_payload_ = nullptr;
138   grpc_closure on_response_received_;
139 
140   // recv_trailing_metadata
141   grpc_metadata_array trailing_metadata_recv_;
142   grpc_status_code status_code_;
143   grpc_slice status_details_;
144   grpc_closure on_status_received_;
145 };
146 
147 }  // namespace grpc_core
148 
149 #endif  // GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H
150