1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cast/streaming/session_messager.h"
6
7 #include "absl/strings/ascii.h"
8 #include "cast/common/public/message_port.h"
9 #include "cast/streaming/message_fields.h"
10 #include "util/json/json_helpers.h"
11 #include "util/json/json_serialization.h"
12 #include "util/osp_logging.h"
13
14 namespace openscreen {
15 namespace cast {
16
17 namespace {
18
ReplyIfTimedOut(int sequence_number,ReceiverMessage::Type reply_type,std::vector<std::pair<int,SenderSessionMessager::ReplyCallback>> * replies)19 void ReplyIfTimedOut(
20 int sequence_number,
21 ReceiverMessage::Type reply_type,
22 std::vector<std::pair<int, SenderSessionMessager::ReplyCallback>>*
23 replies) {
24 auto it = replies->begin();
25 for (; it != replies->end(); ++it) {
26 if (it->first == sequence_number) {
27 OSP_DVLOG
28 << "Replying with empty message due to timeout for sequence number: "
29 << sequence_number;
30 it->second(ReceiverMessage{reply_type, sequence_number});
31 replies->erase(it);
32 break;
33 }
34 }
35 }
36
37 } // namespace
38
SessionMessager(MessagePort * message_port,std::string source_id,ErrorCallback cb)39 SessionMessager::SessionMessager(MessagePort* message_port,
40 std::string source_id,
41 ErrorCallback cb)
42 : message_port_(message_port), error_callback_(std::move(cb)) {
43 OSP_DCHECK(message_port_);
44 OSP_DCHECK(!source_id.empty());
45 message_port_->SetClient(this, source_id);
46 }
47
~SessionMessager()48 SessionMessager::~SessionMessager() {
49 message_port_->ResetClient();
50 }
51
SendMessage(const std::string & destination_id,const std::string & namespace_,const Json::Value & message_root)52 Error SessionMessager::SendMessage(const std::string& destination_id,
53 const std::string& namespace_,
54 const Json::Value& message_root) {
55 OSP_DCHECK(namespace_ == kCastRemotingNamespace ||
56 namespace_ == kCastWebrtcNamespace);
57 auto body_or_error = json::Stringify(message_root);
58 if (body_or_error.is_error()) {
59 return std::move(body_or_error.error());
60 }
61 OSP_DVLOG << "Sending message: DESTINATION[" << destination_id
62 << "], NAMESPACE[" << namespace_ << "], BODY:\n"
63 << body_or_error.value();
64 message_port_->PostMessage(destination_id, namespace_, body_or_error.value());
65 return Error::None();
66 }
67
ReportError(Error error)68 void SessionMessager::ReportError(Error error) {
69 error_callback_(std::move(error));
70 }
71
SenderSessionMessager(MessagePort * message_port,std::string source_id,std::string receiver_id,ErrorCallback cb,TaskRunner * task_runner)72 SenderSessionMessager::SenderSessionMessager(MessagePort* message_port,
73 std::string source_id,
74 std::string receiver_id,
75 ErrorCallback cb,
76 TaskRunner* task_runner)
77 : SessionMessager(message_port, std::move(source_id), std::move(cb)),
78 task_runner_(task_runner),
79 receiver_id_(std::move(receiver_id)) {}
80
SetHandler(ReceiverMessage::Type type,ReplyCallback cb)81 void SenderSessionMessager::SetHandler(ReceiverMessage::Type type,
82 ReplyCallback cb) {
83 // Currently the only handler allowed is for RPC messages.
84 OSP_DCHECK(type == ReceiverMessage::Type::kRpc);
85 rpc_callback_ = std::move(cb);
86 }
87
SendOutboundMessage(SenderMessage message)88 Error SenderSessionMessager::SendOutboundMessage(SenderMessage message) {
89 const auto namespace_ = (message.type == SenderMessage::Type::kRpc)
90 ? kCastRemotingNamespace
91 : kCastWebrtcNamespace;
92
93 ErrorOr<Json::Value> jsonified = message.ToJson();
94 OSP_CHECK(jsonified.is_value()) << "Tried to send an invalid message";
95 return SessionMessager::SendMessage(receiver_id_, namespace_,
96 jsonified.value());
97 }
98
SendRequest(SenderMessage message,ReceiverMessage::Type reply_type,ReplyCallback cb)99 Error SenderSessionMessager::SendRequest(SenderMessage message,
100 ReceiverMessage::Type reply_type,
101 ReplyCallback cb) {
102 static constexpr std::chrono::milliseconds kReplyTimeout{4000};
103 // RPC messages are not meant to be request/reply.
104 OSP_DCHECK(reply_type != ReceiverMessage::Type::kRpc);
105
106 const Error error = SendOutboundMessage(message);
107 if (!error.ok()) {
108 return error;
109 }
110
111 awaiting_replies_.emplace_back(message.sequence_number, std::move(cb));
112 task_runner_->PostTaskWithDelay(
113 [self = weak_factory_.GetWeakPtr(), reply_type,
114 seq_num = message.sequence_number] {
115 if (self) {
116 ReplyIfTimedOut(seq_num, reply_type, &self->awaiting_replies_);
117 }
118 },
119 kReplyTimeout);
120
121 return Error::None();
122 }
123
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)124 void SenderSessionMessager::OnMessage(const std::string& source_id,
125 const std::string& message_namespace,
126 const std::string& message) {
127 if (source_id != receiver_id_) {
128 OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
129 "expected id \""
130 << receiver_id_ << "\", got \"" << source_id << "\"";
131 return;
132 }
133
134 if (message_namespace != kCastWebrtcNamespace &&
135 message_namespace != kCastRemotingNamespace) {
136 OSP_DLOG_WARN << "Received message from unknown namespace: "
137 << message_namespace;
138 return;
139 }
140
141 ErrorOr<Json::Value> message_body = json::Parse(message);
142 if (!message_body) {
143 ReportError(message_body.error());
144 OSP_DLOG_WARN << "Received an invalid message: " << message;
145 return;
146 }
147
148 int sequence_number;
149 if (!json::ParseAndValidateInt(message_body.value()[kSequenceNumber],
150 &sequence_number)) {
151 OSP_DLOG_WARN << "Received a message without a sequence number";
152 return;
153 }
154
155 // If the message is valid JSON and we don't understand it, there are two
156 // options: (1) it's an unknown type, or (2) the receiver filled out the
157 // message incorrectly. In the first case we can drop it, it's likely just
158 // unsupported. In the second case we might need it, so worth warning the
159 // client.
160 ErrorOr<ReceiverMessage> receiver_message =
161 ReceiverMessage::Parse(message_body.value());
162 if (receiver_message.is_error()) {
163 ReportError(receiver_message.error());
164 OSP_DLOG_WARN << "Received an invalid receiver message: "
165 << receiver_message.error();
166 }
167
168 if (receiver_message.value().type == ReceiverMessage::Type::kRpc) {
169 if (rpc_callback_) {
170 rpc_callback_(receiver_message.value({}));
171 } else {
172 OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
173 }
174 } else {
175 auto it = awaiting_replies_.find(sequence_number);
176 if (it == awaiting_replies_.end()) {
177 OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "
178 << sequence_number;
179 return;
180 }
181
182 it->second(receiver_message.value({}));
183 awaiting_replies_.erase(it);
184 }
185 }
186
OnError(Error error)187 void SenderSessionMessager::OnError(Error error) {
188 OSP_DLOG_WARN << "Received an error in the session messager: " << error;
189 }
190
ReceiverSessionMessager(MessagePort * message_port,std::string source_id,ErrorCallback cb)191 ReceiverSessionMessager::ReceiverSessionMessager(MessagePort* message_port,
192 std::string source_id,
193 ErrorCallback cb)
194 : SessionMessager(message_port, std::move(source_id), std::move(cb)) {}
195
SetHandler(SenderMessage::Type type,RequestCallback cb)196 void ReceiverSessionMessager::SetHandler(SenderMessage::Type type,
197 RequestCallback cb) {
198 OSP_DCHECK(callbacks_.find(type) == callbacks_.end());
199 callbacks_.emplace_back(type, std::move(cb));
200 }
201
SendMessage(ReceiverMessage message)202 Error ReceiverSessionMessager::SendMessage(ReceiverMessage message) {
203 if (sender_session_id_.empty()) {
204 return Error(Error::Code::kInitializationFailure,
205 "Tried to send a message without receving one first");
206 }
207
208 const auto namespace_ = (message.type == ReceiverMessage::Type::kRpc)
209 ? kCastRemotingNamespace
210 : kCastWebrtcNamespace;
211
212 ErrorOr<Json::Value> message_json = message.ToJson();
213 OSP_CHECK(message_json.is_value()) << "Tried to send an invalid message";
214 return SessionMessager::SendMessage(sender_session_id_, namespace_,
215 message_json.value());
216 }
217
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)218 void ReceiverSessionMessager::OnMessage(const std::string& source_id,
219 const std::string& message_namespace,
220 const std::string& message) {
221 // We assume we are connected to the first sender_id we receive.
222 if (sender_session_id_.empty()) {
223 sender_session_id_ = source_id;
224 } else if (source_id != sender_session_id_) {
225 OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
226 "id \""
227 << sender_session_id_ << "\", got \"" << source_id << "\"";
228 return;
229 }
230
231 if (message_namespace != kCastWebrtcNamespace &&
232 message_namespace != kCastRemotingNamespace) {
233 OSP_DLOG_WARN << "Received message from unknown namespace: "
234 << message_namespace;
235 return;
236 }
237
238 // If the message is bad JSON, the sender is in a funky state so we
239 // report an error.
240 ErrorOr<Json::Value> message_body = json::Parse(message);
241 if (message_body.is_error()) {
242 ReportError(message_body.error());
243 return;
244 }
245
246 // If the message is valid JSON and we don't understand it, there are two
247 // options: (1) it's an unknown type, or (2) the sender filled out the message
248 // incorrectly. In the first case we can drop it, it's likely just
249 // unsupported. In the second case we might need it, so worth warning the
250 // client.
251 ErrorOr<SenderMessage> sender_message =
252 SenderMessage::Parse(message_body.value());
253 if (sender_message.is_error()) {
254 ReportError(sender_message.error());
255 OSP_DLOG_WARN << "Received an invalid sender message: "
256 << sender_message.error();
257 return;
258 }
259
260 auto it = callbacks_.find(sender_message.value().type);
261 if (it == callbacks_.end()) {
262 OSP_DLOG_INFO << "Received message without a callback, dropping";
263 } else {
264 it->second(sender_message.value());
265 }
266 }
267
OnError(Error error)268 void ReceiverSessionMessager::OnError(Error error) {
269 OSP_DLOG_WARN << "Received an error in the session messager: " << error;
270 }
271
272 } // namespace cast
273 } // namespace openscreen
274