1 // Copyright 2020 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef CAST_STREAMING_SESSION_MESSENGER_H_ 6 #define CAST_STREAMING_SESSION_MESSENGER_H_ 7 8 #include <functional> 9 #include <string> 10 #include <utility> 11 #include <vector> 12 13 #include "absl/types/optional.h" 14 #include "absl/types/variant.h" 15 #include "cast/common/public/message_port.h" 16 #include "cast/streaming/answer_messages.h" 17 #include "cast/streaming/offer_messages.h" 18 #include "cast/streaming/receiver_message.h" 19 #include "cast/streaming/sender_message.h" 20 #include "json/value.h" 21 #include "platform/api/task_runner.h" 22 #include "util/flat_map.h" 23 #include "util/weak_ptr.h" 24 25 namespace openscreen { 26 namespace cast { 27 28 // A message port interface designed specifically for use by the Receiver 29 // and Sender session classes. 30 class SessionMessenger : public MessagePort::Client { 31 public: 32 using ErrorCallback = std::function<void(Error)>; 33 34 SessionMessenger(MessagePort* message_port, 35 std::string source_id, 36 ErrorCallback cb); 37 ~SessionMessenger() override; 38 39 protected: 40 // Barebones message sending method shared by both children. 41 [[nodiscard]] Error SendMessage(const std::string& destination_id, 42 const std::string& namespace_, 43 const Json::Value& message_root); 44 45 // Used to report errors in subclasses. 46 void ReportError(Error error); 47 48 private: 49 MessagePort* const message_port_; 50 ErrorCallback error_callback_; 51 }; 52 53 class SenderSessionMessenger final : public SessionMessenger { 54 public: 55 using ReplyCallback = std::function<void(ReceiverMessage)>; 56 57 SenderSessionMessenger(MessagePort* message_port, 58 std::string source_id, 59 std::string receiver_id, 60 ErrorCallback cb, 61 TaskRunner* task_runner); 62 63 // Set receiver message handler. Note that this should only be 64 // applied for messages that don't have sequence numbers, like RPC 65 // and status messages. 66 void SetHandler(ReceiverMessage::Type type, ReplyCallback cb); 67 68 // Send a request (with optional reply callback). 69 [[nodiscard]] Error SendOutboundMessage(SenderMessage message); 70 [[nodiscard]] Error SendRequest(SenderMessage message, 71 ReceiverMessage::Type reply_type, 72 ReplyCallback cb); 73 74 // MessagePort::Client overrides 75 void OnMessage(const std::string& source_id, 76 const std::string& message_namespace, 77 const std::string& message) override; 78 void OnError(Error error) override; 79 80 private: 81 TaskRunner* const task_runner_; 82 83 // This messenger should only be connected to one receiver, so |receiver_id_| 84 // should not change. 85 const std::string receiver_id_; 86 87 // We keep a list here of replies we are expecting--if the reply is 88 // received for this sequence number, we call its respective callback, 89 // otherwise it is called after an internally specified timeout. 90 FlatMap<int, ReplyCallback> awaiting_replies_; 91 92 // Currently we can only set a handler for RPC messages, so no need for 93 // a flatmap here. 94 ReplyCallback rpc_callback_; 95 96 WeakPtrFactory<SenderSessionMessenger> weak_factory_{this}; 97 }; 98 99 class ReceiverSessionMessenger final : public SessionMessenger { 100 public: 101 using RequestCallback = std::function<void(SenderMessage)>; 102 ReceiverSessionMessenger(MessagePort* message_port, 103 std::string source_id, 104 ErrorCallback cb); 105 106 // Set sender message handler. 107 void SetHandler(SenderMessage::Type type, RequestCallback cb); 108 109 // Send a JSON message. 110 [[nodiscard]] Error SendMessage(ReceiverMessage message); 111 112 // MessagePort::Client overrides 113 void OnMessage(const std::string& source_id, 114 const std::string& message_namespace, 115 const std::string& message) override; 116 void OnError(Error error) override; 117 118 private: 119 // The sender ID of the SenderSession we are connected to. Set on the 120 // first message we receive. 121 std::string sender_session_id_; 122 FlatMap<SenderMessage::Type, RequestCallback> callbacks_; 123 }; 124 125 } // namespace cast 126 } // namespace openscreen 127 128 #endif // CAST_STREAMING_SESSION_MESSENGER_H_ 129