1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cast/streaming/session_messenger.h"
6
7 #include "absl/strings/ascii.h"
8 #include "cast/common/public/message_port.h"
9 #include "cast/streaming/message_fields.h"
10 #include "util/json/json_helpers.h"
11 #include "util/json/json_serialization.h"
12 #include "util/osp_logging.h"
13
14 namespace openscreen {
15 namespace cast {
16
17 namespace {
18
ReplyIfTimedOut(int sequence_number,ReceiverMessage::Type reply_type,std::vector<std::pair<int,SenderSessionMessenger::ReplyCallback>> * replies)19 void ReplyIfTimedOut(
20 int sequence_number,
21 ReceiverMessage::Type reply_type,
22 std::vector<std::pair<int, SenderSessionMessenger::ReplyCallback>>*
23 replies) {
24 for (auto it = replies->begin(); it != replies->end(); ++it) {
25 if (it->first == sequence_number) {
26 OSP_VLOG
27 << "Replying with empty message due to timeout for sequence number: "
28 << sequence_number;
29 it->second(ReceiverMessage{reply_type, sequence_number});
30 replies->erase(it);
31 break;
32 }
33 }
34 }
35
36 } // namespace
37
SessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)38 SessionMessenger::SessionMessenger(MessagePort* message_port,
39 std::string source_id,
40 ErrorCallback cb)
41 : message_port_(message_port), error_callback_(std::move(cb)) {
42 OSP_DCHECK(message_port_);
43 OSP_DCHECK(!source_id.empty());
44 message_port_->SetClient(this, source_id);
45 }
46
~SessionMessenger()47 SessionMessenger::~SessionMessenger() {
48 message_port_->ResetClient();
49 }
50
SendMessage(const std::string & destination_id,const std::string & namespace_,const Json::Value & message_root)51 Error SessionMessenger::SendMessage(const std::string& destination_id,
52 const std::string& namespace_,
53 const Json::Value& message_root) {
54 OSP_DCHECK(namespace_ == kCastRemotingNamespace ||
55 namespace_ == kCastWebrtcNamespace);
56 auto body_or_error = json::Stringify(message_root);
57 if (body_or_error.is_error()) {
58 return std::move(body_or_error.error());
59 }
60 OSP_VLOG << "Sending message: DESTINATION[" << destination_id
61 << "], NAMESPACE[" << namespace_ << "], BODY:\n"
62 << body_or_error.value();
63 message_port_->PostMessage(destination_id, namespace_, body_or_error.value());
64 return Error::None();
65 }
66
ReportError(Error error)67 void SessionMessenger::ReportError(Error error) {
68 error_callback_(std::move(error));
69 }
70
SenderSessionMessenger(MessagePort * message_port,std::string source_id,std::string receiver_id,ErrorCallback cb,TaskRunner * task_runner)71 SenderSessionMessenger::SenderSessionMessenger(MessagePort* message_port,
72 std::string source_id,
73 std::string receiver_id,
74 ErrorCallback cb,
75 TaskRunner* task_runner)
76 : SessionMessenger(message_port, std::move(source_id), std::move(cb)),
77 task_runner_(task_runner),
78 receiver_id_(std::move(receiver_id)) {}
79
SetHandler(ReceiverMessage::Type type,ReplyCallback cb)80 void SenderSessionMessenger::SetHandler(ReceiverMessage::Type type,
81 ReplyCallback cb) {
82 // Currently the only handler allowed is for RPC messages.
83 OSP_DCHECK(type == ReceiverMessage::Type::kRpc);
84 rpc_callback_ = std::move(cb);
85 }
86
SendOutboundMessage(SenderMessage message)87 Error SenderSessionMessenger::SendOutboundMessage(SenderMessage message) {
88 const auto namespace_ = (message.type == SenderMessage::Type::kRpc)
89 ? kCastRemotingNamespace
90 : kCastWebrtcNamespace;
91
92 ErrorOr<Json::Value> jsonified = message.ToJson();
93 OSP_CHECK(jsonified.is_value()) << "Tried to send an invalid message";
94 return SessionMessenger::SendMessage(receiver_id_, namespace_,
95 jsonified.value());
96 }
97
SendRequest(SenderMessage message,ReceiverMessage::Type reply_type,ReplyCallback cb)98 Error SenderSessionMessenger::SendRequest(SenderMessage message,
99 ReceiverMessage::Type reply_type,
100 ReplyCallback cb) {
101 static constexpr std::chrono::milliseconds kReplyTimeout{4000};
102 // RPC messages are not meant to be request/reply.
103 OSP_DCHECK(reply_type != ReceiverMessage::Type::kRpc);
104
105 const Error error = SendOutboundMessage(message);
106 if (!error.ok()) {
107 return error;
108 }
109
110 OSP_DCHECK(awaiting_replies_.find(message.sequence_number) ==
111 awaiting_replies_.end());
112 awaiting_replies_.emplace_back(message.sequence_number, std::move(cb));
113 task_runner_->PostTaskWithDelay(
114 [self = weak_factory_.GetWeakPtr(), reply_type,
115 seq_num = message.sequence_number] {
116 if (self) {
117 ReplyIfTimedOut(seq_num, reply_type, &self->awaiting_replies_);
118 }
119 },
120 kReplyTimeout);
121
122 return Error::None();
123 }
124
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)125 void SenderSessionMessenger::OnMessage(const std::string& source_id,
126 const std::string& message_namespace,
127 const std::string& message) {
128 if (source_id != receiver_id_) {
129 OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
130 "expected id \""
131 << receiver_id_ << "\", got \"" << source_id << "\"";
132 return;
133 }
134
135 if (message_namespace != kCastWebrtcNamespace &&
136 message_namespace != kCastRemotingNamespace) {
137 OSP_DLOG_WARN << "Received message from unknown namespace: "
138 << message_namespace;
139 return;
140 }
141
142 ErrorOr<Json::Value> message_body = json::Parse(message);
143 if (!message_body) {
144 ReportError(message_body.error());
145 OSP_DLOG_WARN << "Received an invalid message: " << message;
146 return;
147 }
148
149 // If the message is valid JSON and we don't understand it, there are two
150 // options: (1) it's an unknown type, or (2) the receiver filled out the
151 // message incorrectly. In the first case we can drop it, it's likely just
152 // unsupported. In the second case we might need it, so worth warning the
153 // client.
154 ErrorOr<ReceiverMessage> receiver_message =
155 ReceiverMessage::Parse(message_body.value());
156 if (receiver_message.is_error()) {
157 ReportError(receiver_message.error());
158 OSP_DLOG_WARN << "Received an invalid receiver message: "
159 << receiver_message.error();
160 }
161
162 if (receiver_message.value().type == ReceiverMessage::Type::kRpc) {
163 if (rpc_callback_) {
164 rpc_callback_(receiver_message.value({}));
165 } else {
166 OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
167 }
168 } else {
169 int sequence_number;
170 if (!json::TryParseInt(message_body.value()[kSequenceNumber],
171 &sequence_number)) {
172 OSP_DLOG_WARN << "Received a message without a sequence number";
173 return;
174 }
175
176 auto it = awaiting_replies_.find(sequence_number);
177 if (it == awaiting_replies_.end()) {
178 OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "
179 << sequence_number;
180 return;
181 }
182
183 it->second(std::move(receiver_message.value({})));
184
185 // Calling the function callback may result in the checksum of the pointed
186 // to object to change, so calling erase() on the iterator after executing
187 // second() may result in a segfault.
188 awaiting_replies_.erase_key(sequence_number);
189 }
190 }
191
OnError(Error error)192 void SenderSessionMessenger::OnError(Error error) {
193 OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
194 }
195
ReceiverSessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)196 ReceiverSessionMessenger::ReceiverSessionMessenger(MessagePort* message_port,
197 std::string source_id,
198 ErrorCallback cb)
199 : SessionMessenger(message_port, std::move(source_id), std::move(cb)) {}
200
SetHandler(SenderMessage::Type type,RequestCallback cb)201 void ReceiverSessionMessenger::SetHandler(SenderMessage::Type type,
202 RequestCallback cb) {
203 OSP_DCHECK(callbacks_.find(type) == callbacks_.end());
204 callbacks_.emplace_back(type, std::move(cb));
205 }
206
SendMessage(ReceiverMessage message)207 Error ReceiverSessionMessenger::SendMessage(ReceiverMessage message) {
208 if (sender_session_id_.empty()) {
209 return Error(Error::Code::kInitializationFailure,
210 "Tried to send a message without receiving one first");
211 }
212
213 const auto namespace_ = (message.type == ReceiverMessage::Type::kRpc)
214 ? kCastRemotingNamespace
215 : kCastWebrtcNamespace;
216
217 ErrorOr<Json::Value> message_json = message.ToJson();
218 OSP_CHECK(message_json.is_value()) << "Tried to send an invalid message";
219 return SessionMessenger::SendMessage(sender_session_id_, namespace_,
220 message_json.value());
221 }
222
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)223 void ReceiverSessionMessenger::OnMessage(const std::string& source_id,
224 const std::string& message_namespace,
225 const std::string& message) {
226 // We assume we are connected to the first sender_id we receive.
227 if (sender_session_id_.empty()) {
228 sender_session_id_ = source_id;
229 } else if (source_id != sender_session_id_) {
230 OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
231 "id \""
232 << sender_session_id_ << "\", got \"" << source_id << "\"";
233 return;
234 }
235
236 if (message_namespace != kCastWebrtcNamespace &&
237 message_namespace != kCastRemotingNamespace) {
238 OSP_DLOG_WARN << "Received message from unknown namespace: "
239 << message_namespace;
240 return;
241 }
242
243 // If the message is bad JSON, the sender is in a funky state so we
244 // report an error.
245 ErrorOr<Json::Value> message_body = json::Parse(message);
246 if (message_body.is_error()) {
247 ReportError(message_body.error());
248 return;
249 }
250
251 // If the message is valid JSON and we don't understand it, there are two
252 // options: (1) it's an unknown type, or (2) the sender filled out the message
253 // incorrectly. In the first case we can drop it, it's likely just
254 // unsupported. In the second case we might need it, so worth warning the
255 // client.
256 ErrorOr<SenderMessage> sender_message =
257 SenderMessage::Parse(message_body.value());
258 if (sender_message.is_error()) {
259 ReportError(sender_message.error());
260 OSP_DLOG_WARN << "Received an invalid sender message: "
261 << sender_message.error();
262 return;
263 }
264
265 auto it = callbacks_.find(sender_message.value().type);
266 if (it == callbacks_.end()) {
267 OSP_DLOG_INFO << "Received message without a callback, dropping";
268 } else {
269 it->second(sender_message.value());
270 }
271 }
272
OnError(Error error)273 void ReceiverSessionMessenger::OnError(Error error) {
274 OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
275 }
276
277 } // namespace cast
278 } // namespace openscreen
279