• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <type_traits>
17 
18 #include "pw_assert/light.h"
19 #include "pw_bytes/span.h"
20 #include "pw_containers/vector.h"
21 #include "pw_rpc/channel.h"
22 #include "pw_rpc/internal/hash.h"
23 #include "pw_rpc/internal/method_lookup.h"
24 #include "pw_rpc/internal/packet.h"
25 #include "pw_rpc/internal/raw_method.h"
26 #include "pw_rpc/internal/server.h"
27 
28 namespace pw::rpc {
29 
30 // Declares a context object that may be used to invoke an RPC. The context is
31 // declared with the name of the implemented service and the method to invoke.
32 // The RPC can then be invoked with the call method.
33 //
34 // For a unary RPC, context.call(request) returns the status, and the response
35 // struct can be accessed via context.response().
36 //
37 //   PW_RAW_TEST_METHOD_CONTEXT(my::CoolService, TheMethod) context;
38 //   EXPECT_EQ(OkStatus(), context.call(encoded_request).status());
39 //   EXPECT_EQ(0,
40 //             std::memcmp(encoded_response,
41 //                         context.response().data(),
42 //                         sizeof(encoded_response)));
43 //
44 // For a server streaming RPC, context.call(request) invokes the method. As in a
45 // normal RPC, the method completes when the ServerWriter's Finish method is
46 // called (or it goes out of scope).
47 //
48 //   PW_RAW_TEST_METHOD_CONTEXT(my::CoolService, TheStreamingMethod) context;
49 //   context.call(encoded_response);
50 //
51 //   EXPECT_TRUE(context.done());  // Check that the RPC completed
52 //   EXPECT_EQ(OkStatus(), context.status());  // Check the status
53 //
54 //   EXPECT_EQ(3u, context.responses().size());
55 //   ByteSpan& response = context.responses()[0];  // check individual responses
56 //
57 //   for (ByteSpan& response : context.responses()) {
58 //     // iterate over the responses
59 //   }
60 //
61 // PW_RAW_TEST_METHOD_CONTEXT forwards its constructor arguments to the
62 // underlying service. For example:
63 //
64 //   PW_RAW_TEST_METHOD_CONTEXT(MyService, Go) context(service, args);
65 //
66 // PW_RAW_TEST_METHOD_CONTEXT takes two optional arguments:
67 //
68 //   size_t kMaxResponse: maximum responses to store; ignored unless streaming
69 //   size_t kOutputSizeBytes: buffer size; must be large enough for a packet
70 //
71 // Example:
72 //
73 //   PW_RAW_TEST_METHOD_CONTEXT(MyService, BestMethod, 3, 256) context;
74 //   ASSERT_EQ(3u, context.responses().max_size());
75 //
76 #define PW_RAW_TEST_METHOD_CONTEXT(service, method, ...)              \
77   ::pw::rpc::RawTestMethodContext<service,                            \
78                                   &service::method,                   \
79                                   ::pw::rpc::internal::Hash(#method), \
80                                   ##__VA_ARGS__>
81 template <typename Service,
82           auto method,
83           uint32_t kMethodId,
84           size_t kMaxResponse = 4,
85           size_t kOutputSizeBytes = 128>
86 class RawTestMethodContext;
87 
88 // Internal classes that implement RawTestMethodContext.
89 namespace internal::test::raw {
90 
91 // A ChannelOutput implementation that stores the outgoing payloads and status.
92 template <size_t kOutputSize>
93 class MessageOutput final : public ChannelOutput {
94  public:
95   using ResponseBuffer = std::array<std::byte, kOutputSize>;
96 
MessageOutput(Vector<ByteSpan> & responses,Vector<ResponseBuffer> & buffers,ByteSpan packet_buffer)97   MessageOutput(Vector<ByteSpan>& responses,
98                 Vector<ResponseBuffer>& buffers,
99                 ByteSpan packet_buffer)
100       : ChannelOutput("internal::test::raw::MessageOutput"),
101         responses_(responses),
102         buffers_(buffers),
103         packet_buffer_(packet_buffer) {
104     clear();
105   }
106 
last_status()107   Status last_status() const { return last_status_; }
set_last_status(Status status)108   void set_last_status(Status status) { last_status_ = status; }
109 
total_responses()110   size_t total_responses() const { return total_responses_; }
111 
stream_ended()112   bool stream_ended() const { return stream_ended_; }
113 
clear()114   void clear() {
115     responses_.clear();
116     buffers_.clear();
117     total_responses_ = 0;
118     stream_ended_ = false;
119     last_status_ = Status::Unknown();
120   }
121 
122  private:
AcquireBuffer()123   ByteSpan AcquireBuffer() override { return packet_buffer_; }
124 
125   Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
126 
127   Vector<ByteSpan>& responses_;
128   Vector<ResponseBuffer>& buffers_;
129   ByteSpan packet_buffer_;
130   size_t total_responses_;
131   bool stream_ended_;
132   Status last_status_;
133 };
134 
135 // Collects everything needed to invoke a particular RPC.
136 template <typename Service,
137           uint32_t kMethodId,
138           size_t kMaxResponse,
139           size_t kOutputSize>
140 struct InvocationContext {
141   template <typename... Args>
InvocationContextInvocationContext142   InvocationContext(Args&&... args)
143       : output(responses, buffers, packet_buffer),
144         channel(Channel::Create<123>(&output)),
145         server(std::span(&channel, 1)),
146         service(std::forward<Args>(args)...),
147         call(static_cast<internal::Server&>(server),
148              static_cast<internal::Channel&>(channel),
149              service,
150              MethodLookup::GetRawMethod<Service, kMethodId>()) {}
151 
152   using ResponseBuffer = std::array<std::byte, kOutputSize>;
153 
154   MessageOutput<kOutputSize> output;
155   rpc::Channel channel;
156   rpc::Server server;
157   Service service;
158   Vector<ByteSpan, kMaxResponse> responses;
159   Vector<ResponseBuffer, kMaxResponse> buffers;
160   std::array<std::byte, kOutputSize> packet_buffer = {};
161   internal::ServerCall call;
162 };
163 
164 // Method invocation context for a unary RPC. Returns the status in call() and
165 // provides the response through the response() method.
166 template <typename Service, auto method, uint32_t kMethodId, size_t kOutputSize>
167 class UnaryContext {
168  private:
169   using Context = InvocationContext<Service, kMethodId, 1, kOutputSize>;
170   Context ctx_;
171 
172  public:
173   template <typename... Args>
UnaryContext(Args &&...args)174   UnaryContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
175 
service()176   Service& service() { return ctx_.service; }
177 
178   // Invokes the RPC with the provided request. Returns RPC's StatusWithSize.
call(ConstByteSpan request)179   StatusWithSize call(ConstByteSpan request) {
180     ctx_.output.clear();
181     ctx_.buffers.emplace_back();
182     ctx_.buffers.back() = {};
183     ctx_.responses.emplace_back();
184     auto& response = ctx_.responses.back();
185     response = {ctx_.buffers.back().data(), ctx_.buffers.back().size()};
186     auto sws = CallMethodImplFunction<method>(ctx_.call, request, response);
187     response = response.first(sws.size());
188     return sws;
189   }
190 
191   // Gives access to the RPC's response.
response()192   ConstByteSpan response() const {
193     PW_ASSERT(ctx_.responses.size() > 0u);
194     return ctx_.responses.back();
195   }
196 };
197 
198 // Method invocation context for a server streaming RPC.
199 template <typename Service,
200           auto method,
201           uint32_t kMethodId,
202           size_t kMaxResponse,
203           size_t kOutputSize>
204 class ServerStreamingContext {
205  private:
206   using Context =
207       InvocationContext<Service, kMethodId, kMaxResponse, kOutputSize>;
208   Context ctx_;
209 
210  public:
211   template <typename... Args>
ServerStreamingContext(Args &&...args)212   ServerStreamingContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
213 
service()214   Service& service() { return ctx_.service; }
215 
216   // Invokes the RPC with the provided request.
call(ConstByteSpan request)217   void call(ConstByteSpan request) {
218     ctx_.output.clear();
219     BaseServerWriter server_writer(ctx_.call);
220     return CallMethodImplFunction<method>(
221         ctx_.call, request, static_cast<RawServerWriter&>(server_writer));
222   }
223 
224   // Returns a server writer which writes responses into the context's buffer.
225   // This should not be called alongside call(); use one or the other.
writer()226   RawServerWriter writer() {
227     ctx_.output.clear();
228     BaseServerWriter server_writer(ctx_.call);
229     return std::move(static_cast<RawServerWriter&>(server_writer));
230   }
231 
232   // Returns the responses that have been recorded. The maximum number of
233   // responses is responses().max_size(). responses().back() is always the most
234   // recent response, even if total_responses() > responses().max_size().
responses()235   const Vector<ByteSpan>& responses() const { return ctx_.responses; }
236 
237   // The total number of responses sent, which may be larger than
238   // responses.max_size().
total_responses()239   size_t total_responses() const { return ctx_.output.total_responses(); }
240 
241   // True if the stream has terminated.
done()242   bool done() const { return ctx_.output.stream_ended(); }
243 
244   // The status of the stream. Only valid if done() is true.
status()245   Status status() const {
246     PW_ASSERT(done());
247     return ctx_.output.last_status();
248   }
249 };
250 
251 // Alias to select the type of the context object to use based on which type of
252 // RPC it is for.
253 template <typename Service,
254           auto method,
255           uint32_t kMethodId,
256           size_t kMaxResponse,
257           size_t kOutputSize>
258 using Context = std::tuple_element_t<
259     static_cast<size_t>(MethodTraits<decltype(method)>::kType),
260     std::tuple<UnaryContext<Service, method, kMethodId, kOutputSize>,
261                ServerStreamingContext<Service,
262                                       method,
263                                       kMethodId,
264                                       kMaxResponse,
265                                       kOutputSize>
266                // TODO(hepler): Support client and bidi streaming
267                >>;
268 
269 template <size_t kOutputSize>
SendAndReleaseBuffer(std::span<const std::byte> buffer)270 Status MessageOutput<kOutputSize>::SendAndReleaseBuffer(
271     std::span<const std::byte> buffer) {
272   PW_ASSERT(!stream_ended_);
273   PW_ASSERT(buffer.data() == packet_buffer_.data());
274 
275   if (buffer.empty()) {
276     return OkStatus();
277   }
278 
279   Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
280   PW_ASSERT(result.ok());
281 
282   last_status_ = result.value().status();
283 
284   switch (result.value().type()) {
285     case internal::PacketType::RESPONSE: {
286       // If we run out of space, the back message is always the most recent.
287       buffers_.emplace_back();
288       buffers_.back() = {};
289       auto response = result.value().payload();
290       std::memcpy(&buffers_.back(), response.data(), response.size());
291       responses_.emplace_back();
292       responses_.back() = {buffers_.back().data(), response.size()};
293       total_responses_ += 1;
294       break;
295     }
296     case internal::PacketType::SERVER_STREAM_END:
297       stream_ended_ = true;
298       break;
299     default:
300       PW_CRASH("Unhandled PacketType");
301   }
302   return OkStatus();
303 }
304 
305 }  // namespace internal::test::raw
306 
307 template <typename Service,
308           auto method,
309           uint32_t kMethodId,
310           size_t kMaxResponse,
311           size_t kOutputSizeBytes>
312 class RawTestMethodContext
313     : public internal::test::raw::
314           Context<Service, method, kMethodId, kMaxResponse, kOutputSizeBytes> {
315  public:
316   // Forwards constructor arguments to the service class.
317   template <typename... ServiceArgs>
RawTestMethodContext(ServiceArgs &&...service_args)318   RawTestMethodContext(ServiceArgs&&... service_args)
319       : internal::test::raw::
320             Context<Service, method, kMethodId, kMaxResponse, kOutputSizeBytes>(
321                 std::forward<ServiceArgs>(service_args)...) {}
322 };
323 
324 }  // namespace pw::rpc
325