• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/receiver_session.h"
6 
7 #include <algorithm>
8 #include <chrono>
9 #include <string>
10 #include <utility>
11 
12 #include "absl/strings/match.h"
13 #include "absl/strings/numbers.h"
14 #include "cast/common/channel/message_util.h"
15 #include "cast/common/public/message_port.h"
16 #include "cast/streaming/environment.h"
17 #include "cast/streaming/message_fields.h"
18 #include "cast/streaming/offer_messages.h"
19 #include "cast/streaming/receiver.h"
20 #include "cast/streaming/sender_message.h"
21 #include "util/json/json_helpers.h"
22 #include "util/osp_logging.h"
23 
24 namespace openscreen {
25 namespace cast {
26 
27 // Using statements for constructor readability.
28 using Preferences = ReceiverSession::Preferences;
29 using ConfiguredReceivers = ReceiverSession::ConfiguredReceivers;
30 
31 namespace {
32 
33 template <typename Stream, typename Codec>
SelectStream(const std::vector<Codec> & preferred_codecs,const std::vector<Stream> & offered_streams)34 std::unique_ptr<Stream> SelectStream(
35     const std::vector<Codec>& preferred_codecs,
36     const std::vector<Stream>& offered_streams) {
37   for (auto codec : preferred_codecs) {
38     for (const Stream& offered_stream : offered_streams) {
39       if (offered_stream.codec == codec) {
40         OSP_DVLOG << "Selected " << CodecToString(codec)
41                   << " as codec for streaming";
42         return std::make_unique<Stream>(offered_stream);
43       }
44     }
45   }
46   return nullptr;
47 }
48 
ToDisplayResolution(const Resolution & resolution)49 DisplayResolution ToDisplayResolution(const Resolution& resolution) {
50   return DisplayResolution{resolution.width, resolution.height};
51 }
52 
53 }  // namespace
54 
55 ReceiverSession::Client::~Client() = default;
56 
57 Preferences::Preferences() = default;
Preferences(std::vector<VideoCodec> video_codecs,std::vector<AudioCodec> audio_codecs)58 Preferences::Preferences(std::vector<VideoCodec> video_codecs,
59                          std::vector<AudioCodec> audio_codecs)
60     : Preferences(video_codecs, audio_codecs, nullptr, nullptr) {}
61 
Preferences(std::vector<VideoCodec> video_codecs,std::vector<AudioCodec> audio_codecs,std::unique_ptr<Constraints> constraints,std::unique_ptr<DisplayDescription> description)62 Preferences::Preferences(std::vector<VideoCodec> video_codecs,
63                          std::vector<AudioCodec> audio_codecs,
64                          std::unique_ptr<Constraints> constraints,
65                          std::unique_ptr<DisplayDescription> description)
66     : video_codecs(std::move(video_codecs)),
67       audio_codecs(std::move(audio_codecs)),
68       constraints(std::move(constraints)),
69       display_description(std::move(description)) {}
70 
71 Preferences::Preferences(Preferences&&) noexcept = default;
72 Preferences& Preferences::operator=(Preferences&&) noexcept = default;
73 
ReceiverSession(Client * const client,Environment * environment,MessagePort * message_port,Preferences preferences)74 ReceiverSession::ReceiverSession(Client* const client,
75                                  Environment* environment,
76                                  MessagePort* message_port,
77                                  Preferences preferences)
78     : client_(client),
79       environment_(environment),
80       preferences_(std::move(preferences)),
81       session_id_(MakeUniqueSessionId("streaming_receiver")),
82       messager_(message_port,
83                 session_id_,
84                 [this](Error error) {
85                   OSP_DLOG_WARN << "Got a session messager error: " << error;
86                   client_->OnError(this, error);
87                 }),
88       packet_router_(environment_) {
89   OSP_DCHECK(client_);
90   OSP_DCHECK(environment_);
91 
92   messager_.SetHandler(
93       SenderMessage::Type::kOffer,
__anona59fcdb80302(SenderMessage message) 94       [this](SenderMessage message) { OnOffer(std::move(message)); });
95   environment_->SetSocketSubscriber(this);
96 }
97 
~ReceiverSession()98 ReceiverSession::~ReceiverSession() {
99   ResetReceivers(Client::kEndOfSession);
100 }
101 
OnSocketReady()102 void ReceiverSession::OnSocketReady() {
103   if (pending_session_) {
104     InitializeSession(*pending_session_);
105     pending_session_.reset();
106   }
107 }
108 
OnSocketInvalid(Error error)109 void ReceiverSession::OnSocketInvalid(Error error) {
110   if (pending_session_) {
111     SendErrorAnswerReply(pending_session_->sequence_number,
112                          "Failed to bind UDP socket");
113     pending_session_.reset();
114   }
115 
116   client_->OnError(this,
117                    Error(Error::Code::kSocketFailure,
118                          "The environment is invalid and should be replaced."));
119 }
120 
IsValid() const121 bool ReceiverSession::SessionProperties::IsValid() const {
122   return (selected_audio || selected_video) && sequence_number >= 0;
123 }
124 
OnOffer(SenderMessage message)125 void ReceiverSession::OnOffer(SenderMessage message) {
126   // We just drop offers we can't respond to. Note that libcast senders will
127   // always send a strictly positive sequence numbers, but zero is permitted
128   // by the spec.
129   if (message.sequence_number < 0) {
130     OSP_DLOG_WARN
131         << "Dropping offer with missing sequence number, can't respond";
132     return;
133   }
134 
135   if (!message.valid) {
136     SendErrorAnswerReply(message.sequence_number,
137                          "Failed to parse malformed OFFER");
138     client_->OnError(this, Error(Error::Code::kParameterInvalid,
139                                  "Received invalid OFFER message"));
140     return;
141   }
142 
143   auto properties = std::make_unique<SessionProperties>();
144   properties->sequence_number = message.sequence_number;
145 
146   const Offer& offer = absl::get<Offer>(message.body);
147   if (!offer.audio_streams.empty() && !preferences_.audio_codecs.empty()) {
148     properties->selected_audio =
149         SelectStream(preferences_.audio_codecs, offer.audio_streams);
150   }
151 
152   if (!offer.video_streams.empty() && !preferences_.video_codecs.empty()) {
153     properties->selected_video =
154         SelectStream(preferences_.video_codecs, offer.video_streams);
155   }
156 
157   if (!properties->IsValid()) {
158     SendErrorAnswerReply(message.sequence_number,
159                          "Failed to select any streams from OFFER");
160     return;
161   }
162 
163   switch (environment_->socket_state()) {
164     // If the environment is ready or in a bad state, we can respond
165     // immediately.
166     case Environment::SocketState::kInvalid:
167       SendErrorAnswerReply(message.sequence_number,
168                            "UDP socket is closed, likely due to a bind error.");
169       break;
170 
171     case Environment::SocketState::kReady:
172       InitializeSession(*properties);
173       break;
174 
175     // Else we need to store the properties we just created until we get a
176     // ready or error event.
177     case Environment::SocketState::kStarting:
178       pending_session_ = std::move(properties);
179       break;
180   }
181 }
182 
InitializeSession(const SessionProperties & properties)183 void ReceiverSession::InitializeSession(const SessionProperties& properties) {
184   Answer answer = ConstructAnswer(properties);
185   if (!answer.IsValid()) {
186     // If the answer message is invalid, there is no point in setting up a
187     // negotiation because the sender won't be able to connect to it.
188     SendErrorAnswerReply(properties.sequence_number,
189                          "Failed to construct an ANSWER message");
190     return;
191   }
192 
193   // Only spawn receivers if we know we have a valid answer message.
194   ConfiguredReceivers receivers = SpawnReceivers(properties);
195   client_->OnMirroringNegotiated(this, std::move(receivers));
196   const Error result = messager_.SendMessage(ReceiverMessage{
197       ReceiverMessage::Type::kAnswer, properties.sequence_number,
198       true /* valid */, std::move(answer)});
199   if (!result.ok()) {
200     client_->OnError(this, std::move(result));
201   }
202 }
203 
ConstructReceiver(const Stream & stream)204 std::unique_ptr<Receiver> ReceiverSession::ConstructReceiver(
205     const Stream& stream) {
206   // Session config is currently only for mirroring.
207   SessionConfig config = {stream.ssrc,         stream.ssrc + 1,
208                           stream.rtp_timebase, stream.channels,
209                           stream.target_delay, stream.aes_key,
210                           stream.aes_iv_mask,  /* is_pli_enabled */ true};
211   return std::make_unique<Receiver>(environment_, &packet_router_,
212                                     std::move(config));
213 }
214 
SpawnReceivers(const SessionProperties & properties)215 ConfiguredReceivers ReceiverSession::SpawnReceivers(
216     const SessionProperties& properties) {
217   OSP_DCHECK(properties.IsValid());
218   ResetReceivers(Client::kRenegotiated);
219 
220   AudioCaptureConfig audio_config;
221   if (properties.selected_audio) {
222     current_audio_receiver_ =
223         ConstructReceiver(properties.selected_audio->stream);
224     audio_config =
225         AudioCaptureConfig{properties.selected_audio->codec,
226                            properties.selected_audio->stream.channels,
227                            properties.selected_audio->bit_rate,
228                            properties.selected_audio->stream.rtp_timebase,
229                            properties.selected_audio->stream.target_delay};
230   }
231 
232   VideoCaptureConfig video_config;
233   if (properties.selected_video) {
234     current_video_receiver_ =
235         ConstructReceiver(properties.selected_video->stream);
236     std::vector<DisplayResolution> display_resolutions;
237     std::transform(properties.selected_video->resolutions.begin(),
238                    properties.selected_video->resolutions.end(),
239                    std::back_inserter(display_resolutions),
240                    ToDisplayResolution);
241     video_config = VideoCaptureConfig{
242         properties.selected_video->codec,
243         FrameRate{properties.selected_video->max_frame_rate.numerator,
244                   properties.selected_video->max_frame_rate.denominator},
245         properties.selected_video->max_bit_rate, std::move(display_resolutions),
246         properties.selected_video->stream.target_delay};
247   }
248 
249   return ConfiguredReceivers{
250       current_audio_receiver_.get(), std::move(audio_config),
251       current_video_receiver_.get(), std::move(video_config)};
252 }
253 
ResetReceivers(Client::ReceiversDestroyingReason reason)254 void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
255   if (current_video_receiver_ || current_audio_receiver_) {
256     client_->OnReceiversDestroying(this, reason);
257     current_audio_receiver_.reset();
258     current_video_receiver_.reset();
259   }
260 }
261 
ConstructAnswer(const SessionProperties & properties)262 Answer ReceiverSession::ConstructAnswer(const SessionProperties& properties) {
263   OSP_DCHECK(properties.IsValid());
264 
265   std::vector<int> stream_indexes;
266   std::vector<Ssrc> stream_ssrcs;
267   if (properties.selected_audio) {
268     stream_indexes.push_back(properties.selected_audio->stream.index);
269     stream_ssrcs.push_back(properties.selected_audio->stream.ssrc + 1);
270   }
271 
272   if (properties.selected_video) {
273     stream_indexes.push_back(properties.selected_video->stream.index);
274     stream_ssrcs.push_back(properties.selected_video->stream.ssrc + 1);
275   }
276 
277   absl::optional<Constraints> constraints;
278   if (preferences_.constraints) {
279     constraints = absl::optional<Constraints>(*preferences_.constraints);
280   }
281 
282   absl::optional<DisplayDescription> display;
283   if (preferences_.display_description) {
284     display =
285         absl::optional<DisplayDescription>(*preferences_.display_description);
286   }
287 
288   return Answer{environment_->GetBoundLocalEndpoint().port,
289                 std::move(stream_indexes),
290                 std::move(stream_ssrcs),
291                 std::move(constraints),
292                 std::move(display),
293                 std::vector<int>{},  // receiver_rtcp_event_log
294                 std::vector<int>{},  // receiver_rtcp_dscp
295                 supports_wifi_status_reporting_};
296 }
297 
SendErrorAnswerReply(int sequence_number,const char * message)298 void ReceiverSession::SendErrorAnswerReply(int sequence_number,
299                                            const char* message) {
300   const Error error(Error::Code::kParseError, message);
301   OSP_DLOG_WARN << message;
302   const Error result = messager_.SendMessage(ReceiverMessage{
303       ReceiverMessage::Type::kAnswer, sequence_number, false /* valid */,
304       ReceiverError{static_cast<int>(Error::Code::kParseError), message}});
305   if (!result.ok()) {
306     client_->OnError(this, std::move(result));
307   }
308 }
309 
310 }  // namespace cast
311 }  // namespace openscreen
312