1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cast/streaming/rpc_broker.h"
6
7 #include <utility>
8
9 #include "util/osp_logging.h"
10
11 namespace openscreen {
12 namespace cast {
13
14 namespace {
15
operator <<(std::ostream & out,const RpcMessage & message)16 std::ostream& operator<<(std::ostream& out, const RpcMessage& message) {
17 out << "handle=" << message.handle() << ", proc=" << message.proc();
18 switch (message.rpc_oneof_case()) {
19 case RpcMessage::kIntegerValue:
20 return out << ", integer_value=" << message.integer_value();
21 case RpcMessage::kInteger64Value:
22 return out << ", integer64_value=" << message.integer64_value();
23 case RpcMessage::kDoubleValue:
24 return out << ", double_value=" << message.double_value();
25 case RpcMessage::kBooleanValue:
26 return out << ", boolean_value=" << message.boolean_value();
27 case RpcMessage::kStringValue:
28 return out << ", string_value=" << message.string_value();
29 default:
30 return out << ", rpc_oneof=" << message.rpc_oneof_case();
31 }
32
33 OSP_NOTREACHED();
34 }
35
36 } // namespace
37
RpcBroker(SendMessageCallback send_message_cb)38 RpcBroker::RpcBroker(SendMessageCallback send_message_cb)
39 : next_handle_(kFirstHandle),
40 send_message_cb_(std::move(send_message_cb)) {}
41
~RpcBroker()42 RpcBroker::~RpcBroker() {
43 receive_callbacks_.clear();
44 }
45
GetUniqueHandle()46 RpcBroker::Handle RpcBroker::GetUniqueHandle() {
47 return next_handle_++;
48 }
49
RegisterMessageReceiverCallback(RpcBroker::Handle handle,ReceiveMessageCallback callback)50 void RpcBroker::RegisterMessageReceiverCallback(
51 RpcBroker::Handle handle,
52 ReceiveMessageCallback callback) {
53 OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end())
54 << "must deregister before re-registering";
55 OSP_DVLOG << "registering handle: " << handle;
56 receive_callbacks_.emplace_back(handle, std::move(callback));
57 }
58
UnregisterMessageReceiverCallback(RpcBroker::Handle handle)59 void RpcBroker::UnregisterMessageReceiverCallback(RpcBroker::Handle handle) {
60 OSP_DVLOG << "unregistering handle: " << handle;
61 receive_callbacks_.erase_key(handle);
62 }
63
ProcessMessageFromRemote(const RpcMessage & message)64 void RpcBroker::ProcessMessageFromRemote(const RpcMessage& message) {
65 OSP_DVLOG << "received message: " << message;
66 const auto entry = receive_callbacks_.find(message.handle());
67 if (entry == receive_callbacks_.end()) {
68 OSP_DVLOG << "unregistered handle: " << message.handle();
69 return;
70 }
71 entry->second(message);
72 }
73
SendMessageToRemote(const RpcMessage & message)74 void RpcBroker::SendMessageToRemote(const RpcMessage& message) {
75 OSP_DVLOG << "sending message message: " << message;
76 std::vector<uint8_t> serialized_message(message.ByteSizeLong());
77 OSP_CHECK(message.SerializeToArray(serialized_message.data(),
78 serialized_message.size()));
79 send_message_cb_(std::move(serialized_message));
80 }
81
IsRegisteredForTesting(RpcBroker::Handle handle)82 bool RpcBroker::IsRegisteredForTesting(RpcBroker::Handle handle) {
83 return receive_callbacks_.find(handle) != receive_callbacks_.end();
84 }
85
86 } // namespace cast
87 } // namespace openscreen
88