1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/status.h>
20
21 #include <memory>
22
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "gtest/gtest.h"
26 #include "src/core/util/time.h"
27 #include "test/core/end2end/cq_verifier.h"
28 #include "test/core/end2end/end2end_tests.h"
29 #include "test/core/end2end/fixtures/h2_tls_common.h"
30
31 namespace grpc_core {
32 namespace {
33
34 // Client requests status along with the initial metadata. Server streams
35 // messages and ends with a non-OK status. Client reads after server is done
36 // writing, and expects to get the status after the messages.
ServerStreaming(CoreEnd2endTest & test,int num_messages)37 void ServerStreaming(CoreEnd2endTest& test, int num_messages) {
38 auto c = test.NewClientCall("/foo").Timeout(Duration::Minutes(1)).Create();
39 IncomingMetadata server_initial_metadata;
40 IncomingStatusOnClient server_status;
41 c.NewBatch(1)
42 .SendInitialMetadata({})
43 .RecvInitialMetadata(server_initial_metadata)
44 // Client requests status early but should not receive status till all the
45 // messages are received.
46 .RecvStatusOnClient(server_status);
47 // Client sends close early
48 c.NewBatch(3).SendCloseFromClient();
49 test.Expect(3, true);
50 test.Step();
51 auto s = test.RequestCall(100);
52 test.Expect(100, true);
53 test.Step();
54 s.NewBatch(101).SendInitialMetadata({});
55 test.Expect(101, true);
56 test.Step();
57 // Server writes bunch of messages
58 for (int i = 0; i < num_messages; i++) {
59 s.NewBatch(103).SendMessage("hello world");
60 test.Expect(103, true);
61 test.Step();
62 }
63 // Server sends status
64 IncomingCloseOnServer client_close;
65 s.NewBatch(104)
66 .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
67 .RecvCloseOnServer(client_close);
68 bool seen_status = false;
69 test.Expect(1, CoreEnd2endTest::Maybe{&seen_status});
70 test.Expect(104, true);
71 test.Step();
72
73 VLOG(2) << "SEEN_STATUS:" << seen_status;
74
75 // Client keeps reading messages till it gets the status
76 int num_messages_received = 0;
77 while (true) {
78 IncomingMessage server_message;
79 c.NewBatch(102).RecvMessage(server_message);
80 test.Expect(1, CqVerifier::Maybe{&seen_status});
81 test.Expect(102, true);
82 test.Step();
83 if (server_message.is_end_of_stream()) {
84 // The transport has received the trailing metadata.
85 break;
86 }
87 EXPECT_EQ(server_message.payload(), "hello world");
88 num_messages_received++;
89 }
90 CHECK_EQ(num_messages_received, num_messages);
91 if (!seen_status) {
92 test.Expect(1, true);
93 test.Step();
94 }
95 EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
96 EXPECT_EQ(server_status.message(), "xyz");
97 }
98
CORE_END2END_TEST(Http2Test,ServerStreaming)99 CORE_END2END_TEST(Http2Test, ServerStreaming) { ServerStreaming(*this, 1); }
100
CORE_END2END_TEST(Http2Test,ServerStreamingEmptyStream)101 CORE_END2END_TEST(Http2Test, ServerStreamingEmptyStream) {
102 ServerStreaming(*this, 0);
103 }
104
CORE_END2END_TEST(Http2Test,ServerStreaming10Messages)105 CORE_END2END_TEST(Http2Test, ServerStreaming10Messages) {
106 // TODO(yashykt): Remove this once b/376283636 is fixed.
107 ConfigVars::Overrides overrides;
108 overrides.default_ssl_roots_file_path = CA_CERT_PATH;
109 overrides.trace =
110 "call,channel,client_channel,client_channel_call,client_channel_lb_call,"
111 "http";
112 ConfigVars::SetOverrides(overrides);
113 ServerStreaming(*this, 10);
114 }
115
116 } // namespace
117 } // namespace grpc_core
118