1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/public/attachments/attachment_uploader_impl.h"
6
7 #include "base/bind.h"
8 #include "base/memory/weak_ptr.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/threading/non_thread_safe.h"
11 #include "google_apis/gaia/gaia_constants.h"
12 #include "net/base/load_flags.h"
13 #include "net/http/http_status_code.h"
14 #include "net/url_request/url_fetcher.h"
15 #include "net/url_request/url_fetcher_delegate.h"
16 #include "sync/api/attachments/attachment.h"
17 #include "sync/protocol/sync.pb.h"
18
19 namespace {
20
21 const char kContentType[] = "application/octet-stream";
22 const char kAttachments[] = "attachments/";
23
24 } // namespace
25
26 namespace syncer {
27
28 // Encapsulates all the state associated with a single upload.
29 class AttachmentUploaderImpl::UploadState : public net::URLFetcherDelegate,
30 public OAuth2TokenService::Consumer,
31 public base::NonThreadSafe {
32 public:
33 // Construct an UploadState.
34 //
35 // UploadState encapsulates the state associated with a single upload. When
36 // the upload completes, the UploadState object becomes "stopped".
37 //
38 // |owner| is a pointer to the object that owns this UploadState. Upon
39 // completion this object will PostTask to owner's OnUploadStateStopped
40 // method.
41 UploadState(
42 const GURL& upload_url,
43 const scoped_refptr<net::URLRequestContextGetter>&
44 url_request_context_getter,
45 const Attachment& attachment,
46 const UploadCallback& user_callback,
47 const std::string& account_id,
48 const OAuth2TokenService::ScopeSet& scopes,
49 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
50 const base::WeakPtr<AttachmentUploaderImpl>& owner);
51
52 virtual ~UploadState();
53
54 // Returns true if this object is stopped. Once stopped, this object is
55 // effectively dead and can be destroyed.
56 bool IsStopped() const;
57
58 // Add |user_callback| to the list of callbacks to be invoked when this upload
59 // completed.
60 //
61 // It is an error to call |AddUserCallback| on a stopped UploadState (see
62 // |IsStopped|).
63 void AddUserCallback(const UploadCallback& user_callback);
64
65 // Return the Attachment this object is uploading.
66 const Attachment& GetAttachment();
67
68 // URLFetcher implementation.
69 virtual void OnURLFetchComplete(const net::URLFetcher* source) OVERRIDE;
70
71 // OAuth2TokenService::Consumer.
72 virtual void OnGetTokenSuccess(const OAuth2TokenService::Request* request,
73 const std::string& access_token,
74 const base::Time& expiration_time) OVERRIDE;
75 virtual void OnGetTokenFailure(const OAuth2TokenService::Request* request,
76 const GoogleServiceAuthError& error) OVERRIDE;
77
78 private:
79 typedef std::vector<UploadCallback> UploadCallbackList;
80
81 void GetToken();
82
83 void StopAndReportResult(const UploadResult& result,
84 const AttachmentId& attachment_id);
85
86 bool is_stopped_;
87 GURL upload_url_;
88 const scoped_refptr<net::URLRequestContextGetter>&
89 url_request_context_getter_;
90 Attachment attachment_;
91 UploadCallbackList user_callbacks_;
92 scoped_ptr<net::URLFetcher> fetcher_;
93 std::string account_id_;
94 OAuth2TokenService::ScopeSet scopes_;
95 std::string access_token_;
96 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider_;
97 // Pointer to the AttachmentUploaderImpl that owns this object.
98 base::WeakPtr<AttachmentUploaderImpl> owner_;
99 scoped_ptr<OAuth2TokenServiceRequest> access_token_request_;
100
101 DISALLOW_COPY_AND_ASSIGN(UploadState);
102 };
103
UploadState(const GURL & upload_url,const scoped_refptr<net::URLRequestContextGetter> & url_request_context_getter,const Attachment & attachment,const UploadCallback & user_callback,const std::string & account_id,const OAuth2TokenService::ScopeSet & scopes,OAuth2TokenServiceRequest::TokenServiceProvider * token_service_provider,const base::WeakPtr<AttachmentUploaderImpl> & owner)104 AttachmentUploaderImpl::UploadState::UploadState(
105 const GURL& upload_url,
106 const scoped_refptr<net::URLRequestContextGetter>&
107 url_request_context_getter,
108 const Attachment& attachment,
109 const UploadCallback& user_callback,
110 const std::string& account_id,
111 const OAuth2TokenService::ScopeSet& scopes,
112 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
113 const base::WeakPtr<AttachmentUploaderImpl>& owner)
114 : OAuth2TokenService::Consumer("attachment-uploader-impl"),
115 is_stopped_(false),
116 upload_url_(upload_url),
117 url_request_context_getter_(url_request_context_getter),
118 attachment_(attachment),
119 user_callbacks_(1, user_callback),
120 account_id_(account_id),
121 scopes_(scopes),
122 token_service_provider_(token_service_provider),
123 owner_(owner) {
124 DCHECK(upload_url_.is_valid());
125 DCHECK(url_request_context_getter_.get());
126 DCHECK(!account_id_.empty());
127 DCHECK(!scopes_.empty());
128 DCHECK(token_service_provider_);
129 GetToken();
130 }
131
~UploadState()132 AttachmentUploaderImpl::UploadState::~UploadState() {
133 }
134
IsStopped() const135 bool AttachmentUploaderImpl::UploadState::IsStopped() const {
136 DCHECK(CalledOnValidThread());
137 return is_stopped_;
138 }
139
AddUserCallback(const UploadCallback & user_callback)140 void AttachmentUploaderImpl::UploadState::AddUserCallback(
141 const UploadCallback& user_callback) {
142 DCHECK(CalledOnValidThread());
143 DCHECK(!is_stopped_);
144 user_callbacks_.push_back(user_callback);
145 }
146
GetAttachment()147 const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() {
148 DCHECK(CalledOnValidThread());
149 return attachment_;
150 }
151
OnURLFetchComplete(const net::URLFetcher * source)152 void AttachmentUploaderImpl::UploadState::OnURLFetchComplete(
153 const net::URLFetcher* source) {
154 DCHECK(CalledOnValidThread());
155 if (is_stopped_) {
156 return;
157 }
158
159 UploadResult result = UPLOAD_TRANSIENT_ERROR;
160 AttachmentId attachment_id = attachment_.GetId();
161 const int response_code = source->GetResponseCode();
162 if (response_code == net::HTTP_OK) {
163 result = UPLOAD_SUCCESS;
164 } else if (response_code == net::HTTP_UNAUTHORIZED) {
165 // Server tells us we've got a bad token so invalidate it.
166 OAuth2TokenServiceRequest::InvalidateToken(
167 token_service_provider_, account_id_, scopes_, access_token_);
168 // Fail the request, but indicate that it may be successful if retried.
169 result = UPLOAD_TRANSIENT_ERROR;
170 } else if (response_code == net::HTTP_FORBIDDEN) {
171 // User is not allowed to use attachments. Retrying won't help.
172 result = UPLOAD_UNSPECIFIED_ERROR;
173 } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
174 result = UPLOAD_TRANSIENT_ERROR;
175 }
176 StopAndReportResult(result, attachment_id);
177 }
178
OnGetTokenSuccess(const OAuth2TokenService::Request * request,const std::string & access_token,const base::Time & expiration_time)179 void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess(
180 const OAuth2TokenService::Request* request,
181 const std::string& access_token,
182 const base::Time& expiration_time) {
183 DCHECK(CalledOnValidThread());
184 if (is_stopped_) {
185 return;
186 }
187
188 DCHECK_EQ(access_token_request_.get(), request);
189 access_token_request_.reset();
190 access_token_ = access_token;
191 fetcher_.reset(
192 net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this));
193 fetcher_->SetAutomaticallyRetryOn5xx(false);
194 fetcher_->SetRequestContext(url_request_context_getter_.get());
195 // TODO(maniscalco): Is there a better way? Copying the attachment data into
196 // a string feels wrong given how large attachments may be (several MBs). If
197 // we may end up switching from URLFetcher to URLRequest, this copy won't be
198 // necessary.
199 scoped_refptr<base::RefCountedMemory> memory = attachment_.GetData();
200 const std::string upload_content(memory->front_as<char>(), memory->size());
201 fetcher_->SetUploadData(kContentType, upload_content);
202 const std::string auth_header("Authorization: Bearer " + access_token_);
203 fetcher_->AddExtraRequestHeader(auth_header);
204 fetcher_->SetLoadFlags(net::LOAD_DO_NOT_SAVE_COOKIES |
205 net::LOAD_DO_NOT_SEND_COOKIES |
206 net::LOAD_DISABLE_CACHE);
207 // TODO(maniscalco): Set an appropriate headers (User-Agent, Content-type, and
208 // Content-length) on the request and include the content's MD5,
209 // AttachmentId's unique_id and the "sync birthday" (bug 371521).
210 fetcher_->Start();
211 }
212
OnGetTokenFailure(const OAuth2TokenService::Request * request,const GoogleServiceAuthError & error)213 void AttachmentUploaderImpl::UploadState::OnGetTokenFailure(
214 const OAuth2TokenService::Request* request,
215 const GoogleServiceAuthError& error) {
216 DCHECK(CalledOnValidThread());
217 if (is_stopped_) {
218 return;
219 }
220
221 DCHECK_EQ(access_token_request_.get(), request);
222 access_token_request_.reset();
223 // TODO(maniscalco): We treat this as a transient error, but it may in fact be
224 // a very long lived error and require user action. Consider differentiating
225 // between the causes of GetToken failure and act accordingly. Think about
226 // the causes of GetToken failure. Are there (bug 412802).
227 StopAndReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId());
228 }
229
GetToken()230 void AttachmentUploaderImpl::UploadState::GetToken() {
231 access_token_request_ = OAuth2TokenServiceRequest::CreateAndStart(
232 token_service_provider_, account_id_, scopes_, this);
233 }
234
StopAndReportResult(const UploadResult & result,const AttachmentId & attachment_id)235 void AttachmentUploaderImpl::UploadState::StopAndReportResult(
236 const UploadResult& result,
237 const AttachmentId& attachment_id) {
238 DCHECK(!is_stopped_);
239 is_stopped_ = true;
240 UploadCallbackList::const_iterator iter = user_callbacks_.begin();
241 UploadCallbackList::const_iterator end = user_callbacks_.end();
242 for (; iter != end; ++iter) {
243 base::MessageLoop::current()->PostTask(
244 FROM_HERE, base::Bind(*iter, result, attachment_id));
245 }
246 base::MessageLoop::current()->PostTask(
247 FROM_HERE,
248 base::Bind(&AttachmentUploaderImpl::OnUploadStateStopped,
249 owner_,
250 attachment_id.GetProto().unique_id()));
251 }
252
AttachmentUploaderImpl(const GURL & sync_service_url,const scoped_refptr<net::URLRequestContextGetter> & url_request_context_getter,const std::string & account_id,const OAuth2TokenService::ScopeSet & scopes,const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider> & token_service_provider)253 AttachmentUploaderImpl::AttachmentUploaderImpl(
254 const GURL& sync_service_url,
255 const scoped_refptr<net::URLRequestContextGetter>&
256 url_request_context_getter,
257 const std::string& account_id,
258 const OAuth2TokenService::ScopeSet& scopes,
259 const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider>&
260 token_service_provider)
261 : sync_service_url_(sync_service_url),
262 url_request_context_getter_(url_request_context_getter),
263 account_id_(account_id),
264 scopes_(scopes),
265 token_service_provider_(token_service_provider),
266 weak_ptr_factory_(this) {
267 DCHECK(CalledOnValidThread());
268 DCHECK(!account_id.empty());
269 DCHECK(!scopes.empty());
270 DCHECK(token_service_provider_.get());
271 }
272
~AttachmentUploaderImpl()273 AttachmentUploaderImpl::~AttachmentUploaderImpl() {
274 DCHECK(CalledOnValidThread());
275 }
276
UploadAttachment(const Attachment & attachment,const UploadCallback & callback)277 void AttachmentUploaderImpl::UploadAttachment(const Attachment& attachment,
278 const UploadCallback& callback) {
279 DCHECK(CalledOnValidThread());
280 const AttachmentId attachment_id = attachment.GetId();
281 const std::string unique_id = attachment_id.GetProto().unique_id();
282 DCHECK(!unique_id.empty());
283 StateMap::iterator iter = state_map_.find(unique_id);
284 if (iter != state_map_.end()) {
285 // We have an old upload request for this attachment...
286 if (!iter->second->IsStopped()) {
287 // "join" to it.
288 DCHECK(attachment.GetData()
289 ->Equals(iter->second->GetAttachment().GetData()));
290 iter->second->AddUserCallback(callback);
291 return;
292 } else {
293 // It's stopped so we can't use it. Delete it.
294 state_map_.erase(iter);
295 }
296 }
297
298 const GURL url = GetURLForAttachmentId(sync_service_url_, attachment_id);
299 scoped_ptr<UploadState> upload_state(
300 new UploadState(url,
301 url_request_context_getter_,
302 attachment,
303 callback,
304 account_id_,
305 scopes_,
306 token_service_provider_.get(),
307 weak_ptr_factory_.GetWeakPtr()));
308 state_map_.add(unique_id, upload_state.Pass());
309 }
310
311 // Static.
GetURLForAttachmentId(const GURL & sync_service_url,const AttachmentId & attachment_id)312 GURL AttachmentUploaderImpl::GetURLForAttachmentId(
313 const GURL& sync_service_url,
314 const AttachmentId& attachment_id) {
315 std::string path = sync_service_url.path();
316 if (path.empty() || *path.rbegin() != '/') {
317 path += '/';
318 }
319 path += kAttachments;
320 path += attachment_id.GetProto().unique_id();
321 GURL::Replacements replacements;
322 replacements.SetPathStr(path);
323 return sync_service_url.ReplaceComponents(replacements);
324 }
325
OnUploadStateStopped(const UniqueId & unique_id)326 void AttachmentUploaderImpl::OnUploadStateStopped(const UniqueId& unique_id) {
327 StateMap::iterator iter = state_map_.find(unique_id);
328 // Only erase if stopped. Because this method is called asynchronously, it's
329 // possible that a new request for this same id arrived after the UploadState
330 // stopped, but before this method was invoked. In that case the UploadState
331 // in the map might be a new one.
332 if (iter != state_map_.end() && iter->second->IsStopped()) {
333 state_map_.erase(iter);
334 }
335 }
336
337 } // namespace syncer
338