• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
19 
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/event_engine/memory_allocator.h>
22 #include <grpc/slice.h>
23 #include <grpc/status.h>
24 #include <grpc/support/port_platform.h>
25 
26 #include <atomic>
27 #include <memory>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "src/core/client_channel/subchannel.h"
34 #include "src/core/lib/iomgr/call_combiner.h"
35 #include "src/core/lib/iomgr/closure.h"
36 #include "src/core/lib/iomgr/error.h"
37 #include "src/core/lib/iomgr/iomgr_fwd.h"
38 #include "src/core/lib/iomgr/polling_entity.h"
39 #include "src/core/lib/resource_quota/arena.h"
40 #include "src/core/lib/resource_quota/memory_quota.h"
41 #include "src/core/lib/slice/slice.h"
42 #include "src/core/lib/slice/slice_buffer.h"
43 #include "src/core/lib/transport/metadata_batch.h"
44 #include "src/core/lib/transport/transport.h"
45 #include "src/core/util/backoff.h"
46 #include "src/core/util/orphanable.h"
47 #include "src/core/util/ref_counted_ptr.h"
48 #include "src/core/util/sync.h"
49 
50 namespace grpc_core {
51 
52 // Represents a streaming call on a subchannel that should be maintained
53 // open at all times.
54 // If the call fails with UNIMPLEMENTED, no further attempts are made.
55 // If the call fails with any other status (including OK), we retry the
56 // call with appropriate backoff.
57 // The backoff state is reset when we receive a message on a stream.
58 //
59 // Currently, this assumes server-side streaming, but it could be extended
60 // to support full bidi streaming if there is a need in the future.
61 class SubchannelStreamClient final
62     : public InternallyRefCounted<SubchannelStreamClient> {
63  public:
64   // Interface implemented by caller.  Thread safety is provided for the
65   // implementation; only one method will be called by any thread at any
66   // one time (including destruction).
67   //
68   // The address of the SubchannelStreamClient object is passed to most
69   // methods for logging purposes.
70   class CallEventHandler {
71    public:
72     virtual ~CallEventHandler() = default;
73 
74     // Returns the path for the streaming call.
75     virtual Slice GetPathLocked()
76         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
77     // Called when a new call attempt is being started.
78     virtual void OnCallStartLocked(SubchannelStreamClient* client)
79         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
80     // Called when a previous call attempt has failed and the retry
81     // timer is started before the next attempt.
82     virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client)
83         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
84     // Returns the message payload to send from the client.
85     virtual grpc_slice EncodeSendMessageLocked()
86         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
87     // Called whenever a message is received from the server.
88     virtual absl::Status RecvMessageReadyLocked(
89         SubchannelStreamClient* client, absl::string_view serialized_message)
90         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
91     // Called when a stream fails.
92     virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
93                                                  grpc_status_code status)
94         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
95   };
96 
97   // If tracer is non-null, it enables trace logging, with the specified
98   // string being the first part of the log message.
99   // Does not take ownership of interested_parties; the caller is responsible
100   // for ensuring that it will outlive the SubchannelStreamClient.
101   SubchannelStreamClient(
102       RefCountedPtr<ConnectedSubchannel> connected_subchannel,
103       grpc_pollset_set* interested_parties,
104       std::unique_ptr<CallEventHandler> event_handler, const char* tracer);
105 
106   ~SubchannelStreamClient() override;
107 
108   void Orphan() override;
109 
110  private:
111   // Contains a call to the backend and all the data related to the call.
112   class CallState final : public Orphanable {
113    public:
114     CallState(RefCountedPtr<SubchannelStreamClient> client,
115               grpc_pollset_set* interested_parties);
116     ~CallState() override;
117 
118     void Orphan() override;
119 
120     void StartCallLocked()
121         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_);
122 
123    private:
124     void Cancel();
125 
126     void StartBatch(grpc_transport_stream_op_batch* batch);
127     static void StartBatchInCallCombiner(void* arg, grpc_error_handle error);
128 
129     void CallEndedLocked(bool retry)
130         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_->mu_);
131 
132     void RecvMessageReady();
133 
134     static void OnComplete(void* arg, grpc_error_handle error);
135     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
136     static void RecvMessageReady(void* arg, grpc_error_handle error);
137     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
138     static void StartCancel(void* arg, grpc_error_handle error);
139     static void OnCancelComplete(void* arg, grpc_error_handle error);
140 
141     static void AfterCallStackDestruction(void* arg, grpc_error_handle error);
142 
143     RefCountedPtr<SubchannelStreamClient> subchannel_stream_client_;
144     grpc_polling_entity pollent_;
145 
146     RefCountedPtr<Arena> arena_;
147     CallCombiner call_combiner_;
148 
149     // The streaming call to the backend. Always non-null.
150     // Refs are tracked manually; when the last ref is released, the
151     // CallState object will be automatically destroyed.
152     SubchannelCall* call_;
153 
154     grpc_transport_stream_op_batch_payload payload_;
155     grpc_transport_stream_op_batch batch_;
156     grpc_transport_stream_op_batch recv_message_batch_;
157     grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
158 
159     grpc_closure on_complete_;
160 
161     // send_initial_metadata
162     grpc_metadata_batch send_initial_metadata_;
163 
164     // send_message
165     SliceBuffer send_message_;
166 
167     // send_trailing_metadata
168     grpc_metadata_batch send_trailing_metadata_;
169 
170     // recv_initial_metadata
171     grpc_metadata_batch recv_initial_metadata_;
172     grpc_closure recv_initial_metadata_ready_;
173 
174     // recv_message
175     absl::optional<SliceBuffer> recv_message_;
176     grpc_closure recv_message_ready_;
177     std::atomic<bool> seen_response_{false};
178 
179     // True if the cancel_stream batch has been started.
180     std::atomic<bool> cancelled_{false};
181 
182     // recv_trailing_metadata
183     grpc_metadata_batch recv_trailing_metadata_;
184     grpc_transport_stream_stats collect_stats_;
185     grpc_closure recv_trailing_metadata_ready_;
186 
187     // Closure for call stack destruction.
188     grpc_closure after_call_stack_destruction_;
189   };
190 
191   void StartCall();
192   void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
193 
194   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
195   void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
196 
197   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
198   grpc_pollset_set* interested_parties_;  // Do not own.
199   const char* tracer_;
200   RefCountedPtr<CallArenaAllocator> call_allocator_;
201 
202   Mutex mu_;
203   std::unique_ptr<CallEventHandler> event_handler_ ABSL_GUARDED_BY(mu_);
204 
205   // The data associated with the current health check call.  It holds a ref
206   // to this SubchannelStreamClient object.
207   OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_);
208 
209   // Call retry state.
210   BackOff retry_backoff_ ABSL_GUARDED_BY(mu_);
211   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
212       retry_timer_handle_ ABSL_GUARDED_BY(mu_);
213   // A raw pointer will suffice since connected_subchannel_ holds a copy of the
214   // ChannelArgs which holds an std::shared_ptr of the EventEngine.
215   grpc_event_engine::experimental::EventEngine* event_engine_
216       ABSL_GUARDED_BY(mu_);
217 };
218 
219 }  // namespace grpc_core
220 
221 #endif  // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
222