• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include <sstream>
22
23#include <grpc/grpc.h>
24#include <grpc/support/time.h>
25#include <grpcpp/channel.h>
26#include <grpcpp/client_context.h>
27#include <grpcpp/create_channel.h>
28#include <grpcpp/generic/async_generic_service.h>
29#include <grpcpp/generic/generic_stub.h>
30#include <grpcpp/server.h>
31#include <grpcpp/server_builder.h>
32#include <grpcpp/server_context.h>
33#include <grpcpp/support/slice.h>
34
35#include "src/core/lib/gprpp/thd.h"
36#include "test/core/util/port.h"
37#include "test/core/util/test_config.h"
38
39using std::chrono::system_clock;
40using namespace grpc;
41
42void* tag(int i) { return (void*)(intptr_t)i; }
43
44static grpc_slice merge_slices(grpc_slice* slices, size_t nslices) {
45  size_t i;
46  size_t len = 0;
47  uint8_t* cursor;
48  grpc_slice out;
49
50  for (i = 0; i < nslices; i++) {
51    len += GRPC_SLICE_LENGTH(slices[i]);
52  }
53
54  out = grpc_slice_malloc(len);
55  cursor = GRPC_SLICE_START_PTR(out);
56
57  for (i = 0; i < nslices; i++) {
58    memcpy(cursor, GRPC_SLICE_START_PTR(slices[i]), GRPC_SLICE_LENGTH(slices[i]));
59    cursor += GRPC_SLICE_LENGTH(slices[i]);
60  }
61
62  return out;
63}
64
65int byte_buffer_eq_string(ByteBuffer* bb, const char* str) {
66  int res;
67
68  std::vector<Slice> slices;
69  bb->Dump(&slices);
70  grpc_slice* c_slices = new grpc_slice[slices.size()];
71  for (int i = 0; i < slices.size(); i++) {
72    c_slices[i] = slices[i].c_slice();
73  }
74  grpc_slice a = merge_slices(c_slices, slices.size());
75  grpc_slice b = grpc_slice_from_copied_string(str);
76  res = (GRPC_SLICE_LENGTH(a) == GRPC_SLICE_LENGTH(b)) &&
77        (0 == memcmp(GRPC_SLICE_START_PTR(a), GRPC_SLICE_START_PTR(b), GRPC_SLICE_LENGTH(a)));
78  grpc_slice_unref(a);
79  grpc_slice_unref(b);
80  for (int i = 0; i < slices.size(); i++) {
81    grpc_slice_unref(c_slices[i]);
82  }
83  delete[] c_slices;
84
85  return res;
86}
87
88@interface GenericTest : XCTestCase
89
90@end
91
92@implementation GenericTest {
93  grpc::string server_host_;
94  CompletionQueue cli_cq_;
95  std::unique_ptr<ServerCompletionQueue> srv_cq_;
96  std::unique_ptr<GenericStub> generic_stub_;
97  std::unique_ptr<Server> server_;
98  AsyncGenericService generic_service_;
99  std::ostringstream server_address_;
100}
101
102- (void)verify_ok:(grpc::CompletionQueue*)cq i:(int)i expect_ok:(bool)expect_ok {
103  bool ok;
104  void* got_tag;
105  XCTAssertTrue(cq->Next(&got_tag, &ok));
106  XCTAssertEqual(expect_ok, ok);
107  XCTAssertEqual(tag(i), got_tag);
108}
109
110- (void)server_ok:(int)i {
111  [self verify_ok:srv_cq_.get() i:i expect_ok:true];
112}
113- (void)client_ok:(int)i {
114  [self verify_ok:&cli_cq_ i:i expect_ok:true];
115}
116- (void)server_fail:(int)i {
117  [self verify_ok:srv_cq_.get() i:i expect_ok:false];
118}
119- (void)client_fail:(int)i {
120  [self verify_ok:&cli_cq_ i:i expect_ok:false];
121}
122
123- (void)setUp {
124  [super setUp];
125
126  server_host_ = "localhost";
127  int port = grpc_pick_unused_port_or_die();
128  server_address_ << server_host_ << ":" << port;
129  // Setup server
130  ServerBuilder builder;
131  builder.AddListeningPort(server_address_.str(), InsecureServerCredentials());
132  builder.RegisterAsyncGenericService(&generic_service_);
133  // Include a second call to RegisterAsyncGenericService to make sure that
134  // we get an error in the log, since it is not allowed to have 2 async
135  // generic services
136  builder.RegisterAsyncGenericService(&generic_service_);
137  srv_cq_ = builder.AddCompletionQueue();
138  server_ = builder.BuildAndStart();
139}
140
141- (void)tearDown {
142  // Put teardown code here. This method is called after the invocation of each test method in the
143  // class.
144  server_->Shutdown();
145  void* ignored_tag;
146  bool ignored_ok;
147  cli_cq_.Shutdown();
148  srv_cq_->Shutdown();
149  while (cli_cq_.Next(&ignored_tag, &ignored_ok))
150    ;
151  while (srv_cq_->Next(&ignored_tag, &ignored_ok))
152    ;
153  [super tearDown];
154}
155
156- (void)ResetStub {
157  std::shared_ptr<Channel> channel =
158      CreateChannel(server_address_.str(), InsecureChannelCredentials());
159  generic_stub_.reset(new GenericStub(channel));
160}
161
162- (void)SendRpc:(int)num_rpcs {
163  [self SendRpc:num_rpcs check_deadline:false deadline:gpr_inf_future(GPR_CLOCK_MONOTONIC)];
164}
165
166- (void)SendRpc:(int)num_rpcs check_deadline:(bool)check_deadline deadline:(gpr_timespec)deadline {
167  const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
168  for (int i = 0; i < num_rpcs; i++) {
169    Status recv_status;
170
171    ClientContext cli_ctx;
172    GenericServerContext srv_ctx;
173    GenericServerAsyncReaderWriter stream(&srv_ctx);
174
175    // The string needs to be long enough to test heap-based slice.
176    /*send_request.set_message("Hello world. Hello world. Hello world.");*/
177
178    if (check_deadline) {
179      cli_ctx.set_deadline(deadline);
180    }
181
182    std::unique_ptr<GenericClientAsyncReaderWriter> call =
183        generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
184    [self client_ok:1];
185    Slice send_slice = Slice("hello world", 11);
186    std::unique_ptr<ByteBuffer> send_buffer =
187        std::unique_ptr<ByteBuffer>(new ByteBuffer(&send_slice, 1));
188    call->Write(*send_buffer, tag(2));
189    // Send ByteBuffer can be destroyed after calling Write.
190    send_buffer.reset();
191    [self client_ok:2];
192    call->WritesDone(tag(3));
193    [self client_ok:3];
194
195    generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4));
196
197    [self verify_ok:srv_cq_.get() i:4 expect_ok:true];
198    XCTAssertEqual(server_host_, srv_ctx.host().substr(0, server_host_.length()));
199    XCTAssertEqual(kMethodName, srv_ctx.method());
200
201    if (check_deadline) {
202      XCTAssertTrue(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
203                                     gpr_time_from_millis(1000, GPR_TIMESPAN)));
204    }
205
206    ByteBuffer recv_buffer;
207    stream.Read(&recv_buffer, tag(5));
208    [self server_ok:5];
209    XCTAssertTrue(byte_buffer_eq_string(&recv_buffer, "hello world"));
210
211    send_buffer = std::unique_ptr<ByteBuffer>(new ByteBuffer(recv_buffer));
212    stream.Write(*send_buffer, tag(6));
213    send_buffer.reset();
214    [self server_ok:6];
215
216    stream.Finish(Status::OK, tag(7));
217    [self server_ok:7];
218
219    recv_buffer.Clear();
220    call->Read(&recv_buffer, tag(8));
221    [self client_ok:8];
222    XCTAssertTrue(byte_buffer_eq_string(&recv_buffer, "hello world"));
223
224    call->Finish(&recv_status, tag(9));
225    [self client_ok:9];
226
227    XCTAssertTrue(recv_status.ok());
228  }
229}
230
231- (void)testSimpleRpc {
232  [self ResetStub];
233  [self SendRpc:1];
234}
235
236- (void)testSequentialRpcs {
237  [self ResetStub];
238  [self SendRpc:10];
239}
240
241+ (void)setUp {
242  grpc_test_init(0, NULL);
243}
244
245@end
246