• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/internal_api/public/attachments/attachment_uploader_impl.h"
6 
7 #include "base/bind.h"
8 #include "base/memory/weak_ptr.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/threading/non_thread_safe.h"
11 #include "google_apis/gaia/gaia_constants.h"
12 #include "net/base/load_flags.h"
13 #include "net/http/http_status_code.h"
14 #include "net/url_request/url_fetcher.h"
15 #include "net/url_request/url_fetcher_delegate.h"
16 #include "sync/api/attachments/attachment.h"
17 #include "sync/protocol/sync.pb.h"
18 
19 namespace {
20 
21 const char kContentType[] = "application/octet-stream";
22 const char kAttachments[] = "attachments/";
23 
24 }  // namespace
25 
26 namespace syncer {
27 
28 // Encapsulates all the state associated with a single upload.
29 class AttachmentUploaderImpl::UploadState : public net::URLFetcherDelegate,
30                                             public OAuth2TokenService::Consumer,
31                                             public base::NonThreadSafe {
32  public:
33   // Construct an UploadState.
34   //
35   // UploadState encapsulates the state associated with a single upload.  When
36   // the upload completes, the UploadState object becomes "stopped".
37   //
38   // |owner| is a pointer to the object that owns this UploadState.  Upon
39   // completion this object will PostTask to owner's OnUploadStateStopped
40   // method.
41   UploadState(
42       const GURL& upload_url,
43       const scoped_refptr<net::URLRequestContextGetter>&
44           url_request_context_getter,
45       const Attachment& attachment,
46       const UploadCallback& user_callback,
47       const std::string& account_id,
48       const OAuth2TokenService::ScopeSet& scopes,
49       OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
50       const base::WeakPtr<AttachmentUploaderImpl>& owner);
51 
52   virtual ~UploadState();
53 
54   // Returns true if this object is stopped.  Once stopped, this object is
55   // effectively dead and can be destroyed.
56   bool IsStopped() const;
57 
58   // Add |user_callback| to the list of callbacks to be invoked when this upload
59   // completed.
60   //
61   // It is an error to call |AddUserCallback| on a stopped UploadState (see
62   // |IsStopped|).
63   void AddUserCallback(const UploadCallback& user_callback);
64 
65   // Return the Attachment this object is uploading.
66   const Attachment& GetAttachment();
67 
68   // URLFetcher implementation.
69   virtual void OnURLFetchComplete(const net::URLFetcher* source) OVERRIDE;
70 
71   // OAuth2TokenService::Consumer.
72   virtual void OnGetTokenSuccess(const OAuth2TokenService::Request* request,
73                                  const std::string& access_token,
74                                  const base::Time& expiration_time) OVERRIDE;
75   virtual void OnGetTokenFailure(const OAuth2TokenService::Request* request,
76                                  const GoogleServiceAuthError& error) OVERRIDE;
77 
78  private:
79   typedef std::vector<UploadCallback> UploadCallbackList;
80 
81   void GetToken();
82 
83   void StopAndReportResult(const UploadResult& result,
84                            const AttachmentId& attachment_id);
85 
86   bool is_stopped_;
87   GURL upload_url_;
88   const scoped_refptr<net::URLRequestContextGetter>&
89       url_request_context_getter_;
90   Attachment attachment_;
91   UploadCallbackList user_callbacks_;
92   scoped_ptr<net::URLFetcher> fetcher_;
93   std::string account_id_;
94   OAuth2TokenService::ScopeSet scopes_;
95   std::string access_token_;
96   OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider_;
97   // Pointer to the AttachmentUploaderImpl that owns this object.
98   base::WeakPtr<AttachmentUploaderImpl> owner_;
99   scoped_ptr<OAuth2TokenServiceRequest> access_token_request_;
100 
101   DISALLOW_COPY_AND_ASSIGN(UploadState);
102 };
103 
UploadState(const GURL & upload_url,const scoped_refptr<net::URLRequestContextGetter> & url_request_context_getter,const Attachment & attachment,const UploadCallback & user_callback,const std::string & account_id,const OAuth2TokenService::ScopeSet & scopes,OAuth2TokenServiceRequest::TokenServiceProvider * token_service_provider,const base::WeakPtr<AttachmentUploaderImpl> & owner)104 AttachmentUploaderImpl::UploadState::UploadState(
105     const GURL& upload_url,
106     const scoped_refptr<net::URLRequestContextGetter>&
107         url_request_context_getter,
108     const Attachment& attachment,
109     const UploadCallback& user_callback,
110     const std::string& account_id,
111     const OAuth2TokenService::ScopeSet& scopes,
112     OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
113     const base::WeakPtr<AttachmentUploaderImpl>& owner)
114     : OAuth2TokenService::Consumer("attachment-uploader-impl"),
115       is_stopped_(false),
116       upload_url_(upload_url),
117       url_request_context_getter_(url_request_context_getter),
118       attachment_(attachment),
119       user_callbacks_(1, user_callback),
120       account_id_(account_id),
121       scopes_(scopes),
122       token_service_provider_(token_service_provider),
123       owner_(owner) {
124   DCHECK(upload_url_.is_valid());
125   DCHECK(url_request_context_getter_.get());
126   DCHECK(!account_id_.empty());
127   DCHECK(!scopes_.empty());
128   DCHECK(token_service_provider_);
129   GetToken();
130 }
131 
~UploadState()132 AttachmentUploaderImpl::UploadState::~UploadState() {
133 }
134 
IsStopped() const135 bool AttachmentUploaderImpl::UploadState::IsStopped() const {
136   DCHECK(CalledOnValidThread());
137   return is_stopped_;
138 }
139 
AddUserCallback(const UploadCallback & user_callback)140 void AttachmentUploaderImpl::UploadState::AddUserCallback(
141     const UploadCallback& user_callback) {
142   DCHECK(CalledOnValidThread());
143   DCHECK(!is_stopped_);
144   user_callbacks_.push_back(user_callback);
145 }
146 
GetAttachment()147 const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() {
148   DCHECK(CalledOnValidThread());
149   return attachment_;
150 }
151 
OnURLFetchComplete(const net::URLFetcher * source)152 void AttachmentUploaderImpl::UploadState::OnURLFetchComplete(
153     const net::URLFetcher* source) {
154   DCHECK(CalledOnValidThread());
155   if (is_stopped_) {
156     return;
157   }
158 
159   UploadResult result = UPLOAD_TRANSIENT_ERROR;
160   AttachmentId attachment_id = attachment_.GetId();
161   const int response_code = source->GetResponseCode();
162   if (response_code == net::HTTP_OK) {
163     result = UPLOAD_SUCCESS;
164   } else if (response_code == net::HTTP_UNAUTHORIZED) {
165     // Server tells us we've got a bad token so invalidate it.
166     OAuth2TokenServiceRequest::InvalidateToken(
167         token_service_provider_, account_id_, scopes_, access_token_);
168     // Fail the request, but indicate that it may be successful if retried.
169     result = UPLOAD_TRANSIENT_ERROR;
170   } else if (response_code == net::HTTP_FORBIDDEN) {
171     // User is not allowed to use attachments.  Retrying won't help.
172     result = UPLOAD_UNSPECIFIED_ERROR;
173   } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
174     result = UPLOAD_TRANSIENT_ERROR;
175   }
176   StopAndReportResult(result, attachment_id);
177 }
178 
OnGetTokenSuccess(const OAuth2TokenService::Request * request,const std::string & access_token,const base::Time & expiration_time)179 void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess(
180     const OAuth2TokenService::Request* request,
181     const std::string& access_token,
182     const base::Time& expiration_time) {
183   DCHECK(CalledOnValidThread());
184   if (is_stopped_) {
185     return;
186   }
187 
188   DCHECK_EQ(access_token_request_.get(), request);
189   access_token_request_.reset();
190   access_token_ = access_token;
191   fetcher_.reset(
192       net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this));
193   fetcher_->SetAutomaticallyRetryOn5xx(false);
194   fetcher_->SetRequestContext(url_request_context_getter_.get());
195   // TODO(maniscalco): Is there a better way?  Copying the attachment data into
196   // a string feels wrong given how large attachments may be (several MBs).  If
197   // we may end up switching from URLFetcher to URLRequest, this copy won't be
198   // necessary.
199   scoped_refptr<base::RefCountedMemory> memory = attachment_.GetData();
200   const std::string upload_content(memory->front_as<char>(), memory->size());
201   fetcher_->SetUploadData(kContentType, upload_content);
202   const std::string auth_header("Authorization: Bearer " + access_token_);
203   fetcher_->AddExtraRequestHeader(auth_header);
204   fetcher_->SetLoadFlags(net::LOAD_DO_NOT_SAVE_COOKIES |
205                          net::LOAD_DO_NOT_SEND_COOKIES |
206                          net::LOAD_DISABLE_CACHE);
207   // TODO(maniscalco): Set an appropriate headers (User-Agent, Content-type, and
208   // Content-length) on the request and include the content's MD5,
209   // AttachmentId's unique_id and the "sync birthday" (bug 371521).
210   fetcher_->Start();
211 }
212 
OnGetTokenFailure(const OAuth2TokenService::Request * request,const GoogleServiceAuthError & error)213 void AttachmentUploaderImpl::UploadState::OnGetTokenFailure(
214     const OAuth2TokenService::Request* request,
215     const GoogleServiceAuthError& error) {
216   DCHECK(CalledOnValidThread());
217   if (is_stopped_) {
218     return;
219   }
220 
221   DCHECK_EQ(access_token_request_.get(), request);
222   access_token_request_.reset();
223   // TODO(maniscalco): We treat this as a transient error, but it may in fact be
224   // a very long lived error and require user action.  Consider differentiating
225   // between the causes of GetToken failure and act accordingly.  Think about
226   // the causes of GetToken failure. Are there (bug 412802).
227   StopAndReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId());
228 }
229 
GetToken()230 void AttachmentUploaderImpl::UploadState::GetToken() {
231   access_token_request_ = OAuth2TokenServiceRequest::CreateAndStart(
232       token_service_provider_, account_id_, scopes_, this);
233 }
234 
StopAndReportResult(const UploadResult & result,const AttachmentId & attachment_id)235 void AttachmentUploaderImpl::UploadState::StopAndReportResult(
236     const UploadResult& result,
237     const AttachmentId& attachment_id) {
238   DCHECK(!is_stopped_);
239   is_stopped_ = true;
240   UploadCallbackList::const_iterator iter = user_callbacks_.begin();
241   UploadCallbackList::const_iterator end = user_callbacks_.end();
242   for (; iter != end; ++iter) {
243     base::MessageLoop::current()->PostTask(
244         FROM_HERE, base::Bind(*iter, result, attachment_id));
245   }
246   base::MessageLoop::current()->PostTask(
247       FROM_HERE,
248       base::Bind(&AttachmentUploaderImpl::OnUploadStateStopped,
249                  owner_,
250                  attachment_id.GetProto().unique_id()));
251 }
252 
AttachmentUploaderImpl(const GURL & sync_service_url,const scoped_refptr<net::URLRequestContextGetter> & url_request_context_getter,const std::string & account_id,const OAuth2TokenService::ScopeSet & scopes,const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider> & token_service_provider)253 AttachmentUploaderImpl::AttachmentUploaderImpl(
254     const GURL& sync_service_url,
255     const scoped_refptr<net::URLRequestContextGetter>&
256         url_request_context_getter,
257     const std::string& account_id,
258     const OAuth2TokenService::ScopeSet& scopes,
259     const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider>&
260         token_service_provider)
261     : sync_service_url_(sync_service_url),
262       url_request_context_getter_(url_request_context_getter),
263       account_id_(account_id),
264       scopes_(scopes),
265       token_service_provider_(token_service_provider),
266       weak_ptr_factory_(this) {
267   DCHECK(CalledOnValidThread());
268   DCHECK(!account_id.empty());
269   DCHECK(!scopes.empty());
270   DCHECK(token_service_provider_.get());
271 }
272 
~AttachmentUploaderImpl()273 AttachmentUploaderImpl::~AttachmentUploaderImpl() {
274   DCHECK(CalledOnValidThread());
275 }
276 
UploadAttachment(const Attachment & attachment,const UploadCallback & callback)277 void AttachmentUploaderImpl::UploadAttachment(const Attachment& attachment,
278                                               const UploadCallback& callback) {
279   DCHECK(CalledOnValidThread());
280   const AttachmentId attachment_id = attachment.GetId();
281   const std::string unique_id = attachment_id.GetProto().unique_id();
282   DCHECK(!unique_id.empty());
283   StateMap::iterator iter = state_map_.find(unique_id);
284   if (iter != state_map_.end()) {
285     // We have an old upload request for this attachment...
286     if (!iter->second->IsStopped()) {
287       // "join" to it.
288       DCHECK(attachment.GetData()
289                  ->Equals(iter->second->GetAttachment().GetData()));
290       iter->second->AddUserCallback(callback);
291       return;
292     } else {
293       // It's stopped so we can't use it.  Delete it.
294       state_map_.erase(iter);
295     }
296   }
297 
298   const GURL url = GetURLForAttachmentId(sync_service_url_, attachment_id);
299   scoped_ptr<UploadState> upload_state(
300       new UploadState(url,
301                       url_request_context_getter_,
302                       attachment,
303                       callback,
304                       account_id_,
305                       scopes_,
306                       token_service_provider_.get(),
307                       weak_ptr_factory_.GetWeakPtr()));
308   state_map_.add(unique_id, upload_state.Pass());
309 }
310 
311 // Static.
GetURLForAttachmentId(const GURL & sync_service_url,const AttachmentId & attachment_id)312 GURL AttachmentUploaderImpl::GetURLForAttachmentId(
313     const GURL& sync_service_url,
314     const AttachmentId& attachment_id) {
315   std::string path = sync_service_url.path();
316   if (path.empty() || *path.rbegin() != '/') {
317     path += '/';
318   }
319   path += kAttachments;
320   path += attachment_id.GetProto().unique_id();
321   GURL::Replacements replacements;
322   replacements.SetPathStr(path);
323   return sync_service_url.ReplaceComponents(replacements);
324 }
325 
OnUploadStateStopped(const UniqueId & unique_id)326 void AttachmentUploaderImpl::OnUploadStateStopped(const UniqueId& unique_id) {
327   StateMap::iterator iter = state_map_.find(unique_id);
328   // Only erase if stopped.  Because this method is called asynchronously, it's
329   // possible that a new request for this same id arrived after the UploadState
330   // stopped, but before this method was invoked.  In that case the UploadState
331   // in the map might be a new one.
332   if (iter != state_map_.end() && iter->second->IsStopped()) {
333     state_map_.erase(iter);
334   }
335 }
336 
337 }  // namespace syncer
338