• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/session_messager.h"
6 
7 #include "absl/strings/ascii.h"
8 #include "cast/common/public/message_port.h"
9 #include "cast/streaming/message_fields.h"
10 #include "util/json/json_helpers.h"
11 #include "util/json/json_serialization.h"
12 #include "util/osp_logging.h"
13 
14 namespace openscreen {
15 namespace cast {
16 
17 namespace {
18 
ReplyIfTimedOut(int sequence_number,ReceiverMessage::Type reply_type,std::vector<std::pair<int,SenderSessionMessager::ReplyCallback>> * replies)19 void ReplyIfTimedOut(
20     int sequence_number,
21     ReceiverMessage::Type reply_type,
22     std::vector<std::pair<int, SenderSessionMessager::ReplyCallback>>*
23         replies) {
24   auto it = replies->begin();
25   for (; it != replies->end(); ++it) {
26     if (it->first == sequence_number) {
27       OSP_DVLOG
28           << "Replying with empty message due to timeout for sequence number: "
29           << sequence_number;
30       it->second(ReceiverMessage{reply_type, sequence_number});
31       replies->erase(it);
32       break;
33     }
34   }
35 }
36 
37 }  // namespace
38 
SessionMessager(MessagePort * message_port,std::string source_id,ErrorCallback cb)39 SessionMessager::SessionMessager(MessagePort* message_port,
40                                  std::string source_id,
41                                  ErrorCallback cb)
42     : message_port_(message_port), error_callback_(std::move(cb)) {
43   OSP_DCHECK(message_port_);
44   OSP_DCHECK(!source_id.empty());
45   message_port_->SetClient(this, source_id);
46 }
47 
~SessionMessager()48 SessionMessager::~SessionMessager() {
49   message_port_->ResetClient();
50 }
51 
SendMessage(const std::string & destination_id,const std::string & namespace_,const Json::Value & message_root)52 Error SessionMessager::SendMessage(const std::string& destination_id,
53                                    const std::string& namespace_,
54                                    const Json::Value& message_root) {
55   OSP_DCHECK(namespace_ == kCastRemotingNamespace ||
56              namespace_ == kCastWebrtcNamespace);
57   auto body_or_error = json::Stringify(message_root);
58   if (body_or_error.is_error()) {
59     return std::move(body_or_error.error());
60   }
61   OSP_DVLOG << "Sending message: DESTINATION[" << destination_id
62             << "], NAMESPACE[" << namespace_ << "], BODY:\n"
63             << body_or_error.value();
64   message_port_->PostMessage(destination_id, namespace_, body_or_error.value());
65   return Error::None();
66 }
67 
ReportError(Error error)68 void SessionMessager::ReportError(Error error) {
69   error_callback_(std::move(error));
70 }
71 
SenderSessionMessager(MessagePort * message_port,std::string source_id,std::string receiver_id,ErrorCallback cb,TaskRunner * task_runner)72 SenderSessionMessager::SenderSessionMessager(MessagePort* message_port,
73                                              std::string source_id,
74                                              std::string receiver_id,
75                                              ErrorCallback cb,
76                                              TaskRunner* task_runner)
77     : SessionMessager(message_port, std::move(source_id), std::move(cb)),
78       task_runner_(task_runner),
79       receiver_id_(std::move(receiver_id)) {}
80 
SetHandler(ReceiverMessage::Type type,ReplyCallback cb)81 void SenderSessionMessager::SetHandler(ReceiverMessage::Type type,
82                                        ReplyCallback cb) {
83   // Currently the only handler allowed is for RPC messages.
84   OSP_DCHECK(type == ReceiverMessage::Type::kRpc);
85   rpc_callback_ = std::move(cb);
86 }
87 
SendOutboundMessage(SenderMessage message)88 Error SenderSessionMessager::SendOutboundMessage(SenderMessage message) {
89   const auto namespace_ = (message.type == SenderMessage::Type::kRpc)
90                               ? kCastRemotingNamespace
91                               : kCastWebrtcNamespace;
92 
93   ErrorOr<Json::Value> jsonified = message.ToJson();
94   OSP_CHECK(jsonified.is_value()) << "Tried to send an invalid message";
95   return SessionMessager::SendMessage(receiver_id_, namespace_,
96                                       jsonified.value());
97 }
98 
SendRequest(SenderMessage message,ReceiverMessage::Type reply_type,ReplyCallback cb)99 Error SenderSessionMessager::SendRequest(SenderMessage message,
100                                          ReceiverMessage::Type reply_type,
101                                          ReplyCallback cb) {
102   static constexpr std::chrono::milliseconds kReplyTimeout{4000};
103   // RPC messages are not meant to be request/reply.
104   OSP_DCHECK(reply_type != ReceiverMessage::Type::kRpc);
105 
106   const Error error = SendOutboundMessage(message);
107   if (!error.ok()) {
108     return error;
109   }
110 
111   awaiting_replies_.emplace_back(message.sequence_number, std::move(cb));
112   task_runner_->PostTaskWithDelay(
113       [self = weak_factory_.GetWeakPtr(), reply_type,
114        seq_num = message.sequence_number] {
115         if (self) {
116           ReplyIfTimedOut(seq_num, reply_type, &self->awaiting_replies_);
117         }
118       },
119       kReplyTimeout);
120 
121   return Error::None();
122 }
123 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)124 void SenderSessionMessager::OnMessage(const std::string& source_id,
125                                       const std::string& message_namespace,
126                                       const std::string& message) {
127   if (source_id != receiver_id_) {
128     OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
129                      "expected id \""
130                   << receiver_id_ << "\", got \"" << source_id << "\"";
131     return;
132   }
133 
134   if (message_namespace != kCastWebrtcNamespace &&
135       message_namespace != kCastRemotingNamespace) {
136     OSP_DLOG_WARN << "Received message from unknown namespace: "
137                   << message_namespace;
138     return;
139   }
140 
141   ErrorOr<Json::Value> message_body = json::Parse(message);
142   if (!message_body) {
143     ReportError(message_body.error());
144     OSP_DLOG_WARN << "Received an invalid message: " << message;
145     return;
146   }
147 
148   int sequence_number;
149   if (!json::ParseAndValidateInt(message_body.value()[kSequenceNumber],
150                                  &sequence_number)) {
151     OSP_DLOG_WARN << "Received a message without a sequence number";
152     return;
153   }
154 
155   // If the message is valid JSON and we don't understand it, there are two
156   // options: (1) it's an unknown type, or (2) the receiver filled out the
157   // message incorrectly. In the first case we can drop it, it's likely just
158   // unsupported. In the second case we might need it, so worth warning the
159   // client.
160   ErrorOr<ReceiverMessage> receiver_message =
161       ReceiverMessage::Parse(message_body.value());
162   if (receiver_message.is_error()) {
163     ReportError(receiver_message.error());
164     OSP_DLOG_WARN << "Received an invalid receiver message: "
165                   << receiver_message.error();
166   }
167 
168   if (receiver_message.value().type == ReceiverMessage::Type::kRpc) {
169     if (rpc_callback_) {
170       rpc_callback_(receiver_message.value({}));
171     } else {
172       OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
173     }
174   } else {
175     auto it = awaiting_replies_.find(sequence_number);
176     if (it == awaiting_replies_.end()) {
177       OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "
178                     << sequence_number;
179       return;
180     }
181 
182     it->second(receiver_message.value({}));
183     awaiting_replies_.erase(it);
184   }
185 }
186 
OnError(Error error)187 void SenderSessionMessager::OnError(Error error) {
188   OSP_DLOG_WARN << "Received an error in the session messager: " << error;
189 }
190 
ReceiverSessionMessager(MessagePort * message_port,std::string source_id,ErrorCallback cb)191 ReceiverSessionMessager::ReceiverSessionMessager(MessagePort* message_port,
192                                                  std::string source_id,
193                                                  ErrorCallback cb)
194     : SessionMessager(message_port, std::move(source_id), std::move(cb)) {}
195 
SetHandler(SenderMessage::Type type,RequestCallback cb)196 void ReceiverSessionMessager::SetHandler(SenderMessage::Type type,
197                                          RequestCallback cb) {
198   OSP_DCHECK(callbacks_.find(type) == callbacks_.end());
199   callbacks_.emplace_back(type, std::move(cb));
200 }
201 
SendMessage(ReceiverMessage message)202 Error ReceiverSessionMessager::SendMessage(ReceiverMessage message) {
203   if (sender_session_id_.empty()) {
204     return Error(Error::Code::kInitializationFailure,
205                  "Tried to send a message without receving one first");
206   }
207 
208   const auto namespace_ = (message.type == ReceiverMessage::Type::kRpc)
209                               ? kCastRemotingNamespace
210                               : kCastWebrtcNamespace;
211 
212   ErrorOr<Json::Value> message_json = message.ToJson();
213   OSP_CHECK(message_json.is_value()) << "Tried to send an invalid message";
214   return SessionMessager::SendMessage(sender_session_id_, namespace_,
215                                       message_json.value());
216 }
217 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)218 void ReceiverSessionMessager::OnMessage(const std::string& source_id,
219                                         const std::string& message_namespace,
220                                         const std::string& message) {
221   // We assume we are connected to the first sender_id we receive.
222   if (sender_session_id_.empty()) {
223     sender_session_id_ = source_id;
224   } else if (source_id != sender_session_id_) {
225     OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
226                      "id \""
227                   << sender_session_id_ << "\", got \"" << source_id << "\"";
228     return;
229   }
230 
231   if (message_namespace != kCastWebrtcNamespace &&
232       message_namespace != kCastRemotingNamespace) {
233     OSP_DLOG_WARN << "Received message from unknown namespace: "
234                   << message_namespace;
235     return;
236   }
237 
238   // If the message is bad JSON, the sender is in a funky state so we
239   // report an error.
240   ErrorOr<Json::Value> message_body = json::Parse(message);
241   if (message_body.is_error()) {
242     ReportError(message_body.error());
243     return;
244   }
245 
246   // If the message is valid JSON and we don't understand it, there are two
247   // options: (1) it's an unknown type, or (2) the sender filled out the message
248   // incorrectly. In the first case we can drop it, it's likely just
249   // unsupported. In the second case we might need it, so worth warning the
250   // client.
251   ErrorOr<SenderMessage> sender_message =
252       SenderMessage::Parse(message_body.value());
253   if (sender_message.is_error()) {
254     ReportError(sender_message.error());
255     OSP_DLOG_WARN << "Received an invalid sender message: "
256                   << sender_message.error();
257     return;
258   }
259 
260   auto it = callbacks_.find(sender_message.value().type);
261   if (it == callbacks_.end()) {
262     OSP_DLOG_INFO << "Received message without a callback, dropping";
263   } else {
264     it->second(sender_message.value());
265   }
266 }
267 
OnError(Error error)268 void ReceiverSessionMessager::OnError(Error error) {
269   OSP_DLOG_WARN << "Received an error in the session messager: " << error;
270 }
271 
272 }  // namespace cast
273 }  // namespace openscreen
274