• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
17 
18 #include "pw_rpc/server.h"
19 // clang-format on
20 
21 #include <algorithm>
22 
23 #include "pw_log/log.h"
24 #include "pw_rpc/internal/endpoint.h"
25 #include "pw_rpc/internal/packet.h"
26 
27 namespace pw::rpc {
28 namespace {
29 
30 using internal::Packet;
31 using internal::PacketType;
32 
33 }  // namespace
34 
ProcessPacket(ConstByteSpan packet_data,ChannelOutput * interface)35 Status Server::ProcessPacket(ConstByteSpan packet_data,
36                              ChannelOutput* interface) {
37   PW_TRY_ASSIGN(Packet packet,
38                 Endpoint::ProcessPacket(packet_data, Packet::kServer));
39 
40   internal::rpc_lock().lock();
41   internal::ServerCall* const call =
42       static_cast<internal::ServerCall*>(FindCall(packet));
43 
44   // Verbose log for debugging.
45   // PW_LOG_DEBUG("RPC server received packet type %u for %u:%08x/%08x",
46   //              static_cast<unsigned>(packet.type()),
47   //              static_cast<unsigned>(packet.channel_id()),
48   //              static_cast<unsigned>(packet.service_id()),
49   //              static_cast<unsigned>(packet.method_id()));
50 
51   internal::Channel* channel = GetInternalChannel(packet.channel_id());
52   if (channel == nullptr) {
53     // If an interface was provided, respond with a SERVER_ERROR to indicate
54     // that the channel is not available on this server. Don't send responses to
55     // error messages, though, to avoid potential infinite cycles.
56     if (interface != nullptr && packet.type() != PacketType::CLIENT_ERROR) {
57       internal::Channel(packet.channel_id(), interface)
58           .Send(Packet::ServerError(packet, Status::Unavailable()))
59           .IgnoreError();
60     }
61 
62     internal::rpc_lock().unlock();
63     PW_LOG_WARN("RPC server received packet for unknown channel %u",
64                 static_cast<unsigned>(packet.channel_id()));
65     return Status::Unavailable();
66   }
67 
68   const auto [service, method] = FindMethod(packet);
69 
70   if (method == nullptr) {
71     // Don't send responses to errors to avoid infinite error cycles.
72     if (packet.type() != PacketType::CLIENT_ERROR) {
73       channel->Send(Packet::ServerError(packet, Status::NotFound()))
74           .IgnoreError();
75     }
76     internal::rpc_lock().unlock();
77     return OkStatus();  // OK since the packet was handled.
78   }
79 
80   switch (packet.type()) {
81     case PacketType::REQUEST: {
82       // If the REQUEST is for an ongoing RPC, the existing call will be
83       // cancelled when the new call object is created.
84       const internal::CallContext context(
85           *this, channel->id(), *service, *method, packet.call_id());
86       method->Invoke(context, packet);
87       break;
88     }
89     case PacketType::CLIENT_STREAM:
90       HandleClientStreamPacket(packet, *channel, call);
91       break;
92     case PacketType::CLIENT_ERROR:
93     case PacketType::DEPRECATED_CANCEL:
94       if (call != nullptr && call->id() == packet.call_id()) {
95         call->HandleError(packet.status());
96       } else {
97         internal::rpc_lock().unlock();
98       }
99       break;
100     case PacketType::CLIENT_STREAM_END:
101       HandleClientStreamPacket(packet, *channel, call);
102       break;
103     default:
104       internal::rpc_lock().unlock();
105       PW_LOG_WARN("pw_rpc server unable to handle packet of type %u",
106                   unsigned(packet.type()));
107   }
108 
109   return OkStatus();  // OK since the packet was handled
110 }
111 
FindMethod(const internal::Packet & packet)112 std::tuple<Service*, const internal::Method*> Server::FindMethod(
113     const internal::Packet& packet) {
114   // Packets always include service and method IDs.
115   auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
116     return s.id() == packet.service_id();
117   });
118 
119   if (service == services_.end()) {
120     return {};
121   }
122 
123   return {&(*service), service->FindMethod(packet.method_id())};
124 }
125 
HandleClientStreamPacket(const internal::Packet & packet,internal::Channel & channel,internal::ServerCall * call) const126 void Server::HandleClientStreamPacket(const internal::Packet& packet,
127                                       internal::Channel& channel,
128                                       internal::ServerCall* call) const {
129   if (call == nullptr || call->id() != packet.call_id()) {
130     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
131         .IgnoreError();  // Errors are logged in Channel::Send.
132     internal::rpc_lock().unlock();
133     PW_LOG_DEBUG(
134         "Received client stream packet for %u:%08x/%08x, which is not pending",
135         static_cast<unsigned>(packet.channel_id()),
136         static_cast<unsigned>(packet.service_id()),
137         static_cast<unsigned>(packet.method_id()));
138     return;
139   }
140 
141   if (!call->has_client_stream()) {
142     channel.Send(Packet::ServerError(packet, Status::InvalidArgument()))
143         .IgnoreError();  // Errors are logged in Channel::Send.
144     internal::rpc_lock().unlock();
145     return;
146   }
147 
148   if (!call->client_stream_open()) {
149     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
150         .IgnoreError();  // Errors are logged in Channel::Send.
151     internal::rpc_lock().unlock();
152     return;
153   }
154 
155   if (packet.type() == PacketType::CLIENT_STREAM) {
156     call->HandlePayload(packet.payload());
157   } else {  // Handle PacketType::CLIENT_STREAM_END.
158     call->HandleClientStreamEnd();
159   }
160 }
161 
162 }  // namespace pw::rpc
163