• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include <sstream>
22
23#include <grpc/grpc.h>
24#include <grpc/support/time.h>
25#include <grpcpp/channel.h>
26#include <grpcpp/client_context.h>
27#include <grpcpp/create_channel.h>
28#include <grpcpp/generic/async_generic_service.h>
29#include <grpcpp/generic/generic_stub.h>
30#include <grpcpp/server.h>
31#include <grpcpp/server_builder.h>
32#include <grpcpp/server_context.h>
33#include <grpcpp/support/slice.h>
34
35#include "src/core/util/thd.h"
36#include "test/core/test_util/port.h"
37#include "test/core/test_util/test_config.h"
38
39using std::chrono::system_clock;
40using namespace grpc;
41
42void* tag(int i) { return (void*)(intptr_t)i; }
43
44static grpc_slice merge_slices(grpc_slice* slices, size_t nslices) {
45  size_t i;
46  size_t len = 0;
47  uint8_t* cursor;
48  grpc_slice out;
49
50  for (i = 0; i < nslices; i++) {
51    len += GRPC_SLICE_LENGTH(slices[i]);
52  }
53
54  out = grpc_slice_malloc(len);
55  cursor = GRPC_SLICE_START_PTR(out);
56
57  for (i = 0; i < nslices; i++) {
58    memcpy(cursor, GRPC_SLICE_START_PTR(slices[i]), GRPC_SLICE_LENGTH(slices[i]));
59    cursor += GRPC_SLICE_LENGTH(slices[i]);
60  }
61
62  return out;
63}
64
65int byte_buffer_eq_string(ByteBuffer* bb, const char* str) {
66  int res;
67
68  std::vector<Slice> slices;
69  bb->Dump(&slices);
70  grpc_slice* c_slices = new grpc_slice[slices.size()];
71  for (int i = 0; i < slices.size(); i++) {
72    c_slices[i] = slices[i].c_slice();
73  }
74  grpc_slice a = merge_slices(c_slices, slices.size());
75  grpc_slice b = grpc_slice_from_copied_string(str);
76  res = (GRPC_SLICE_LENGTH(a) == GRPC_SLICE_LENGTH(b)) &&
77        (0 == memcmp(GRPC_SLICE_START_PTR(a), GRPC_SLICE_START_PTR(b), GRPC_SLICE_LENGTH(a)));
78  grpc_slice_unref(a);
79  grpc_slice_unref(b);
80  for (int i = 0; i < slices.size(); i++) {
81    grpc_slice_unref(c_slices[i]);
82  }
83  delete[] c_slices;
84
85  return res;
86}
87
88@interface GenericTest : XCTestCase
89
90@end
91
92@implementation GenericTest {
93  std::string server_host_;
94  CompletionQueue cli_cq_;
95  std::unique_ptr<ServerCompletionQueue> srv_cq_;
96  std::unique_ptr<GenericStub> generic_stub_;
97  std::unique_ptr<Server> server_;
98  AsyncGenericService generic_service_;
99  std::ostringstream server_address_;
100}
101
102- (void)verify_ok:(grpc::CompletionQueue*)cq i:(int)i expect_ok:(bool)expect_ok {
103  bool ok;
104  void* got_tag;
105  XCTAssertTrue(cq->Next(&got_tag, &ok));
106  XCTAssertEqual(expect_ok, ok);
107  XCTAssertEqual(tag(i), got_tag);
108}
109
110- (void)server_ok:(int)i {
111  [self verify_ok:srv_cq_.get() i:i expect_ok:true];
112}
113- (void)client_ok:(int)i {
114  [self verify_ok:&cli_cq_ i:i expect_ok:true];
115}
116- (void)server_fail:(int)i {
117  [self verify_ok:srv_cq_.get() i:i expect_ok:false];
118}
119- (void)client_fail:(int)i {
120  [self verify_ok:&cli_cq_ i:i expect_ok:false];
121}
122
123- (void)setUp {
124  [super setUp];
125
126  server_host_ = "localhost";
127  int port = grpc_pick_unused_port_or_die();
128  server_address_ << server_host_ << ":" << port;
129  // Setup server
130  ServerBuilder builder;
131  builder.AddListeningPort(server_address_.str(), InsecureServerCredentials());
132  builder.RegisterAsyncGenericService(&generic_service_);
133  // Include a second call to RegisterAsyncGenericService to make sure that
134  // we get an error in the log, since it is not allowed to have 2 async
135  // generic services
136  builder.RegisterAsyncGenericService(&generic_service_);
137  srv_cq_ = builder.AddCompletionQueue();
138  server_ = builder.BuildAndStart();
139}
140
141- (void)tearDown {
142  // Put teardown code here. This method is called after the invocation of each test method in the
143  // class.
144  server_->Shutdown();
145  void* ignored_tag;
146  bool ignored_ok;
147  cli_cq_.Shutdown();
148  srv_cq_->Shutdown();
149  while (cli_cq_.Next(&ignored_tag, &ignored_ok));
150  while (srv_cq_->Next(&ignored_tag, &ignored_ok));
151  [super tearDown];
152}
153
154- (void)ResetStub {
155  std::shared_ptr<Channel> channel =
156      CreateChannel(server_address_.str(), InsecureChannelCredentials());
157  generic_stub_.reset(new GenericStub(channel));
158}
159
160- (void)SendRpc:(int)num_rpcs {
161  [self SendRpc:num_rpcs check_deadline:false deadline:gpr_inf_future(GPR_CLOCK_MONOTONIC)];
162}
163
164- (void)SendRpc:(int)num_rpcs check_deadline:(bool)check_deadline deadline:(gpr_timespec)deadline {
165  const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
166  for (int i = 0; i < num_rpcs; i++) {
167    Status recv_status;
168
169    ClientContext cli_ctx;
170    GenericServerContext srv_ctx;
171    GenericServerAsyncReaderWriter stream(&srv_ctx);
172
173    // The string needs to be long enough to test heap-based slice.
174    /*send_request.set_message("Hello world. Hello world. Hello world.");*/
175
176    if (check_deadline) {
177      cli_ctx.set_deadline(deadline);
178    }
179
180    std::unique_ptr<GenericClientAsyncReaderWriter> call =
181        generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
182    [self client_ok:1];
183    Slice send_slice = Slice("hello world", 11);
184    std::unique_ptr<ByteBuffer> send_buffer =
185        std::unique_ptr<ByteBuffer>(new ByteBuffer(&send_slice, 1));
186    call->Write(*send_buffer, tag(2));
187    // Send ByteBuffer can be destroyed after calling Write.
188    send_buffer.reset();
189    [self client_ok:2];
190    call->WritesDone(tag(3));
191    [self client_ok:3];
192
193    generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4));
194
195    [self verify_ok:srv_cq_.get() i:4 expect_ok:true];
196    XCTAssertEqual(server_host_, srv_ctx.host().substr(0, server_host_.length()));
197    XCTAssertEqual(kMethodName, srv_ctx.method());
198
199    if (check_deadline) {
200      XCTAssertTrue(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
201                                     gpr_time_from_millis(1000, GPR_TIMESPAN)));
202    }
203
204    ByteBuffer recv_buffer;
205    stream.Read(&recv_buffer, tag(5));
206    [self server_ok:5];
207    XCTAssertTrue(byte_buffer_eq_string(&recv_buffer, "hello world"));
208
209    send_buffer = std::unique_ptr<ByteBuffer>(new ByteBuffer(recv_buffer));
210    stream.Write(*send_buffer, tag(6));
211    send_buffer.reset();
212    [self server_ok:6];
213
214    stream.Finish(Status::OK, tag(7));
215    [self server_ok:7];
216
217    recv_buffer.Clear();
218    call->Read(&recv_buffer, tag(8));
219    [self client_ok:8];
220    XCTAssertTrue(byte_buffer_eq_string(&recv_buffer, "hello world"));
221
222    call->Finish(&recv_status, tag(9));
223    [self client_ok:9];
224
225    XCTAssertTrue(recv_status.ok());
226  }
227}
228
229- (void)testSimpleRpc {
230  [self ResetStub];
231  [self SendRpc:1];
232}
233
234- (void)testSequentialRpcs {
235  [self ResetStub];
236  [self SendRpc:10];
237}
238
239+ (void)setUp {
240  grpc_test_init(NULL, NULL);
241}
242
243@end
244