• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/core/test_util/mock_endpoint.h"
20 
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/sync.h>
24 
25 #include <memory>
26 
27 #include "absl/log/check.h"
28 #include "absl/status/status.h"
29 #include "absl/strings/string_view.h"
30 #include "src/core/lib/event_engine/tcp_socket_utils.h"
31 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
32 #include "src/core/util/down_cast.h"
33 
34 namespace grpc_event_engine {
35 namespace experimental {
36 
MockEndpoint()37 MockEndpoint::MockEndpoint()
38     : peer_addr_(URIToResolvedAddress("ipv4:127.0.0.1:12345").value()),
39       local_addr_(URIToResolvedAddress("ipv4:127.0.0.1:6789").value()) {}
40 
~MockEndpointController()41 MockEndpointController::~MockEndpointController() {
42   grpc_core::MutexLock lock(&mu_);
43   if (on_read_) {
44     engine_->Run([cb = std::move(on_read_)]() mutable {
45       cb(absl::InternalError("Endpoint Shutdown"));
46     });
47     on_read_ = nullptr;
48   }
49 }
50 
Create(std::shared_ptr<EventEngine> engine)51 std::shared_ptr<MockEndpointController> MockEndpointController::Create(
52     std::shared_ptr<EventEngine> engine) {
53   return std::shared_ptr<MockEndpointController>(
54       new MockEndpointController(std::move(engine)));
55 }
56 
MockEndpointController(std::shared_ptr<EventEngine> engine)57 MockEndpointController::MockEndpointController(
58     std::shared_ptr<EventEngine> engine)
59     : engine_(std::move(engine)),
60       mock_grpc_endpoint_(grpc_event_engine_endpoint_create(
61           std::make_unique<grpc_event_engine::experimental::MockEndpoint>())) {}
62 
TriggerReadEvent(Slice read_data)63 void MockEndpointController::TriggerReadEvent(Slice read_data) {
64   grpc_core::MutexLock lock(&mu_);
65   CHECK(!reads_done_)
66       << "Cannot trigger a read event after NoMoreReads has been called.";
67   if (on_read_) {
68     on_read_slice_buffer_->Append(std::move(read_data));
69     engine_->Run(
70         [cb = std::move(on_read_)]() mutable { cb(absl::OkStatus()); });
71     on_read_ = nullptr;
72     on_read_slice_buffer_ = nullptr;
73   } else {
74     read_buffer_.Append(std::move(read_data));
75   }
76 }
77 
NoMoreReads()78 void MockEndpointController::NoMoreReads() {
79   grpc_core::MutexLock lock(&mu_);
80   CHECK(!std::exchange(reads_done_, true))
81       << "NoMoreReads() can only be called once";
82 }
83 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer)84 void MockEndpointController::Read(
85     absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) {
86   grpc_core::MutexLock lock(&mu_);
87   if (read_buffer_.Count() > 0) {
88     CHECK(buffer->Count() == 0);
89     CHECK(!on_read_);
90     read_buffer_.Swap(*buffer);
91     engine_->Run([cb = std::move(on_read)]() mutable { cb(absl::OkStatus()); });
92   } else if (reads_done_) {
93     engine_->Run([cb = std::move(on_read)]() mutable {
94       cb(absl::UnavailableError("reads done"));
95     });
96   } else {
97     on_read_ = std::move(on_read);
98     on_read_slice_buffer_ = buffer;
99   }
100 }
101 
TakeCEndpoint()102 grpc_endpoint* MockEndpointController::TakeCEndpoint() {
103   CHECK_NE(mock_grpc_endpoint_, nullptr)
104       << "The endpoint has already been taken";
105   grpc_core::DownCast<MockEndpoint*>(
106       grpc_get_wrapped_event_engine_endpoint(mock_grpc_endpoint_))
107       ->SetController(shared_from_this());
108   auto ret = mock_grpc_endpoint_;
109   mock_grpc_endpoint_ = nullptr;
110   return ret;
111 }
112 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)113 bool MockEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
114                         SliceBuffer* buffer, const ReadArgs* /* args */) {
115   endpoint_control_->Read(std::move(on_read), buffer);
116   return false;
117 }
118 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)119 bool MockEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
120                          SliceBuffer* data, const WriteArgs* /* args */) {
121   // No-op implementation. Nothing was using it.
122   data->Clear();
123   endpoint_control_->engine()->Run(
124       [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
125   return false;
126 }
127 
GetPeerAddress() const128 const EventEngine::ResolvedAddress& MockEndpoint::GetPeerAddress() const {
129   return peer_addr_;
130 }
131 
GetLocalAddress() const132 const EventEngine::ResolvedAddress& MockEndpoint::GetLocalAddress() const {
133   return local_addr_;
134 }
135 
136 }  // namespace experimental
137 }  // namespace grpc_event_engine
138