• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "components/copresence/rpc/rpc_handler.h"
6 
7 #include <map>
8 
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 
15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17 // we fix this with an #undef.
18 #include "base/time/time.h"
19 #if defined(OS_WIN)
20 #undef DeviceCapabilities
21 #endif
22 
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_delegate.h"
29 #include "net/http/http_status_code.h"
30 
31 // TODO(ckehoe): Return error messages for bad requests.
32 
33 namespace copresence {
34 
35 using google::protobuf::MessageLite;
36 using google::protobuf::RepeatedPtrField;
37 
38 const char RpcHandler::kReportRequestRpcName[] = "report";
39 
40 namespace {
41 
42 // UrlSafe is defined as:
43 // '/' represented by a '_' and '+' represented by a '-'
44 // TODO(rkc): Move this to the wrapper.
ToUrlSafe(std::string token)45 std::string ToUrlSafe(std::string token) {
46   base::ReplaceChars(token, "+", "-", &token);
47   base::ReplaceChars(token, "/", "_", &token);
48   return token;
49 }
50 
51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
52 const int kMaxInvalidTokens = 10000;
53 const char kRegisterDeviceRpcName[] = "registerdevice";
54 const char kDefaultCopresenceServer[] =
55     "https://www.googleapis.com/copresence/v2/copresence";
56 
57 // Logging
58 
59 // Checks for a copresence error. If there is one, logs it and returns true.
CopresenceErrorLogged(const Status & status)60 bool CopresenceErrorLogged(const Status& status) {
61   if (status.code() != OK) {
62     LOG(ERROR) << "Copresence error code " << status.code()
63                << (status.message().empty() ? std::string() :
64                   ": " + status.message());
65   }
66   return status.code() != OK;
67 }
68 
LogIfErrorStatus(const util::error::Code & code,const std::string & context)69 void LogIfErrorStatus(const util::error::Code& code,
70                       const std::string& context) {
71   LOG_IF(ERROR, code != util::error::OK)
72       << context << " error " << code << ". See "
73       << "cs/google3/util/task/codes.proto for more info.";
74 }
75 
76 // If any errors occurred, logs them and returns true.
ReportErrorLogged(const ReportResponse & response)77 bool ReportErrorLogged(const ReportResponse& response) {
78   bool result = CopresenceErrorLogged(response.header().status());
79 
80   // The Report fails or succeeds as a unit. If any responses had errors,
81   // the header will too. Thus we don't need to propagate individual errors.
82   if (response.has_update_signals_response())
83     LogIfErrorStatus(response.update_signals_response().status(), "Update");
84   if (response.has_manage_messages_response())
85     LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
86   if (response.has_manage_subscriptions_response()) {
87     LogIfErrorStatus(response.manage_subscriptions_response().status(),
88                      "Subscribe");
89   }
90 
91   return result;
92 }
93 
94 // Request construction
95 // TODO(ckehoe): Move these into a separate file?
96 
97 template <typename T>
GetBroadcastScanConfig(const T & msg)98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
99   if (msg.has_token_exchange_strategy() &&
100       msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
101     return msg.token_exchange_strategy().broadcast_scan_configuration();
102   }
103   return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
104 }
105 
GetDeviceCapabilities(const ReportRequest & request)106 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
107   scoped_ptr<DeviceState> state(new DeviceState);
108 
109   TokenTechnology* ultrasound =
110       state->mutable_capabilities()->add_token_technology();
111   ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
112   ultrasound->add_instruction_type(TRANSMIT);
113   ultrasound->add_instruction_type(RECEIVE);
114 
115   TokenTechnology* audible =
116       state->mutable_capabilities()->add_token_technology();
117   audible->set_medium(AUDIO_AUDIBLE_DTMF);
118   audible->add_instruction_type(TRANSMIT);
119   audible->add_instruction_type(RECEIVE);
120 
121   return state.Pass();
122 }
123 
124 // TODO(ckehoe): We're keeping this code in a separate function for now
125 // because we get a version string from Chrome, but the proto expects
126 // an int64 version. We should probably change the version proto
127 // to handle a more detailed version.
CreateVersion(const std::string & client,const std::string & version_name)128 ClientVersion* CreateVersion(const std::string& client,
129                              const std::string& version_name) {
130   ClientVersion* version = new ClientVersion;
131 
132   version->set_client(client);
133   version->set_version_name(version_name);
134 
135   return version;
136 }
137 
AddTokenToRequest(ReportRequest * request,const AudioToken & token)138 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
139   TokenObservation* token_observation =
140       request->mutable_update_signals_request()->add_token_observation();
141   token_observation->set_token_id(ToUrlSafe(token.token));
142 
143   TokenSignals* signals = token_observation->add_signals();
144   signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
145                                     : AUDIO_ULTRASOUND_PASSBAND);
146   signals->set_observed_time_millis(base::Time::Now().ToJsTime());
147 }
148 
149 }  // namespace
150 
151 // Public methods
152 
RpcHandler(CopresenceDelegate * delegate)153 RpcHandler::RpcHandler(CopresenceDelegate* delegate)
154     : delegate_(delegate),
155       invalid_audio_token_cache_(
156           base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
157           kMaxInvalidTokens),
158       server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
159                                        base::Unretained(this))) {}
160 
~RpcHandler()161 RpcHandler::~RpcHandler() {
162   for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
163        post != pending_posts_.end(); ++post) {
164     delete *post;
165   }
166 
167   if (delegate_ && delegate_->GetWhispernetClient()) {
168     delegate_->GetWhispernetClient()->RegisterTokensCallback(
169         WhispernetClient::TokensCallback());
170   }
171 }
172 
Initialize(const SuccessCallback & init_done_callback)173 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
174   scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
175   DCHECK(device_id_.empty());
176 
177   request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
178   Identity* identity =
179       request->mutable_device_identifiers()->mutable_registrant();
180   identity->set_type(CHROME);
181   identity->set_chrome_id(base::GenerateGUID());
182   SendServerRequest(
183       kRegisterDeviceRpcName,
184       std::string(),
185       request.Pass(),
186       base::Bind(&RpcHandler::RegisterResponseHandler,
187                  // On destruction, this request will be cancelled.
188                  base::Unretained(this),
189                  init_done_callback));
190 }
191 
SendReportRequest(scoped_ptr<ReportRequest> request)192 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
193   SendReportRequest(request.Pass(), std::string(), StatusCallback());
194 }
195 
SendReportRequest(scoped_ptr<ReportRequest> request,const std::string & app_id,const StatusCallback & status_callback)196 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
197                                    const std::string& app_id,
198                                    const StatusCallback& status_callback) {
199   DCHECK(request.get());
200   DCHECK(!device_id_.empty())
201       << "RpcHandler::Initialize() must complete successfully "
202       << "before other RpcHandler methods are called.";
203 
204   DVLOG(3) << "Sending report request to server.";
205 
206   // If we are unpublishing or unsubscribing, we need to stop those publish or
207   // subscribes right away, we don't need to wait for the server to tell us.
208   ProcessRemovedOperations(*request);
209 
210   request->mutable_update_signals_request()->set_allocated_state(
211       GetDeviceCapabilities(*request).release());
212 
213   AddPlayingTokens(request.get());
214 
215   SendServerRequest(kReportRequestRpcName,
216                     app_id,
217                     request.Pass(),
218                     // On destruction, this request will be cancelled.
219                     base::Bind(&RpcHandler::ReportResponseHandler,
220                                base::Unretained(this),
221                                status_callback));
222 }
223 
ReportTokens(const std::vector<AudioToken> & tokens)224 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
225   DCHECK(!tokens.empty());
226 
227   scoped_ptr<ReportRequest> request(new ReportRequest);
228   for (size_t i = 0; i < tokens.size(); ++i) {
229     if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
230       continue;
231     DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
232     AddTokenToRequest(request.get(), tokens[i]);
233   }
234   SendReportRequest(request.Pass());
235 }
236 
ConnectToWhispernet()237 void RpcHandler::ConnectToWhispernet() {
238   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
239 
240   // |directive_handler_| will be destructed with us, so unretained is safe.
241   directive_handler_.reset(new DirectiveHandler);
242   directive_handler_->Initialize(
243       base::Bind(&WhispernetClient::DecodeSamples,
244                  base::Unretained(whispernet_client)),
245       base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
246                  base::Unretained(this)));
247 
248   whispernet_client->RegisterTokensCallback(
249       base::Bind(&RpcHandler::ReportTokens,
250                  // On destruction, this callback will be disconnected.
251                  base::Unretained(this)));
252 }
253 
254 // Private methods
255 
RegisterResponseHandler(const SuccessCallback & init_done_callback,HttpPost * completed_post,int http_status_code,const std::string & response_data)256 void RpcHandler::RegisterResponseHandler(
257     const SuccessCallback& init_done_callback,
258     HttpPost* completed_post,
259     int http_status_code,
260     const std::string& response_data) {
261   if (completed_post) {
262     int elements_erased = pending_posts_.erase(completed_post);
263     DCHECK(elements_erased);
264     delete completed_post;
265   }
266 
267   if (http_status_code != net::HTTP_OK) {
268     init_done_callback.Run(false);
269     return;
270   }
271 
272   RegisterDeviceResponse response;
273   if (!response.ParseFromString(response_data)) {
274     LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
275     init_done_callback.Run(false);
276     return;
277   }
278 
279   if (CopresenceErrorLogged(response.header().status()))
280     return;
281   device_id_ = response.registered_device_id();
282   DCHECK(!device_id_.empty());
283   DVLOG(2) << "Device registration successful: id " << device_id_;
284   init_done_callback.Run(true);
285 }
286 
ReportResponseHandler(const StatusCallback & status_callback,HttpPost * completed_post,int http_status_code,const std::string & response_data)287 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
288                                        HttpPost* completed_post,
289                                        int http_status_code,
290                                        const std::string& response_data) {
291   if (completed_post) {
292     int elements_erased = pending_posts_.erase(completed_post);
293     DCHECK(elements_erased);
294     delete completed_post;
295   }
296 
297   if (http_status_code != net::HTTP_OK) {
298     if (!status_callback.is_null())
299       status_callback.Run(FAIL);
300     return;
301   }
302 
303   DVLOG(3) << "Received ReportResponse.";
304   ReportResponse response;
305   if (!response.ParseFromString(response_data)) {
306     LOG(ERROR) << "Invalid ReportResponse";
307     if (!status_callback.is_null())
308       status_callback.Run(FAIL);
309     return;
310   }
311 
312   if (ReportErrorLogged(response)) {
313     if (!status_callback.is_null())
314       status_callback.Run(FAIL);
315     return;
316   }
317 
318   const RepeatedPtrField<MessageResult>& message_results =
319       response.manage_messages_response().published_message_result();
320   for (int i = 0; i < message_results.size(); ++i) {
321     DVLOG(2) << "Published message with id "
322              << message_results.Get(i).published_message_id();
323   }
324 
325   const RepeatedPtrField<SubscriptionResult>& subscription_results =
326       response.manage_subscriptions_response().subscription_result();
327   for (int i = 0; i < subscription_results.size(); ++i) {
328     DVLOG(2) << "Created subscription with id "
329              << subscription_results.Get(i).subscription_id();
330   }
331 
332   if (response.has_update_signals_response()) {
333     const UpdateSignalsResponse& update_response =
334         response.update_signals_response();
335     DispatchMessages(update_response.message());
336 
337     if (directive_handler_.get()) {
338       for (int i = 0; i < update_response.directive_size(); ++i)
339         directive_handler_->AddDirective(update_response.directive(i));
340     } else {
341       DVLOG(1) << "No directive handler.";
342     }
343 
344     const RepeatedPtrField<Token>& tokens = update_response.token();
345     for (int i = 0; i < tokens.size(); ++i) {
346       switch (tokens.Get(i).status()) {
347         case VALID:
348           // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
349           // short TTL (like 10s) and send it up with every report request.
350           // Then we'll still get messages while we're waiting to hear it again.
351           VLOG(1) << "Got valid token " << tokens.Get(i).id();
352           break;
353         case INVALID:
354           DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
355           invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
356           break;
357         default:
358           DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
359                    << tokens.Get(i).status();
360       }
361     }
362   }
363 
364   // TODO(ckehoe): Return a more detailed status response.
365   if (!status_callback.is_null())
366     status_callback.Run(SUCCESS);
367 }
368 
ProcessRemovedOperations(const ReportRequest & request)369 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
370   // Remove unpublishes.
371   if (request.has_manage_messages_request()) {
372     const RepeatedPtrField<std::string>& unpublishes =
373         request.manage_messages_request().id_to_unpublish();
374     for (int i = 0; i < unpublishes.size(); ++i)
375       directive_handler_->RemoveDirectives(unpublishes.Get(i));
376   }
377 
378   // Remove unsubscribes.
379   if (request.has_manage_subscriptions_request()) {
380     const RepeatedPtrField<std::string>& unsubscribes =
381         request.manage_subscriptions_request().id_to_unsubscribe();
382     for (int i = 0; i < unsubscribes.size(); ++i)
383       directive_handler_->RemoveDirectives(unsubscribes.Get(i));
384   }
385 }
386 
AddPlayingTokens(ReportRequest * request)387 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
388   if (!directive_handler_)
389     return;
390 
391   const std::string& audible_token = directive_handler_->CurrentAudibleToken();
392   const std::string& inaudible_token =
393       directive_handler_->CurrentInaudibleToken();
394 
395   if (!audible_token.empty())
396     AddTokenToRequest(request, AudioToken(audible_token, true));
397   if (!inaudible_token.empty())
398     AddTokenToRequest(request, AudioToken(inaudible_token, false));
399 }
400 
DispatchMessages(const RepeatedPtrField<SubscribedMessage> & messages)401 void RpcHandler::DispatchMessages(
402     const RepeatedPtrField<SubscribedMessage>& messages) {
403   if (messages.size() == 0)
404     return;
405 
406   // Index the messages by subscription id.
407   std::map<std::string, std::vector<Message>> messages_by_subscription;
408   DVLOG(3) << "Dispatching " << messages.size() << " messages";
409   for (int m = 0; m < messages.size(); ++m) {
410     const RepeatedPtrField<std::string>& subscription_ids =
411         messages.Get(m).subscription_id();
412     for (int s = 0; s < subscription_ids.size(); ++s) {
413       messages_by_subscription[subscription_ids.Get(s)].push_back(
414           messages.Get(m).published_message());
415     }
416   }
417 
418   // Send the messages for each subscription.
419   for (std::map<std::string, std::vector<Message>>::const_iterator
420            subscription = messages_by_subscription.begin();
421        subscription != messages_by_subscription.end();
422        ++subscription) {
423     // TODO(ckehoe): Once we have the app ID from the server, we need to pass
424     // it in here and get rid of the app id registry from the main API class.
425     delegate_->HandleMessages("", subscription->first, subscription->second);
426   }
427 }
428 
CreateRequestHeader(const std::string & client_name) const429 RequestHeader* RpcHandler::CreateRequestHeader(
430     const std::string& client_name) const {
431   RequestHeader* header = new RequestHeader;
432 
433   header->set_allocated_framework_version(CreateVersion(
434       "Chrome", delegate_->GetPlatformVersionString()));
435   if (!client_name.empty()) {
436     header->set_allocated_client_version(
437         CreateVersion(client_name, std::string()));
438   }
439   header->set_current_time_millis(base::Time::Now().ToJsTime());
440   header->set_registered_device_id(device_id_);
441 
442   DeviceFingerprint* fingerprint = new DeviceFingerprint;
443   fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
444   fingerprint->set_type(CHROME_PLATFORM_TYPE);
445   header->set_allocated_device_fingerprint(fingerprint);
446 
447   return header;
448 }
449 
450 template <class T>
SendServerRequest(const std::string & rpc_name,const std::string & app_id,scoped_ptr<T> request,const PostCleanupCallback & response_handler)451 void RpcHandler::SendServerRequest(
452     const std::string& rpc_name,
453     const std::string& app_id,
454     scoped_ptr<T> request,
455     const PostCleanupCallback& response_handler) {
456   request->set_allocated_header(CreateRequestHeader(app_id));
457   server_post_callback_.Run(delegate_->GetRequestContext(),
458                             rpc_name,
459                             make_scoped_ptr<MessageLite>(request.release()),
460                             response_handler);
461 }
462 
SendHttpPost(net::URLRequestContextGetter * url_context_getter,const std::string & rpc_name,scoped_ptr<MessageLite> request_proto,const PostCleanupCallback & callback)463 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
464                               const std::string& rpc_name,
465                               scoped_ptr<MessageLite> request_proto,
466                               const PostCleanupCallback& callback) {
467   // Create the base URL to call.
468   CommandLine* command_line = CommandLine::ForCurrentProcess();
469   const std::string copresence_server_host =
470       command_line->HasSwitch(switches::kCopresenceServer) ?
471       command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
472       kDefaultCopresenceServer;
473 
474   // Create the request and keep a pointer until it completes.
475   HttpPost* http_post = new HttpPost(
476       url_context_getter,
477       copresence_server_host,
478       rpc_name,
479       command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
480       delegate_->GetAPIKey(),
481       *request_proto);
482 
483   http_post->Start(base::Bind(callback, http_post));
484   pending_posts_.insert(http_post);
485 }
486 
AudioDirectiveListToWhispernetConnector(const std::string & token,bool audible,const WhispernetClient::SamplesCallback & samples_callback)487 void RpcHandler::AudioDirectiveListToWhispernetConnector(
488     const std::string& token,
489     bool audible,
490     const WhispernetClient::SamplesCallback& samples_callback) {
491   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
492   if (whispernet_client) {
493     whispernet_client->RegisterSamplesCallback(samples_callback);
494     whispernet_client->EncodeToken(token, audible);
495   }
496 }
497 
498 }  // namespace copresence
499