• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/status.h>
20 
21 #include <memory>
22 
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "gtest/gtest.h"
26 #include "src/core/util/time.h"
27 #include "test/core/end2end/cq_verifier.h"
28 #include "test/core/end2end/end2end_tests.h"
29 #include "test/core/end2end/fixtures/h2_tls_common.h"
30 
31 namespace grpc_core {
32 namespace {
33 
34 // Client requests status along with the initial metadata. Server streams
35 // messages and ends with a non-OK status. Client reads after server is done
36 // writing, and expects to get the status after the messages.
ServerStreaming(CoreEnd2endTest & test,int num_messages)37 void ServerStreaming(CoreEnd2endTest& test, int num_messages) {
38   auto c = test.NewClientCall("/foo").Timeout(Duration::Minutes(1)).Create();
39   IncomingMetadata server_initial_metadata;
40   IncomingStatusOnClient server_status;
41   c.NewBatch(1)
42       .SendInitialMetadata({})
43       .RecvInitialMetadata(server_initial_metadata)
44       // Client requests status early but should not receive status till all the
45       // messages are received.
46       .RecvStatusOnClient(server_status);
47   // Client sends close early
48   c.NewBatch(3).SendCloseFromClient();
49   test.Expect(3, true);
50   test.Step();
51   auto s = test.RequestCall(100);
52   test.Expect(100, true);
53   test.Step();
54   s.NewBatch(101).SendInitialMetadata({});
55   test.Expect(101, true);
56   test.Step();
57   // Server writes bunch of messages
58   for (int i = 0; i < num_messages; i++) {
59     s.NewBatch(103).SendMessage("hello world");
60     test.Expect(103, true);
61     test.Step();
62   }
63   // Server sends status
64   IncomingCloseOnServer client_close;
65   s.NewBatch(104)
66       .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
67       .RecvCloseOnServer(client_close);
68   bool seen_status = false;
69   test.Expect(1, CoreEnd2endTest::Maybe{&seen_status});
70   test.Expect(104, true);
71   test.Step();
72 
73   VLOG(2) << "SEEN_STATUS:" << seen_status;
74 
75   // Client keeps reading messages till it gets the status
76   int num_messages_received = 0;
77   while (true) {
78     IncomingMessage server_message;
79     c.NewBatch(102).RecvMessage(server_message);
80     test.Expect(1, CqVerifier::Maybe{&seen_status});
81     test.Expect(102, true);
82     test.Step();
83     if (server_message.is_end_of_stream()) {
84       // The transport has received the trailing metadata.
85       break;
86     }
87     EXPECT_EQ(server_message.payload(), "hello world");
88     num_messages_received++;
89   }
90   CHECK_EQ(num_messages_received, num_messages);
91   if (!seen_status) {
92     test.Expect(1, true);
93     test.Step();
94   }
95   EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
96   EXPECT_EQ(server_status.message(), "xyz");
97 }
98 
CORE_END2END_TEST(Http2Test,ServerStreaming)99 CORE_END2END_TEST(Http2Test, ServerStreaming) { ServerStreaming(*this, 1); }
100 
CORE_END2END_TEST(Http2Test,ServerStreamingEmptyStream)101 CORE_END2END_TEST(Http2Test, ServerStreamingEmptyStream) {
102   ServerStreaming(*this, 0);
103 }
104 
CORE_END2END_TEST(Http2Test,ServerStreaming10Messages)105 CORE_END2END_TEST(Http2Test, ServerStreaming10Messages) {
106   // TODO(yashykt): Remove this once b/376283636 is fixed.
107   ConfigVars::Overrides overrides;
108   overrides.default_ssl_roots_file_path = CA_CERT_PATH;
109   overrides.trace =
110       "call,channel,client_channel,client_channel_call,client_channel_lb_call,"
111       "http";
112   ConfigVars::SetOverrides(overrides);
113   ServerStreaming(*this, 10);
114 }
115 
116 }  // namespace
117 }  // namespace grpc_core
118