• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/session_messenger.h"
6 
7 #include "absl/strings/ascii.h"
8 #include "cast/common/public/message_port.h"
9 #include "cast/streaming/message_fields.h"
10 #include "util/json/json_helpers.h"
11 #include "util/json/json_serialization.h"
12 #include "util/osp_logging.h"
13 
14 namespace openscreen {
15 namespace cast {
16 
17 namespace {
18 
ReplyIfTimedOut(int sequence_number,ReceiverMessage::Type reply_type,std::vector<std::pair<int,SenderSessionMessenger::ReplyCallback>> * replies)19 void ReplyIfTimedOut(
20     int sequence_number,
21     ReceiverMessage::Type reply_type,
22     std::vector<std::pair<int, SenderSessionMessenger::ReplyCallback>>*
23         replies) {
24   for (auto it = replies->begin(); it != replies->end(); ++it) {
25     if (it->first == sequence_number) {
26       OSP_VLOG
27           << "Replying with empty message due to timeout for sequence number: "
28           << sequence_number;
29       it->second(ReceiverMessage{reply_type, sequence_number});
30       replies->erase(it);
31       break;
32     }
33   }
34 }
35 
36 }  // namespace
37 
SessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)38 SessionMessenger::SessionMessenger(MessagePort* message_port,
39                                    std::string source_id,
40                                    ErrorCallback cb)
41     : message_port_(message_port), error_callback_(std::move(cb)) {
42   OSP_DCHECK(message_port_);
43   OSP_DCHECK(!source_id.empty());
44   message_port_->SetClient(this, source_id);
45 }
46 
~SessionMessenger()47 SessionMessenger::~SessionMessenger() {
48   message_port_->ResetClient();
49 }
50 
SendMessage(const std::string & destination_id,const std::string & namespace_,const Json::Value & message_root)51 Error SessionMessenger::SendMessage(const std::string& destination_id,
52                                     const std::string& namespace_,
53                                     const Json::Value& message_root) {
54   OSP_DCHECK(namespace_ == kCastRemotingNamespace ||
55              namespace_ == kCastWebrtcNamespace);
56   auto body_or_error = json::Stringify(message_root);
57   if (body_or_error.is_error()) {
58     return std::move(body_or_error.error());
59   }
60   OSP_VLOG << "Sending message: DESTINATION[" << destination_id
61            << "], NAMESPACE[" << namespace_ << "], BODY:\n"
62            << body_or_error.value();
63   message_port_->PostMessage(destination_id, namespace_, body_or_error.value());
64   return Error::None();
65 }
66 
ReportError(Error error)67 void SessionMessenger::ReportError(Error error) {
68   error_callback_(std::move(error));
69 }
70 
SenderSessionMessenger(MessagePort * message_port,std::string source_id,std::string receiver_id,ErrorCallback cb,TaskRunner * task_runner)71 SenderSessionMessenger::SenderSessionMessenger(MessagePort* message_port,
72                                                std::string source_id,
73                                                std::string receiver_id,
74                                                ErrorCallback cb,
75                                                TaskRunner* task_runner)
76     : SessionMessenger(message_port, std::move(source_id), std::move(cb)),
77       task_runner_(task_runner),
78       receiver_id_(std::move(receiver_id)) {}
79 
SetHandler(ReceiverMessage::Type type,ReplyCallback cb)80 void SenderSessionMessenger::SetHandler(ReceiverMessage::Type type,
81                                         ReplyCallback cb) {
82   // Currently the only handler allowed is for RPC messages.
83   OSP_DCHECK(type == ReceiverMessage::Type::kRpc);
84   rpc_callback_ = std::move(cb);
85 }
86 
SendOutboundMessage(SenderMessage message)87 Error SenderSessionMessenger::SendOutboundMessage(SenderMessage message) {
88   const auto namespace_ = (message.type == SenderMessage::Type::kRpc)
89                               ? kCastRemotingNamespace
90                               : kCastWebrtcNamespace;
91 
92   ErrorOr<Json::Value> jsonified = message.ToJson();
93   OSP_CHECK(jsonified.is_value()) << "Tried to send an invalid message";
94   return SessionMessenger::SendMessage(receiver_id_, namespace_,
95                                        jsonified.value());
96 }
97 
SendRequest(SenderMessage message,ReceiverMessage::Type reply_type,ReplyCallback cb)98 Error SenderSessionMessenger::SendRequest(SenderMessage message,
99                                           ReceiverMessage::Type reply_type,
100                                           ReplyCallback cb) {
101   static constexpr std::chrono::milliseconds kReplyTimeout{4000};
102   // RPC messages are not meant to be request/reply.
103   OSP_DCHECK(reply_type != ReceiverMessage::Type::kRpc);
104 
105   const Error error = SendOutboundMessage(message);
106   if (!error.ok()) {
107     return error;
108   }
109 
110   OSP_DCHECK(awaiting_replies_.find(message.sequence_number) ==
111              awaiting_replies_.end());
112   awaiting_replies_.emplace_back(message.sequence_number, std::move(cb));
113   task_runner_->PostTaskWithDelay(
114       [self = weak_factory_.GetWeakPtr(), reply_type,
115        seq_num = message.sequence_number] {
116         if (self) {
117           ReplyIfTimedOut(seq_num, reply_type, &self->awaiting_replies_);
118         }
119       },
120       kReplyTimeout);
121 
122   return Error::None();
123 }
124 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)125 void SenderSessionMessenger::OnMessage(const std::string& source_id,
126                                        const std::string& message_namespace,
127                                        const std::string& message) {
128   if (source_id != receiver_id_) {
129     OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
130                      "expected id \""
131                   << receiver_id_ << "\", got \"" << source_id << "\"";
132     return;
133   }
134 
135   if (message_namespace != kCastWebrtcNamespace &&
136       message_namespace != kCastRemotingNamespace) {
137     OSP_DLOG_WARN << "Received message from unknown namespace: "
138                   << message_namespace;
139     return;
140   }
141 
142   ErrorOr<Json::Value> message_body = json::Parse(message);
143   if (!message_body) {
144     ReportError(message_body.error());
145     OSP_DLOG_WARN << "Received an invalid message: " << message;
146     return;
147   }
148 
149   // If the message is valid JSON and we don't understand it, there are two
150   // options: (1) it's an unknown type, or (2) the receiver filled out the
151   // message incorrectly. In the first case we can drop it, it's likely just
152   // unsupported. In the second case we might need it, so worth warning the
153   // client.
154   ErrorOr<ReceiverMessage> receiver_message =
155       ReceiverMessage::Parse(message_body.value());
156   if (receiver_message.is_error()) {
157     ReportError(receiver_message.error());
158     OSP_DLOG_WARN << "Received an invalid receiver message: "
159                   << receiver_message.error();
160   }
161 
162   if (receiver_message.value().type == ReceiverMessage::Type::kRpc) {
163     if (rpc_callback_) {
164       rpc_callback_(receiver_message.value({}));
165     } else {
166       OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
167     }
168   } else {
169     int sequence_number;
170     if (!json::TryParseInt(message_body.value()[kSequenceNumber],
171                            &sequence_number)) {
172       OSP_DLOG_WARN << "Received a message without a sequence number";
173       return;
174     }
175 
176     auto it = awaiting_replies_.find(sequence_number);
177     if (it == awaiting_replies_.end()) {
178       OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "
179                     << sequence_number;
180       return;
181     }
182 
183     it->second(std::move(receiver_message.value({})));
184 
185     // Calling the function callback may result in the checksum of the pointed
186     // to object to change, so calling erase() on the iterator after executing
187     // second() may result in a segfault.
188     awaiting_replies_.erase_key(sequence_number);
189   }
190 }
191 
OnError(Error error)192 void SenderSessionMessenger::OnError(Error error) {
193   OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
194 }
195 
ReceiverSessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)196 ReceiverSessionMessenger::ReceiverSessionMessenger(MessagePort* message_port,
197                                                    std::string source_id,
198                                                    ErrorCallback cb)
199     : SessionMessenger(message_port, std::move(source_id), std::move(cb)) {}
200 
SetHandler(SenderMessage::Type type,RequestCallback cb)201 void ReceiverSessionMessenger::SetHandler(SenderMessage::Type type,
202                                           RequestCallback cb) {
203   OSP_DCHECK(callbacks_.find(type) == callbacks_.end());
204   callbacks_.emplace_back(type, std::move(cb));
205 }
206 
SendMessage(ReceiverMessage message)207 Error ReceiverSessionMessenger::SendMessage(ReceiverMessage message) {
208   if (sender_session_id_.empty()) {
209     return Error(Error::Code::kInitializationFailure,
210                  "Tried to send a message without receiving one first");
211   }
212 
213   const auto namespace_ = (message.type == ReceiverMessage::Type::kRpc)
214                               ? kCastRemotingNamespace
215                               : kCastWebrtcNamespace;
216 
217   ErrorOr<Json::Value> message_json = message.ToJson();
218   OSP_CHECK(message_json.is_value()) << "Tried to send an invalid message";
219   return SessionMessenger::SendMessage(sender_session_id_, namespace_,
220                                        message_json.value());
221 }
222 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)223 void ReceiverSessionMessenger::OnMessage(const std::string& source_id,
224                                          const std::string& message_namespace,
225                                          const std::string& message) {
226   // We assume we are connected to the first sender_id we receive.
227   if (sender_session_id_.empty()) {
228     sender_session_id_ = source_id;
229   } else if (source_id != sender_session_id_) {
230     OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
231                      "id \""
232                   << sender_session_id_ << "\", got \"" << source_id << "\"";
233     return;
234   }
235 
236   if (message_namespace != kCastWebrtcNamespace &&
237       message_namespace != kCastRemotingNamespace) {
238     OSP_DLOG_WARN << "Received message from unknown namespace: "
239                   << message_namespace;
240     return;
241   }
242 
243   // If the message is bad JSON, the sender is in a funky state so we
244   // report an error.
245   ErrorOr<Json::Value> message_body = json::Parse(message);
246   if (message_body.is_error()) {
247     ReportError(message_body.error());
248     return;
249   }
250 
251   // If the message is valid JSON and we don't understand it, there are two
252   // options: (1) it's an unknown type, or (2) the sender filled out the message
253   // incorrectly. In the first case we can drop it, it's likely just
254   // unsupported. In the second case we might need it, so worth warning the
255   // client.
256   ErrorOr<SenderMessage> sender_message =
257       SenderMessage::Parse(message_body.value());
258   if (sender_message.is_error()) {
259     ReportError(sender_message.error());
260     OSP_DLOG_WARN << "Received an invalid sender message: "
261                   << sender_message.error();
262     return;
263   }
264 
265   auto it = callbacks_.find(sender_message.value().type);
266   if (it == callbacks_.end()) {
267     OSP_DLOG_INFO << "Received message without a callback, dropping";
268   } else {
269     it->second(sender_message.value());
270   }
271 }
272 
OnError(Error error)273 void ReceiverSessionMessenger::OnError(Error error) {
274   OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
275 }
276 
277 }  // namespace cast
278 }  // namespace openscreen
279