1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/internal/call.h"
16
17 #include "pw_assert/check.h"
18 #include "pw_rpc/client.h"
19 #include "pw_rpc/internal/endpoint.h"
20 #include "pw_rpc/internal/method.h"
21 #include "pw_rpc/server.h"
22
23 namespace pw::rpc::internal {
24
25 // Creates an active client-side call, assigning it a new ID.
Call(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type)26 Call::Call(Endpoint& client,
27 uint32_t channel_id,
28 uint32_t service_id,
29 uint32_t method_id,
30 MethodType type)
31 : Call(client,
32 client.NewCallId(),
33 channel_id,
34 service_id,
35 method_id,
36 type,
37 kClientCall) {}
38
Call(Endpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,CallType call_type)39 Call::Call(Endpoint& endpoint_ref,
40 uint32_t call_id,
41 uint32_t channel_id,
42 uint32_t service_id,
43 uint32_t method_id,
44 MethodType type,
45 CallType call_type)
46 : endpoint_(&endpoint_ref),
47 channel_id_(channel_id),
48 id_(call_id),
49 service_id_(service_id),
50 method_id_(method_id),
51 rpc_state_(kActive),
52 type_(type),
53 call_type_(call_type),
54 client_stream_state_(HasClientStream(type) ? kClientStreamActive
55 : kClientStreamInactive) {
56 endpoint().RegisterCall(*this);
57 }
58
MoveFrom(Call & other)59 void Call::MoveFrom(Call& other) {
60 PW_DCHECK(!active_locked());
61
62 if (!other.active_locked()) {
63 return; // Nothing else to do; this call is already closed.
64 }
65
66 // Copy all members from the other call.
67 endpoint_ = other.endpoint_;
68 channel_id_ = other.channel_id_;
69 id_ = other.id_;
70 service_id_ = other.service_id_;
71 method_id_ = other.method_id_;
72
73 rpc_state_ = other.rpc_state_;
74 type_ = other.type_;
75 call_type_ = other.call_type_;
76 client_stream_state_ = other.client_stream_state_;
77
78 on_error_ = std::move(other.on_error_);
79 on_next_ = std::move(other.on_next_);
80
81 // Mark the other call inactive, unregister it, and register this one.
82 other.rpc_state_ = kInactive;
83 other.client_stream_state_ = kClientStreamInactive;
84
85 endpoint().UnregisterCall(other);
86 endpoint().RegisterUniqueCall(*this);
87 }
88
SendPacket(PacketType type,ConstByteSpan payload,Status status)89 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
90 if (!active_locked()) {
91 return Status::FailedPrecondition();
92 }
93
94 Channel* channel = endpoint_->GetInternalChannel(channel_id_);
95 if (channel == nullptr) {
96 return Status::Unavailable();
97 }
98 return channel->Send(MakePacket(type, payload, status));
99 }
100
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)101 Status Call::CloseAndSendFinalPacketLocked(PacketType type,
102 ConstByteSpan response,
103 Status status) {
104 const Status send_status = SendPacket(type, response, status);
105 UnregisterAndMarkClosed();
106 return send_status;
107 }
108
WriteLocked(ConstByteSpan payload)109 Status Call::WriteLocked(ConstByteSpan payload) {
110 return SendPacket(call_type_ == kServerCall ? PacketType::SERVER_STREAM
111 : PacketType::CLIENT_STREAM,
112 payload);
113 }
114
UnregisterAndMarkClosed()115 void Call::UnregisterAndMarkClosed() {
116 if (active_locked()) {
117 endpoint().UnregisterCall(*this);
118 MarkClosed();
119 }
120 }
121
122 } // namespace pw::rpc::internal
123