1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/copresence/rpc/rpc_handler.h"
6
7 #include <map>
8
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14
15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17 // we fix this with an #undef.
18 #include "base/time/time.h"
19 #if defined(OS_WIN)
20 #undef DeviceCapabilities
21 #endif
22
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_delegate.h"
29 #include "net/http/http_status_code.h"
30
31 // TODO(ckehoe): Return error messages for bad requests.
32
33 namespace copresence {
34
35 using google::protobuf::MessageLite;
36 using google::protobuf::RepeatedPtrField;
37
38 const char RpcHandler::kReportRequestRpcName[] = "report";
39
40 namespace {
41
42 // UrlSafe is defined as:
43 // '/' represented by a '_' and '+' represented by a '-'
44 // TODO(rkc): Move this to the wrapper.
ToUrlSafe(std::string token)45 std::string ToUrlSafe(std::string token) {
46 base::ReplaceChars(token, "+", "-", &token);
47 base::ReplaceChars(token, "/", "_", &token);
48 return token;
49 }
50
51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
52 const int kMaxInvalidTokens = 10000;
53 const char kRegisterDeviceRpcName[] = "registerdevice";
54 const char kDefaultCopresenceServer[] =
55 "https://www.googleapis.com/copresence/v2/copresence";
56
57 // Logging
58
59 // Checks for a copresence error. If there is one, logs it and returns true.
CopresenceErrorLogged(const Status & status)60 bool CopresenceErrorLogged(const Status& status) {
61 if (status.code() != OK) {
62 LOG(ERROR) << "Copresence error code " << status.code()
63 << (status.message().empty() ? std::string() :
64 ": " + status.message());
65 }
66 return status.code() != OK;
67 }
68
LogIfErrorStatus(const util::error::Code & code,const std::string & context)69 void LogIfErrorStatus(const util::error::Code& code,
70 const std::string& context) {
71 LOG_IF(ERROR, code != util::error::OK)
72 << context << " error " << code << ". See "
73 << "cs/google3/util/task/codes.proto for more info.";
74 }
75
76 // If any errors occurred, logs them and returns true.
ReportErrorLogged(const ReportResponse & response)77 bool ReportErrorLogged(const ReportResponse& response) {
78 bool result = CopresenceErrorLogged(response.header().status());
79
80 // The Report fails or succeeds as a unit. If any responses had errors,
81 // the header will too. Thus we don't need to propagate individual errors.
82 if (response.has_update_signals_response())
83 LogIfErrorStatus(response.update_signals_response().status(), "Update");
84 if (response.has_manage_messages_response())
85 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
86 if (response.has_manage_subscriptions_response()) {
87 LogIfErrorStatus(response.manage_subscriptions_response().status(),
88 "Subscribe");
89 }
90
91 return result;
92 }
93
94 // Request construction
95 // TODO(ckehoe): Move these into a separate file?
96
97 template <typename T>
GetBroadcastScanConfig(const T & msg)98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
99 if (msg.has_token_exchange_strategy() &&
100 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
101 return msg.token_exchange_strategy().broadcast_scan_configuration();
102 }
103 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
104 }
105
GetDeviceCapabilities(const ReportRequest & request)106 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
107 scoped_ptr<DeviceState> state(new DeviceState);
108
109 TokenTechnology* ultrasound =
110 state->mutable_capabilities()->add_token_technology();
111 ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
112 ultrasound->add_instruction_type(TRANSMIT);
113 ultrasound->add_instruction_type(RECEIVE);
114
115 TokenTechnology* audible =
116 state->mutable_capabilities()->add_token_technology();
117 audible->set_medium(AUDIO_AUDIBLE_DTMF);
118 audible->add_instruction_type(TRANSMIT);
119 audible->add_instruction_type(RECEIVE);
120
121 return state.Pass();
122 }
123
124 // TODO(ckehoe): We're keeping this code in a separate function for now
125 // because we get a version string from Chrome, but the proto expects
126 // an int64 version. We should probably change the version proto
127 // to handle a more detailed version.
CreateVersion(const std::string & client,const std::string & version_name)128 ClientVersion* CreateVersion(const std::string& client,
129 const std::string& version_name) {
130 ClientVersion* version = new ClientVersion;
131
132 version->set_client(client);
133 version->set_version_name(version_name);
134
135 return version;
136 }
137
AddTokenToRequest(ReportRequest * request,const AudioToken & token)138 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
139 TokenObservation* token_observation =
140 request->mutable_update_signals_request()->add_token_observation();
141 token_observation->set_token_id(ToUrlSafe(token.token));
142
143 TokenSignals* signals = token_observation->add_signals();
144 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
145 : AUDIO_ULTRASOUND_PASSBAND);
146 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
147 }
148
149 } // namespace
150
151 // Public methods
152
RpcHandler(CopresenceDelegate * delegate)153 RpcHandler::RpcHandler(CopresenceDelegate* delegate)
154 : delegate_(delegate),
155 invalid_audio_token_cache_(
156 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
157 kMaxInvalidTokens),
158 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
159 base::Unretained(this))) {}
160
~RpcHandler()161 RpcHandler::~RpcHandler() {
162 for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
163 post != pending_posts_.end(); ++post) {
164 delete *post;
165 }
166
167 if (delegate_ && delegate_->GetWhispernetClient()) {
168 delegate_->GetWhispernetClient()->RegisterTokensCallback(
169 WhispernetClient::TokensCallback());
170 }
171 }
172
Initialize(const SuccessCallback & init_done_callback)173 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
174 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
175 DCHECK(device_id_.empty());
176
177 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
178 Identity* identity =
179 request->mutable_device_identifiers()->mutable_registrant();
180 identity->set_type(CHROME);
181 identity->set_chrome_id(base::GenerateGUID());
182 SendServerRequest(
183 kRegisterDeviceRpcName,
184 std::string(),
185 request.Pass(),
186 base::Bind(&RpcHandler::RegisterResponseHandler,
187 // On destruction, this request will be cancelled.
188 base::Unretained(this),
189 init_done_callback));
190 }
191
SendReportRequest(scoped_ptr<ReportRequest> request)192 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
193 SendReportRequest(request.Pass(), std::string(), StatusCallback());
194 }
195
SendReportRequest(scoped_ptr<ReportRequest> request,const std::string & app_id,const StatusCallback & status_callback)196 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
197 const std::string& app_id,
198 const StatusCallback& status_callback) {
199 DCHECK(request.get());
200 DCHECK(!device_id_.empty())
201 << "RpcHandler::Initialize() must complete successfully "
202 << "before other RpcHandler methods are called.";
203
204 DVLOG(3) << "Sending report request to server.";
205
206 // If we are unpublishing or unsubscribing, we need to stop those publish or
207 // subscribes right away, we don't need to wait for the server to tell us.
208 ProcessRemovedOperations(*request);
209
210 request->mutable_update_signals_request()->set_allocated_state(
211 GetDeviceCapabilities(*request).release());
212
213 AddPlayingTokens(request.get());
214
215 SendServerRequest(kReportRequestRpcName,
216 app_id,
217 request.Pass(),
218 // On destruction, this request will be cancelled.
219 base::Bind(&RpcHandler::ReportResponseHandler,
220 base::Unretained(this),
221 status_callback));
222 }
223
ReportTokens(const std::vector<AudioToken> & tokens)224 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
225 DCHECK(!tokens.empty());
226
227 scoped_ptr<ReportRequest> request(new ReportRequest);
228 for (size_t i = 0; i < tokens.size(); ++i) {
229 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
230 continue;
231 DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
232 AddTokenToRequest(request.get(), tokens[i]);
233 }
234 SendReportRequest(request.Pass());
235 }
236
ConnectToWhispernet()237 void RpcHandler::ConnectToWhispernet() {
238 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
239
240 // |directive_handler_| will be destructed with us, so unretained is safe.
241 directive_handler_.reset(new DirectiveHandler);
242 directive_handler_->Initialize(
243 base::Bind(&WhispernetClient::DecodeSamples,
244 base::Unretained(whispernet_client)),
245 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
246 base::Unretained(this)));
247
248 whispernet_client->RegisterTokensCallback(
249 base::Bind(&RpcHandler::ReportTokens,
250 // On destruction, this callback will be disconnected.
251 base::Unretained(this)));
252 }
253
254 // Private methods
255
RegisterResponseHandler(const SuccessCallback & init_done_callback,HttpPost * completed_post,int http_status_code,const std::string & response_data)256 void RpcHandler::RegisterResponseHandler(
257 const SuccessCallback& init_done_callback,
258 HttpPost* completed_post,
259 int http_status_code,
260 const std::string& response_data) {
261 if (completed_post) {
262 int elements_erased = pending_posts_.erase(completed_post);
263 DCHECK(elements_erased);
264 delete completed_post;
265 }
266
267 if (http_status_code != net::HTTP_OK) {
268 init_done_callback.Run(false);
269 return;
270 }
271
272 RegisterDeviceResponse response;
273 if (!response.ParseFromString(response_data)) {
274 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
275 init_done_callback.Run(false);
276 return;
277 }
278
279 if (CopresenceErrorLogged(response.header().status()))
280 return;
281 device_id_ = response.registered_device_id();
282 DCHECK(!device_id_.empty());
283 DVLOG(2) << "Device registration successful: id " << device_id_;
284 init_done_callback.Run(true);
285 }
286
ReportResponseHandler(const StatusCallback & status_callback,HttpPost * completed_post,int http_status_code,const std::string & response_data)287 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
288 HttpPost* completed_post,
289 int http_status_code,
290 const std::string& response_data) {
291 if (completed_post) {
292 int elements_erased = pending_posts_.erase(completed_post);
293 DCHECK(elements_erased);
294 delete completed_post;
295 }
296
297 if (http_status_code != net::HTTP_OK) {
298 if (!status_callback.is_null())
299 status_callback.Run(FAIL);
300 return;
301 }
302
303 DVLOG(3) << "Received ReportResponse.";
304 ReportResponse response;
305 if (!response.ParseFromString(response_data)) {
306 LOG(ERROR) << "Invalid ReportResponse";
307 if (!status_callback.is_null())
308 status_callback.Run(FAIL);
309 return;
310 }
311
312 if (ReportErrorLogged(response)) {
313 if (!status_callback.is_null())
314 status_callback.Run(FAIL);
315 return;
316 }
317
318 const RepeatedPtrField<MessageResult>& message_results =
319 response.manage_messages_response().published_message_result();
320 for (int i = 0; i < message_results.size(); ++i) {
321 DVLOG(2) << "Published message with id "
322 << message_results.Get(i).published_message_id();
323 }
324
325 const RepeatedPtrField<SubscriptionResult>& subscription_results =
326 response.manage_subscriptions_response().subscription_result();
327 for (int i = 0; i < subscription_results.size(); ++i) {
328 DVLOG(2) << "Created subscription with id "
329 << subscription_results.Get(i).subscription_id();
330 }
331
332 if (response.has_update_signals_response()) {
333 const UpdateSignalsResponse& update_response =
334 response.update_signals_response();
335 DispatchMessages(update_response.message());
336
337 if (directive_handler_.get()) {
338 for (int i = 0; i < update_response.directive_size(); ++i)
339 directive_handler_->AddDirective(update_response.directive(i));
340 } else {
341 DVLOG(1) << "No directive handler.";
342 }
343
344 const RepeatedPtrField<Token>& tokens = update_response.token();
345 for (int i = 0; i < tokens.size(); ++i) {
346 switch (tokens.Get(i).status()) {
347 case VALID:
348 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
349 // short TTL (like 10s) and send it up with every report request.
350 // Then we'll still get messages while we're waiting to hear it again.
351 VLOG(1) << "Got valid token " << tokens.Get(i).id();
352 break;
353 case INVALID:
354 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
355 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
356 break;
357 default:
358 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
359 << tokens.Get(i).status();
360 }
361 }
362 }
363
364 // TODO(ckehoe): Return a more detailed status response.
365 if (!status_callback.is_null())
366 status_callback.Run(SUCCESS);
367 }
368
ProcessRemovedOperations(const ReportRequest & request)369 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
370 // Remove unpublishes.
371 if (request.has_manage_messages_request()) {
372 const RepeatedPtrField<std::string>& unpublishes =
373 request.manage_messages_request().id_to_unpublish();
374 for (int i = 0; i < unpublishes.size(); ++i)
375 directive_handler_->RemoveDirectives(unpublishes.Get(i));
376 }
377
378 // Remove unsubscribes.
379 if (request.has_manage_subscriptions_request()) {
380 const RepeatedPtrField<std::string>& unsubscribes =
381 request.manage_subscriptions_request().id_to_unsubscribe();
382 for (int i = 0; i < unsubscribes.size(); ++i)
383 directive_handler_->RemoveDirectives(unsubscribes.Get(i));
384 }
385 }
386
AddPlayingTokens(ReportRequest * request)387 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
388 if (!directive_handler_)
389 return;
390
391 const std::string& audible_token = directive_handler_->CurrentAudibleToken();
392 const std::string& inaudible_token =
393 directive_handler_->CurrentInaudibleToken();
394
395 if (!audible_token.empty())
396 AddTokenToRequest(request, AudioToken(audible_token, true));
397 if (!inaudible_token.empty())
398 AddTokenToRequest(request, AudioToken(inaudible_token, false));
399 }
400
DispatchMessages(const RepeatedPtrField<SubscribedMessage> & messages)401 void RpcHandler::DispatchMessages(
402 const RepeatedPtrField<SubscribedMessage>& messages) {
403 if (messages.size() == 0)
404 return;
405
406 // Index the messages by subscription id.
407 std::map<std::string, std::vector<Message>> messages_by_subscription;
408 DVLOG(3) << "Dispatching " << messages.size() << " messages";
409 for (int m = 0; m < messages.size(); ++m) {
410 const RepeatedPtrField<std::string>& subscription_ids =
411 messages.Get(m).subscription_id();
412 for (int s = 0; s < subscription_ids.size(); ++s) {
413 messages_by_subscription[subscription_ids.Get(s)].push_back(
414 messages.Get(m).published_message());
415 }
416 }
417
418 // Send the messages for each subscription.
419 for (std::map<std::string, std::vector<Message>>::const_iterator
420 subscription = messages_by_subscription.begin();
421 subscription != messages_by_subscription.end();
422 ++subscription) {
423 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
424 // it in here and get rid of the app id registry from the main API class.
425 delegate_->HandleMessages("", subscription->first, subscription->second);
426 }
427 }
428
CreateRequestHeader(const std::string & client_name) const429 RequestHeader* RpcHandler::CreateRequestHeader(
430 const std::string& client_name) const {
431 RequestHeader* header = new RequestHeader;
432
433 header->set_allocated_framework_version(CreateVersion(
434 "Chrome", delegate_->GetPlatformVersionString()));
435 if (!client_name.empty()) {
436 header->set_allocated_client_version(
437 CreateVersion(client_name, std::string()));
438 }
439 header->set_current_time_millis(base::Time::Now().ToJsTime());
440 header->set_registered_device_id(device_id_);
441
442 DeviceFingerprint* fingerprint = new DeviceFingerprint;
443 fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
444 fingerprint->set_type(CHROME_PLATFORM_TYPE);
445 header->set_allocated_device_fingerprint(fingerprint);
446
447 return header;
448 }
449
450 template <class T>
SendServerRequest(const std::string & rpc_name,const std::string & app_id,scoped_ptr<T> request,const PostCleanupCallback & response_handler)451 void RpcHandler::SendServerRequest(
452 const std::string& rpc_name,
453 const std::string& app_id,
454 scoped_ptr<T> request,
455 const PostCleanupCallback& response_handler) {
456 request->set_allocated_header(CreateRequestHeader(app_id));
457 server_post_callback_.Run(delegate_->GetRequestContext(),
458 rpc_name,
459 make_scoped_ptr<MessageLite>(request.release()),
460 response_handler);
461 }
462
SendHttpPost(net::URLRequestContextGetter * url_context_getter,const std::string & rpc_name,scoped_ptr<MessageLite> request_proto,const PostCleanupCallback & callback)463 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
464 const std::string& rpc_name,
465 scoped_ptr<MessageLite> request_proto,
466 const PostCleanupCallback& callback) {
467 // Create the base URL to call.
468 CommandLine* command_line = CommandLine::ForCurrentProcess();
469 const std::string copresence_server_host =
470 command_line->HasSwitch(switches::kCopresenceServer) ?
471 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
472 kDefaultCopresenceServer;
473
474 // Create the request and keep a pointer until it completes.
475 HttpPost* http_post = new HttpPost(
476 url_context_getter,
477 copresence_server_host,
478 rpc_name,
479 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
480 delegate_->GetAPIKey(),
481 *request_proto);
482
483 http_post->Start(base::Bind(callback, http_post));
484 pending_posts_.insert(http_post);
485 }
486
AudioDirectiveListToWhispernetConnector(const std::string & token,bool audible,const WhispernetClient::SamplesCallback & samples_callback)487 void RpcHandler::AudioDirectiveListToWhispernetConnector(
488 const std::string& token,
489 bool audible,
490 const WhispernetClient::SamplesCallback& samples_callback) {
491 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
492 if (whispernet_client) {
493 whispernet_client->RegisterSamplesCallback(samples_callback);
494 whispernet_client->EncodeToken(token, audible);
495 }
496 }
497
498 } // namespace copresence
499